functiontrace-server 0.8.6

The server component that FunctionTrace (functiontrace.com) clients will spawn and connect to
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
use functiontrace_server::function_trace::{FunctionTrace, TraceInitialization};
use functiontrace_server::profile_generator::{
    FirefoxProfile, FirefoxProfileThreadId, FirefoxThread,
};
use functiontrace_server::trace_streamer::TraceSender;
use serde::de::Deserialize;
use std::io::{BufReader, Read, Seek};
use std::path::{Path, PathBuf};
use std::time::Duration;

use bytesize::ByteSize;
use color_eyre::eyre::{Result, WrapErr, eyre};
use owo_colors::OwoColorize;
use tokio::io::AsyncReadExt;

#[derive(argh::FromArgs)]
/** functiontrace-server <https://functiontrace.com>
The profile generation server for functiontrace.  This should rarely be run manually.
*/
struct Args {
    /// the directory to write generated profiles to
    #[argh(option, short = 'd')]
    directory: String,

    /// read debug output from a thread's previous run from the given file
    #[cfg(feature = "debug-tracelog")]
    #[argh(option)]
    debug_tracelog: Option<String>,
}

#[tokio::main]
async fn main() -> Result<()> {
    color_eyre::install()?;
    env_logger::Builder::from_default_env()
        .format_timestamp(None)
        .format_module_path(false)
        .init();
    let args: Args = argh::from_env();

    // Find the location we're supposed to output our profiles to.
    let output_dir = Path::new(&args.directory)
        .canonicalize()
        .wrap_err("The provided output directory doesn't exist")?;

    #[cfg(feature = "debug-tracelog")]
    if let Some(filename) = args.debug_tracelog {
        // We've been asked to read from a debug tracelog, rather than live input.
        log::info!("Parsing a previous set of inputs to functiontrace-server");

        // Attempt to parse the log, allowing us to make server-side changes to see the
        // impact on previous inputs.  We don't care about generating any real outputs
        // in this mode.
        let mut profile = FirefoxProfile::new(TraceInitialization {
            lang_version: "replayed".to_string(),
            platform: "replayed".to_string(),
            program_name: "replayed".to_string(),
            program_version: "replayed".to_string(),
            time: Duration::new(0, 0),
        });

        let tid = profile.register_thread();
        parse_thread_logs(
            std::fs::File::open(&filename)
                .wrap_err_with(|| format!("Failed to read debug tracelog {}", filename))?,
            &tid,
        )?;

        return Ok(());
    }

    // In the background, check if there's a more recent version of functiontrace available.
    // This can be disabled by setting `FUNCTIONTRACE_NOUPDATE`.
    //
    // NOTE: This is opt-out rather than opt-in because it hits `crates.io` rather than anything
    // under our control/visibility, so it doesn't seem too bad from a privacy perspective.
    // Nonetheless, let's make this configurable for people who care.
    let update_check = tokio::task::spawn(async {
        if std::env::var("FUNCTIONTRACE_NOUPDATE").is_ok() {
            // We've been asked not to check for updates.
            None
        } else {
            let latest =
                tokio::time::timeout(Duration::from_millis(500), check_latest::check_max_async!())
                    .await;

            // We intentionally ignore all errors here since this isn't key functionality.
            latest.ok().map(|x| x.ok()).flatten().unwrap_or(None)
        }
    });

    log::info!("Starting functiontrace-server");

    // Create a Unix socket to listen on for trace messages.
    let pipe = PathBuf::from(&format!(
        "/tmp/functiontrace-server.sock.{}",
        std::process::id()
    ));

    if pipe.exists() {
        // This socket already exists - remove it so we can rebind.
        std::fs::remove_file(&pipe)?;
        log::warn!(
            "Deleted an existing pipe - you should kill any other running instances of this application."
        );
    }

    let listener =
        tokio::net::UnixListener::bind(&pipe).wrap_err("Race condition creating pipe")?;

    // The first message we're expecting is an encoded TraceInitialization.
    let mut profile = {
        let (mut socket, _addr) = tokio::time::timeout(Duration::from_secs(5), listener.accept())
            .await
            .wrap_err("No clients initialized functiontrace-server before timeout")?
            .wrap_err("Failed to accept socket connection")?;

        let info: TraceInitialization = {
            let mut buf = Vec::with_capacity(std::mem::size_of::<TraceInitialization>());
            socket
                .read_to_end(&mut buf)
                .await
                .wrap_err("Invalid initialization message")?;

            // TODO: We should print out what it failed on
            rmp_serde::from_slice(&buf).wrap_err("Failed to parse initialization message")?
        };
        log::info!("Received a new trace connection: {:?}", info);

        FirefoxProfile::new(info)
    };

    let mut profile_threads = tokio::task::JoinSet::new();
    let mut client_tasks = tokio::task::JoinSet::new();
    let (mut spawned_tasks, mut retired_tasks) = (0, 0);

    loop {
        tokio::select! {
            // Listen to more connections, each of which corresponds to a client thread.
            connection = listener.accept() => {
                let mut client = connection.wrap_err("Error accepting a new client")?.0;

                // There's a new client ready to talk to us.  Register it, then spawn a task to
                // collect it's data and ferry it to blocking thread to be processed.
                let thread_info = profile.register_thread();
                log::info!("|{}| Connection from a new client", thread_info.tid);

                let mut sender = TraceSender::new(thread_info.tid);
                let receiver = sender.receiver();

                // Register this thread with the profile, then push the full log
                // parsing out into its own thread.
                profile_threads.spawn_blocking(move || {
                    parse_thread_logs(receiver, &thread_info)
                });

                spawned_tasks += 1;

                // Spawn a task to process the data this client sends us.
                client_tasks.spawn(async move {
                    let client_id = sender.client_id;

                    loop {
                        // Allocate a reasonably sized buffer to read data into.
                        let mut buf = Vec::new();
                        buf.resize(64 * 1024, 0);

                        match client.read(&mut buf).await {
                            Ok(0) => {
                                // Once we receive a wakeup for 0 bytes, we know that the other
                                // side disconnected.
                                let sent = sender.retire();
                                log::info!(
                                    "|{}| Retired client after reading {}",
                                    client_id,
                                    ByteSize(sent as u64).display().iec_short()
                                );
                                break;
                            }
                            Ok(len) => {
                                // We read some data, but there may be more.  Send this batch to
                                // the other consumer for now.
                                buf.truncate(len);
                                sender.send(buf);
                            }
                            Err(err) => {
                                if err.kind() == std::io::ErrorKind::WouldBlock {
                                    log::trace!("|{}| Blocking client", client_id);
                                    continue;
                                }

                                let sent = sender.retire();
                                log::error!("|{client_id}| Client disappeared with {err} after {}", 
                                    ByteSize(sent as u64).display().iec_short());
                                break;
                            }
                        }
                    }
                });
            },

            // One of our clients has disconnected and should be retired.
            Some(_) = client_tasks.join_next() => {
                retired_tasks += 1;

                if spawned_tasks == retired_tasks {
                    // We've registered clients and retired the same number that have registered.
                    // This means there are no clients left outstanding (and no others could be
                    // capable of talking to this socket), so we're done handling inbound messages.
                    break;
                }
            }
        };
    }

    // Wait for each of the threads to parse their traces, then add them to the profile.
    while let Some(t) = profile_threads.join_next().await {
        match t.expect("Parsing thread failed to join")? {
            ThreadTrace::ParsedThread(trace) => profile.finalize_thread(trace),
            // TODO: We should denote that this thread is corrupted in some fashion.
            // However, we've never observed this case in the wild, so don't know how it'll
            // manifest.
            ThreadTrace::CorruptedThread(trace) => profile.finalize_thread(trace),
        }
    }
    profile.export(output_dir)?;

    // Now that we've finished, print a notification if there's a newer version of functiontrace
    // for download.
    // NOTE: We don't want to print during execution since it might interleave poorly with other
    // text.
    if let Ok(Some(version)) = update_check.await {
        println!("{}", format!("functiontrace-server v{} has been released!  Consider updating to the newest release: {}", version, "cargo install functiontrace-server".italic()).bright_purple());
    }

    // Attempt to remove the socket we created, since we clearly aren't listening on it anymore.
    // If this fails it's fine, since it's in `/tmp` and will be removed on conflict in the future
    // either way.
    let _ = std::fs::remove_file(&pipe);

    Ok(())
}

