1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../../README.md");
24
25macro_rules! soft_assert {
28 ($cond:expr, $($arg:tt)+) => {
29 if !$cond {
30 tracing::warn!($($arg)+);
31 debug_assert!(false, $($arg)+);
32 }
33 };
34}
35
36mod batch;
37mod batch_inner;
38mod batch_queue;
39mod batcher;
40pub mod error;
41mod limits;
42mod policies;
43mod processor;
44mod timeout;
45mod worker;
46
47pub use batcher::Batcher;
48pub use error::BatchError;
49pub use limits::Limits;
50pub use policies::{BatchingPolicy, OnFull};
51pub use processor::Processor;
52pub use worker::WorkerHandle;
53
54#[cfg(test)]
55mod tests {
56 use std::time::Duration;
57
58 use tokio::join;
59 use tracing::{Instrument, span};
60
61 use crate::{Batcher, BatchingPolicy, Limits, Processor};
62
63 #[derive(Debug, Clone)]
64 pub struct SimpleBatchProcessor(pub Duration);
65
66 impl Processor for SimpleBatchProcessor {
67 type Key = String;
68 type Input = String;
69 type Output = String;
70 type Error = String;
71 type Resources = ();
72
73 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
74 tokio::time::sleep(self.0).await;
75 Ok(())
76 }
77
78 async fn process(
79 &self,
80 key: String,
81 inputs: impl Iterator<Item = String> + Send,
82 _resources: (),
83 ) -> Result<Vec<String>, String> {
84 tokio::time::sleep(self.0).await;
85 Ok(inputs.map(|s| s + " processed for " + &key).collect())
86 }
87 }
88
89 #[tokio::test]
90 #[ignore = "flaky"]
91 async fn test_tracing() {
92 use tracing::Level;
93 use tracing_capture::{CaptureLayer, SharedStorage};
94 use tracing_subscriber::layer::SubscriberExt;
95
96 let subscriber = tracing_subscriber::fmt()
97 .pretty()
98 .with_max_level(Level::INFO)
99 .with_test_writer()
100 .finish();
101 let storage = SharedStorage::default();
103 let subscriber = subscriber.with(CaptureLayer::new(&storage));
104
105 let _guard = tracing::subscriber::set_default(subscriber);
107
108 let batcher = Batcher::builder()
109 .name("test_tracing")
110 .processor(SimpleBatchProcessor(Duration::from_millis(10)))
111 .limits(Limits::builder().max_batch_size(2).build())
112 .batching_policy(BatchingPolicy::Size)
113 .build();
114
115 let h1 = {
116 tokio_test::task::spawn({
117 let span = span!(Level::INFO, "test_handler_span1");
118
119 batcher
120 .add("A".to_string(), "1".to_string())
121 .instrument(span)
122 })
123 };
124 let h2 = {
125 tokio_test::task::spawn({
126 let span = span!(Level::INFO, "test_handler_span2");
127
128 batcher
129 .add("A".to_string(), "2".to_string())
130 .instrument(span)
131 })
132 };
133
134 let (o1, o2) = join!(h1, h2);
135
136 assert!(o1.is_ok());
137 assert!(o2.is_ok());
138
139 let worker = batcher.worker_handle();
140 worker.shut_down().await;
141 tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
142 .await
143 .expect("Worker should shut down");
144
145 let storage = storage.lock();
146
147 let outer_process_spans: Vec<_> = storage
148 .all_spans()
149 .filter(|span| span.metadata().name().contains("process batch"))
150 .collect();
151 assert_eq!(
152 outer_process_spans.len(),
153 1,
154 "should be a single outer span for processing the batch"
155 );
156 let process_span = *outer_process_spans.first().unwrap();
157
158 assert_eq!(
159 process_span["batch.size"], 2u64,
160 "batch.size shouldn't be emitted as a string",
161 );
162
163 assert_eq!(
164 process_span.follows_from().len(),
165 2,
166 "should follow from both handler spans"
167 );
168
169 let resource_spans: Vec<_> = storage
170 .all_spans()
171 .filter(|span| span.metadata().name().contains("acquire resources"))
172 .collect();
173 assert_eq!(
174 resource_spans.len(),
175 1,
176 "should be a single span for acquiring resources"
177 );
178 let resource_span_parent = resource_spans
179 .first()
180 .unwrap()
181 .parent()
182 .expect("resource span should have a parent");
183 assert_eq!(
184 resource_span_parent, process_span,
185 "resource acquisition should be a child of the outer process span"
186 );
187
188 let inner_process_spans: Vec<_> = storage
189 .all_spans()
190 .filter(|span| span.metadata().name().contains("process()"))
191 .collect();
192 assert_eq!(
193 inner_process_spans.len(),
194 1,
195 "should be a single inner span for processing the batch"
196 );
197 let inner_span_parent = inner_process_spans
198 .first()
199 .unwrap()
200 .parent()
201 .expect("resource span should have a parent");
202 assert_eq!(
203 inner_span_parent, process_span,
204 "resource acquisition should be a child of the outer process span"
205 );
206
207 let link_back_spans: Vec<_> = storage
208 .all_spans()
209 .filter(|span| span.metadata().name().contains("batch finished"))
210 .collect();
211 assert_eq!(
212 link_back_spans.len(),
213 2,
214 "should be two spans for linking back to the process span"
215 );
216
217 for span in link_back_spans {
218 assert_eq!(
219 span.follows_from().len(),
220 1,
221 "link back spans should follow from the process span"
222 );
223 }
224
225 assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
226 }
227}