functiontrace-server 0.4.0

The server component that FunctionTrace (functiontrace.com) clients will spawn and connect to
Documentation
extern crate log;

use functiontrace_server::function_trace::{FunctionTrace, TraceInitialization};
use functiontrace_server::profile_generator::{
    FirefoxProfile, FirefoxProfileThreadId, FirefoxThread,
};
use functiontrace_server::trace_streamer::TraceSender;
use serde::de::Deserialize;
use std::io::{BufReader, Read};
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::{fs, process, thread};

use argh::FromArgs;

#[cfg(feature = "debug-tracelog")]
use std::fs::File;
#[cfg(feature = "debug-tracelog")]
use std::io::prelude::*;
#[cfg(feature = "debug-tracelog")]
use std::time::Duration;

#[derive(FromArgs)]
/** functiontrace-server (https://functiontrace.com)
The profile generation server for functiontrace.  This should rarely be run manually.
*/
struct Args {
    /// the directory to write generated profiles to
    #[argh(option, short = 'd')]
    directory: String,

    /// max number of threads/subprocesses to support (default = 1024)
    #[argh(option, default = "1024")]
    max_threads: usize,

    /// read debug output from a thread's previous run from the given file
    #[cfg(feature = "debug-tracelog")]
    #[argh(option)]
    debug_tracelog: Option<String>,
}

fn main() -> std::io::Result<()> {
    env_logger::Builder::from_default_env()
        .format_timestamp(None)
        .format_module_path(false)
        .init();
    let args: Args = argh::from_env();

    // Find the location we're supposed to output our profiles to.
    let output_dir = match Path::new(&args.directory).canonicalize() {
        Ok(path) => path,
        Err(x) => {
            log::error!("The provided output directory doesn't exist");
            return Err(x);
        }
    };

    #[cfg(feature = "debug-tracelog")]
    if let Some(filename) = args.debug_tracelog {
        // We've been asked to read from a debug tracelog, rather than live input.
        log::info!("Parsing a previous set of inputs to functiontrace-server");

        // Attempt to parse the log, allowing us to make server-side changes to see the
        // impact on previous inputs.  We don't care about generating any real outputs
        // in this mode.
        let mut profile = FirefoxProfile::new(TraceInitialization {
            lang_version: "replayed".to_string(),
            platform: "replayed".to_string(),
            program_name: "replayed".to_string(),
            program_version: "replayed".to_string(),
            time: Duration::new(0, 0),
        });

        let tid = profile.register_thread();
        parse_thread_logs(File::open(filename)?, tid)?;

        return Ok(());
    }

    log::info!("Starting functiontrace-server");

    // Create a Unix socket to listen on for trace messages.
    let pipe =
        Path::new(&format!("/tmp/functiontrace-server.sock.{}", process::id())).to_path_buf();

    if pipe.exists() {
        // This socket already exists - remove it so we can rebind.
        fs::remove_file(&pipe)?;
        log::warn!("Deleted an existing pipe - you should kill any other running instances of this application.");
    }

    let listener = UnixListener::bind(pipe)?;

    // The first message we're expecting is an encoded TraceInitialization.
    let mut profile = {
        let (mut socket, _addr) = listener.accept()?;

        let info: TraceInitialization = {
            let mut buf = Vec::new();
            socket
                .read_to_end(&mut buf)
                .expect("Invalid initialization message");

            // TODO: We should print out what it failed on
            rmp_serde::from_read_ref(&buf).expect("Failed to parse message")
        };
        log::info!("Received a new trace connection: {:?}", info);

        FirefoxProfile::new(info)
    };

    let mut poll = mio::Poll::new()?;
    let mut events = mio::Events::with_capacity(args.max_threads);
    let mut clients: Vec<(mio::net::UnixStream, TraceSender)> = Vec::new();
    let mut threads = Vec::new();
    let mut retired_threads = 0;

    // Listen to more connections, each of which corresponds to a client thread.
    listener.set_nonblocking(true)?;
    let mut client_listener = mio::net::UnixListener::from_std(listener);

    const NEW_CLIENT: usize = std::usize::MAX;
    poll.registry().register(
        &mut client_listener,
        mio::Token(NEW_CLIENT),
        mio::Interest::READABLE,
    )?;

    loop {
        poll.poll(&mut events, None)?;

        for event in events.iter() {
            match event.token() {
                mio::Token(NEW_CLIENT) => loop {
                    match client_listener.accept() {
                        Ok((mut stream, _)) => {
                            // Listen on this client.
                            poll.registry().register(
                                &mut stream,
                                mio::Token(clients.len()),
                                mio::Interest::READABLE,
                            )?;

                            let thread_info = profile.register_thread();
                            log::info!("|{}| Connection from a new client", thread_info.tid);

                            let sender = TraceSender::new(thread_info.tid);
                            let receiver = sender.receiver();
                            clients.push((stream, sender));

                            // Register this thread with the profile, then push the full log
                            // parsing out into its own thread.
                            threads
                                .push(thread::spawn(|| parse_thread_logs(receiver, thread_info)));
                        }
                        Err(err) => {
                            if err.kind() == std::io::ErrorKind::WouldBlock {
                                // We aren't ready for more accepts, so go back to sleep.
                                break;
                            } else {
                                return Err(err);
                            }
                        }
                    }
                },
                mio::Token(x) => {
                    // This is a client sending us data.
                    let (client, sender) = clients
                        .get_mut(x)
                        .expect("Can only listen to clients we're expecting");

                    loop {
                        // Allocate a reasonably sized buffer to read data into.
                        let mut buf = Vec::new();
                        buf.resize(64 * 1024, 0);

                        match client.read(&mut buf) {
                            Ok(0) => {
                                // Once we receive a wakeup for 0 bytes, we know that the other
                                // side disconnected.
                                poll.registry().deregister(client)?;
                                let sent = sender.retire();
                                log::info!(
                                    "|{}| Retired client after reading {}",
                                    sender.client_id,
                                    bytesize::to_string(sent as u64, false)
                                );

                                retired_threads += 1;
                                break;
                            }
                            Ok(len) => {
                                // We read some data, but there may be more.  Send this batch to
                                // the other consumer for now.
                                buf.truncate(len);
                                sender.send(buf);
                            }
                            Err(err) => {
                                if err.kind() == std::io::ErrorKind::WouldBlock {
                                    log::trace!("|{}| Blocking client", x);
                                    break;
                                } else {
                                    return Err(err);
                                }
                            }
                        }
                    }
                }
            }
        }

        if !clients.is_empty() && clients.len() == retired_threads {
            // We've registered clients and retired the same number that have registered.  This
            // means there are no clients left outstanding (and no others could be capable of
            // talking to this socket), so we're done handling inbound messages.
            break;
        }
    }

    // Wait for each of the threads to parse their traces, then add them to the profile.
    for t in threads.into_iter() {
        match t.join().expect("Thread failed to join")? {
            ThreadTrace::ParsedThread(trace) => profile.finalize_thread(trace),
            // TODO: We should denote that this thread is corrupted in some fashion.
            // However, we've never observed this case in the wild, so don't know how it'll
            // manifest.
            ThreadTrace::CorruptedThread(trace) => profile.finalize_thread(trace),
        }
    }
    profile.export(output_dir)?;

    Ok(())
}

