streaming_flow/
streaming_flow.rs1use rust_logic_graph::{Context, Node};
11use rust_logic_graph::streaming::{
12 StreamNode, StreamProcessor, BackpressureConfig, ChunkConfig,
13 MapOperator, FilterOperator, FoldOperator,
14};
15use async_trait::async_trait;
16use serde_json::Value;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20struct NumberProcessor;
22
23#[async_trait]
24impl StreamProcessor for NumberProcessor {
25 async fn process_item(&self, item: Value, _ctx: &Context) -> Result<Value, rust_logic_graph::RuleError> {
26 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
28
29 if let Some(n) = item.as_i64() {
30 Ok(Value::Number((n * 2).into()))
31 } else {
32 Ok(item)
33 }
34 }
35}
36
37#[tokio::main]
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("š Streaming Processing Example\n");
42
43 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("ā Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("ā Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("ā Results: {:?}\n", results);
122 }
123
124 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("ā Even numbers: {:?}\n", results);
143 }
144
145 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("ā Sum: {:?}\n", result);
169
170 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers ā filter(even) ā map(square) ā collect");
174
175 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("ā Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" ⢠Memory efficient - processes items incrementally");
207 println!(" ⢠Backpressure handling - prevents overwhelming consumers");
208 println!(" ⢠Large dataset support - chunking for massive data");
209 println!(" ⢠Concurrent processing - multiple items in parallel");
210 println!(" ⢠Composable operators - chain transformations");
211 println!("\nš Example completed!");
212
213 Ok(())
214}