data_pipeline_rs/
lib.rs

1pub mod data_handler;
2pub mod handlers;
3pub mod node;
4pub mod node_visitor;
5pub mod pipeline_builder;
6pub mod stats_producer;
7
8#[cfg(test)]
9mod test {
10    use std::{fmt::Debug, time::Instant};
11
12    use serde_json::json;
13
14    use crate::{
15        data_handler::{DataObserver, SomeDataHandler},
16        handlers::static_demuxer::{ConditionalPath, StaticDemuxer},
17        node::Node,
18        node_visitor::StatsNodeVisitor,
19        pipeline_builder::PipelineBuilder,
20    };
21
22    #[derive(Default)]
23    struct DataLogger {
24        num_logs: u32,
25    }
26
27    impl<T> DataObserver<T> for DataLogger
28    where
29        T: Debug,
30    {
31        fn observe(&mut self, data: &T) {
32            self.num_logs += 1;
33            println!("{data:?}")
34        }
35
36        fn get_stats(&self) -> Option<serde_json::Value> {
37            Some(json!({
38                "num_logs": self.num_logs,
39            }))
40        }
41    }
42
43    impl<T> From<DataLogger> for SomeDataHandler<T>
44    where
45        T: Debug,
46    {
47        fn from(val: DataLogger) -> Self {
48            SomeDataHandler::Observer(Box::new(val))
49        }
50    }
51
52    // impl_conversion_to_some_data_handler!(DataLogger, Observer);
53
54    #[test]
55    fn test() {
56        let num_items = 1000000;
57        let num_nodes = 10;
58        // let first_node = NodeRef::new(Node::new("1"));
59        // let mut prev_node = first_node.clone();
60        let mut builder = PipelineBuilder::default();
61        for i in 0..num_nodes {
62            builder = builder.attach(Node::new(format!("{i}"), DataLogger::default()));
63        }
64
65        let first_node = builder.build();
66        let start = Instant::now();
67        for i in 0..num_items {
68            first_node.process_data(i);
69        }
70        let duration = Instant::now() - start;
71        println!(
72            "{num_nodes} nodes processed {num_items} items in {}ms ({} items/msec)",
73            duration.as_millis(),
74            (num_items as u128 / duration.as_millis())
75        );
76        let mut stats = StatsNodeVisitor::default();
77        first_node.visit(&mut stats);
78        println!("{:#}", stats);
79    }
80
81    #[test]
82    fn test_builder() {
83        let pipeline = PipelineBuilder::new()
84            .demux(
85                "odd/even demuxer",
86                StaticDemuxer::new(vec![
87                    ConditionalPath {
88                        predicate: Box::new(|num: &u32| num % 2 == 0),
89                        next: PipelineBuilder::new()
90                            .attach(Node::new("1a", DataLogger::default()))
91                            .attach(Node::new("2a", DataLogger::default()))
92                            .attach(Node::new("3a", DataLogger::default()))
93                            .build(),
94                    },
95                    ConditionalPath {
96                        predicate: Box::new(|num: &u32| num % 2 == 1),
97                        next: PipelineBuilder::new()
98                            .attach(Node::new("1b", DataLogger::default()))
99                            .attach(Node::new("2b", DataLogger::default()))
100                            .attach(Node::new("3b", DataLogger::default()))
101                            .build(),
102                    },
103                ]),
104            )
105            .build();
106
107        for i in 0..10 {
108            pipeline.process_data(i);
109        }
110        let mut stats = StatsNodeVisitor::default();
111        pipeline.visit(&mut stats);
112        println!("{:#}", stats);
113    }
114}