use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use logforth_core::record::Record;
use logforth_core::trap::BestEffortTrap;
use crate::Overflow;
use crate::Task;
use crate::channel::channel;
use crate::state::AsyncState;
use crate::worker::Worker;
#[derive(Debug)]
pub struct Async {
state: AsyncState,
}
impl Append for Async {
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
let mut diagnostics = vec![];
let mut collector = DiagnosticCollector(&mut diagnostics);
for d in diags {
d.visit(&mut collector)?;
}
let task = Task::Log {
record: Box::new(record.to_owned()),
diags: diagnostics,
};
self.state.send_task(task)
}
fn flush(&self) -> Result<(), Error> {
let (done_tx, done_rx) = oneshot::channel();
let task = Task::Flush { done: done_tx };
self.state.send_task(task)?;
done_rx
.recv()
.map_err(|err| Error::new("worker exited before completing flush").with_source(err))
}
}
pub struct AsyncBuilder {
thread_name: String,
appends: Vec<Box<dyn Append>>,
buffered_lines_limit: Option<usize>,
trap: Box<dyn Trap>,
overflow: Overflow,
}
impl AsyncBuilder {
pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
AsyncBuilder {
thread_name: thread_name.into(),
appends: vec![],
buffered_lines_limit: None,
trap: Box::new(BestEffortTrap::default()),
overflow: Overflow::Block,
}
}
pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
self.buffered_lines_limit = buffered_lines_limit;
self
}
pub fn overflow_block(mut self) -> Self {
self.overflow = Overflow::Block;
self
}
pub fn overflow_drop_incoming(mut self) -> Self {
self.overflow = Overflow::DropIncoming;
self
}
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
self.trap = trap.into();
self
}
pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
self.appends.push(append.into());
self
}
pub fn build(self) -> Async {
let Self {
thread_name,
appends,
buffered_lines_limit,
trap,
overflow,
} = self;
let (sender, receiver) = channel(buffered_lines_limit);
let worker = Worker::new(appends, receiver, trap);
let thread_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || worker.run())
.expect("failed to spawn async appender thread");
let state = AsyncState::new(overflow, sender, thread_handle);
Async { state }
}
}
struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
impl<'a> Visitor for DiagnosticCollector<'a> {
fn visit(&mut self, key: kv::KeyView, value: kv::ValueView) -> Result<(), Error> {
self.0.push((key.to_owned(), value.to_owned()));
Ok(())
}
}