1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Batch up multiple items for processing as a single unit.
//!
//! _I got 99 problems, but a batch ain't one..._
//!
//! Sometimes it is more efficient to process many items at once rather than one at a time.
//! Especially when the processing step has overheads which can be shared between many items.
//!
//! A worker task is run in the background and items are submitted to it for batching. Batches are
//! processed in their own tasks, concurrently.
//!
//! See the README for an example.

#![deny(missing_docs)]

#[cfg(doctest)]
use doc_comment::doctest;
#[cfg(doctest)]
doctest!("../README.md");

mod batch;
mod batcher;
mod error;
mod policies;
mod worker;

pub use batcher::{Batcher, Processor};
pub use error::BatchError;
pub use policies::{BatchingPolicy, Limits, OnFull};

#[cfg(test)]
mod test {
    use std::time::Duration;

    use async_trait::async_trait;
    use tokio::join;
    use tracing::{span, Instrument};

    use crate::{Batcher, BatchingPolicy, Limits, Processor};

    #[derive(Debug, Clone)]
    pub struct SimpleBatchProcessor(pub Duration);

    #[async_trait]
    impl Processor<String, String, String> for SimpleBatchProcessor {
        async fn process(
            &self,
            key: String,
            inputs: impl Iterator<Item = String> + Send,
        ) -> Result<Vec<String>, String> {
            tokio::time::sleep(self.0).await;
            Ok(inputs.map(|s| s + " processed for " + &key).collect())
        }
    }

    #[tokio::test]
    async fn test_tracing() {
        use tracing::Level;
        use tracing_capture::{CaptureLayer, SharedStorage};
        use tracing_subscriber::layer::SubscriberExt;

        let subscriber = tracing_subscriber::fmt()
            .pretty()
            .with_max_level(Level::INFO)
            .finish();
        // Add the capturing layer.
        let storage = SharedStorage::default();
        let subscriber = subscriber.with(CaptureLayer::new(&storage));

        // Capture tracing information.
        let _guard = tracing::subscriber::set_default(subscriber);

        let batcher = Batcher::new(
            SimpleBatchProcessor(Duration::ZERO),
            Limits::default().max_batch_size(2),
            BatchingPolicy::Size,
        );

        let h1 = {
            tokio_test::task::spawn({
                let span = span!(Level::INFO, "test_handler_span1");

                batcher
                    .add("A".to_string(), "1".to_string())
                    .instrument(span)
            })
        };
        let h2 = {
            tokio_test::task::spawn({
                let span = span!(Level::INFO, "test_handler_span2");

                batcher
                    .add("A".to_string(), "2".to_string())
                    .instrument(span)
            })
        };

        let (_o1, _o2) = join!(h1, h2);

        let storage = storage.lock();

        let process_span: Vec<_> = storage
            .all_spans()
            .filter(|span| span.metadata().name().contains("process batch"))
            .collect();
        assert_eq!(
            process_span.len(),
            1,
            "should be a single span for processing the batch"
        );

        assert_eq!(
            process_span.first().unwrap().follows_from().len(),
            2,
            "should follow from both handler spans"
        );

        let link_back_spans: Vec<_> = storage
            .all_spans()
            .filter(|span| span.metadata().name().contains("batch finished"))
            .collect();
        assert_eq!(
            link_back_spans.len(),
            2,
            "should be two spans for linking back to the process span"
        );

        for span in link_back_spans {
            assert_eq!(
                span.follows_from().len(),
                1,
                "should follow from the process span"
            );
        }
    }
}