use std::sync::Arc;
use crossbeam_channel::Receiver;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
use logforth_core::kv;
use logforth_core::kv::Visitor;
use crate::Task;
pub(crate) struct Worker {
receiver: Receiver<Task>,
trap: Arc<dyn Trap>,
}
impl Worker {
pub(crate) fn new(receiver: Receiver<Task>, trap: Arc<dyn Trap>) -> Self {
Self { receiver, trap }
}
pub(crate) fn run(self) {
let Self { receiver, trap } = self;
while let Ok(task) = receiver.recv() {
match task {
Task::Log {
appends,
record,
diags,
} => {
let diags: Vec<Box<dyn Diagnostic>> = vec![Box::new(OwnedDiagnostic(diags))];
let diags = diags.as_slice();
let record = record.as_record();
for append in appends.iter() {
if let Err(err) = append.append(&record, diags) {
let err = Error::new("failed to append record").set_source(err);
trap.trap(&err);
}
}
}
Task::Flush { appends } => {
for append in appends.iter() {
if let Err(err) = append.flush() {
let err = Error::new("failed to flush").set_source(err);
trap.trap(&err);
}
}
}
}
}
}
}
#[derive(Debug)]
struct OwnedDiagnostic(Vec<(kv::KeyOwned, kv::ValueOwned)>);
impl Diagnostic for OwnedDiagnostic {
fn visit(&self, visitor: &mut dyn Visitor) -> Result<(), Error> {
for (key, value) in &self.0 {
visitor.visit(key.by_ref(), value.by_ref())?;
}
Ok(())
}
}