use std::{fmt::Display, hash::Hash};
use async_trait::async_trait;
use tokio::sync::oneshot;
use tracing::Span;
use crate::{
batch::BatchItem,
error::Result,
worker::{Worker, WorkerHandle},
BatchError, BatchingStrategy,
};
#[derive(Debug)]
pub struct Batcher<K, I, O = (), E = String> {
worker: WorkerHandle<K, I, O, E>,
}
#[async_trait]
pub trait Processor<K, I, O = (), E = String>
where
E: Display,
{
async fn process(
&self,
key: K,
inputs: impl Iterator<Item = I> + Send,
) -> std::result::Result<Vec<O>, E>;
}
impl<K, I, O, E> Batcher<K, I, O, E>
where
K: 'static + Send + Eq + Hash + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
pub fn new<F>(processor: F, batching_strategy: BatchingStrategy) -> Self
where
F: 'static + Send + Clone + Processor<K, I, O, E>,
{
let handle = Worker::spawn(processor, batching_strategy);
Self { worker: handle }
}
pub async fn add(&self, key: K, input: I) -> Result<O, E> {
let span_id = Span::current().id();
let (tx, rx) = oneshot::channel();
self.worker
.send(BatchItem {
key,
input,
tx,
span_id,
})
.await?;
rx.await?.map_err(BatchError::BatchFailed)
}
}
impl<K, I, O> Clone for Batcher<K, I, O> {
fn clone(&self) -> Self {
Self {
worker: self.worker.clone(),
}
}
}