1pub mod data_handler;
2pub mod handlers;
3pub mod node;
4pub mod node_visitor;
5pub mod pipeline_builder;
6pub mod stats_producer;
7
8#[cfg(test)]
9mod test {
10 use std::{fmt::Debug, time::Instant};
11
12 use serde_json::json;
13
14 use crate::{
15 data_handler::{DataObserver, SomeDataHandler},
16 handlers::static_demuxer::{ConditionalPath, StaticDemuxer},
17 node::Node,
18 node_visitor::StatsNodeVisitor,
19 pipeline_builder::PipelineBuilder,
20 };
21
22 #[derive(Default)]
23 struct DataLogger {
24 num_logs: u32,
25 }
26
27 impl<T> DataObserver<T> for DataLogger
28 where
29 T: Debug,
30 {
31 fn observe(&mut self, data: &T) {
32 self.num_logs += 1;
33 println!("{data:?}")
34 }
35
36 fn get_stats(&self) -> Option<serde_json::Value> {
37 Some(json!({
38 "num_logs": self.num_logs,
39 }))
40 }
41 }
42
43 impl<T> From<DataLogger> for SomeDataHandler<T>
44 where
45 T: Debug,
46 {
47 fn from(val: DataLogger) -> Self {
48 SomeDataHandler::Observer(Box::new(val))
49 }
50 }
51
52 #[test]
55 fn test() {
56 let num_items = 1000000;
57 let num_nodes = 10;
58 let mut builder = PipelineBuilder::default();
61 for i in 0..num_nodes {
62 builder = builder.attach(Node::new(format!("{i}"), DataLogger::default()));
63 }
64
65 let first_node = builder.build();
66 let start = Instant::now();
67 for i in 0..num_items {
68 first_node.process_data(i);
69 }
70 let duration = Instant::now() - start;
71 println!(
72 "{num_nodes} nodes processed {num_items} items in {}ms ({} items/msec)",
73 duration.as_millis(),
74 (num_items as u128 / duration.as_millis())
75 );
76 let mut stats = StatsNodeVisitor::default();
77 first_node.visit(&mut stats);
78 println!("{:#}", stats);
79 }
80
81 #[test]
82 fn test_builder() {
83 let pipeline = PipelineBuilder::new()
84 .demux(
85 "odd/even demuxer",
86 StaticDemuxer::new(vec![
87 ConditionalPath {
88 predicate: Box::new(|num: &u32| num % 2 == 0),
89 next: PipelineBuilder::new()
90 .attach(Node::new("1a", DataLogger::default()))
91 .attach(Node::new("2a", DataLogger::default()))
92 .attach(Node::new("3a", DataLogger::default()))
93 .build(),
94 },
95 ConditionalPath {
96 predicate: Box::new(|num: &u32| num % 2 == 1),
97 next: PipelineBuilder::new()
98 .attach(Node::new("1b", DataLogger::default()))
99 .attach(Node::new("2b", DataLogger::default()))
100 .attach(Node::new("3b", DataLogger::default()))
101 .build(),
102 },
103 ]),
104 )
105 .build();
106
107 for i in 0..10 {
108 pipeline.process_data(i);
109 }
110 let mut stats = StatsNodeVisitor::default();
111 pipeline.visit(&mut stats);
112 println!("{:#}", stats);
113 }
114}