1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_inner;
27mod batch_queue;
28mod batcher;
29mod error;
30mod policies;
31mod processor;
32mod timeout;
33mod worker;
34
35pub use batcher::Batcher;
36pub use error::BatchError;
37pub use policies::{BatchingPolicy, Limits, OnFull};
38pub use processor::Processor;
39pub use worker::WorkerHandle;
40
41#[cfg(test)]
42mod tests {
43 use std::time::Duration;
44
45 use tokio::join;
46 use tracing::{Instrument, span};
47
48 use crate::{Batcher, BatchingPolicy, Limits, Processor};
49
50 #[derive(Debug, Clone)]
51 pub struct SimpleBatchProcessor(pub Duration);
52
53 impl Processor for SimpleBatchProcessor {
54 type Key = String;
55 type Input = String;
56 type Output = String;
57 type Error = String;
58 type Resources = ();
59
60 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
61 tokio::time::sleep(self.0).await;
62 Ok(())
63 }
64
65 async fn process(
66 &self,
67 key: String,
68 inputs: impl Iterator<Item = String> + Send,
69 _resources: (),
70 ) -> Result<Vec<String>, String> {
71 tokio::time::sleep(self.0).await;
72 Ok(inputs.map(|s| s + " processed for " + &key).collect())
73 }
74 }
75
76 #[tokio::test]
77 #[ignore = "flaky"]
78 async fn test_tracing() {
79 use tracing::Level;
80 use tracing_capture::{CaptureLayer, SharedStorage};
81 use tracing_subscriber::layer::SubscriberExt;
82
83 let subscriber = tracing_subscriber::fmt()
84 .pretty()
85 .with_max_level(Level::INFO)
86 .with_test_writer()
87 .finish();
88 let storage = SharedStorage::default();
90 let subscriber = subscriber.with(CaptureLayer::new(&storage));
91
92 let _guard = tracing::subscriber::set_default(subscriber);
94
95 let batcher = Batcher::builder()
96 .name("test_tracing")
97 .processor(SimpleBatchProcessor(Duration::from_millis(10)))
98 .limits(Limits::default().with_max_batch_size(2))
99 .batching_policy(BatchingPolicy::Size)
100 .build();
101
102 let h1 = {
103 tokio_test::task::spawn({
104 let span = span!(Level::INFO, "test_handler_span1");
105
106 batcher
107 .add("A".to_string(), "1".to_string())
108 .instrument(span)
109 })
110 };
111 let h2 = {
112 tokio_test::task::spawn({
113 let span = span!(Level::INFO, "test_handler_span2");
114
115 batcher
116 .add("A".to_string(), "2".to_string())
117 .instrument(span)
118 })
119 };
120
121 let (o1, o2) = join!(h1, h2);
122
123 assert!(o1.is_ok());
124 assert!(o2.is_ok());
125
126 let worker = batcher.worker_handle();
127 worker.shut_down().await;
128 tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
129 .await
130 .expect("Worker should shut down");
131
132 let storage = storage.lock();
133
134 let outer_process_spans: Vec<_> = storage
135 .all_spans()
136 .filter(|span| span.metadata().name().contains("process batch"))
137 .collect();
138 assert_eq!(
139 outer_process_spans.len(),
140 1,
141 "should be a single outer span for processing the batch"
142 );
143 let process_span = outer_process_spans.first().unwrap().clone();
144
145 assert_eq!(
146 process_span["batch.size"], 2u64,
147 "batch.size shouldn't be emitted as a string",
148 );
149
150 assert_eq!(
151 process_span.follows_from().len(),
152 2,
153 "should follow from both handler spans"
154 );
155
156 let resource_spans: Vec<_> = storage
157 .all_spans()
158 .filter(|span| span.metadata().name().contains("acquire resources"))
159 .collect();
160 assert_eq!(
161 resource_spans.len(),
162 1,
163 "should be a single span for acquiring resources"
164 );
165 let resource_span_parent = resource_spans
166 .first()
167 .unwrap()
168 .parent()
169 .expect("resource span should have a parent");
170 assert_eq!(
171 resource_span_parent, process_span,
172 "resource acquisition should be a child of the outer process span"
173 );
174
175 let inner_process_spans: Vec<_> = storage
176 .all_spans()
177 .filter(|span| span.metadata().name().contains("process()"))
178 .collect();
179 assert_eq!(
180 inner_process_spans.len(),
181 1,
182 "should be a single inner span for processing the batch"
183 );
184 let inner_span_parent = inner_process_spans
185 .first()
186 .unwrap()
187 .parent()
188 .expect("resource span should have a parent");
189 assert_eq!(
190 inner_span_parent, process_span,
191 "resource acquisition should be a child of the outer process span"
192 );
193
194 let link_back_spans: Vec<_> = storage
195 .all_spans()
196 .filter(|span| span.metadata().name().contains("batch finished"))
197 .collect();
198 assert_eq!(
199 link_back_spans.len(),
200 2,
201 "should be two spans for linking back to the process span"
202 );
203
204 for span in link_back_spans {
205 assert_eq!(
206 span.follows_from().len(),
207 1,
208 "link back spans should follow from the process span"
209 );
210 }
211
212 assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
213 }
214}