use crate::{
lossy_lines::{lossy_lines, LossyLinesCodec},
utils::{adb, config_get},
LogStream, StreamData, DEFAULT_BUFFER,
};
use clap::{value_t, ArgMatches};
use failure::{err_msg, format_err, Error};
use futures::{stream::iter_ok, Async, Future, Stream};
#[cfg(target_os = "linux")]
use rogcat::record::{Record, Timestamp};
use std::{
borrow::ToOwned,
convert::Into,
io::BufReader,
net::ToSocketAddrs,
path::PathBuf,
process::{Command, Stdio},
};
use tokio::{
codec::{Decoder, FramedRead},
fs::File,
net::TcpStream,
};
use tokio_process::{Child, CommandExt};
use url::Url;
struct Process {
cmd: Vec<String>,
respawn: bool,
child: Option<Child>,
stream: Option<LogStream>,
}
pub fn files(args: &ArgMatches) -> Result<LogStream, Error> {
let files = args
.values_of("input")
.ok_or_else(|| err_msg("Missing input argument"))?
.map(PathBuf::from)
.collect::<Vec<PathBuf>>();
let f = iter_ok::<_, Error>(files)
.map(|f| {
File::open(f.clone())
.map(|s| Decoder::framed(LossyLinesCodec::new(), s))
.flatten_stream()
.map(StreamData::Line)
.map_err(move |e| format_err!("Failed to open {}: {}", f.display(), e))
})
.flatten();
Ok(Box::new(f))
}
pub fn stdin() -> LogStream {
let s = FramedRead::new(tokio::io::stdin(), LossyLinesCodec::new())
.map_err(Into::into)
.map(StreamData::Line);
Box::new(s)
}
pub fn serial(_args: &ArgMatches) -> LogStream {
unimplemented!()
}
#[cfg(target_os = "linux")]
pub fn can(dev: &str) -> Result<LogStream, Error> {
let process = dev.to_string();
let now = time::now();
let stream = tokio_socketcan::CANSocket::open(dev)?
.map_err(std::convert::Into::into)
.map(move |s| {
let data = s
.data()
.iter()
.map(|b| format!("{:02x}", b))
.collect::<Vec<String>>();
let extended = if s.is_extended() { "E" } else { " " };
StreamData::Record(Record {
timestamp: Some(Timestamp::new(now)),
message: format!("{} {} ", extended, data.join(" ")),
tag: format!("0x{:x}", s.id()),
raw: format!(
"({}) {} {}#{}",
now.strftime("%s.%f").unwrap(),
process,
if s.is_extended() {
format!("{:08X}", s.id())
} else {
format!("{:X}", s.id())
},
data.join("")
),
process: process.clone(),
..Default::default()
})
});
Ok(Box::new(stream))
}
pub fn tcp(addr: &Url) -> Result<LogStream, Error> {
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| err_msg("Failed to parse addr"))?;
let s = TcpStream::connect(&addr)
.map(|s| Decoder::framed(LossyLinesCodec::new(), s))
.flatten_stream()
.map_err(|e| format_err!("Failed to connect: {}", e))
.map(StreamData::Line);
Ok(Box::new(s))
}
pub fn logcat(args: &ArgMatches) -> Result<LogStream, Error> {
let mut cmd = vec![adb()?.display().to_string()];
cmd.push("logcat".into());
let mut respawn = args.is_present("restart") | config_get::<bool>("restart").unwrap_or(true);
if args.is_present("tail") {
let count = value_t!(args, "tail", u32).unwrap_or_else(|e| e.exit());
cmd.push("-t".into());
cmd.push(count.to_string());
respawn = false;
};
if args.is_present("dump") {
cmd.push("-d".into());
respawn = false;
}
for buffer in args
.values_of("buffer")
.map(|m| m.map(ToOwned::to_owned).collect::<Vec<String>>())
.or_else(|| config_get("buffer"))
.unwrap_or_else(|| DEFAULT_BUFFER.iter().map(|&s| s.to_owned()).collect())
{
cmd.push("-b".into());
cmd.push(buffer);
}
Ok(Box::new(Process::with_cmd(cmd, respawn)))
}
pub fn process(args: &ArgMatches) -> Result<LogStream, Error> {
let respawn = args.is_present("restart");
let cmd = value_t!(args, "COMMAND", String)?
.split_whitespace()
.map(ToOwned::to_owned)
.collect();
Ok(Box::new(Process::with_cmd(cmd, respawn)))
}
impl Process {
fn with_cmd(cmd: Vec<String>, respawn: bool) -> Process {
Process {
cmd,
respawn,
child: None,
stream: None,
}
}
fn spawn(&mut self) -> Result<Async<Option<StreamData>>, Error> {
let mut child = Command::new(self.cmd[0].clone())
.args(&self.cmd[1..])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn_async()?;
let stdout = BufReader::new(child.stdout().take().unwrap());
let stderr = BufReader::new(child.stderr().take().unwrap());
self.child = Some(child);
let stdout = lossy_lines(stdout)
.map_err(Into::into)
.map(StreamData::Line);
let stderr = lossy_lines(stderr)
.map_err(Into::into)
.map(StreamData::Line);
let mut stream = stdout.select(stderr);
let poll = stream.poll();
self.stream = Some(Box::new(stream));
poll
}
}
impl Stream for Process {
type Item = StreamData;
type Error = Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
if let Some(ref mut inner) = self.stream {
match inner.poll() {
Ok(Async::Ready(None)) if self.respawn => self.spawn(),
poll => poll,
}
} else {
self.spawn()
}
}
}