batch_aint_one/
batcher.rs

1use std::{fmt::Debug, sync::Arc};
2
3use bon::bon;
4use tokio::sync::{mpsc, oneshot};
5use tracing::{Level, Span, span};
6
7use crate::{
8    batch::BatchItem,
9    error::BatchResult,
10    limits::Limits,
11    policies::BatchingPolicy,
12    processor::Processor,
13    worker::{Worker, WorkerDropGuard, WorkerHandle},
14};
15
16/// Groups items to be processed in batches.
17///
18/// Takes inputs one at a time and sends them to a background worker task which groups them into
19/// batches according to the specified [`BatchingPolicy`] and [`Limits`], and processes them using
20/// the provided [`Processor`].
21///
22/// Cheap to clone. Cloned instances share the same background worker task.
23///
24/// ## Drop
25///
26/// When the last instance of a `Batcher` is dropped, the worker task will be aborted (ungracefully
27/// shut down).
28///
29/// If you want to shut down the worker gracefully, call [`WorkerHandle::shut_down()`].
30#[derive(Debug)]
31pub struct Batcher<P: Processor> {
32    name: String,
33    worker: Arc<WorkerHandle>,
34    worker_guard: Arc<WorkerDropGuard>,
35    item_tx: mpsc::Sender<BatchItem<P>>,
36}
37
38#[bon]
39impl<P: Processor> Batcher<P> {
40    /// Create a new batcher.
41    ///
42    /// # Notes
43    ///
44    /// If `batching_policy` is `Balanced { min_size_hint }` where `min_size_hint` is greater than
45    /// `limits.max_batch_size`, the `min_size_hint` will be clamped to `max_batch_size`.
46    #[builder]
47    pub fn new(
48        name: impl Into<String>,
49        processor: P,
50        limits: Limits,
51        batching_policy: BatchingPolicy,
52    ) -> Self {
53        let name = name.into();
54
55        let batching_policy = batching_policy.normalise(limits);
56
57        let (handle, worker_guard, item_tx) =
58            Worker::spawn(name.clone(), processor, limits, batching_policy);
59
60        Self {
61            name,
62            worker: Arc::new(handle),
63            worker_guard: Arc::new(worker_guard),
64            item_tx,
65        }
66    }
67
68    /// Add an item to be batched and processed, and await the result.
69    pub async fn add(&self, key: P::Key, input: P::Input) -> BatchResult<P::Output, P::Error> {
70        // Record the span ID so we can link the shared processing span.
71        let requesting_span = Span::current().clone();
72
73        let (tx, rx) = oneshot::channel();
74        self.item_tx
75            .send(BatchItem {
76                key,
77                input,
78                submitted_at: tokio::time::Instant::now(),
79                tx,
80                requesting_span,
81            })
82            .await?;
83
84        let (output, batch_span) = rx.await?;
85
86        {
87            let link_back_span = span!(Level::INFO, "batch finished");
88            if let Some(span) = batch_span {
89                // WARNING: It's very important that we don't drop the span until _after_
90                // follows_from().
91                //
92                // If we did e.g. `.follows_from(span)` then the span would get converted into an ID
93                // and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
94                // then panic, because the span will have been closed and no longer exist.
95                //
96                // Don't ask me how long this took me to debug.
97                link_back_span.follows_from(&span);
98                link_back_span.in_scope(|| {
99                    // Do nothing. This span is just here to work around a Honeycomb limitation:
100                    //
101                    // If the batch span is linked to a parent span like so:
102                    //
103                    // parent_span_1 <-link- batch_span
104                    //
105                    // then in Honeycomb, the link is only shown on the batch span. It it not possible
106                    // to click through to the batch span from the parent.
107                    //
108                    // So, here we link back to the batch to make this easier.
109                });
110            }
111        }
112        output
113    }
114
115    /// Get a handle to the worker.
116    pub fn worker_handle(&self) -> Arc<WorkerHandle> {
117        Arc::clone(&self.worker)
118    }
119}
120
121impl<P: Processor> Clone for Batcher<P> {
122    fn clone(&self) -> Self {
123        Self {
124            name: self.name.clone(),
125            worker: self.worker.clone(),
126            worker_guard: self.worker_guard.clone(),
127            item_tx: self.item_tx.clone(),
128        }
129    }
130}