batch_aint_one/batcher.rs
1use std::{fmt::Debug, sync::Arc};
2
3use bon::bon;
4use tokio::sync::{mpsc, oneshot};
5use tracing::{Level, Span, span};
6
7use crate::{
8 batch::BatchItem,
9 error::BatchResult,
10 limits::Limits,
11 policies::BatchingPolicy,
12 processor::Processor,
13 worker::{Worker, WorkerDropGuard, WorkerHandle},
14};
15
16/// Groups items to be processed in batches.
17///
18/// Takes inputs one at a time and sends them to a background worker task which groups them into
19/// batches according to the specified [`BatchingPolicy`] and [`Limits`], and processes them using
20/// the provided [`Processor`].
21///
22/// Cheap to clone. Cloned instances share the same background worker task.
23///
24/// ## Drop
25///
26/// When the last instance of a `Batcher` is dropped, the worker task will be aborted (ungracefully
27/// shut down).
28///
29/// If you want to shut down the worker gracefully, call [`WorkerHandle::shut_down()`].
30#[derive(Debug)]
31pub struct Batcher<P: Processor> {
32 name: String,
33 worker: Arc<WorkerHandle>,
34 worker_guard: Arc<WorkerDropGuard>,
35 item_tx: mpsc::Sender<BatchItem<P>>,
36}
37
38#[bon]
39impl<P: Processor> Batcher<P> {
40 /// Create a new batcher.
41 ///
42 /// # Notes
43 ///
44 /// If `batching_policy` is `Balanced { min_size_hint }` where `min_size_hint` is greater than
45 /// `limits.max_batch_size`, the `min_size_hint` will be clamped to `max_batch_size`.
46 #[builder]
47 pub fn new(
48 name: impl Into<String>,
49 processor: P,
50 limits: Limits,
51 batching_policy: BatchingPolicy,
52 ) -> Self {
53 let name = name.into();
54
55 let batching_policy = batching_policy.normalise(limits);
56
57 let (handle, worker_guard, item_tx) =
58 Worker::spawn(name.clone(), processor, limits, batching_policy);
59
60 Self {
61 name,
62 worker: Arc::new(handle),
63 worker_guard: Arc::new(worker_guard),
64 item_tx,
65 }
66 }
67
68 /// Add an item to be batched and processed, and await the result.
69 pub async fn add(&self, key: P::Key, input: P::Input) -> BatchResult<P::Output, P::Error> {
70 // Record the span ID so we can link the shared processing span.
71 let requesting_span = Span::current().clone();
72
73 let (tx, rx) = oneshot::channel();
74 self.item_tx
75 .send(BatchItem {
76 key,
77 input,
78 submitted_at: tokio::time::Instant::now(),
79 tx,
80 requesting_span,
81 })
82 .await?;
83
84 let (output, batch_span) = rx.await?;
85
86 {
87 let link_back_span = span!(Level::INFO, "batch finished");
88 if let Some(span) = batch_span {
89 // WARNING: It's very important that we don't drop the span until _after_
90 // follows_from().
91 //
92 // If we did e.g. `.follows_from(span)` then the span would get converted into an ID
93 // and dropped. Any attempt to look up the span by ID _inside_ follows_from() would
94 // then panic, because the span will have been closed and no longer exist.
95 //
96 // Don't ask me how long this took me to debug.
97 link_back_span.follows_from(&span);
98 link_back_span.in_scope(|| {
99 // Do nothing. This span is just here to work around a Honeycomb limitation:
100 //
101 // If the batch span is linked to a parent span like so:
102 //
103 // parent_span_1 <-link- batch_span
104 //
105 // then in Honeycomb, the link is only shown on the batch span. It it not possible
106 // to click through to the batch span from the parent.
107 //
108 // So, here we link back to the batch to make this easier.
109 });
110 }
111 }
112 output
113 }
114
115 /// Get a handle to the worker.
116 pub fn worker_handle(&self) -> Arc<WorkerHandle> {
117 Arc::clone(&self.worker)
118 }
119}
120
121impl<P: Processor> Clone for Batcher<P> {
122 fn clone(&self) -> Self {
123 Self {
124 name: self.name.clone(),
125 worker: self.worker.clone(),
126 worker_guard: self.worker_guard.clone(),
127 item_tx: self.item_tx.clone(),
128 }
129 }
130}