1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_queue;
27mod batcher;
28mod error;
29mod policies;
30mod processor;
31mod timeout;
32mod worker;
33
34pub use batcher::Batcher;
35pub use error::BatchError;
36pub use policies::{BatchingPolicy, Limits, OnFull};
37pub use processor::Processor;
38
39#[cfg(test)]
40mod tests {
41 use std::time::Duration;
42
43 use tokio::join;
44 use tracing::{Instrument, span};
45
46 use crate::{Batcher, BatchingPolicy, Limits, Processor};
47
48 #[derive(Debug, Clone)]
49 pub struct SimpleBatchProcessor(pub Duration);
50
51 impl Processor for SimpleBatchProcessor {
52 type Key = String;
53 type Input = String;
54 type Output = String;
55 type Error = String;
56 type Resources = ();
57
58 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
59 tokio::time::sleep(self.0).await;
60 Ok(())
61 }
62
63 async fn process(
64 &self,
65 key: String,
66 inputs: impl Iterator<Item = String> + Send,
67 _resources: (),
68 ) -> Result<Vec<String>, String> {
69 tokio::time::sleep(self.0).await;
70 Ok(inputs.map(|s| s + " processed for " + &key).collect())
71 }
72 }
73
74 #[tokio::test]
75 #[ignore = "flaky"]
76 async fn test_tracing() {
77 use tracing::Level;
78 use tracing_capture::{CaptureLayer, SharedStorage};
79 use tracing_subscriber::layer::SubscriberExt;
80
81 let subscriber = tracing_subscriber::fmt()
82 .pretty()
83 .with_max_level(Level::INFO)
84 .with_test_writer()
85 .finish();
86 let storage = SharedStorage::default();
88 let subscriber = subscriber.with(CaptureLayer::new(&storage));
89
90 let _guard = tracing::subscriber::set_default(subscriber);
92
93 let batcher = Batcher::builder()
94 .name("test_tracing")
95 .processor(SimpleBatchProcessor(Duration::from_millis(10)))
96 .limits(Limits::default().with_max_batch_size(2))
97 .batching_policy(BatchingPolicy::Size)
98 .build();
99
100 let h1 = {
101 tokio_test::task::spawn({
102 let span = span!(Level::INFO, "test_handler_span1");
103
104 batcher
105 .add("A".to_string(), "1".to_string())
106 .instrument(span)
107 })
108 };
109 let h2 = {
110 tokio_test::task::spawn({
111 let span = span!(Level::INFO, "test_handler_span2");
112
113 batcher
114 .add("A".to_string(), "2".to_string())
115 .instrument(span)
116 })
117 };
118
119 let (o1, o2) = join!(h1, h2);
120
121 assert!(o1.is_ok());
122 assert!(o2.is_ok());
123
124 let worker = batcher.worker_handle();
125 worker.shut_down().await;
126 tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
127 .await
128 .expect("Worker should shut down");
129
130 let storage = storage.lock();
131
132 let outer_process_spans: Vec<_> = storage
133 .all_spans()
134 .filter(|span| span.metadata().name().contains("process batch"))
135 .collect();
136 assert_eq!(
137 outer_process_spans.len(),
138 1,
139 "should be a single outer span for processing the batch"
140 );
141 let process_span = outer_process_spans.first().unwrap().clone();
142
143 assert_eq!(
144 process_span["batch.size"], 2u64,
145 "batch.size shouldn't be emitted as a string",
146 );
147
148 assert_eq!(
149 process_span.follows_from().len(),
150 2,
151 "should follow from both handler spans"
152 );
153
154 let resource_spans: Vec<_> = storage
155 .all_spans()
156 .filter(|span| span.metadata().name().contains("acquire resources"))
157 .collect();
158 assert_eq!(
159 resource_spans.len(),
160 1,
161 "should be a single span for acquiring resources"
162 );
163 let resource_span_parent = resource_spans
164 .first()
165 .unwrap()
166 .parent()
167 .expect("resource span should have a parent");
168 assert_eq!(
169 resource_span_parent, process_span,
170 "resource acquisition should be a child of the outer process span"
171 );
172
173 let inner_process_spans: Vec<_> = storage
174 .all_spans()
175 .filter(|span| span.metadata().name().contains("process()"))
176 .collect();
177 assert_eq!(
178 inner_process_spans.len(),
179 1,
180 "should be a single inner span for processing the batch"
181 );
182 let inner_span_parent = inner_process_spans
183 .first()
184 .unwrap()
185 .parent()
186 .expect("resource span should have a parent");
187 assert_eq!(
188 inner_span_parent, process_span,
189 "resource acquisition should be a child of the outer process span"
190 );
191
192 let link_back_spans: Vec<_> = storage
193 .all_spans()
194 .filter(|span| span.metadata().name().contains("batch finished"))
195 .collect();
196 assert_eq!(
197 link_back_spans.len(),
198 2,
199 "should be two spans for linking back to the process span"
200 );
201
202 for span in link_back_spans {
203 assert_eq!(
204 span.follows_from().len(),
205 1,
206 "link back spans should follow from the process span"
207 );
208 }
209
210 assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
211 }
212}