batch_aint_one/
lib.rs

1//! Batch up multiple items for processing as a single unit.
2//!
3//! _I got 99 problems, but a batch ain't one..._
4//!
5//! Sometimes it is more efficient to process many items at once rather than one at a time.
6//! Especially when the processing step has overheads which can be shared between many items.
7//!
8//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
9//! of these operations can be batched up into more efficient versions: _select many rows_ and
10//! _insert many rows_.
11//!
12//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
13//! items to the worker and wait for them to be processed. The worker task batches together many
14//! items and processes them as one unit, before sending a result back to each calling task.
15//!
16//! See the README for an example.
17
18#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_inner;
27mod batch_queue;
28mod batcher;
29mod error;
30mod policies;
31mod processor;
32mod timeout;
33mod worker;
34
35pub use batcher::Batcher;
36pub use error::BatchError;
37pub use policies::{BatchingPolicy, Limits, OnFull};
38pub use processor::Processor;
39pub use worker::WorkerHandle;
40
41#[cfg(test)]
42mod tests {
43    use std::time::Duration;
44
45    use tokio::join;
46    use tracing::{Instrument, span};
47
48    use crate::{Batcher, BatchingPolicy, Limits, Processor};
49
50    #[derive(Debug, Clone)]
51    pub struct SimpleBatchProcessor(pub Duration);
52
53    impl Processor for SimpleBatchProcessor {
54        type Key = String;
55        type Input = String;
56        type Output = String;
57        type Error = String;
58        type Resources = ();
59
60        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
61            tokio::time::sleep(self.0).await;
62            Ok(())
63        }
64
65        async fn process(
66            &self,
67            key: String,
68            inputs: impl Iterator<Item = String> + Send,
69            _resources: (),
70        ) -> Result<Vec<String>, String> {
71            tokio::time::sleep(self.0).await;
72            Ok(inputs.map(|s| s + " processed for " + &key).collect())
73        }
74    }
75
76    #[tokio::test]
77    #[ignore = "flaky"]
78    async fn test_tracing() {
79        use tracing::Level;
80        use tracing_capture::{CaptureLayer, SharedStorage};
81        use tracing_subscriber::layer::SubscriberExt;
82
83        let subscriber = tracing_subscriber::fmt()
84            .pretty()
85            .with_max_level(Level::INFO)
86            .with_test_writer()
87            .finish();
88        // Add the capturing layer.
89        let storage = SharedStorage::default();
90        let subscriber = subscriber.with(CaptureLayer::new(&storage));
91
92        // Capture tracing information.
93        let _guard = tracing::subscriber::set_default(subscriber);
94
95        let batcher = Batcher::builder()
96            .name("test_tracing")
97            .processor(SimpleBatchProcessor(Duration::from_millis(10)))
98            .limits(Limits::default().with_max_batch_size(2))
99            .batching_policy(BatchingPolicy::Size)
100            .build();
101
102        let h1 = {
103            tokio_test::task::spawn({
104                let span = span!(Level::INFO, "test_handler_span1");
105
106                batcher
107                    .add("A".to_string(), "1".to_string())
108                    .instrument(span)
109            })
110        };
111        let h2 = {
112            tokio_test::task::spawn({
113                let span = span!(Level::INFO, "test_handler_span2");
114
115                batcher
116                    .add("A".to_string(), "2".to_string())
117                    .instrument(span)
118            })
119        };
120
121        let (o1, o2) = join!(h1, h2);
122
123        assert!(o1.is_ok());
124        assert!(o2.is_ok());
125
126        let worker = batcher.worker_handle();
127        worker.shut_down().await;
128        tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
129            .await
130            .expect("Worker should shut down");
131
132        let storage = storage.lock();
133
134        let outer_process_spans: Vec<_> = storage
135            .all_spans()
136            .filter(|span| span.metadata().name().contains("process batch"))
137            .collect();
138        assert_eq!(
139            outer_process_spans.len(),
140            1,
141            "should be a single outer span for processing the batch"
142        );
143        let process_span = outer_process_spans.first().unwrap().clone();
144
145        assert_eq!(
146            process_span["batch.size"], 2u64,
147            "batch.size shouldn't be emitted as a string",
148        );
149
150        assert_eq!(
151            process_span.follows_from().len(),
152            2,
153            "should follow from both handler spans"
154        );
155
156        let resource_spans: Vec<_> = storage
157            .all_spans()
158            .filter(|span| span.metadata().name().contains("acquire resources"))
159            .collect();
160        assert_eq!(
161            resource_spans.len(),
162            1,
163            "should be a single span for acquiring resources"
164        );
165        let resource_span_parent = resource_spans
166            .first()
167            .unwrap()
168            .parent()
169            .expect("resource span should have a parent");
170        assert_eq!(
171            resource_span_parent, process_span,
172            "resource acquisition should be a child of the outer process span"
173        );
174
175        let inner_process_spans: Vec<_> = storage
176            .all_spans()
177            .filter(|span| span.metadata().name().contains("process()"))
178            .collect();
179        assert_eq!(
180            inner_process_spans.len(),
181            1,
182            "should be a single inner span for processing the batch"
183        );
184        let inner_span_parent = inner_process_spans
185            .first()
186            .unwrap()
187            .parent()
188            .expect("resource span should have a parent");
189        assert_eq!(
190            inner_span_parent, process_span,
191            "resource acquisition should be a child of the outer process span"
192        );
193
194        let link_back_spans: Vec<_> = storage
195            .all_spans()
196            .filter(|span| span.metadata().name().contains("batch finished"))
197            .collect();
198        assert_eq!(
199            link_back_spans.len(),
200            2,
201            "should be two spans for linking back to the process span"
202        );
203
204        for span in link_back_spans {
205            assert_eq!(
206                span.follows_from().len(),
207                1,
208                "link back spans should follow from the process span"
209            );
210        }
211
212        assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
213    }
214}