batch_aint_one/batcher.rs
1use std::{fmt::Debug, sync::Arc};
2
3use bon::bon;
4use tokio::sync::{mpsc, oneshot};
5use tracing::{Level, Span, span};
6
7use crate::{
8 batch::BatchItem,
9 error::BatchResult,
10 limits::Limits,
11 policies::BatchingPolicy,
12 processor::Processor,
13 worker::{Worker, WorkerDropGuard, WorkerHandle},
14};
15
16/// Groups items to be processed in batches.
17///
18/// Takes inputs one at a time and sends them to a background worker task which groups them into
19/// batches according to the specified [`BatchingPolicy`] and [`Limits`], and processes them using
20/// the provided [`Processor`].
21///
22/// Cheap to clone. Cloned instances share the same background worker task.
23///
24/// ## Drop
25///
26/// When the last instance of a `Batcher` is dropped, the worker task will be aborted (ungracefully
27/// shut down). Any callers still awaiting [`Batcher::add()`] will receive a
28/// [`BatchError::Rx`](crate::BatchError::Rx) error.
29///
30/// If you want to shut down the worker gracefully, call [`WorkerHandle::shut_down()`].
31#[derive(Debug)]
32pub struct Batcher<P: Processor> {
33 name: String,
34 worker: Arc<WorkerHandle>,
35 worker_guard: Arc<WorkerDropGuard>,
36 item_tx: mpsc::Sender<BatchItem<P>>,
37}
38
39#[bon]
40impl<P: Processor> Batcher<P> {
41 /// Create a new batcher.
42 ///
43 /// # Notes
44 ///
45 /// If `batching_policy` is `Balanced { min_size_hint }` where `min_size_hint` is greater than
46 /// `limits.max_batch_size`, the `min_size_hint` will be clamped to `max_batch_size`.
47 #[builder]
48 pub fn new(
49 name: impl Into<String>,
50 processor: P,
51 limits: Limits,
52 batching_policy: BatchingPolicy,
53 ) -> Self {
54 let name = name.into();
55
56 let batching_policy = batching_policy.normalise(limits);
57
58 let (handle, worker_guard, item_tx) =
59 Worker::spawn(name.clone(), processor, limits, batching_policy);
60
61 Self {
62 name,
63 worker: Arc::new(handle),
64 worker_guard: Arc::new(worker_guard),
65 item_tx,
66 }
67 }
68
69 /// Add an item to be batched and processed, and await the result.
70 pub async fn add(&self, key: P::Key, input: P::Input) -> BatchResult<P::Output, P::Error> {
71 // Record the span ID so we can link the shared processing span.
72 let requesting_span = Span::current().clone();
73
74 let (tx, rx) = oneshot::channel();
75 self.item_tx
76 .send(BatchItem {
77 key,
78 input,
79 submitted_at: tokio::time::Instant::now(),
80 tx,
81 requesting_span,
82 })
83 .await?;
84
85 let (output, batch_span) = rx.await?;
86
87 {
88 let link_back_span = span!(Level::INFO, "batch finished");
89 if let Some(span) = batch_span {
90 // WARNING: It's very important that we don't drop the span until _after_
91 // follows_from().
92 //
93 // If we did e.g. `.follows_from(span)` then the span would get converted into an ID
94 // and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
95 // then panic, because the span will have been closed and no longer exist.
96 //
97 // Don't ask me how long this took me to debug.
98 link_back_span.follows_from(&span);
99 link_back_span.in_scope(|| {
100 // Do nothing. This span is just here to work around a Honeycomb limitation:
101 //
102 // If the batch span is linked to a parent span like so:
103 //
104 // parent_span_1 <-link- batch_span
105 //
106 // then in Honeycomb, the link is only shown on the batch span. It it not possible
107 // to click through to the batch span from the parent.
108 //
109 // So, here we link back to the batch to make this easier.
110 });
111 }
112 }
113 output
114 }
115
116 /// Get a handle to the worker.
117 pub fn worker_handle(&self) -> Arc<WorkerHandle> {
118 Arc::clone(&self.worker)
119 }
120}
121
122impl<P: Processor> Clone for Batcher<P> {
123 fn clone(&self) -> Self {
124 Self {
125 name: self.name.clone(),
126 worker: self.worker.clone(),
127 worker_guard: self.worker_guard.clone(),
128 item_tx: self.item_tx.clone(),
129 }
130 }
131}