#![warn(missing_docs)]
#[macro_use]
extern crate slog;
extern crate crossbeam_channel;
extern crate take_mut;
extern crate thread_local;
use crossbeam_channel::Sender;
use slog::{BorrowedKV, Level, Record, RecordStatic, SingleKV, KV};
use slog::{Key, OwnedKVList, Serializer};
use slog::Drain;
use std::error::Error;
use std::fmt;
use std::sync;
use std::{io, thread};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use take_mut::take;
struct ToSendSerializer {
kv: Box<dyn KV + Send>,
}
impl ToSendSerializer {
fn new() -> Self {
ToSendSerializer { kv: Box::new(()) }
}
fn finish(self) -> Box<dyn KV + Send> {
self.kv
}
}
impl Serializer for ToSendSerializer {
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
Ok(())
}
fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
let val = val.to_owned();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_arguments(
&mut self,
key: Key,
val: &fmt::Arguments,
) -> slog::Result {
let val = fmt::format(*val);
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(feature = "nested-values")]
fn emit_serde(
&mut self,
key: Key,
value: &slog::SerdeValue,
) -> slog::Result {
let val = value.to_sendable();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
}
#[derive(Debug)]
pub enum AsyncError {
Full,
Fatal(Box<dyn std::error::Error>),
}
impl<T> From<crossbeam_channel::TrySendError<T>> for AsyncError {
fn from(_: crossbeam_channel::TrySendError<T>) -> AsyncError {
AsyncError::Full
}
}
impl<T> From<crossbeam_channel::SendError<T>> for AsyncError {
fn from(_: crossbeam_channel::SendError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(io::Error::new(
io::ErrorKind::BrokenPipe,
"The logger thread terminated",
)))
}
}
impl<T> From<std::sync::PoisonError<T>> for AsyncError {
fn from(err: std::sync::PoisonError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(io::Error::new(
io::ErrorKind::BrokenPipe,
err.description(),
)))
}
}
pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
pub struct AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
chan_size: usize,
blocking: bool,
drain: D,
thread_name: Option<String>,
}
impl<D> AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> Self {
AsyncCoreBuilder {
chan_size: 128,
blocking: false,
drain,
thread_name: None,
}
}
pub fn thread_name(mut self, name: String) -> Self {
assert!(name.find('\0').is_none(), "Name with \\'0\\' in it passed");
self.thread_name = Some(name);
self
}
pub fn chan_size(mut self, s: usize) -> Self {
self.chan_size = s;
self
}
pub fn blocking(mut self, blocking: bool) -> Self {
self.blocking = blocking;
self
}
fn spawn_thread(self) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
let mut builder = thread::Builder::new();
if let Some(thread_name) = self.thread_name {
builder = builder.name(thread_name);
}
let drain = self.drain;
let join = builder
.spawn(move || loop {
match rx.recv().unwrap() {
AsyncMsg::Record(r) => {
r.log_to(&drain).unwrap();
}
AsyncMsg::Finish => return,
}
})
.unwrap();
(join, tx)
}
pub fn build(self) -> AsyncCore {
self.build_no_guard()
}
pub fn build_no_guard(self) -> AsyncCore {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
AsyncCore {
ref_sender: tx,
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(Some(join)),
blocking,
}
}
pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
(
AsyncCore {
ref_sender: tx.clone(),
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(None),
blocking,
},
AsyncGuard {
join: Some(join),
tx,
},
)
}
}
pub struct AsyncGuard {
join: Option<thread::JoinHandle<()>>,
tx: Sender<AsyncMsg>,
}
impl Drop for AsyncGuard {
fn drop(&mut self) {
let _err: Result<(), Box<dyn std::error::Error>> = {
|| {
let _ = self.tx.send(AsyncMsg::Finish);
let join = self.join.take().unwrap();
if join.thread().id() != thread::current().id() {
join.join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
}
Ok(())
}
}();
}
}
pub struct AsyncCore {
ref_sender: Sender<AsyncMsg>,
tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
join: Mutex<Option<thread::JoinHandle<()>>>,
blocking: bool,
}
impl AsyncCore {
pub fn new<D>(drain: D) -> Self
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
D: std::panic::RefUnwindSafe,
{
AsyncCoreBuilder::new(drain).build()
}
pub fn custom<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> AsyncCoreBuilder<D> {
AsyncCoreBuilder::new(drain)
}
fn get_sender(
&self,
) -> Result<
&crossbeam_channel::Sender<AsyncMsg>,
std::sync::PoisonError<
sync::MutexGuard<crossbeam_channel::Sender<AsyncMsg>>,
>,
> {
self.tl_sender.get_or_try(|| Ok(self.ref_sender.clone()))
}
fn send(&self, r: AsyncRecord) -> AsyncResult<()> {
let sender = self.get_sender()?;
if self.blocking {
sender.send(AsyncMsg::Record(r))?;
} else {
sender.try_send(AsyncMsg::Record(r))?;
}
Ok(())
}
}
impl Drain for AsyncCore {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
let mut ser = ToSendSerializer::new();
record
.kv()
.serialize(record, &mut ser)
.expect("`ToSendSerializer` can't fail");
self.send(AsyncRecord::from(record, logger_values))
}
}
pub struct AsyncRecord {
msg: String,
level: Level,
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Box<dyn KV + Send>,
}
impl AsyncRecord {
pub fn from(record: &Record, logger_values: &OwnedKVList) -> Self {
let mut ser = ToSendSerializer::new();
record
.kv()
.serialize(record, &mut ser)
.expect("`ToSendSerializer` can't fail");
AsyncRecord {
msg: fmt::format(*record.msg()),
level: record.level(),
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: ser.finish(),
}
}
pub fn log_to<D: Drain>(self, drain: &D) -> Result<D::Ok, D::Err> {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};
drain.log(
&Record::new(
&rs,
&format_args!("{}", self.msg),
BorrowedKV(&self.kv),
),
&self.logger_values,
)
}
pub fn as_record_values(&self, mut f: impl FnMut(&Record, &OwnedKVList)) {
let rs = RecordStatic {
location: &*self.location,
level: self.level,
tag: &self.tag,
};
f(
&Record::new(
&rs,
&format_args!("{}", self.msg),
BorrowedKV(&self.kv),
),
&self.logger_values,
)
}
}
enum AsyncMsg {
Record(AsyncRecord),
Finish,
}
impl Drop for AsyncCore {
fn drop(&mut self) {
let _err: Result<(), Box<dyn std::error::Error>> = {
|| {
if let Some(join) = self.join.lock()?.take() {
let _ = self.get_sender()?.send(AsyncMsg::Finish);
if join.thread().id() != thread::current().id() {
join.join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
}
}
Ok(())
}
}();
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum OverflowStrategy {
DropAndReport,
Drop,
Block,
#[doc(hidden)]
DoNotMatchAgainstThisAndReadTheDocs,
}
pub struct AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
core: AsyncCoreBuilder<D>,
inc_dropped: bool,
}
impl<D> AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> AsyncBuilder<D> {
AsyncBuilder {
core: AsyncCoreBuilder::new(drain),
inc_dropped: true,
}
}
pub fn chan_size(self, s: usize) -> Self {
AsyncBuilder {
core: self.core.chan_size(s),
..self
}
}
pub fn overflow_strategy(
self,
overflow_strategy: OverflowStrategy,
) -> Self {
let (block, inc) = match overflow_strategy {
OverflowStrategy::Block => (true, false),
OverflowStrategy::Drop => (false, false),
OverflowStrategy::DropAndReport => (false, true),
OverflowStrategy::DoNotMatchAgainstThisAndReadTheDocs => {
panic!("Invalid variant")
}
};
AsyncBuilder {
core: self.core.blocking(block),
inc_dropped: inc,
}
}
pub fn thread_name(self, name: String) -> Self {
AsyncBuilder {
core: self.core.thread_name(name),
..self
}
}
pub fn build(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_no_guard(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_with_guard(self) -> (Async, AsyncGuard) {
let (core, guard) = self.core.build_with_guard();
(
Async {
core,
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
},
guard,
)
}
}
pub struct Async {
core: AsyncCore,
dropped: AtomicUsize,
inc_dropped: bool,
}
impl Async {
pub fn default<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> Self {
AsyncBuilder::new(drain).build()
}
pub fn new<D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static>(
drain: D,
) -> AsyncBuilder<D> {
AsyncBuilder::new(drain)
}
fn push_dropped(&self, logger_values: &OwnedKVList) -> AsyncResult<()> {
let dropped = self.dropped.swap(0, Ordering::Relaxed);
if dropped > 0 {
match self.core.log(
&record!(
slog::Level::Error,
"slog-async",
&format_args!(
"slog-async: logger dropped messages \
due to channel \
overflow"
),
b!("count" => dropped)
),
logger_values,
) {
Ok(()) => {}
Err(AsyncError::Full) => {
self.dropped.fetch_add(dropped + 1, Ordering::Relaxed);
return Ok(());
}
Err(e) => return Err(e),
}
}
Ok(())
}
}
impl Drain for Async {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
self.push_dropped(logger_values)?;
match self.core.log(record, logger_values) {
Ok(()) => {}
Err(AsyncError::Full) if self.inc_dropped => {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
Err(AsyncError::Full) => {}
Err(e) => return Err(e),
}
Ok(())
}
}
impl Drop for Async {
fn drop(&mut self) {
let _ = self.push_dropped(&o!().into());
}
}