1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_queue;
27mod batcher;
28mod error;
29mod policies;
30mod worker;
31
32pub use batcher::{Batcher, Processor};
33pub use error::BatchError;
34pub use policies::{BatchingPolicy, Limits, OnFull};
35
36#[cfg(test)]
37mod tests {
38 use std::time::Duration;
39
40 use tokio::join;
41 use tracing::{span, Instrument};
42
43 use crate::{Batcher, BatchingPolicy, Limits, Processor};
44
45 #[derive(Debug, Clone)]
46 pub struct SimpleBatchProcessor(pub Duration);
47
48 impl Processor<String, String, String> for SimpleBatchProcessor {
49 async fn process(
50 &self,
51 key: String,
52 inputs: impl Iterator<Item = String> + Send,
53 ) -> Result<Vec<String>, String> {
54 tokio::time::sleep(self.0).await;
55 Ok(inputs.map(|s| s + " processed for " + &key).collect())
56 }
57 }
58
59 #[tokio::test]
60 async fn test_tracing() {
61 use tracing::Level;
62 use tracing_capture::{CaptureLayer, SharedStorage};
63 use tracing_subscriber::layer::SubscriberExt;
64
65 let subscriber = tracing_subscriber::fmt()
66 .pretty()
67 .with_max_level(Level::INFO)
68 .finish();
69 let storage = SharedStorage::default();
71 let subscriber = subscriber.with(CaptureLayer::new(&storage));
72
73 let _guard = tracing::subscriber::set_default(subscriber);
75
76 let batcher = Batcher::new(
77 SimpleBatchProcessor(Duration::ZERO),
78 Limits::default().max_batch_size(2),
79 BatchingPolicy::Size,
80 );
81
82 let h1 = {
83 tokio_test::task::spawn({
84 let span = span!(Level::INFO, "test_handler_span1");
85
86 batcher
87 .add("A".to_string(), "1".to_string())
88 .instrument(span)
89 })
90 };
91 let h2 = {
92 tokio_test::task::spawn({
93 let span = span!(Level::INFO, "test_handler_span2");
94
95 batcher
96 .add("A".to_string(), "2".to_string())
97 .instrument(span)
98 })
99 };
100
101 let (o1, o2) = join!(h1, h2);
102
103 assert!(o1.is_ok());
104 assert!(o2.is_ok());
105
106 let storage = storage.lock();
107
108 let process_spans: Vec<_> = storage
109 .all_spans()
110 .filter(|span| span.metadata().name().contains("process batch"))
111 .collect();
112 assert_eq!(
113 process_spans.len(),
114 1,
115 "should be a single span for processing the batch"
116 );
117
118 let process_span = process_spans.first().unwrap();
119
120 assert_eq!(
121 process_span["batch_size"], 2u64,
122 "batch_size shouldn't be emitted as a string",
123 );
124
125 assert_eq!(
126 process_span.follows_from().len(),
127 2,
128 "should follow from both handler spans"
129 );
130
131 let link_back_spans: Vec<_> = storage
132 .all_spans()
133 .filter(|span| span.metadata().name().contains("batch finished"))
134 .collect();
135 assert_eq!(
136 link_back_spans.len(),
137 2,
138 "should be two spans for linking back to the process span"
139 );
140
141 for span in link_back_spans {
142 assert_eq!(
143 span.follows_from().len(),
144 1,
145 "link back spans should follow from the process span"
146 );
147 }
148
149 assert_eq!(storage.all_spans().len(), 5, "should be 5 spans in total");
150 }
151}