1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
extern crate log;
use functiontrace_server::function_trace::{FunctionTrace, TraceInitialization};
use functiontrace_server::profile_generator::{
FirefoxProfile, FirefoxProfileThreadId, FirefoxThread,
};
use functiontrace_server::trace_streamer::TraceSender;
use serde::de::Deserialize;
use std::io::{BufReader, Read};
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::{fs, process, thread};
use argh::FromArgs;
#[cfg(feature = "debug-tracelog")]
use std::fs::File;
#[cfg(feature = "debug-tracelog")]
use std::io::prelude::*;
#[cfg(feature = "debug-tracelog")]
use std::time::Duration;
#[derive(FromArgs)]
/** functiontrace-server (https://functiontrace.com)
The profile generation server for functiontrace. This should rarely be run manually.
*/
struct Args {
/// the directory to write generated profiles to
#[argh(option, short = 'd')]
directory: String,
/// max number of threads/subprocesses to support (default = 1024)
#[argh(option, default = "1024")]
max_threads: usize,
/// read debug output from a thread's previous run from the given file
#[cfg(feature = "debug-tracelog")]
#[argh(option)]
debug_tracelog: Option<String>,
}
fn main() -> std::io::Result<()> {
env_logger::Builder::from_default_env()
.format_timestamp(None)
.format_module_path(false)
.init();
let args: Args = argh::from_env();
// Find the location we're supposed to output our profiles to.
let output_dir = match Path::new(&args.directory).canonicalize() {
Ok(path) => path,
Err(x) => {
log::error!("The provided output directory doesn't exist");
return Err(x);
}
};
#[cfg(feature = "debug-tracelog")]
if let Some(filename) = args.debug_tracelog {
// We've been asked to read from a debug tracelog, rather than live input.
log::info!("Parsing a previous set of inputs to functiontrace-server");
// Attempt to parse the log, allowing us to make server-side changes to see the
// impact on previous inputs. We don't care about generating any real outputs
// in this mode.
let mut profile = FirefoxProfile::new(TraceInitialization {
lang_version: "replayed".to_string(),
platform: "replayed".to_string(),
program_name: "replayed".to_string(),
program_version: "replayed".to_string(),
time: Duration::new(0, 0),
});
let tid = profile.register_thread();
parse_thread_logs(File::open(filename)?, tid)?;
return Ok(());
}
log::info!("Starting functiontrace-server");
// Create a Unix socket to listen on for trace messages.
let pipe =
Path::new(&format!("/tmp/functiontrace-server.sock.{}", process::id())).to_path_buf();
if pipe.exists() {
// This socket already exists - remove it so we can rebind.
fs::remove_file(&pipe)?;
log::warn!("Deleted an existing pipe - you should kill any other running instances of this application.");
}
let listener = UnixListener::bind(pipe)?;
// The first message we're expecting is an encoded TraceInitialization.
let mut profile = {
let (mut socket, _addr) = listener.accept()?;
let info: TraceInitialization = {
let mut buf = Vec::new();
socket
.read_to_end(&mut buf)
.expect("Invalid initialization message");
// TODO: We should print out what it failed on
rmp_serde::from_read_ref(&buf).expect("Failed to parse message")
};
log::info!("Received a new trace connection: {:?}", info);
FirefoxProfile::new(info)
};
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(args.max_threads);
let mut clients: Vec<(mio::net::UnixStream, TraceSender)> = Vec::new();
let mut threads = Vec::new();
let mut retired_threads = 0;
// Listen to more connections, each of which corresponds to a client thread.
listener.set_nonblocking(true)?;
let mut client_listener = mio::net::UnixListener::from_std(listener);
const NEW_CLIENT: usize = std::usize::MAX;
poll.registry().register(
&mut client_listener,
mio::Token(NEW_CLIENT),
mio::Interest::READABLE,
)?;
loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
mio::Token(NEW_CLIENT) => loop {
match client_listener.accept() {
Ok((mut stream, _)) => {
// Listen on this client.
poll.registry().register(
&mut stream,
mio::Token(clients.len()),
mio::Interest::READABLE,
)?;
let thread_info = profile.register_thread();
log::info!("|{}| Connection from a new client", thread_info.tid);
let sender = TraceSender::new(thread_info.tid);
let receiver = sender.receiver();
clients.push((stream, sender));
// Register this thread with the profile, then push the full log
// parsing out into its own thread.
threads
.push(thread::spawn(|| parse_thread_logs(receiver, thread_info)));
}
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
// We aren't ready for more accepts, so go back to sleep.
break;
} else {
return Err(err);
}
}
}
},
mio::Token(x) => {
// This is a client sending us data.
let (client, sender) = clients
.get_mut(x)
.expect("Can only listen to clients we're expecting");
loop {
// Allocate a reasonably sized buffer to read data into.
let mut buf = Vec::new();
buf.resize(64 * 1024, 0);
match client.read(&mut buf) {
Ok(0) => {
// Once we receive a wakeup for 0 bytes, we know that the other
// side disconnected.
poll.registry().deregister(client)?;
let sent = sender.retire();
log::info!(
"|{}| Retired client after reading {}",
sender.client_id,
bytesize::to_string(sent as u64, false)
);
retired_threads += 1;
break;
}
Ok(len) => {
// We read some data, but there may be more. Send this batch to
// the other consumer for now.
buf.truncate(len);
sender.send(buf);
}
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
log::trace!("|{}| Blocking client", x);
break;
} else {
return Err(err);
}
}
}
}
}
}
}
if !clients.is_empty() && clients.len() == retired_threads {
// We've registered clients and retired the same number that have registered. This
// means there are no clients left outstanding (and no others could be capable of
// talking to this socket), so we're done handling inbound messages.
break;
}
}
// Wait for each of the threads to parse their traces, then add them to the profile.
for t in threads.into_iter() {
match t.join().expect("Thread failed to join")? {
ThreadTrace::ParsedThread(trace) => profile.finalize_thread(trace),
// TODO: We should denote that this thread is corrupted in some fashion.
// However, we've never observed this case in the wild, so don't know how it'll
// manifest.
ThreadTrace::CorruptedThread(trace) => profile.finalize_thread(trace),
}
}
profile.export(output_dir)?;
Ok(())
}
/// A valid thread's trace can either be parsed entirely successful or have corrupted data
/// somewhere in it.
pub enum ThreadTrace {
/// The parsed thread logs.
ParsedThread(FirefoxThread),
/// The given thread failed to be parsed properly, likely due to a crash, and the trace is
/// terminated unexpectedly.
CorruptedThread(FirefoxThread),
}
/// Parses the inbound trace stream for the given thread, returning the parsed data.
fn parse_thread_logs(logs: impl Read, tid: FirefoxProfileThreadId) -> std::io::Result<ThreadTrace> {
let mut decoder = rmp_serde::decode::Deserializer::new(BufReader::new(logs));
let id = tid.tid;
#[cfg(feature = "debug-tracelog")]
let debug_file = format!("functiontrace_raw.{}.dat", id);
#[cfg(feature = "debug-tracelog")]
let output_error = format!("Dumping output to {}. This can be analyzed via the --debug-tracelog flag to functiontrace-server.", debug_file);
// We should first receive a thread registration event.
let thread_registration = match FunctionTrace::deserialize(&mut decoder) {
Ok(FunctionTrace::RegisterThread(registration)) => {
log::info!("|{}| Parsing a new thread", id);
Some(FirefoxThread::new(registration, tid))
}
Ok(_) => {
log::error!("|{}| Missing ThreadRegistration event", id);
None
}
Err(e) => {
log::error!("|{}| Deserialization error: {:?}", id, e);
#[cfg(feature = "debug-tracelog")]
{
// TODO: This is broken right now, as we aren't able to look backwards into the
// decoder like this. We should probably just buffer all output in this mode to a
// Vec, allowing us to dump it if needed.
log::error!("|{}| {}", id, output_error);
File::create(debug_file)?.write_all(decoder.get_ref().get_ref())?;
}
None
}
};
let mut thread = match thread_registration {
Some(thread) => thread,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Received data without a ThreadRegistration event",
))
}
};
// After the thread is registered, we receive various trace events until the socket is closed.
loop {
match FunctionTrace::deserialize(&mut decoder) {
Ok(FunctionTrace::RegisterThread(registration)) => {
log::error!(
"|{}| Received an unexpected registration event: {:?}",
id,
registration
);
break Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected ThreadRegistration",
));
}
Ok(trace_log) => {
log::trace!("|{}| {:?}", id, trace_log);
thread.add_trace(trace_log);
}
Err(rmp_serde::decode::Error::InvalidMarkerRead(_)) => {
// We get an invalid read once we're at the end of the loop.
log::info!("|{}| Fully parsed thread!", id);
break Ok(ThreadTrace::ParsedThread(thread));
}
Err(e) => {
// We received an error, but we've parsed previous data that we don't want to drop.
// Mark that this trace buffer was corrupted, and return the current thread state.
//
// XXX: We can't print the position of the deserializer because rmp only supports
// `.position()` on std::io::Cursors for some reason.
log::warn!("|{}| Deserialization error `{}` at (unknown offset)", id, e);
#[cfg(feature = "debug-tracelog")]
{
log::error!("|{}| {}", id, output_error);
File::create(debug_file)?.write_all(decoder.get_ref().get_ref())?;
}
break Ok(ThreadTrace::CorruptedThread(thread));
}
}
}
}