Skip to main content

batch_aint_one/
batcher.rs

1use std::{fmt::Debug, sync::Arc};
2
3use bon::bon;
4use tokio::sync::{mpsc, oneshot};
5use tracing::{Level, Span, span};
6
7use crate::{
8    batch::BatchItem,
9    error::BatchResult,
10    limits::Limits,
11    policies::BatchingPolicy,
12    processor::Processor,
13    worker::{Worker, WorkerDropGuard, WorkerHandle},
14};
15
16/// Groups items to be processed in batches.
17///
18/// Takes inputs one at a time and sends them to a background worker task which groups them into
19/// batches according to the specified [`BatchingPolicy`] and [`Limits`], and processes them using
20/// the provided [`Processor`].
21///
22/// Cheap to clone. Cloned instances share the same background worker task.
23///
24/// ## Drop
25///
26/// When the last instance of a `Batcher` is dropped, the worker task will be aborted (ungracefully
27/// shut down). Any callers still awaiting [`Batcher::add()`] will receive a
28/// [`BatchError::Rx`](crate::BatchError::Rx) error.
29///
30/// If you want to shut down the worker gracefully, call [`WorkerHandle::shut_down()`].
31#[derive(Debug)]
32pub struct Batcher<P: Processor> {
33    name: String,
34    worker: Arc<WorkerHandle>,
35    worker_guard: Arc<WorkerDropGuard>,
36    item_tx: mpsc::Sender<BatchItem<P>>,
37}
38
39#[bon]
40impl<P: Processor> Batcher<P> {
41    /// Create a new batcher.
42    ///
43    /// # Notes
44    ///
45    /// If `batching_policy` is `Balanced { min_size_hint }` where `min_size_hint` is greater than
46    /// `limits.max_batch_size`, the `min_size_hint` will be clamped to `max_batch_size`.
47    #[builder]
48    pub fn new(
49        name: impl Into<String>,
50        processor: P,
51        limits: Limits,
52        batching_policy: BatchingPolicy,
53    ) -> Self {
54        let name = name.into();
55
56        let batching_policy = batching_policy.normalise(limits);
57
58        let (handle, worker_guard, item_tx) =
59            Worker::spawn(name.clone(), processor, limits, batching_policy);
60
61        Self {
62            name,
63            worker: Arc::new(handle),
64            worker_guard: Arc::new(worker_guard),
65            item_tx,
66        }
67    }
68
69    /// Add an item to be batched and processed, and await the result.
70    pub async fn add(&self, key: P::Key, input: P::Input) -> BatchResult<P::Output, P::Error> {
71        // Record the span ID so we can link the shared processing span.
72        let requesting_span = Span::current().clone();
73
74        let (tx, rx) = oneshot::channel();
75        self.item_tx
76            .send(BatchItem {
77                key,
78                input,
79                submitted_at: tokio::time::Instant::now(),
80                tx,
81                requesting_span,
82            })
83            .await?;
84
85        let (output, batch_span) = rx.await?;
86
87        {
88            let link_back_span = span!(Level::INFO, "batch finished");
89            if let Some(span) = batch_span {
90                // WARNING: It's very important that we don't drop the span until _after_
91                // follows_from().
92                //
93                // If we did e.g. `.follows_from(span)` then the span would get converted into an ID
94                // and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
95                // then panic, because the span will have been closed and no longer exist.
96                //
97                // Don't ask me how long this took me to debug.
98                link_back_span.follows_from(&span);
99                link_back_span.in_scope(|| {
100                    // Do nothing. This span is just here to work around a Honeycomb limitation:
101                    //
102                    // If the batch span is linked to a parent span like so:
103                    //
104                    // parent_span_1 <-link- batch_span
105                    //
106                    // then in Honeycomb, the link is only shown on the batch span. It it not possible
107                    // to click through to the batch span from the parent.
108                    //
109                    // So, here we link back to the batch to make this easier.
110                });
111            }
112        }
113        output
114    }
115
116    /// Get a handle to the worker.
117    pub fn worker_handle(&self) -> Arc<WorkerHandle> {
118        Arc::clone(&self.worker)
119    }
120}
121
122impl<P: Processor> Clone for Batcher<P> {
123    fn clone(&self) -> Self {
124        Self {
125            name: self.name.clone(),
126            worker: self.worker.clone(),
127            worker_guard: self.worker_guard.clone(),
128            item_tx: self.item_tx.clone(),
129        }
130    }
131}