Skip to main content

batch_aint_one/
lib.rs

1//! Batch up multiple items for processing as a single unit.
2//!
3//! _I got 99 problems, but a batch ain't one..._
4//!
5//! Sometimes it is more efficient to process many items at once rather than one at a time.
6//! Especially when the processing step has overheads which can be shared between many items.
7//!
8//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
9//! of these operations can be batched up into more efficient versions: _select many rows_ and
10//! _insert many rows_.
11//!
12//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
13//! items to the worker and wait for them to be processed. The worker task batches together many
14//! items and processes them as one unit, before sending a result back to each calling task.
15//!
16//! See the README for an example.
17
18#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../../README.md");
24
25/// Check an invariant which should never be violated: panic in debug builds (and tests), but
26/// only log a warning in release builds rather than bringing the process down.
27macro_rules! soft_assert {
28    ($cond:expr, $($arg:tt)+) => {
29        if !$cond {
30            tracing::warn!($($arg)+);
31            debug_assert!(false, $($arg)+);
32        }
33    };
34}
35
36mod batch;
37mod batch_inner;
38mod batch_queue;
39mod batcher;
40pub mod error;
41mod limits;
42mod policies;
43mod processor;
44mod timeout;
45mod worker;
46
47pub use batcher::Batcher;
48pub use error::BatchError;
49pub use limits::Limits;
50pub use policies::{BatchingPolicy, OnFull};
51pub use processor::Processor;
52pub use worker::WorkerHandle;
53
54#[cfg(test)]
55mod tests {
56    use std::time::Duration;
57
58    use tokio::join;
59    use tracing::{Instrument, span};
60
61    use crate::{Batcher, BatchingPolicy, Limits, Processor};
62
63    #[derive(Debug, Clone)]
64    pub struct SimpleBatchProcessor(pub Duration);
65
66    impl Processor for SimpleBatchProcessor {
67        type Key = String;
68        type Input = String;
69        type Output = String;
70        type Error = String;
71        type Resources = ();
72
73        async fn acquire_resources(&self, _key: String) -> Result<(), String> {
74            tokio::time::sleep(self.0).await;
75            Ok(())
76        }
77
78        async fn process(
79            &self,
80            key: String,
81            inputs: impl Iterator<Item = String> + Send,
82            _resources: (),
83        ) -> Result<Vec<String>, String> {
84            tokio::time::sleep(self.0).await;
85            Ok(inputs.map(|s| s + " processed for " + &key).collect())
86        }
87    }
88
89    #[tokio::test]
90    #[ignore = "flaky"]
91    async fn test_tracing() {
92        use tracing::Level;
93        use tracing_capture::{CaptureLayer, SharedStorage};
94        use tracing_subscriber::layer::SubscriberExt;
95
96        let subscriber = tracing_subscriber::fmt()
97            .pretty()
98            .with_max_level(Level::INFO)
99            .with_test_writer()
100            .finish();
101        // Add the capturing layer.
102        let storage = SharedStorage::default();
103        let subscriber = subscriber.with(CaptureLayer::new(&storage));
104
105        // Capture tracing information.
106        let _guard = tracing::subscriber::set_default(subscriber);
107
108        let batcher = Batcher::builder()
109            .name("test_tracing")
110            .processor(SimpleBatchProcessor(Duration::from_millis(10)))
111            .limits(Limits::builder().max_batch_size(2).build())
112            .batching_policy(BatchingPolicy::Size)
113            .build();
114
115        let h1 = {
116            tokio_test::task::spawn({
117                let span = span!(Level::INFO, "test_handler_span1");
118
119                batcher
120                    .add("A".to_string(), "1".to_string())
121                    .instrument(span)
122            })
123        };
124        let h2 = {
125            tokio_test::task::spawn({
126                let span = span!(Level::INFO, "test_handler_span2");
127
128                batcher
129                    .add("A".to_string(), "2".to_string())
130                    .instrument(span)
131            })
132        };
133
134        let (o1, o2) = join!(h1, h2);
135
136        assert!(o1.is_ok());
137        assert!(o2.is_ok());
138
139        let worker = batcher.worker_handle();
140        worker.shut_down().await;
141        tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
142            .await
143            .expect("Worker should shut down");
144
145        let storage = storage.lock();
146
147        let outer_process_spans: Vec<_> = storage
148            .all_spans()
149            .filter(|span| span.metadata().name().contains("process batch"))
150            .collect();
151        assert_eq!(
152            outer_process_spans.len(),
153            1,
154            "should be a single outer span for processing the batch"
155        );
156        let process_span = *outer_process_spans.first().unwrap();
157
158        assert_eq!(
159            process_span["batch.size"], 2u64,
160            "batch.size shouldn't be emitted as a string",
161        );
162
163        assert_eq!(
164            process_span.follows_from().len(),
165            2,
166            "should follow from both handler spans"
167        );
168
169        let resource_spans: Vec<_> = storage
170            .all_spans()
171            .filter(|span| span.metadata().name().contains("acquire resources"))
172            .collect();
173        assert_eq!(
174            resource_spans.len(),
175            1,
176            "should be a single span for acquiring resources"
177        );
178        let resource_span_parent = resource_spans
179            .first()
180            .unwrap()
181            .parent()
182            .expect("resource span should have a parent");
183        assert_eq!(
184            resource_span_parent, process_span,
185            "resource acquisition should be a child of the outer process span"
186        );
187
188        let inner_process_spans: Vec<_> = storage
189            .all_spans()
190            .filter(|span| span.metadata().name().contains("process()"))
191            .collect();
192        assert_eq!(
193            inner_process_spans.len(),
194            1,
195            "should be a single inner span for processing the batch"
196        );
197        let inner_span_parent = inner_process_spans
198            .first()
199            .unwrap()
200            .parent()
201            .expect("resource span should have a parent");
202        assert_eq!(
203            inner_span_parent, process_span,
204            "resource acquisition should be a child of the outer process span"
205        );
206
207        let link_back_spans: Vec<_> = storage
208            .all_spans()
209            .filter(|span| span.metadata().name().contains("batch finished"))
210            .collect();
211        assert_eq!(
212            link_back_spans.len(),
213            2,
214            "should be two spans for linking back to the process span"
215        );
216
217        for span in link_back_spans {
218            assert_eq!(
219                span.follows_from().len(),
220                1,
221                "link back spans should follow from the process span"
222            );
223        }
224
225        assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
226    }
227}