use std::sync::Arc;
use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use logforth_core::record::Record;
use logforth_core::trap::BestEffortTrap;
use crate::Overflow;
use crate::Task;
use crate::state::AsyncState;
use crate::worker::Worker;
#[derive(Debug)]
pub struct Async {
appends: Arc<[Box<dyn Append>]>,
overflow: Overflow,
state: AsyncState,
trap: Arc<dyn Trap>,
}
impl Append for Async {
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
let mut diagnostics = vec![];
let mut collector = DiagnosticCollector(&mut diagnostics);
for d in diags {
d.visit(&mut collector)?;
}
let overflow = self.overflow;
let task = Task::Log {
appends: self.appends.clone(),
record: Box::new(record.to_owned()),
diags: diagnostics,
};
self.state.send_task(task, overflow)
}
fn flush(&self) -> Result<(), Error> {
let overflow = self.overflow;
let task = Task::Flush {
appends: self.appends.clone(),
};
self.state.send_task(task, overflow)
}
fn exit(&self) -> Result<(), Error> {
self.state.destroy();
for append in self.appends.iter() {
if let Err(err) = append.exit() {
self.trap.trap(&err);
}
}
Ok(())
}
}
pub struct AsyncBuilder {
thread_name: String,
appends: Vec<Box<dyn Append>>,
buffered_lines_limit: Option<usize>,
trap: Arc<dyn Trap>,
overflow: Overflow,
}
impl AsyncBuilder {
pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
AsyncBuilder {
thread_name: thread_name.into(),
appends: vec![],
buffered_lines_limit: None,
trap: Arc::new(BestEffortTrap::default()),
overflow: Overflow::Block,
}
}
pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
self.buffered_lines_limit = buffered_lines_limit;
self
}
pub fn overflow_block(mut self) -> Self {
self.overflow = Overflow::Block;
self
}
pub fn overflow_drop_incoming(mut self) -> Self {
self.overflow = Overflow::DropIncoming;
self
}
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
let trap = trap.into();
self.trap = trap.into();
self
}
pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
self.appends.push(append.into());
self
}
pub fn build(self) -> Async {
let Self {
thread_name,
appends,
buffered_lines_limit,
trap,
overflow,
} = self;
let appends = appends.into_boxed_slice().into();
let (sender, receiver) = match buffered_lines_limit {
Some(limit) => crossbeam_channel::bounded(limit),
None => crossbeam_channel::unbounded(),
};
let worker = Worker::new(receiver, trap.clone());
let thread_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || worker.run())
.expect("failed to spawn async appender thread");
let state = AsyncState::new(sender, thread_handle);
Async {
appends,
overflow,
state,
trap,
}
}
}
struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
impl<'a> Visitor for DiagnosticCollector<'a> {
fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
self.0.push((key.to_owned(), value.to_owned()));
Ok(())
}
}