use failure::Error;
use futures::{sync::oneshot, Future, Sink, Stream};
use rogcat::{parser, record::Record};
use std::{process::exit, str::FromStr};
use tokio::runtime::Runtime;
use tokio_signal::ctrl_c;
use url::Url;
mod cli;
mod filewriter;
mod filter;
mod lossy_lines;
mod profiles;
mod reader;
mod subcommands;
mod terminal;
mod utils;
const DEFAULT_BUFFER: [&str; 4] = ["main", "events", "crash", "kernel"];
#[derive(Debug, Clone)]
pub enum StreamData {
Record(Record),
Line(String),
}
type LogStream = Box<dyn Stream<Item = StreamData, Error = Error> + Send>;
type LogSink = Box<dyn Sink<SinkItem = Record, SinkError = Error> + Send>;
fn run() -> Result<(), Error> {
let args = cli::cli().get_matches();
utils::config_init();
subcommands::run(&args);
let source = {
if args.is_present("input") {
reader::files(&args)?
} else {
match args.value_of("COMMAND") {
Some(c) => {
if c == "-" {
reader::stdin()
} else if let Ok(url) = Url::parse(c) {
match url.scheme() {
#[cfg(target_os = "linux")]
"can" => reader::can(url.host_str().expect("Invalid can device"))?,
"tcp" => reader::tcp(&url)?,
"serial" => reader::serial(&args),
_ => reader::process(&args)?,
}
} else {
reader::process(&args)?
}
}
None => reader::logcat(&args)?,
}
}
};
let profile = profiles::from_args(&args)?;
let sink = if args.is_present("output") {
filewriter::try_from(&args)?
} else {
terminal::try_from(&args, &profile)?
};
let mut head = args
.value_of("head")
.map(|v| usize::from_str(v).expect("Invalid head arguement"));
let filter = filter::from_args_profile(&args, &profile)?;
let mut parser = parser::Parser::default();
let mut runtime = Runtime::new()?;
let f = source
.map(move |a| match a {
StreamData::Line(l) => parser.parse(&l),
StreamData::Record(r) => r,
})
.filter(move |r| filter.filter(r))
.take_while(move |_| {
Ok(match head {
Some(0) => false,
Some(n) => {
head = Some(n - 1);
true
}
None => true,
})
})
.forward(sink)
.map(|_| exit(0))
.map_err(|e| eprintln!("{}", e));
let mut f = Some(oneshot::spawn(f, &runtime.executor()));
runtime.block_on(ctrl_c().flatten_stream().take(1).for_each(move |()| {
f.take();
Ok(())
}))?;
Ok(())
}
fn main() {
match run() {
Err(e) => {
eprintln!("{}", e);
exit(1)
}
Ok(_) => exit(0),
}
}