streaming_flow/
streaming_flow.rs

1//! Streaming Processing Example
2//!
3//! This example demonstrates stream-based node execution with:
4//! - Backpressure handling
5//! - Large dataset chunking
6//! - Stream transformation operators (map, filter, fold)
7//!
8//! To run: cargo run --example streaming_flow
9
10use rust_logic_graph::{Context, Node};
11use rust_logic_graph::streaming::{
12    StreamNode, StreamProcessor, BackpressureConfig, ChunkConfig,
13    MapOperator, FilterOperator, FoldOperator,
14};
15use async_trait::async_trait;
16use serde_json::Value;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20// Custom stream processor
21struct NumberProcessor;
22
23#[async_trait]
24impl StreamProcessor for NumberProcessor {
25    async fn process_item(&self, item: Value, _ctx: &Context) -> Result<Value, rust_logic_graph::RuleError> {
26        // Simulate some async processing
27        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
28
29        if let Some(n) = item.as_i64() {
30            Ok(Value::Number((n * 2).into()))
31        } else {
32            Ok(item)
33        }
34    }
35}
36
37#[tokio::main]
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("šŸš€ Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("āœ“ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("āœ“ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("āœ“ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("āœ“ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("āœ“ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("āœ“ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\nšŸŽ‰ Example completed!");
212
213    Ok(())
214}