use crate::fail;
use crate::layer::ForestLayer;
use crate::printer::PrettyPrinter;
use crate::processor::{self, Processor, WithFallback};
use crate::tag::{NoTag, TagParser};
use crate::tree::Tree;
use std::future::Future;
use std::iter;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::oneshot;
use tracing::Subscriber;
use tracing_subscriber::layer::{Layered, SubscriberExt as _};
use tracing_subscriber::Registry;
pub fn worker_task() -> Builder<InnerSender<impl Processor>, WorkerTask<PrettyPrinter>, NoTag> {
worker_task_inner(WorkerTask(PrettyPrinter::new()), true)
}
pub fn capture() -> Builder<InnerSender<impl Processor>, Capture, NoTag> {
worker_task_inner(Capture(()), false)
}
fn worker_task_inner<P>(
worker_processor: P,
is_global: bool,
) -> Builder<InnerSender<impl Processor>, P, NoTag> {
let (tx, rx) = mpsc::unbounded_channel();
let sender_processor = processor::from_fn(move |tree| {
tx.send(tree).map_err(|err| {
let msg = err.to_string().into();
processor::error(err.0, msg)
})
});
Builder {
sender_processor: InnerSender(sender_processor),
worker_processor,
receiver: rx,
tag: NoTag,
is_global,
}
}
pub struct Builder<Tx, Rx, T> {
sender_processor: Tx,
worker_processor: Rx,
receiver: UnboundedReceiver<Tree>,
tag: T,
is_global: bool,
}
pub struct Capture(());
pub struct WorkerTask<P>(P);
#[derive(Debug)]
pub struct InnerSender<P>(P);
impl<P: Processor> Processor for InnerSender<P> {
fn process(&self, tree: Tree) -> processor::Result {
self.0.process(tree)
}
}
mod sealed {
pub trait Sealed {}
}
impl<P> sealed::Sealed for InnerSender<P> {}
impl<P: sealed::Sealed, F> sealed::Sealed for WithFallback<P, F> {}
impl<Tx, P, T> Builder<Tx, WorkerTask<P>, T>
where
P: Processor,
{
pub fn map_receiver<F, P2>(self, f: F) -> Builder<Tx, WorkerTask<P2>, T>
where
F: FnOnce(P) -> P2,
P2: Processor,
{
Builder {
sender_processor: self.sender_processor,
worker_processor: WorkerTask(f(self.worker_processor.0)),
receiver: self.receiver,
tag: self.tag,
is_global: self.is_global,
}
}
}
impl<Tx, Rx, T> Builder<Tx, Rx, T>
where
Tx: Processor + sealed::Sealed,
T: TagParser,
{
pub fn map_sender<F, Tx2>(self, f: F) -> Builder<Tx2, Rx, T>
where
F: FnOnce(Tx) -> Tx2,
Tx2: Processor + sealed::Sealed,
{
Builder {
sender_processor: f(self.sender_processor),
worker_processor: self.worker_processor,
receiver: self.receiver,
tag: self.tag,
is_global: self.is_global,
}
}
pub fn set_tag<T2>(self, tag: T2) -> Builder<Tx, Rx, T2>
where
T2: TagParser,
{
Builder {
sender_processor: self.sender_processor,
worker_processor: self.worker_processor,
receiver: self.receiver,
tag,
is_global: self.is_global,
}
}
pub fn set_global(mut self, is_global: bool) -> Self {
self.is_global = is_global;
self
}
pub fn build(self) -> Runtime<Layered<ForestLayer<Tx, T>, Registry>, Rx> {
self.build_on(|x| x)
}
pub fn build_on<F, S>(self, f: F) -> Runtime<S, Rx>
where
F: FnOnce(Layered<ForestLayer<Tx, T>, Registry>) -> S,
S: Subscriber,
{
self.build_with(|layer| f(Registry::default().with(layer)))
}
pub fn build_with<F, S>(self, f: F) -> Runtime<S, Rx>
where
F: FnOnce(ForestLayer<Tx, T>) -> S,
S: Subscriber,
{
let layer = ForestLayer::new(self.sender_processor, self.tag);
let subscriber = f(layer);
Runtime {
subscriber,
worker_processor: self.worker_processor,
receiver: self.receiver,
is_global: self.is_global,
}
}
}
pub struct Runtime<S, P> {
subscriber: S,
worker_processor: P, receiver: UnboundedReceiver<Tree>,
is_global: bool,
}
impl<S, P> Runtime<S, WorkerTask<P>>
where
S: Subscriber + Send + Sync,
P: Processor + Send,
{
pub async fn on<F: Future>(self, f: F) -> F::Output {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let processor = self.worker_processor.0;
let mut receiver = self.receiver;
let handle = tokio::spawn(async move {
loop {
tokio::select! {
Some(tree) = receiver.recv() => processor.process(tree).expect(fail::PROCESSING_ERROR),
Ok(()) = &mut shutdown_rx => break,
else => break,
}
}
receiver.close();
while let Ok(tree) = receiver.try_recv() {
processor.process(tree).expect(fail::PROCESSING_ERROR);
}
});
let output = {
let _guard = if self.is_global {
tracing::subscriber::set_global_default(self.subscriber)
.expect("global default already set");
None
} else {
Some(tracing::subscriber::set_default(self.subscriber))
};
f.await
};
shutdown_tx
.send(())
.expect("Shutdown signal couldn't send, this is a bug");
handle
.await
.expect("Failed to join the writing task, this is a bug");
output
}
}
impl<S> Runtime<S, Capture>
where
S: Subscriber + Send + Sync,
{
pub async fn on(self, f: impl Future<Output = ()>) -> Vec<Tree> {
{
let _guard = if self.is_global {
tracing::subscriber::set_global_default(self.subscriber)
.expect("global default already set");
None
} else {
Some(tracing::subscriber::set_default(self.subscriber))
};
f.await;
}
let mut receiver = self.receiver;
receiver.close();
iter::from_fn(|| receiver.try_recv().ok()).collect()
}
}