use std::borrow::Cow;
use std::error;
use std::fmt::{self, Debug, Display, Formatter};
use std::mem;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::thread::{self, JoinHandle};
use futures::{future, Stream};
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio_core::reactor::Core;
use {Request, Service};
const DEFAULT_LOGGING_NAME: &str = "logging";
enum Event {
Write(Vec<u8>),
Close,
}
struct Inner {
tx: mpsc::UnboundedSender<Event>,
thread: Option<JoinHandle<()>>,
}
impl Inner {
fn new(name: Cow<'static, str>, tx: UnboundedSender<Event>, rx: UnboundedReceiver<Event>) -> Self {
let thread = thread::Builder::new().name("logging".into()).spawn(move || {
let mut core = Core::new().expect("failed to initialize logger event loop");
let handle = core.handle();
let service = Service::new(name, &handle);
let future = rx.and_then(|event| {
let future = match event {
Event::Write(buf) => {
service.call_mute(Request::from_buf(0, buf));
future::ok(())
}
Event::Close => future::err(())
};
Box::new(future)
});
drop(core.run(future.fold(0, |acc, _v| future::ok(acc))));
}).expect("failed to create logging thread");
Self { tx: tx, thread: Some(thread) }
}
}
impl Drop for Inner {
fn drop(&mut self) {
self.tx.unbounded_send(Event::Close).unwrap();
self.thread.take().unwrap().join().unwrap();
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Severity {
Debug,
Info,
Warn,
Error,
}
impl Into<isize> for Severity {
fn into(self) -> isize {
match self {
Severity::Debug => 0,
Severity::Info => 1,
Severity::Warn => 2,
Severity::Error => 3,
}
}
}
impl Display for Severity {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let result = match *self {
Severity::Debug => "debug",
Severity::Info => "info",
Severity::Warn => "warn",
Severity::Error => "error",
};
fmt.write_str(result)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SeverityParseError;
impl Display for SeverityParseError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.write_str(error::Error::description(self))
}
}
impl error::Error for SeverityParseError {
fn description(&self) -> &str {
"invalid severity syntax"
}
}
impl FromStr for Severity {
type Err = SeverityParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"debug" => Ok(Severity::Debug),
"info" => Ok(Severity::Info),
"warn" | "warning" => Ok(Severity::Warn),
"error" => Ok(Severity::Error),
_ => Err(SeverityParseError),
}
}
}
#[derive(Clone)]
pub struct LoggerContext {
tx: UnboundedSender<Event>,
name: Cow<'static, str>,
inner: Arc<Inner>,
filter: Filter,
}
impl LoggerContext {
pub fn new<N>(name: N) -> Self
where N: Into<Cow<'static, str>>
{
let name = name.into();
let (tx, rx) = mpsc::unbounded();
Self {
tx: tx.clone(),
name: name.clone(),
inner: Arc::new(Inner::new(name, tx, rx)),
filter: Filter { sev: Arc::new(AtomicIsize::new(0)) },
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn create<T>(&self, source: T) -> Logger
where T: Into<Cow<'static, str>>
{
Logger {
parent: self.clone(),
source: source.into(),
}
}
pub fn filter(&self) -> &Filter {
&self.filter
}
}
impl Default for LoggerContext {
fn default() -> Self {
LoggerContext::new(DEFAULT_LOGGING_NAME)
}
}
impl Debug for LoggerContext {
fn fmt(&self, fmt: &mut Formatter) -> Result<(), fmt::Error> {
fmt.debug_struct("LoggerContext")
.field("name", &self.name)
.field("filter", &self.filter.get())
.finish()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum FilterResult {
Accept,
Reject,
Neutral,
}
pub trait Log {
fn source(&self) -> &str;
fn filter(&self, sev: Severity) -> FilterResult;
#[doc(hidden)]
fn __emit(&self, buf: Vec<u8>) {
mem::drop(buf);
}
}
#[derive(Debug, Clone)]
pub struct Logger {
parent: LoggerContext,
source: Cow<'static, str>,
}
impl Logger {
pub fn name(&self) -> &str {
self.parent.name()
}
}
impl Log for Logger {
fn source(&self) -> &str {
&self.source
}
fn filter(&self, sev: Severity) -> FilterResult {
let sev: isize = sev.into();
if sev >= self.parent.filter().get() {
FilterResult::Accept
} else {
FilterResult::Reject
}
}
fn __emit(&self, buf: Vec<u8>) {
self.parent.tx.unbounded_send(Event::Write(buf)).unwrap();
}
}
#[derive(Clone, Debug)]
pub struct Filter {
sev: Arc<AtomicIsize>,
}
impl Filter {
pub fn get(&self) -> isize {
self.sev.load(Ordering::Relaxed)
}
pub fn set(&self, sev: isize) {
self.sev.store(sev, Ordering::Relaxed)
}
}
#[macro_export]
macro_rules! cocaine_log(
(__unwrap # {}) => {
&[0u8; 0]
};
(__unwrap # {$($name:ident: $val:expr,)+}) => {
($((stringify!($name), &$val)),+,)
};
(__execute # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr, {$($name:ident: $val:expr,)*}) => {{
extern crate rmp_serde as rmps;
rmps::to_vec(&($sev, $src, format!($fmt, $($args)*), cocaine_log!(__unwrap # {$($name: $val,)*})))
}};
(__split # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr, ; {$($name:ident: $val:expr,)*}) => {
cocaine_log!(__execute # {$($args)*}, $src, $sev, $fmt, {$($name: $val,)*})
};
(__split # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr, $arg:tt $($kwargs:tt)*) => {
cocaine_log!(__split # {$($args)* $arg}, $src, $sev, $fmt, $($kwargs)*)
};
(__split # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr; {$($name:ident: $val:expr,)*}) => {
cocaine_log!(__execute # {$($args)*}, $src, $sev, $fmt, {$($name: $val,)*})
};
(__split # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr,) => {
cocaine_log!(__execute # {$($args)*}, $src, $sev, $fmt, {})
};
(__split # {$($args:tt)*}, $src:expr, $sev:expr, $fmt:expr) => {
cocaine_log!(__execute # {$($args)*}, $src, $sev, $fmt, {})
};
(__test # $src:expr, $sev:expr, $($args:tt)*) => {
cocaine_log!(__split # {}, $src, $sev, $($args)*)
};
($log:expr, $sev:expr, $($args:tt)*) => {{
#[allow(unused)]
use cocaine::logging::{FilterResult, Log};
match $log.filter($sev) {
FilterResult::Accept |
FilterResult::Neutral => {
let sev: isize = $sev.into();
$log.__emit(cocaine_log!(__split # {}, $log.source(), sev, $($args)*)
.expect("failed to serialize logging event frame"));
}
FilterResult::Reject => {}
}
}};
);
#[cfg(test)]
mod tests {
extern crate rmpv;
use rmpv::Value;
#[test]
fn test_macro_without_args() {
let buf = cocaine_log!(__test # "test", 1, "nginx/1.6 configured").unwrap();
let expected = Value::Array(vec![
Value::from(1),
Value::from("test"),
Value::from("nginx/1.6 configured"),
Value::Array(vec![])
]);
assert_eq!(expected, rmpv::decode::read_value(&mut &buf[..]).unwrap());
}
#[test]
fn test_macro_with_args() {
let buf = cocaine_log!(__test # "test", 1, "{} {} HTTP/1.1 {} {}", "GET", "/static/image.png", 404, 347).unwrap();
let expected = Value::Array(vec![
Value::from(1),
Value::from("test"),
Value::from("GET /static/image.png HTTP/1.1 404 347"),
Value::Array(vec![])
]);
assert_eq!(expected, rmpv::decode::read_value(&mut &buf[..]).unwrap());
}
#[test]
fn test_macro_with_attribute() {
let buf = cocaine_log!(__test # "test", 1, "nginx/1.6 configured"; {
config: "/etc/nginx/nginx.conf",
}).unwrap();
let expected = Value::Array(vec![
Value::from(1),
Value::from("test"),
Value::from("nginx/1.6 configured"),
Value::Array(vec![
Value::Array(vec![
Value::from("config"),
Value::from("/etc/nginx/nginx.conf")
])
])
]);
assert_eq!(expected, rmpv::decode::read_value(&mut &buf[..]).unwrap());
}
#[test]
fn test_macro_with_attributes() {
let buf = cocaine_log!(__test # "test", 1, "nginx/1.6 configured"; {
config: "/etc/nginx/nginx.conf",
elapsed: 42.15,
}).unwrap();
let expected = Value::Array(vec![
Value::from(1),
Value::from("test"),
Value::from("nginx/1.6 configured"),
Value::Array(vec![
Value::Array(vec![
Value::from("config"),
Value::from("/etc/nginx/nginx.conf")
]),
Value::Array(vec![
Value::from("elapsed"),
Value::from(42.15)
])
])
]);
assert_eq!(expected, rmpv::decode::read_value(&mut &buf[..]).unwrap());
}
#[test]
fn test_macro_with_args_and_attributes() {
let buf = cocaine_log!(__test # "test", 1, "file does not exist: {}", "/var/www/favicon.ico"; {
path: "/",
cache: true,
method: "GET",
version: 1.1,
protocol: "HTTP",
}).unwrap();
let expected = Value::Array(vec![
Value::from(1),
Value::from("test"),
Value::from("file does not exist: /var/www/favicon.ico"),
Value::Array(vec![
Value::Array(vec![
Value::from("path"),
Value::from("/")
]),
Value::Array(vec![
Value::from("cache"),
Value::from(true)
]),
Value::Array(vec![
Value::from("method"),
Value::from("GET")
]),
Value::Array(vec![
Value::from("version"),
Value::from(1.1)
]),
Value::Array(vec![
Value::from("protocol"),
Value::from("HTTP")
])
])
]);
assert_eq!(expected, rmpv::decode::read_value(&mut &buf[..]).unwrap());
}
}