batch_aint_one/
lib.rs

1//! Batch up multiple items for processing as a single unit.
2//!
3//! _I got 99 problems, but a batch ain't one..._
4//!
5//! Sometimes it is more efficient to process many items at once rather than one at a time.
6//! Especially when the processing step has overheads which can be shared between many items.
7//!
8//! Often applications work with one item at a time, e.g. _select one row_ or _insert one row_. Many
9//! of these operations can be batched up into more efficient versions: _select many rows_ and
10//! _insert many rows_.
11//!
12//! A worker task is run in the background. Many client tasks (e.g. message handlers) can submit
13//! items to the worker and wait for them to be processed. The worker task batches together many
14//! items and processes them as one unit, before sending a result back to each calling task.
15//!
16//! See the README for an example.
17
18#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_queue;
27mod batcher;
28mod error;
29mod policies;
30mod worker;
31
32pub use batcher::{Batcher, Processor};
33pub use error::BatchError;
34pub use policies::{BatchingPolicy, Limits, OnFull};
35
36#[cfg(test)]
37mod tests {
38    use std::time::Duration;
39
40    use async_trait::async_trait;
41    use tokio::join;
42    use tracing::{span, Instrument};
43
44    use crate::{Batcher, BatchingPolicy, Limits, Processor};
45
46    #[derive(Debug, Clone)]
47    pub struct SimpleBatchProcessor(pub Duration);
48
49    #[async_trait]
50    impl Processor<String, String, String> for SimpleBatchProcessor {
51        async fn process(
52            &self,
53            key: String,
54            inputs: impl Iterator<Item = String> + Send,
55        ) -> Result<Vec<String>, String> {
56            tokio::time::sleep(self.0).await;
57            Ok(inputs.map(|s| s + " processed for " + &key).collect())
58        }
59    }
60
61    #[tokio::test]
62    async fn test_tracing() {
63        use tracing::Level;
64        use tracing_capture::{CaptureLayer, SharedStorage};
65        use tracing_subscriber::layer::SubscriberExt;
66
67        let subscriber = tracing_subscriber::fmt()
68            .pretty()
69            .with_max_level(Level::INFO)
70            .finish();
71        // Add the capturing layer.
72        let storage = SharedStorage::default();
73        let subscriber = subscriber.with(CaptureLayer::new(&storage));
74
75        // Capture tracing information.
76        let _guard = tracing::subscriber::set_default(subscriber);
77
78        let batcher = Batcher::new(
79            SimpleBatchProcessor(Duration::ZERO),
80            Limits::default().max_batch_size(2),
81            BatchingPolicy::Size,
82        );
83
84        let h1 = {
85            tokio_test::task::spawn({
86                let span = span!(Level::INFO, "test_handler_span1");
87
88                batcher
89                    .add("A".to_string(), "1".to_string())
90                    .instrument(span)
91            })
92        };
93        let h2 = {
94            tokio_test::task::spawn({
95                let span = span!(Level::INFO, "test_handler_span2");
96
97                batcher
98                    .add("A".to_string(), "2".to_string())
99                    .instrument(span)
100            })
101        };
102
103        let (o1, o2) = join!(h1, h2);
104
105        assert!(o1.is_ok());
106        assert!(o2.is_ok());
107
108        let storage = storage.lock();
109
110        let process_spans: Vec<_> = storage
111            .all_spans()
112            .filter(|span| span.metadata().name().contains("process batch"))
113            .collect();
114        assert_eq!(
115            process_spans.len(),
116            1,
117            "should be a single span for processing the batch"
118        );
119
120        let process_span = process_spans.first().unwrap();
121
122        assert_eq!(
123            process_span["batch_size"], 2u64,
124            "batch_size shouldn't be emitted as a string",
125        );
126
127        assert_eq!(
128            process_span.follows_from().len(),
129            2,
130            "should follow from both handler spans"
131        );
132
133        let link_back_spans: Vec<_> = storage
134            .all_spans()
135            .filter(|span| span.metadata().name().contains("batch finished"))
136            .collect();
137        assert_eq!(
138            link_back_spans.len(),
139            2,
140            "should be two spans for linking back to the process span"
141        );
142
143        for span in link_back_spans {
144            assert_eq!(
145                span.follows_from().len(),
146                1,
147                "link back spans should follow from the process span"
148            );
149        }
150
151        assert_eq!(storage.all_spans().len(), 5, "should be 5 spans in total");
152    }
153}