use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use crate::Task;
use crate::channel::Receiver;
pub(crate) struct Worker {
appends: Vec<Box<dyn Append>>,
receiver: Receiver<Task>,
trap: Box<dyn Trap>,
}
impl Worker {
pub(crate) fn new(
appends: Vec<Box<dyn Append>>,
receiver: Receiver<Task>,
trap: Box<dyn Trap>,
) -> Self {
Self {
appends,
receiver,
trap,
}
}
pub(crate) fn run(self) {
let Self {
appends,
receiver,
trap,
} = self;
while let Ok(task) = receiver.recv() {
match task {
Task::Log { record, diags } => {
let diags: &[Box<dyn Diagnostic>] = if diags.is_empty() {
&[]
} else {
&[Box::new(AsyncDiagnostic(diags))]
};
record.with(|record| {
for append in appends.iter() {
if let Err(err) = append.append(&record, diags) {
let err = Error::new("failed to append record").with_source(err);
trap.trap(&err);
}
}
});
}
Task::Flush { done } => {
for append in appends.iter() {
if let Err(err) = append.flush() {
trap.trap(&err);
}
}
let _ = done.send(());
}
}
}
}
}
#[derive(Debug)]
struct AsyncDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>);
impl Diagnostic for AsyncDiagnostic {
fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> {
for (key, value) in &self.0 {
visitor.visit(key.view(), value.view())?;
}
Ok(())
}
}