batch_aint_one/
lib.rs

1//! Batch up multiple items for processing as a single unit.
2//!
3//! _I got 99 problems, but a batch ain't one..._
4//!
5//! Sometimes it is more efficient to process many items at once rather than one at a time.
6//! Especially when the processing step has overheads which can be shared between many items.
7//!
8//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
9//! of these operations can be batched up into more efficient versions: _select many rows_ and
10//! _insert many rows_.
11//!
12//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
13//! items to the worker and wait for them to be processed. The worker task batches together many
14//! items and processes them as one unit, before sending a result back to each calling task.
15//!
16//! See the README for an example.
17
18#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_queue;
27mod batcher;
28mod error;
29mod policies;
30mod processor;
31mod timeout;
32mod worker;
33
34pub use batcher::Batcher;
35pub use error::BatchError;
36pub use policies::{BatchingPolicy, Limits, OnFull};
37pub use processor::Processor;
38
39#[cfg(test)]
40mod tests {
41    use std::time::Duration;
42
43    use tokio::join;
44    use tracing::{Instrument, span};
45
46    use crate::{Batcher, BatchingPolicy, Limits, Processor};
47
48    #[derive(Debug, Clone)]
49    pub struct SimpleBatchProcessor(pub Duration);
50
51    impl Processor for SimpleBatchProcessor {
52        type Key = String;
53        type Input = String;
54        type Output = String;
55        type Error = String;
56        type Resources = ();
57
58        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
59            tokio::time::sleep(self.0).await;
60            Ok(())
61        }
62
63        async fn process(
64            &self,
65            key: String,
66            inputs: impl Iterator<Item = String> + Send,
67            _resources: (),
68        ) -> Result<Vec<String>, String> {
69            tokio::time::sleep(self.0).await;
70            Ok(inputs.map(|s| s + " processed for " + &key).collect())
71        }
72    }
73
74    #[tokio::test]
75    #[ignore = "flaky"]
76    async fn test_tracing() {
77        use tracing::Level;
78        use tracing_capture::{CaptureLayer, SharedStorage};
79        use tracing_subscriber::layer::SubscriberExt;
80
81        let subscriber = tracing_subscriber::fmt()
82            .pretty()
83            .with_max_level(Level::INFO)
84            .with_test_writer()
85            .finish();
86        // Add the capturing layer.
87        let storage = SharedStorage::default();
88        let subscriber = subscriber.with(CaptureLayer::new(&storage));
89
90        // Capture tracing information.
91        let _guard = tracing::subscriber::set_default(subscriber);
92
93        let batcher = Batcher::builder()
94            .name("test_tracing")
95            .processor(SimpleBatchProcessor(Duration::from_millis(10)))
96            .limits(Limits::default().with_max_batch_size(2))
97            .batching_policy(BatchingPolicy::Size)
98            .build();
99
100        let h1 = {
101            tokio_test::task::spawn({
102                let span = span!(Level::INFO, "test_handler_span1");
103
104                batcher
105                    .add("A".to_string(), "1".to_string())
106                    .instrument(span)
107            })
108        };
109        let h2 = {
110            tokio_test::task::spawn({
111                let span = span!(Level::INFO, "test_handler_span2");
112
113                batcher
114                    .add("A".to_string(), "2".to_string())
115                    .instrument(span)
116            })
117        };
118
119        let (o1, o2) = join!(h1, h2);
120
121        assert!(o1.is_ok());
122        assert!(o2.is_ok());
123
124        let worker = batcher.worker_handle();
125        worker.shut_down().await;
126        tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
127            .await
128            .expect("Worker should shut down");
129
130        let storage = storage.lock();
131
132        let outer_process_spans: Vec<_> = storage
133            .all_spans()
134            .filter(|span| span.metadata().name().contains("process batch"))
135            .collect();
136        assert_eq!(
137            outer_process_spans.len(),
138            1,
139            "should be a single outer span for processing the batch"
140        );
141        let process_span = outer_process_spans.first().unwrap().clone();
142
143        assert_eq!(
144            process_span["batch.size"], 2u64,
145            "batch.size shouldn't be emitted as a string",
146        );
147
148        assert_eq!(
149            process_span.follows_from().len(),
150            2,
151            "should follow from both handler spans"
152        );
153
154        let resource_spans: Vec<_> = storage
155            .all_spans()
156            .filter(|span| span.metadata().name().contains("acquire resources"))
157            .collect();
158        assert_eq!(
159            resource_spans.len(),
160            1,
161            "should be a single span for acquiring resources"
162        );
163        let resource_span_parent = resource_spans
164            .first()
165            .unwrap()
166            .parent()
167            .expect("resource span should have a parent");
168        assert_eq!(
169            resource_span_parent, process_span,
170            "resource acquisition should be a child of the outer process span"
171        );
172
173        let inner_process_spans: Vec<_> = storage
174            .all_spans()
175            .filter(|span| span.metadata().name().contains("process()"))
176            .collect();
177        assert_eq!(
178            inner_process_spans.len(),
179            1,
180            "should be a single inner span for processing the batch"
181        );
182        let inner_span_parent = inner_process_spans
183            .first()
184            .unwrap()
185            .parent()
186            .expect("resource span should have a parent");
187        assert_eq!(
188            inner_span_parent, process_span,
189            "resource acquisition should be a child of the outer process span"
190        );
191
192        let link_back_spans: Vec<_> = storage
193            .all_spans()
194            .filter(|span| span.metadata().name().contains("batch finished"))
195            .collect();
196        assert_eq!(
197            link_back_spans.len(),
198            2,
199            "should be two spans for linking back to the process span"
200        );
201
202        for span in link_back_spans {
203            assert_eq!(
204                span.follows_from().len(),
205                1,
206                "link back spans should follow from the process span"
207            );
208        }
209
210        assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
211    }
212}