functiontrace-server 0.1.3

The server component that FunctionTrace (functiontrace.com) clients will spawn and connect to
Documentation
extern crate clap;
extern crate pretty_env_logger;

use clap::{App, Arg};
use std::io::Read;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::{fs, thread, process};
use functiontrace_server::profile_generator::{FirefoxProfile, FirefoxThread, FirefoxProfileThreadId};
use functiontrace_server::function_trace::{FunctionTrace, TraceInitialization};
use serde::de::Deserialize;

fn main() -> std::io::Result<()> {
    pretty_env_logger::init();

    let args = App::new("functiontrace-server")
        .version("0.2.0")
        .author("Matt Bryant <mbryant@programsareproofs.com>")
        .about("The profile generation server for functiontrace.  This should rarely be run manually.")
        .arg(Arg::with_name("output_dir")
             .short("d")
             .long("directory")
             .help("The directory to write generated profiles to")
             .required(true)
             .takes_value(true))
        .get_matches();

    // Find the location we're supposed to output our profiles to.
    let output_dir = match Path::new(args.value_of("output_dir").expect("Required")).canonicalize() {
        Ok(path) => path,
        Err(x) => {
            log::error!("The provided output directory doesn't exist");
            return Err(x);
        }
    };

    log::info!("Starting functiontrace-server");

    // Create a Unix socket to listen on for trace messages.
    let pipe = Path::new(&format!("/tmp/functiontrace-server.sock.{}", process::id())).to_path_buf();

    if pipe.exists() {
        // This socket already exists - remove it so we can rebind.
        fs::remove_file(&pipe)?;
        log::warn!("Deleted an existing pipe - you should kill any other running instances of this application.");
    }

    let listener = UnixListener::bind(pipe)?;

    // The first message we're expecting is an encoded TraceInitialization.
    let mut profile = {
        let (mut socket, _addr) = listener.accept()?;

        let info : TraceInitialization = {
            let mut buf = Vec::new();
            socket.read_to_end(&mut buf).expect("Invalid initialization message");

            // TODO: We should print out what it failed on
            rmp_serde::from_read_ref(&buf).expect("Failed to parse message")
        };
        log::info!("Received a new trace connection: {:?}", info);

        FirefoxProfile::new(info)
    };

    let mut poll = mio::Poll::new()?;
    let mut events = mio::Events::with_capacity(64);
    let mut clients: Vec<(mio::net::UnixStream, Vec<u8>)> = Vec::new();
    let mut threads = Vec::new();

    // Listen to more connections, each of which corresponds to a client thread.
    listener.set_nonblocking(true)?;
    let mut client_listener = mio::net::UnixListener::from_std(listener);

    const NEW_CLIENT: usize = std::usize::MAX;
    poll.registry().register(&mut client_listener, mio::Token(NEW_CLIENT), mio::Interest::READABLE)?;

    loop {
        poll.poll(&mut events, None)?;

        for event in events.iter() {
            match event.token() {
                mio::Token(NEW_CLIENT) => loop {
                    match client_listener.accept() {
                        Ok((mut stream, _)) => {
                            log::info!("Connection from a new thread");

                            // Listen on this client.
                            poll.registry().register(&mut stream, mio::Token(clients.len()), mio::Interest::READABLE)?;

                            clients.push((stream, Vec::new()));
                        },
                        Err(err) => if err.kind() == std::io::ErrorKind::WouldBlock {
                            // We aren't ready for more accepts, so go back to sleep.
                            break
                        } else {
                            return Err(err)
                        }
                    }
                },
                mio::Token(x) => {
                    // This is a client sending us data.
                    let (client, data) = clients.get_mut(x).expect("Can only listen to clients we're expecting");

                    loop {
                        // We read into this buffer then copy into our vector.  Ideally rustc is
                        // smart enough to read into the vector instead.
                        let mut buf = [0; 64 * 1024];

                        match client.read(&mut buf) {
                            Ok(0) => {
                                // Once we receive a wakeup for 0 bytes, we know that the other
                                // side disconnected.
                                log::info!("Retiring client {:?} ({} bytes)", x, data.len());
                                poll.registry().deregister(client)?;

                                // We can take ownership of our logs, since they won't be appended
                                // to anymore.
                                let logs = std::mem::replace(data, Vec::new());

                                // Register this thread with the profile, then push the full log
                                // parsing out into its own thread.
                                let tid = profile.register_thread();

                                threads.push(thread::spawn(move || parse_thread_logs(logs, tid)));
                                break;
                            },
                            Ok(len) => {
                                // We read some data, but there may be more.
                                data.extend_from_slice(&buf[0..len]);
                            },
                            Err(err) => if err.kind() == std::io::ErrorKind::WouldBlock {
                                log::trace!("Blocking client {:?}", x);
                                break;
                            } else {
                                return Err(err)
                            }
                        }
                    }
                }
            }
        }

        if clients.len() > 0 && clients.len() == threads.len() {
            // We've registered clients and retired the same number that have registered.
            // This means there are no clients left outstanding, so we're done handling them.
            break;
        }
    }

    // Wait for each of the threads to parse their traces, then add them to the profile.
    for t in threads.into_iter() {
        profile.finalize_thread(t.join().expect("Thread failed to join")?);
    }
    profile.export(output_dir)?;

    Ok(())
}

fn parse_thread_logs(logs: Vec<u8>, tid: FirefoxProfileThreadId) -> std::io::Result<FirefoxThread> {
    let mut decoder = rmp_serde::decode::Deserializer::new(std::io::Cursor::new(logs));

    // We should first receive a thread registration event.
    let registration_error = Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize ThreadRegistration"));
    let mut thread = match FunctionTrace::deserialize(&mut decoder) {
        Ok(FunctionTrace::RegisterThread(registration)) => {
            log::info!("Parsing a new thread: {:?}", registration);
            FirefoxThread::new(registration, tid)
        },
        Ok(_) => {
            log::error!("Missing ThreadRegistration event");
            return registration_error;
        },
        Err(e) => {
            log::error!("Deserialization error: {}", e);
            return registration_error;
        }
    };

    // After the thread is registered, we receive various trace events until the socket is closed.
    let deserialization_error = Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Failed to deserialize trace event"));
    loop {
        match FunctionTrace::deserialize(&mut decoder) {
            Ok(FunctionTrace::RegisterThread(registration)) => {
                log::error!("Received an unexpected registration event: {:?}", registration);
                return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Unexpected ThreadRegistration"));
            },
            Ok(trace_log) =>  {
                log::trace!("{:?}", trace_log);
                thread.add_trace(trace_log);
            },
            Err(rmp_serde::decode::Error::InvalidMarkerRead(_)) => {
                // We get an invalid read once we're at the end of the loop.
                break;
            }
            Err(e) => {
                log::error!("Deserialization error at {}: {}", decoder.position(), e);
                return deserialization_error;
            }
        }
    }

    Ok(thread)
}