/// A valid thread's trace can either be parsed entirely successful or have corrupted data
/// somewhere in it.
pub enum ThreadTrace {
    /// The parsed thread logs.
    ParsedThread(FirefoxThread),
    /// The given thread failed to be parsed properly, likely due to a crash, and the trace is
    /// terminated unexpectedly.
    CorruptedThread(FirefoxThread),
}

/// Parses the inbound trace stream for the given thread, returning the parsed data.
fn parse_thread_logs(logs: impl Read, tid: FirefoxProfileThreadId) -> std::io::Result<ThreadTrace> {
    let mut decoder = rmp_serde::decode::Deserializer::new(BufReader::new(logs));
    let id = tid.tid;

    #[cfg(feature = "debug-tracelog")]
    let debug_file = format!("functiontrace_raw.{}.dat", id);
    #[cfg(feature = "debug-tracelog")]
    let output_error = format!("Dumping output to {}.  This can be analyzed via the --debug-tracelog flag to functiontrace-server.", debug_file);

    // We should first receive a thread registration event.
    let thread_registration = match FunctionTrace::deserialize(&mut decoder) {
        Ok(FunctionTrace::RegisterThread(registration)) => {
            log::info!("|{}| Parsing a new thread", id);
            Some(FirefoxThread::new(registration, tid))
        }
        Ok(_) => {
            log::error!("|{}| Missing ThreadRegistration event", id);
            None
        }
        Err(e) => {
            log::error!("|{}| Deserialization error: {:?}", id, e);

            #[cfg(feature = "debug-tracelog")]
            {
                // TODO: This is broken right now, as we aren't able to look backwards into the
                // decoder like this.  We should probably just buffer all output in this mode to a
                // Vec, allowing us to dump it if needed.
                log::error!("|{}| {}", id, output_error);
                File::create(debug_file)?.write_all(decoder.get_ref().get_ref())?;
            }

            None
        }
    };

    let mut thread = match thread_registration {
        Some(thread) => thread,
        None => {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "Received data without a ThreadRegistration event",
            ))
        }
    };

    // After the thread is registered, we receive various trace events until the socket is closed.
    loop {
        match FunctionTrace::deserialize(&mut decoder) {
            Ok(FunctionTrace::RegisterThread(registration)) => {
                log::error!(
                    "|{}| Received an unexpected registration event: {:?}",
                    id,
                    registration
                );
                break Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    "Unexpected ThreadRegistration",
                ));
            }
            Ok(trace_log) => {
                log::trace!("|{}| {:?}", id, trace_log);
                thread.add_trace(trace_log);
            }
            Err(rmp_serde::decode::Error::InvalidMarkerRead(_)) => {
                // We get an invalid read once we're at the end of the loop.
                log::info!("|{}| Fully parsed thread!", id);
                break Ok(ThreadTrace::ParsedThread(thread));
            }
            Err(e) => {
                // We received an error, but we've parsed previous data that we don't want to drop.
                // Mark that this trace buffer was corrupted, and return the current thread state.
                //
                // XXX: We can't print the position of the deserializer because rmp only supports
                // `.position()` on std::io::Cursors for some reason.
                log::warn!("|{}| Deserialization error `{}` at (unknown offset)", id, e);

                #[cfg(feature = "debug-tracelog")]
                {
                    log::error!("|{}| {}", id, output_error);
                    File::create(debug_file)?.write_all(decoder.get_ref().get_ref())?;
                }

                break Ok(ThreadTrace::CorruptedThread(thread));
            }
        }
    }
}