1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
use std::{fmt::Display, hash::Hash, sync::Arc};
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tracing::{span, Level, Span};
use crate::{
batch::BatchItem,
error::Result,
policies::{BatchingPolicy, Limits},
worker::{Worker, WorkerHandle},
};
/// Groups items to be processed in batches.
///
/// Takes inputs (`I`) grouped by a key (`K`) and processes multiple together in a batch. An output
/// (`O`) is produced for each input.
///
/// Errors (`E`) can be returned from a batch.
///
/// Cheap to clone.
#[derive(Debug)]
pub struct Batcher<K, I, O = (), E = String>
where
K: 'static + Send + Eq + Hash + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
worker: Arc<WorkerHandle>,
item_tx: mpsc::Sender<BatchItem<K, I, O, E>>,
}
/// Process a batch of inputs for a given key.
#[async_trait]
pub trait Processor<K, I, O = (), E = String>
where
E: Display,
{
/// Process the batch.
///
/// The order of the outputs in the returned `Vec` must be the same as the order of the inputs
/// in the given iterator.
async fn process(
&self,
key: K,
inputs: impl Iterator<Item = I> + Send,
) -> std::result::Result<Vec<O>, E>;
}
impl<K, I, O, E> Batcher<K, I, O, E>
where
K: 'static + Send + Eq + Hash + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
/// Create a new batcher.
pub fn new<F>(processor: F, limits: Limits, batching_policy: BatchingPolicy) -> Self
where
F: 'static + Send + Clone + Processor<K, I, O, E>,
{
let (handle, item_tx) = Worker::spawn(processor, limits, batching_policy);
Self {
worker: Arc::new(handle),
item_tx,
}
}
/// Add an item to the batch and await the result.
pub async fn add(&self, key: K, input: I) -> Result<O, E> {
// Record the span ID so we can link the shared processing span.
let span_id = Span::current().id();
let (tx, rx) = oneshot::channel();
self.item_tx
.send(BatchItem {
key,
input,
tx,
span_id,
})
.await?;
let (o, batch_span_id) = rx.await?;
{
let link_back_span = span!(Level::INFO, "batch finished");
link_back_span.follows_from(batch_span_id);
link_back_span.in_scope(|| {
// Do nothing. This span is just here to work around a Honeycomb limitation:
//
// If the batch span is linked to a parent span like so:
//
// parent_span_1 <-link- batch_span
//
// then in Honeycomb, the link is only shown on the batch span. It it not possible
// to click through to the batch span from the parent.
//
// So, here we link back to the batch to make this easier.
})
}
o
}
}
impl<K, I, O, E> Clone for Batcher<K, I, O, E>
where
K: 'static + Send + Eq + Hash + Clone,
I: 'static + Send,
O: 'static + Send,
E: 'static + Send + Clone + Display,
{
fn clone(&self) -> Self {
Self {
worker: self.worker.clone(),
item_tx: self.item_tx.clone(),
}
}
}