/// A valid thread's trace can either be parsed entirely successful or have corrupted data
/// somewhere in it.
pub enum ThreadTrace {
    /// The parsed thread logs.
    ParsedThread(FirefoxThread),
    /// The given thread failed to be parsed properly, likely due to a crash, and the trace is
    /// terminated unexpectedly.
    CorruptedThread(FirefoxThread),
}

/// Parses the inbound trace stream for the given thread, returning the parsed data.
fn parse_thread_logs(logs: impl Read + Seek, tid: &FirefoxProfileThreadId) -> Result<ThreadTrace> {
    let mut decoder = rmp_serde::decode::Deserializer::new(BufReader::new({
        #[cfg(feature = "debug-tracelog")]
        {
            // If we're in debug mode, we'll record everything that flows through the underlying
            // reader so we can write it out and examine it later.
            debug::Recorder::new(logs)
        }

        #[cfg(not(feature = "debug-tracelog"))]
        logs
    }));

    let id = tid.tid;

    #[cfg(feature = "debug-tracelog")]
    let debug_file = format!("functiontrace_raw.{}.dat", id);

    #[cfg(feature = "debug-tracelog")]
    let output_error = format!(
        "Dumping output to {}.  This can be analyzed via the --debug-tracelog flag to functiontrace-server.",
        debug_file
    );

    // We should first receive a thread registration event.
    let mut thread = match FunctionTrace::deserialize(&mut decoder) {
        Ok(FunctionTrace::RegisterThread(registration)) => {
            log::info!("|{}| Parsing a new thread", id);
            FirefoxThread::new(registration, tid)
        }
        Ok(msg) => {
            log::error!("Received unexpected {msg:?}");
            return Err(eyre!("Missing ThreadRegistration event for thread {}", id));
        }
        Err(e) => {
            #[cfg(feature = "debug-tracelog")]
            {
                log::error!("|{}| {}", id, output_error);
                decoder
                    .into_inner()
                    .into_inner()
                    .dump(debug_file)
                    .wrap_err("Failed to dump debug tracelog")?;
            }

            return Err(e)
                .wrap_err_with(|| format!("Deserialization error initializing thread {}", id));
        }
    };

    // After the thread is registered, we receive various trace events until the socket is closed.
    loop {
        match FunctionTrace::deserialize(&mut decoder) {
            Ok(FunctionTrace::RegisterThread(registration)) => {
                log::error!(
                    "|{}| Received an unexpected registration event: {:?}",
                    id,
                    registration
                );
                break Err(eyre!("Unexpected ThreadRegistration"));
            }
            Ok(trace_log) => {
                log::trace!("|{}| {:?}", id, trace_log);
                thread.add_trace(trace_log);
            }
            Err(rmp_serde::decode::Error::InvalidMarkerRead(_)) => {
                // We get an invalid read once we're at the end of the loop.
                log::info!("|{}| Fully parsed thread!", id);
                break Ok(ThreadTrace::ParsedThread(thread));
            }
            Err(e) => {
                // We received an error, but we've parsed previous data that we don't want to drop.
                // Mark that this trace buffer was corrupted, and return the current thread state.
                let offset = decoder
                    .get_mut()
                    .stream_position()
                    .wrap_err("Fetching deserialization error offset")?;
                log::warn!("|{}| Deserialization error `{}` at {}", id, e, offset);

                #[cfg(feature = "debug-tracelog")]
                {
                    log::error!("|{}| {}", id, output_error);
                    decoder
                        .into_inner()
                        .into_inner()
                        .dump(debug_file)
                        .wrap_err("Failed to dump debug tracelog")?;
                }

                break Ok(ThreadTrace::CorruptedThread(thread));
            }
        }
    }
}

