use crossbeam_channel as cc;
use mio::unix::{EventedFd, UnixReady};
use mio::{Events, Poll, PollOpt, Ready, Token};
use nix::fcntl::{fcntl, FcntlArg::F_SETFL, OFlag};
use slab::Slab;
use std::fs::{File, OpenOptions};
use std::io::{self, BufRead, BufReader, Write};
use std::net::SocketAddr;
use std::os::unix::io::{FromRawFd, IntoRawFd, RawFd};
use std::thread;
use std::time::Duration;
use syslog::{self, Formatter3164, Logger, LoggerBackend, Severity::*};
#[derive(Debug, Clone)]
pub enum Address {
Tcp(SocketAddr),
Udp {
server: SocketAddr,
local: SocketAddr,
},
Unix(Option<String>),
}
#[derive(Debug, Clone)]
pub enum Stream {
File {
filename: String,
},
Syslog {
address: Address,
facility: syslog::Facility,
severity: u32, },
#[cfg(test)]
Stdout,
}
struct Connection {
source: BufReader<File>,
sink: Stream,
}
impl Connection {
fn new(fd: RawFd, stream: Stream) -> Connection {
fcntl(fd, F_SETFL(OFlag::O_NONBLOCK)).unwrap(); Connection {
source: BufReader::new(unsafe { File::from_raw_fd(fd) }),
sink: stream,
}
}
}
pub struct Handler {
channel: cc::Sender<Message>,
thread: Option<thread::JoinHandle<()>>,
}
impl Handler {
pub fn new() -> Handler {
let (tx, rx) = cc::unbounded();
Handler {
channel: tx,
thread: Some(thread::spawn(move || {
handler(&rx);
})),
}
}
pub fn add_stream(&self, fd: RawFd, stream: Stream) {
debug!("Adding stream for fd {}", fd);
self.channel.send(Message::Add(fd, stream))
}
}
impl Drop for Handler {
fn drop(&mut self) {
self.channel.send(Message::Close);
self.thread.take().unwrap().join().unwrap();
}
}
enum Message {
Add(RawFd, Stream),
Close,
}
fn handler(channel: &cc::Receiver<Message>) {
let mut connections = Slab::with_capacity(128);
let poll = Poll::new().unwrap();
let mut closed = false;
while !closed {
if let Some(message) = channel.try_recv() {
match message {
Message::Add(fd, stream) => {
if let Err(e) = poll.register(
&EventedFd(&fd),
Token(connections.insert(Connection::new(fd, stream))),
Ready::readable() | UnixReady::hup(),
PollOpt::edge(),
) {
error!("Failed to register stream with mio ({})", e);
}
}
Message::Close => closed = true,
}
}
let mut events = Events::with_capacity(128);
if let Err(e) = poll.poll(&mut events, Some(Duration::from_millis(50))) {
error!("Failed to poll mio ({}). Abandoning stream redirection.", e);
break;
}
for event in &events {
let Token(handle) = event.token();
if event.readiness().is_readable() {
let mut connection = connections.get_mut(handle).unwrap();
loop {
match read_line(&mut connection.source) {
Ok(None) => break, Ok(Some(ref line)) if line.is_empty() => break, Ok(Some(line)) => {
match write_line(&connection.sink, &line[..line.len() - 1]) {
Ok(()) => (),
Err(e) => warn!("Stream redirection failure ({}): {}", e, line),
}
}
Err(e) => {
warn!("Stream error {}", e);
break;
}
}
}
}
if UnixReady::from(event.readiness()).is_hup() {
let mut connection = connections.remove(handle);
poll.deregister(&EventedFd(&connection.source.into_inner().into_raw_fd()))
.unwrap();
}
}
}
}
fn read_line(source: &mut BufReader<File>) -> io::Result<Option<String>> {
let mut line = String::new();
match source.read_line(&mut line) {
Ok(_) => Ok(Some(line)),
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock => Ok(None),
_ => Err(e),
},
}
}
fn write_line(sink: &Stream, line: &str) -> io::Result<()> {
match sink {
Stream::File { filename } => {
OpenOptions::new()
.create_new(true)
.append(true)
.open(filename)?
.write_all(line.as_ref())?;
Ok(())
}
Stream::Syslog {
address,
facility,
severity,
} => {
let formatter = Formatter3164 {
facility: *facility,
hostname: None,
process: String::from("riffol"),
pid: 0,
};
let mut logger: syslog::Result<
Logger<LoggerBackend, String, Formatter3164>,
> = match address {
Address::Unix(address) => match address {
Some(address) => syslog::unix_custom(formatter, address),
None => syslog::unix(formatter),
},
Address::Tcp(server) => syslog::tcp(formatter, server),
Address::Udp { server, local } => syslog::udp(formatter, local, server),
};
let line = line.to_owned();
match logger {
Ok(mut logger) => match severity {
x if *x == LOG_EMERG as u32 => logger.emerg(line),
x if *x == LOG_ALERT as u32 => logger.alert(line),
x if *x == LOG_CRIT as u32 => logger.crit(line),
x if *x == LOG_ERR as u32 => logger.err(line),
x if *x == LOG_WARNING as u32 => logger.warning(line),
x if *x == LOG_NOTICE as u32 => logger.notice(line),
x if *x == LOG_INFO as u32 => logger.info(line),
_ => logger.debug(line),
}.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))),
_ => Err(io::Error::new(
io::ErrorKind::Other,
"Couldn't create logging instance",
)),
}
}
#[cfg(test)]
Stream::Stdout => Ok(println!("{}", line)),
}
}
#[cfg(test)]
mod test {
use super::Stream;
use std::os::unix::io::IntoRawFd;
use std::process::{Command, Stdio};
#[test]
fn test1() {
let handler = super::Handler::new();
let mut child1 = Command::new("ping")
.arg("-c2")
.arg("8.8.4.4")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let mut child2 = Command::new("ls")
.arg("-l")
.arg("/")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
handler.add_stream(
child1.stdout.take().unwrap().into_raw_fd(),
Stream::Stdout.clone(),
);
handler.add_stream(
child1.stderr.take().unwrap().into_raw_fd(),
Stream::Stdout.clone(),
);
handler.add_stream(
child2.stdout.take().unwrap().into_raw_fd(),
Stream::Stdout.clone(),
);
handler.add_stream(
child2.stderr.take().unwrap().into_raw_fd(),
Stream::Stdout.clone(),
);
child2.wait().unwrap();
child1.wait().unwrap();
}
}