1#![deny(missing_docs)]
19
20#[cfg(doctest)]
21use doc_comment::doctest;
22#[cfg(doctest)]
23doctest!("../README.md");
24
25mod batch;
26mod batch_queue;
27mod batcher;
28mod error;
29mod policies;
30mod worker;
31
32pub use batcher::{Batcher, Processor};
33pub use error::BatchError;
34pub use policies::{BatchingPolicy, Limits, OnFull};
35
36#[cfg(test)]
37mod tests {
38 use std::time::Duration;
39
40 use async_trait::async_trait;
41 use tokio::join;
42 use tracing::{span, Instrument};
43
44 use crate::{Batcher, BatchingPolicy, Limits, Processor};
45
46 #[derive(Debug, Clone)]
47 pub struct SimpleBatchProcessor(pub Duration);
48
49 #[async_trait]
50 impl Processor<String, String, String> for SimpleBatchProcessor {
51 async fn process(
52 &self,
53 key: String,
54 inputs: impl Iterator<Item = String> + Send,
55 ) -> Result<Vec<String>, String> {
56 tokio::time::sleep(self.0).await;
57 Ok(inputs.map(|s| s + " processed for " + &key).collect())
58 }
59 }
60
61 #[tokio::test]
62 async fn test_tracing() {
63 use tracing::Level;
64 use tracing_capture::{CaptureLayer, SharedStorage};
65 use tracing_subscriber::layer::SubscriberExt;
66
67 let subscriber = tracing_subscriber::fmt()
68 .pretty()
69 .with_max_level(Level::INFO)
70 .finish();
71 let storage = SharedStorage::default();
73 let subscriber = subscriber.with(CaptureLayer::new(&storage));
74
75 let _guard = tracing::subscriber::set_default(subscriber);
77
78 let batcher = Batcher::new(
79 SimpleBatchProcessor(Duration::ZERO),
80 Limits::default().max_batch_size(2),
81 BatchingPolicy::Size,
82 );
83
84 let h1 = {
85 tokio_test::task::spawn({
86 let span = span!(Level::INFO, "test_handler_span1");
87
88 batcher
89 .add("A".to_string(), "1".to_string())
90 .instrument(span)
91 })
92 };
93 let h2 = {
94 tokio_test::task::spawn({
95 let span = span!(Level::INFO, "test_handler_span2");
96
97 batcher
98 .add("A".to_string(), "2".to_string())
99 .instrument(span)
100 })
101 };
102
103 let (o1, o2) = join!(h1, h2);
104
105 assert!(o1.is_ok());
106 assert!(o2.is_ok());
107
108 let storage = storage.lock();
109
110 let process_spans: Vec<_> = storage
111 .all_spans()
112 .filter(|span| span.metadata().name().contains("process batch"))
113 .collect();
114 assert_eq!(
115 process_spans.len(),
116 1,
117 "should be a single span for processing the batch"
118 );
119
120 let process_span = process_spans.first().unwrap();
121
122 assert_eq!(
123 process_span["batch_size"], 2u64,
124 "batch_size shouldn't be emitted as a string",
125 );
126
127 assert_eq!(
128 process_span.follows_from().len(),
129 2,
130 "should follow from both handler spans"
131 );
132
133 let link_back_spans: Vec<_> = storage
134 .all_spans()
135 .filter(|span| span.metadata().name().contains("batch finished"))
136 .collect();
137 assert_eq!(
138 link_back_spans.len(),
139 2,
140 "should be two spans for linking back to the process span"
141 );
142
143 for span in link_back_spans {
144 assert_eq!(
145 span.follows_from().len(),
146 1,
147 "link back spans should follow from the process span"
148 );
149 }
150
151 assert_eq!(storage.all_spans().len(), 5, "should be 5 spans in total");
152 }
153}