extern crate clap;
extern crate pretty_env_logger;
use clap::{App, Arg};
use std::io::Read;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::{fs, thread, process};
use functiontrace_server::profile_generator::{FirefoxProfile, FirefoxThread, FirefoxProfileThreadId};
use functiontrace_server::function_trace::{FunctionTrace, TraceInitialization};
use serde::de::Deserialize;
fn main() -> std::io::Result<()> {
pretty_env_logger::init();
let args = App::new("functiontrace-server")
.version("0.2.0")
.author("Matt Bryant <mbryant@programsareproofs.com>")
.about("The profile generation server for functiontrace. This should rarely be run manually.")
.arg(Arg::with_name("output_dir")
.short("d")
.long("directory")
.help("The directory to write generated profiles to")
.required(true)
.takes_value(true))
.get_matches();
let output_dir = match Path::new(args.value_of("output_dir").expect("Required")).canonicalize() {
Ok(path) => path,
Err(x) => {
log::error!("The provided output directory doesn't exist");
return Err(x);
}
};
log::info!("Starting functiontrace-server");
let pipe = Path::new(&format!("/tmp/functiontrace-server.sock.{}", process::id())).to_path_buf();
if pipe.exists() {
fs::remove_file(&pipe)?;
log::warn!("Deleted an existing pipe - you should kill any other running instances of this application.");
}
let listener = UnixListener::bind(pipe)?;
let mut profile = {
let (mut socket, _addr) = listener.accept()?;
let info : TraceInitialization = {
let mut buf = Vec::new();
socket.read_to_end(&mut buf).expect("Invalid initialization message");
rmp_serde::from_read_ref(&buf).expect("Failed to parse message")
};
log::info!("Received a new trace connection: {:?}", info);
FirefoxProfile::new(info)
};
let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(64);
let mut clients: Vec<(mio::net::UnixStream, Vec<u8>)> = Vec::new();
let mut threads = Vec::new();
listener.set_nonblocking(true)?;
let mut client_listener = mio::net::UnixListener::from_std(listener);
const NEW_CLIENT: usize = std::usize::MAX;
poll.registry().register(&mut client_listener, mio::Token(NEW_CLIENT), mio::Interest::READABLE)?;
loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
mio::Token(NEW_CLIENT) => loop {
match client_listener.accept() {
Ok((mut stream, _)) => {
log::info!("Connection from a new thread");
poll.registry().register(&mut stream, mio::Token(clients.len()), mio::Interest::READABLE)?;
clients.push((stream, Vec::new()));
},
Err(err) => if err.kind() == std::io::ErrorKind::WouldBlock {
break
} else {
return Err(err)
}
}
},
mio::Token(x) => {
let (client, data) = clients.get_mut(x).expect("Can only listen to clients we're expecting");
loop {
let mut buf = [0; 64 * 1024];
match client.read(&mut buf) {
Ok(0) => {
log::info!("Retiring client {:?} ({} bytes)", x, data.len());
poll.registry().deregister(client)?;
let logs = std::mem::replace(data, Vec::new());
let tid = profile.register_thread();
threads.push(thread::spawn(move || parse_thread_logs(logs, tid)));
break;
},
Ok(len) => {
data.extend_from_slice(&buf[0..len]);
},
Err(err) => if err.kind() == std::io::ErrorKind::WouldBlock {
log::trace!("Blocking client {:?}", x);
break;
} else {
return Err(err)
}
}
}
}
}
}
if clients.len() > 0 && clients.len() == threads.len() {
break;
}
}
for t in threads.into_iter() {
profile.finalize_thread(t.join().expect("Thread failed to join")?);
}
profile.export(output_dir)?;
Ok(())
}
fn parse_thread_logs(logs: Vec<u8>, tid: FirefoxProfileThreadId) -> std::io::Result<FirefoxThread> {
let mut decoder = rmp_serde::decode::Deserializer::new(std::io::Cursor::new(logs));
let registration_error = Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize ThreadRegistration"));
let mut thread = match FunctionTrace::deserialize(&mut decoder) {
Ok(FunctionTrace::RegisterThread(registration)) => {
log::info!("Parsing a new thread: {:?}", registration);
FirefoxThread::new(registration, tid)
},
Ok(_) => {
log::error!("Missing ThreadRegistration event");
return registration_error;
},
Err(e) => {
log::error!("Deserialization error: {}", e);
return registration_error;
}
};
let deserialization_error = Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Failed to deserialize trace event"));
loop {
match FunctionTrace::deserialize(&mut decoder) {
Ok(FunctionTrace::RegisterThread(registration)) => {
log::error!("Received an unexpected registration event: {:?}", registration);
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Unexpected ThreadRegistration"));
},
Ok(trace_log) => {
log::trace!("{:?}", trace_log);
thread.add_trace(trace_log);
},
Err(rmp_serde::decode::Error::InvalidMarkerRead(_)) => {
break;
}
Err(e) => {
log::error!("Deserialization error at {}: {}", decoder.position(), e);
return deserialization_error;
}
}
}
Ok(thread)
}