batch_aint_one/
lib.rs

1//! Batch up multiple items for processing as a single unit.
2//!
3//! _I got 99 problems, but a batch ain't one..._
4//!
5//! Sometimes it is more efficient to process many items at once rather than one at a time.
6//! Especially when the processing step has overheads which can be shared between many items.
7//!
8//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
9//! of these operations can be batched up into more efficient versions: _select many rows_ and
10//! _insert many rows_.
11//!
12//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
13//! items to the worker and wait for them to be processed. The worker task batches together many
14//! items and processes them as one unit, before sending a result back to each calling task.
15//!
16//! See the README for an example.
17
18#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../../README.md");
24
25mod batch;
26mod batch_inner;
27mod batch_queue;
28mod batcher;
29pub mod error;
30mod limits;
31mod policies;
32mod processor;
33mod timeout;
34mod worker;
35
36pub use batcher::Batcher;
37pub use error::BatchError;
38pub use limits::Limits;
39pub use policies::{BatchingPolicy, OnFull};
40pub use processor::Processor;
41pub use worker::WorkerHandle;
42
43#[cfg(test)]
44mod tests {
45    use std::time::Duration;
46
47    use tokio::join;
48    use tracing::{Instrument, span};
49
50    use crate::{Batcher, BatchingPolicy, Limits, Processor};
51
52    #[derive(Debug, Clone)]
53    pub struct SimpleBatchProcessor(pub Duration);
54
55    impl Processor for SimpleBatchProcessor {
56        type Key = String;
57        type Input = String;
58        type Output = String;
59        type Error = String;
60        type Resources = ();
61
62        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
63            tokio::time::sleep(self.0).await;
64            Ok(())
65        }
66
67        async fn process(
68            &self,
69            key: String,
70            inputs: impl Iterator<Item = String> + Send,
71            _resources: (),
72        ) -> Result<Vec<String>, String> {
73            tokio::time::sleep(self.0).await;
74            Ok(inputs.map(|s| s + " processed for " + &key).collect())
75        }
76    }
77
78    #[tokio::test]
79    #[ignore = "flaky"]
80    async fn test_tracing() {
81        use tracing::Level;
82        use tracing_capture::{CaptureLayer, SharedStorage};
83        use tracing_subscriber::layer::SubscriberExt;
84
85        let subscriber = tracing_subscriber::fmt()
86            .pretty()
87            .with_max_level(Level::INFO)
88            .with_test_writer()
89            .finish();
90        // Add the capturing layer.
91        let storage = SharedStorage::default();
92        let subscriber = subscriber.with(CaptureLayer::new(&storage));
93
94        // Capture tracing information.
95        let _guard = tracing::subscriber::set_default(subscriber);
96
97        let batcher = Batcher::builder()
98            .name("test_tracing")
99            .processor(SimpleBatchProcessor(Duration::from_millis(10)))
100            .limits(Limits::builder().max_batch_size(2).build())
101            .batching_policy(BatchingPolicy::Size)
102            .build();
103
104        let h1 = {
105            tokio_test::task::spawn({
106                let span = span!(Level::INFO, "test_handler_span1");
107
108                batcher
109                    .add("A".to_string(), "1".to_string())
110                    .instrument(span)
111            })
112        };
113        let h2 = {
114            tokio_test::task::spawn({
115                let span = span!(Level::INFO, "test_handler_span2");
116
117                batcher
118                    .add("A".to_string(), "2".to_string())
119                    .instrument(span)
120            })
121        };
122
123        let (o1, o2) = join!(h1, h2);
124
125        assert!(o1.is_ok());
126        assert!(o2.is_ok());
127
128        let worker = batcher.worker_handle();
129        worker.shut_down().await;
130        tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
131            .await
132            .expect("Worker should shut down");
133
134        let storage = storage.lock();
135
136        let outer_process_spans: Vec<_> = storage
137            .all_spans()
138            .filter(|span| span.metadata().name().contains("process batch"))
139            .collect();
140        assert_eq!(
141            outer_process_spans.len(),
142            1,
143            "should be a single outer span for processing the batch"
144        );
145        let process_span = *outer_process_spans.first().unwrap();
146
147        assert_eq!(
148            process_span["batch.size"], 2u64,
149            "batch.size shouldn't be emitted as a string",
150        );
151
152        assert_eq!(
153            process_span.follows_from().len(),
154            2,
155            "should follow from both handler spans"
156        );
157
158        let resource_spans: Vec<_> = storage
159            .all_spans()
160            .filter(|span| span.metadata().name().contains("acquire resources"))
161            .collect();
162        assert_eq!(
163            resource_spans.len(),
164            1,
165            "should be a single span for acquiring resources"
166        );
167        let resource_span_parent = resource_spans
168            .first()
169            .unwrap()
170            .parent()
171            .expect("resource span should have a parent");
172        assert_eq!(
173            resource_span_parent, process_span,
174            "resource acquisition should be a child of the outer process span"
175        );
176
177        let inner_process_spans: Vec<_> = storage
178            .all_spans()
179            .filter(|span| span.metadata().name().contains("process()"))
180            .collect();
181        assert_eq!(
182            inner_process_spans.len(),
183            1,
184            "should be a single inner span for processing the batch"
185        );
186        let inner_span_parent = inner_process_spans
187            .first()
188            .unwrap()
189            .parent()
190            .expect("resource span should have a parent");
191        assert_eq!(
192            inner_span_parent, process_span,
193            "resource acquisition should be a child of the outer process span"
194        );
195
196        let link_back_spans: Vec<_> = storage
197            .all_spans()
198            .filter(|span| span.metadata().name().contains("batch finished"))
199            .collect();
200        assert_eq!(
201            link_back_spans.len(),
202            2,
203            "should be two spans for linking back to the process span"
204        );
205
206        for span in link_back_spans {
207            assert_eq!(
208                span.follows_from().len(),
209                1,
210                "link back spans should follow from the process span"
211            );
212        }
213
214        assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
215    }
216}