#[cfg(feature = "debug-tracelog")]
mod debug {
    use color_eyre::eyre::{Result, WrapErr};
    use std::io::{Read, Seek, SeekFrom, Write};

    /// A simple wrapper around a [`Read`] type that keeps a record of all bytes that have been
    /// read through it.
    ///
    /// This obviously isn't very efficient on memory and shouldn't be used generally, but it's
    /// very useful for looking at previously read bytes whenever there's a parse error or similar.
    pub(crate) struct Recorder<R: Read> {
        /// The underlying reader
        reader: R,

        /// Store the already-read bytes from the underlying reader.
        ///
        /// TODO: If we run into memory usage issues, this could be switched to be backed by a file
        /// instead.
        buffer: Vec<u8>,
    }

    impl<R: Read> Recorder<R> {
        pub fn new(reader: R) -> Self {
            Self {
                // Start with a 1MB buffer.  That's probably too small for most use-cases, but
                // since there's one of these per thread we don't want to go too large, and
                // performance doesn't matter that much since we're in a debug mode.
                buffer: Vec::with_capacity(1024 * 1024),
                reader,
            }
        }

        /// Dump the current recorded data to the given file path.
        pub fn dump(&self, output_file: impl Into<std::path::PathBuf>) -> Result<()> {
            let path = output_file.into();

            std::fs::File::create(&path)
                .wrap_err_with(|| format!("Failed to create file {}", path.display()))?
                .write_all(&self.buffer)
                .wrap_err("Failed to write recorded bytes to file")?;

            Ok(())
        }
    }

    /// Delegate to the underlying [`Read`] implementation, then keep a copy of any bytes that were
    /// read.
    impl<R: Read> Read for Recorder<R> {
        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
            let bytes = self.reader.read(buf)?;
            self.buffer.extend_from_slice(&buf[..bytes]);

            Ok(bytes)
        }
    }

    /// Delegate to the underlying [`Seek`] implementation.
    ///
    /// Theoretically we have more information so can write a better [`Seek`] than the underlying
    /// reader.  [`TraceReceiver`] in particular doesn't keep a history of its data, so only
    /// really implements [`stream_position`].  However, we don't actually need anything more than
    /// that.
    impl<R: Read + Seek> Seek for Recorder<R> {
        fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
            self.reader.seek(pos)
        }
    }
}