1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../../README.md");
24
25mod batch;
26mod batch_inner;
27mod batch_queue;
28mod batcher;
29pub mod error;
30mod limits;
31mod policies;
32mod processor;
33mod timeout;
34mod worker;
35
36pub use batcher::Batcher;
37pub use error::BatchError;
38pub use limits::Limits;
39pub use policies::{BatchingPolicy, OnFull};
40pub use processor::Processor;
41pub use worker::WorkerHandle;
42
43#[cfg(test)]
44mod tests {
45 use std::time::Duration;
46
47 use tokio::join;
48 use tracing::{Instrument, span};
49
50 use crate::{Batcher, BatchingPolicy, Limits, Processor};
51
52 #[derive(Debug, Clone)]
53 pub struct SimpleBatchProcessor(pub Duration);
54
55 impl Processor for SimpleBatchProcessor {
56 type Key = String;
57 type Input = String;
58 type Output = String;
59 type Error = String;
60 type Resources = ();
61
62 async fn acquire_resources(&self, _key: String) -> Result<(), String> {
63 tokio::time::sleep(self.0).await;
64 Ok(())
65 }
66
67 async fn process(
68 &self,
69 key: String,
70 inputs: impl Iterator<Item = String> + Send,
71 _resources: (),
72 ) -> Result<Vec<String>, String> {
73 tokio::time::sleep(self.0).await;
74 Ok(inputs.map(|s| s + " processed for " + &key).collect())
75 }
76 }
77
78 #[tokio::test]
79 #[ignore = "flaky"]
80 async fn test_tracing() {
81 use tracing::Level;
82 use tracing_capture::{CaptureLayer, SharedStorage};
83 use tracing_subscriber::layer::SubscriberExt;
84
85 let subscriber = tracing_subscriber::fmt()
86 .pretty()
87 .with_max_level(Level::INFO)
88 .with_test_writer()
89 .finish();
90 let storage = SharedStorage::default();
92 let subscriber = subscriber.with(CaptureLayer::new(&storage));
93
94 let _guard = tracing::subscriber::set_default(subscriber);
96
97 let batcher = Batcher::builder()
98 .name("test_tracing")
99 .processor(SimpleBatchProcessor(Duration::from_millis(10)))
100 .limits(Limits::builder().max_batch_size(2).build())
101 .batching_policy(BatchingPolicy::Size)
102 .build();
103
104 let h1 = {
105 tokio_test::task::spawn({
106 let span = span!(Level::INFO, "test_handler_span1");
107
108 batcher
109 .add("A".to_string(), "1".to_string())
110 .instrument(span)
111 })
112 };
113 let h2 = {
114 tokio_test::task::spawn({
115 let span = span!(Level::INFO, "test_handler_span2");
116
117 batcher
118 .add("A".to_string(), "2".to_string())
119 .instrument(span)
120 })
121 };
122
123 let (o1, o2) = join!(h1, h2);
124
125 assert!(o1.is_ok());
126 assert!(o2.is_ok());
127
128 let worker = batcher.worker_handle();
129 worker.shut_down().await;
130 tokio::time::timeout(Duration::from_secs(1), worker.wait_for_shutdown())
131 .await
132 .expect("Worker should shut down");
133
134 let storage = storage.lock();
135
136 let outer_process_spans: Vec<_> = storage
137 .all_spans()
138 .filter(|span| span.metadata().name().contains("process batch"))
139 .collect();
140 assert_eq!(
141 outer_process_spans.len(),
142 1,
143 "should be a single outer span for processing the batch"
144 );
145 let process_span = *outer_process_spans.first().unwrap();
146
147 assert_eq!(
148 process_span["batch.size"], 2u64,
149 "batch.size shouldn't be emitted as a string",
150 );
151
152 assert_eq!(
153 process_span.follows_from().len(),
154 2,
155 "should follow from both handler spans"
156 );
157
158 let resource_spans: Vec<_> = storage
159 .all_spans()
160 .filter(|span| span.metadata().name().contains("acquire resources"))
161 .collect();
162 assert_eq!(
163 resource_spans.len(),
164 1,
165 "should be a single span for acquiring resources"
166 );
167 let resource_span_parent = resource_spans
168 .first()
169 .unwrap()
170 .parent()
171 .expect("resource span should have a parent");
172 assert_eq!(
173 resource_span_parent, process_span,
174 "resource acquisition should be a child of the outer process span"
175 );
176
177 let inner_process_spans: Vec<_> = storage
178 .all_spans()
179 .filter(|span| span.metadata().name().contains("process()"))
180 .collect();
181 assert_eq!(
182 inner_process_spans.len(),
183 1,
184 "should be a single inner span for processing the batch"
185 );
186 let inner_span_parent = inner_process_spans
187 .first()
188 .unwrap()
189 .parent()
190 .expect("resource span should have a parent");
191 assert_eq!(
192 inner_span_parent, process_span,
193 "resource acquisition should be a child of the outer process span"
194 );
195
196 let link_back_spans: Vec<_> = storage
197 .all_spans()
198 .filter(|span| span.metadata().name().contains("batch finished"))
199 .collect();
200 assert_eq!(
201 link_back_spans.len(),
202 2,
203 "should be two spans for linking back to the process span"
204 );
205
206 for span in link_back_spans {
207 assert_eq!(
208 span.follows_from().len(),
209 1,
210 "link back spans should follow from the process span"
211 );
212 }
213
214 assert_eq!(storage.all_spans().len(), 7, "should be 7 spans in total");
215 }
216}