use crate::formatter::Formatter;
use crate::layer::Tree;
use crate::processor::Processor;
use std::future::Future;
use std::io::Write;
use tokio::sync::mpsc;
use tracing_subscriber::fmt::MakeWriter;
pub struct AsyncProcessor {
tx: mpsc::UnboundedSender<Tree>,
}
impl AsyncProcessor {
pub fn spawn<F, W>(formatter: F, make_writer: W) -> (Self, impl Future<Output = ()>)
where
F: 'static + Formatter + Send,
W: 'static + for<'a> MakeWriter<'a> + Send,
{
let (tx, mut rx) = mpsc::unbounded_channel();
let handle = async move {
while let Some(tree) = rx.recv().await {
let mut buf = Vec::with_capacity(0);
#[allow(clippy::expect_used)]
{
formatter.fmt(tree, &mut buf).expect("formatting failed");
make_writer
.make_writer()
.write_all(&buf[..])
.expect("writing failed");
}
}
};
let processor = AsyncProcessor { tx };
(processor, handle)
}
}
impl From<mpsc::UnboundedSender<Tree>> for AsyncProcessor {
fn from(tx: mpsc::UnboundedSender<Tree>) -> Self {
AsyncProcessor { tx }
}
}
impl Processor for AsyncProcessor {
fn process(&self, tree: Tree) {
#[allow(clippy::expect_used)]
self.tx
.send(tree)
.expect("failed to send logs to processing thread");
}
}