use std::io;
use std::sync::Mutex;
use std::sync::{mpsc, Arc};
use std::thread;
use std::mem;
use super::{Level, Logger};
use super::format;
use super::logger::RecordInfo;
use super::{OwnedKeyValue, OwnedKeyValueNode};
use crossbeam::sync::ArcCell;
#[allow(missing_docs)]
mod error {
use super::super::format;
use std::io;
error_chain! {
types {
Error, ErrorKind, ChainErr, Result;
}
links {
format::Error, format::ErrorKind, Format;
}
foreign_links {
io::Error, Io, "io error";
}
errors {
SendError {
description("sending to another thread failed")
display("sending to another thread failed")
}
LockError {
description("locking mutex failed")
display("locking mutex failed")
}
FailoverExhausted {
description("failover drains exhausted")
display("failover drains exhausted")
}
}
}
}
pub use self::error::{Error, Result, ErrorKind};
pub trait Drain: Send + Sync {
fn log(&self, buf: &mut Vec<u8>, info: &RecordInfo, &OwnedKeyValueNode) -> Result<()>;
}
pub trait IntoLogger: Drain + Sized + 'static {
fn into_logger(self, values: Vec<OwnedKeyValue>) -> Logger {
Logger::new_root(values, self)
}
}
impl<D: Drain + Sized + 'static> IntoLogger for D {}
impl<D: Drain> Drain for Box<D> {
fn log(&self, buf: &mut Vec<u8>, info: &RecordInfo, o: &OwnedKeyValueNode) -> Result<()> {
(**self).log(buf, info, o)
}
}
impl<D: Drain> Drain for Arc<D> {
fn log(&self, buf: &mut Vec<u8>, info: &RecordInfo, o: &OwnedKeyValueNode) -> Result<()> {
(**self).log(buf, info, o)
}
}
pub struct Discard;
impl Drain for Discard {
fn log(&self, _: &mut Vec<u8>, _: &RecordInfo, _: &OwnedKeyValueNode) -> Result<()> {
Ok(())
}
}
pub struct AtomicSwitchCtrl(Arc<ArcCell<Box<Drain>>>);
pub struct AtomicSwitch(Arc<ArcCell<Box<Drain>>>);
impl AtomicSwitchCtrl {
pub fn new<D: Drain + 'static>(d: D) -> Self {
let a = Arc::new(ArcCell::new(Arc::new(Box::new(d) as Box<Drain>)));
AtomicSwitchCtrl(a)
}
pub fn new_from_arc(d: Arc<ArcCell<Box<Drain>>>) -> Self {
AtomicSwitchCtrl(d)
}
pub fn drain(&self) -> AtomicSwitch {
AtomicSwitch(self.0.clone())
}
pub fn set<D: Drain>(&self, drain: D) {
let _ = self.0.set(Arc::new(Box::new(drain)));
}
pub fn swap(&self, drain: Arc<Box<Drain>>) -> Arc<Box<Drain>> {
self.0.set(drain)
}
}
impl Drain for AtomicSwitch {
fn log(&self,
mut buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
self.0.get().log(buf, info, logger_values)
}
}
pub struct Streamer<W: io::Write, F: format::Format> {
io: Mutex<W>,
format: F,
}
impl<W: io::Write, F: format::Format> Streamer<W, F> {
pub fn new(io: W, format: F) -> Self {
Streamer {
io: Mutex::new(io),
format: format,
}
}
}
impl<W: 'static + io::Write + Send, F: format::Format + Send> Drain for Streamer<W, F> {
fn log(&self,
mut buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
let res =
{
|| {
try!(self.format.format(&mut buf, info, logger_values));
{
let mut io = try!(self.io
.lock()
.map_err(|_| -> Error { ErrorKind::LockError.into() }));
try!(io.write_all(&buf));
}
Ok(())
}
}();
buf.clear();
res
}
}
pub struct AsyncStreamer<F: format::Format> {
format: F,
io: Mutex<AsyncIoWriter>,
}
impl<F: format::Format> AsyncStreamer<F> {
pub fn new<W: io::Write + Send + 'static>(io: W, format: F) -> Self {
AsyncStreamer {
io: Mutex::new(AsyncIoWriter::new(io)),
format: format,
}
}
}
impl<F: format::Format + Send> Drain for AsyncStreamer<F> {
fn log(&self,
mut buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
try!(self.format.format(&mut buf, info, logger_values));
{
let mut io = try!(self.io.lock().map_err(|_| -> Error { ErrorKind::LockError.into() }));
let mut new_buf = Vec::with_capacity(128);
mem::swap(buf, &mut new_buf);
try!(io.write_nocopy(new_buf));
}
Ok(())
}
}
pub struct Filter<D: Drain> {
drain: D,
cond: Box<Fn(&RecordInfo) -> bool + 'static + Send + Sync>,
}
impl<D: Drain> Filter<D> {
pub fn new<F: 'static + Sync + Send + Fn(&RecordInfo) -> bool>(drain: D, cond: F) -> Self {
Filter {
drain: drain,
cond: Box::new(cond),
}
}
}
impl<D: Drain> Drain for Filter<D> {
fn log(&self,
buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
if (self.cond)(&info) {
self.drain.log(buf, info, logger_values)
} else {
Ok(())
}
}
}
pub struct FilterLevel<D: Drain> {
level: Level,
drain: D,
}
impl<D: Drain> FilterLevel<D> {
pub fn new(drain: D, level: Level) -> Self {
FilterLevel {
level: level,
drain: drain,
}
}
}
impl<D: Drain> Drain for FilterLevel<D> {
fn log(&self,
buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
if info.level().is_at_least(self.level) {
self.drain.log(buf, info, logger_values)
} else {
Ok(())
}
}
}
pub struct Duplicate<D1: Drain, D2: Drain> {
drain1: D1,
drain2: D2,
}
impl<D1: Drain, D2: Drain> Duplicate<D1, D2> {
pub fn new(drain1: D1, drain2: D2) -> Self {
Duplicate {
drain1: drain1,
drain2: drain2,
}
}
}
impl<D1: Drain, D2: Drain> Drain for Duplicate<D1, D2> {
fn log(&self,
buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
let res1 = self.drain1.log(buf, info, logger_values);
buf.clear();
let res2 = self.drain2.log(buf, info, logger_values);
match (res1, res2) {
(Ok(_), Ok(_)) => Ok(()),
(Ok(_), Err(e)) => Err(e),
(Err(e), Ok(_)) => Err(e),
(Err(e1), Err(_)) => Err(e1),
}
}
}
pub struct Failover<D1: Drain, D2: Drain> {
drain1: D1,
drain2: D2,
}
impl<D1: Drain, D2: Drain> Failover<D1, D2> {
pub fn new(drain1: D1, drain2: D2) -> Self {
Failover {
drain1: drain1,
drain2: drain2,
}
}
}
impl<D1: Drain, D2: Drain> Drain for Failover<D1, D2> {
fn log(&self,
buf: &mut Vec<u8>,
info: &RecordInfo,
logger_values: &OwnedKeyValueNode)
-> Result<()> {
match self.drain1.log(buf, info, logger_values) {
Ok(_) => Ok(()),
Err(_) => self.drain2.log(buf, info, logger_values),
}
}
}
enum AsyncIoMsg {
Bytes(Vec<u8>),
Flush,
Eof,
}
struct AsyncIoWriter {
sender: mpsc::Sender<AsyncIoMsg>,
join: Option<thread::JoinHandle<()>>,
}
impl AsyncIoWriter {
pub fn new<W: io::Write + Send + 'static>(mut io: W) -> Self {
let (tx, rx) = mpsc::channel();
let join = thread::spawn(move || {
loop {
match rx.recv().unwrap() {
AsyncIoMsg::Bytes(buf) => io.write_all(&buf).unwrap(),
AsyncIoMsg::Flush => io.flush().unwrap(),
AsyncIoMsg::Eof => return,
}
}
});
AsyncIoWriter {
sender: tx,
join: Some(join),
}
}
pub fn write_nocopy(&mut self, buf: Vec<u8>) -> Result<()> {
try!(self.sender
.send(AsyncIoMsg::Bytes(buf))
.map_err(|_| -> Error { ErrorKind::SendError.into() }));
Ok(())
}
}
impl io::Write for AsyncIoWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let _ = self.sender.send(AsyncIoMsg::Bytes(buf.to_vec())).unwrap();
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
let _ = self.sender.send(AsyncIoMsg::Flush);
Ok(())
}
}
impl Drop for AsyncIoWriter {
fn drop(&mut self) {
let _ = self.sender.send(AsyncIoMsg::Eof);
let _ = self.join.take().unwrap().join();
}
}
pub fn stream<W: io::Write + Send, F: format::Format>(io: W, format: F) -> Streamer<W, F> {
Streamer::new(io, format)
}
pub fn async_stream<W: io::Write + Send, F: format::Format>(io: W, format: F) -> Streamer<W, F> {
Streamer::new(io, format)
}
pub fn discard() -> Discard {
Discard
}
pub fn filter<D: Drain, F: 'static + Send + Sync + Fn(&RecordInfo) -> bool>(cond: F,
d: D)
-> Filter<D> {
Filter::new(d, cond)
}
pub fn filter_level<D: Drain>(level: Level, d: D) -> FilterLevel<D> {
FilterLevel::new(d, level)
}
pub fn duplicate<D1: Drain, D2: Drain>(d1: D1, d2: D2) -> Duplicate<D1, D2> {
Duplicate::new(d1, d2)
}
pub fn failover<D1: Drain, D2: Drain>(d1: D1, d2: D2) -> Failover<D1, D2> {
Failover::new(d1, d2)
}