use std::{fmt::Debug, sync::Arc};
use bon::bon;
use tokio::sync::{mpsc, oneshot};
use tracing::{Level, Span, span};
use crate::{
batch::BatchItem,
error::BatchResult,
limits::Limits,
policies::BatchingPolicy,
processor::Processor,
worker::{Worker, WorkerDropGuard, WorkerHandle},
};
#[derive(Debug)]
pub struct Batcher<P: Processor> {
name: String,
worker: Arc<WorkerHandle>,
worker_guard: Arc<WorkerDropGuard>,
item_tx: mpsc::Sender<BatchItem<P>>,
}
#[bon]
impl<P: Processor> Batcher<P> {
#[builder]
pub fn new(
name: impl Into<String>,
processor: P,
limits: Limits,
batching_policy: BatchingPolicy,
) -> Self {
let name = name.into();
let batching_policy = batching_policy.normalise(limits);
let (handle, worker_guard, item_tx) =
Worker::spawn(name.clone(), processor, limits, batching_policy);
Self {
name,
worker: Arc::new(handle),
worker_guard: Arc::new(worker_guard),
item_tx,
}
}
pub async fn add(&self, key: P::Key, input: P::Input) -> BatchResult<P::Output, P::Error> {
let requesting_span = Span::current().clone();
let (tx, rx) = oneshot::channel();
self.item_tx
.send(BatchItem {
key,
input,
submitted_at: tokio::time::Instant::now(),
tx,
requesting_span,
})
.await?;
let (output, batch_span) = rx.await?;
{
let link_back_span = span!(Level::INFO, "batch finished");
if let Some(span) = batch_span {
link_back_span.follows_from(&span);
link_back_span.in_scope(|| {
});
}
}
output
}
pub fn worker_handle(&self) -> Arc<WorkerHandle> {
Arc::clone(&self.worker)
}
}
impl<P: Processor> Clone for Batcher<P> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
worker: self.worker.clone(),
worker_guard: self.worker_guard.clone(),
item_tx: self.item_tx.clone(),
}
}
}