StreamNode

Struct StreamNode 

Source
pub struct StreamNode {
    pub id: String,
    pub processor: Arc<dyn StreamProcessor>,
    pub backpressure_config: BackpressureConfig,
    pub chunk_config: Option<ChunkConfig>,
    pub collect_results: bool,
}
Expand description

Stream node for processing data streams

Fields§

§id: String§processor: Arc<dyn StreamProcessor>§backpressure_config: BackpressureConfig§chunk_config: Option<ChunkConfig>§collect_results: bool

Implementations§

Source§

impl StreamNode

Source

pub fn new(id: impl Into<String>, processor: Arc<dyn StreamProcessor>) -> Self

Create a new stream node

Examples found in repository?
examples/streaming_flow.rs (line 49)
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("🚀 Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("✓ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("✓ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("✓ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("✓ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("✓ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("✓ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\n🎉 Example completed!");
212
213    Ok(())
214}
Source

pub fn with_backpressure(self, config: BackpressureConfig) -> Self

Configure backpressure

Examples found in repository?
examples/streaming_flow.rs (lines 50-53)
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("🚀 Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("✓ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("✓ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("✓ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("✓ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("✓ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("✓ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\n🎉 Example completed!");
212
213    Ok(())
214}
Source

pub fn with_chunking(self, config: ChunkConfig) -> Self

Enable chunked processing for large datasets

Examples found in repository?
examples/streaming_flow.rs (lines 76-79)
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("🚀 Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("✓ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("✓ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("✓ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("✓ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("✓ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("✓ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\n🎉 Example completed!");
212
213    Ok(())
214}
Source

pub fn collect_results(self, collect: bool) -> Self

Set whether to collect all results (default: true) If false, only the last result is stored

Examples found in repository?
examples/streaming_flow.rs (line 159)
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("🚀 Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("✓ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("✓ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("✓ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("✓ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("✓ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("✓ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\n🎉 Example completed!");
212
213    Ok(())
214}
Source

pub async fn process_stream( &self, data: Vec<Value>, ctx: &Context, ) -> RuleResult

Process a stream of data

Examples found in repository?
examples/streaming_flow.rs (line 61)
38async fn main() -> anyhow::Result<()> {
39    tracing_subscriber::fmt::init();
40
41    println!("🚀 Streaming Processing Example\n");
42
43    // Example 1: Basic streaming with backpressure
44    println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48    let processor = Arc::new(NumberProcessor);
49    let node = StreamNode::new("processor", processor)
50        .with_backpressure(BackpressureConfig {
51            buffer_size: 10,
52            max_concurrent: 5,
53        });
54
55    let ctx = Context {
56        data: HashMap::new(),
57    };
58
59    println!("Processing 100 numbers with backpressure...");
60    let start = std::time::Instant::now();
61    let result = node.process_stream(data, &ctx).await?;
62    let duration = start.elapsed();
63
64    if let Value::Array(results) = result {
65        println!("✓ Processed {} items in {:?}", results.len(), duration);
66        println!("  First: {:?}, Last: {:?}\n", results.first(), results.last());
67    }
68
69    // Example 2: Large dataset with chunking
70    println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72    let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74    let processor = Arc::new(NumberProcessor);
75    let node = StreamNode::new("chunked_processor", processor)
76        .with_chunking(ChunkConfig {
77            chunk_size: 1000,
78            overlap: 0,
79        })
80        .with_backpressure(BackpressureConfig {
81            buffer_size: 100,
82            max_concurrent: 10,
83        });
84
85    let ctx = Context {
86        data: HashMap::new(),
87    };
88
89    println!("Processing 10,000 numbers in chunks of 1,000...");
90    let start = std::time::Instant::now();
91    let result = node.process_stream(large_data, &ctx).await?;
92    let duration = start.elapsed();
93
94    if let Value::Array(results) = result {
95        println!("✓ Processed {} items in {:?}", results.len(), duration);
96        println!("  Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97    }
98
99    // Example 3: Map operator
100    println!("=== Example 3: Map Operator ===\n");
101
102    let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104    let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105        if let Some(n) = v.as_i64() {
106            Value::Number((n * n).into())
107        } else {
108            v
109        }
110    }));
111
112    let node = StreamNode::new("map_node", map_op);
113    let ctx = Context {
114        data: HashMap::new(),
115    };
116
117    println!("Squaring numbers 1-10...");
118    let result = node.process_stream(data, &ctx).await?;
119
120    if let Value::Array(results) = result {
121        println!("✓ Results: {:?}\n", results);
122    }
123
124    // Example 4: Filter operator
125    println!("=== Example 4: Filter Operator ===\n");
126
127    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129    let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131    }));
132
133    let node = StreamNode::new("filter_node", filter_op);
134    let ctx = Context {
135        data: HashMap::new(),
136    };
137
138    println!("Filtering even numbers from 1-20...");
139    let result = node.process_stream(data, &ctx).await?;
140
141    if let Value::Array(results) = result {
142        println!("✓ Even numbers: {:?}\n", results);
143    }
144
145    // Example 5: Fold operator (sum)
146    println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148    let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150    let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151        acc + v.as_i64().unwrap_or(0)
152    }));
153
154    let node = StreamNode::new("fold_node", fold_op)
155        .with_chunking(ChunkConfig {
156            chunk_size: 100,
157            overlap: 0,
158        })
159        .collect_results(false); // Only return final result
160
161    let ctx = Context {
162        data: HashMap::new(),
163    };
164
165    println!("Summing numbers 1-100...");
166    let result = node.process_stream(data, &ctx).await?;
167
168    println!("✓ Sum: {:?}\n", result);
169
170    // Example 6: Chained operations
171    println!("=== Example 6: Chained Operations ===\n");
172
173    println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175    // First: filter even numbers
176    let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178    let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179        v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180    }));
181
182    let filter_node = StreamNode::new("filter", filter_op);
183    let ctx = Context {
184        data: HashMap::new(),
185    };
186
187    let filtered = filter_node.process_stream(data, &ctx).await?;
188
189    // Then: square the numbers
190    if let Value::Array(filtered_data) = filtered {
191        let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192            if let Some(n) = v.as_i64() {
193                Value::Number((n * n).into())
194            } else {
195                v
196            }
197        }));
198
199        let map_node = StreamNode::new("square", map_op);
200        let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202        println!("✓ Results: {:?}\n", result);
203    }
204
205    println!("=== Benefits of Streaming Processing ===");
206    println!("  • Memory efficient - processes items incrementally");
207    println!("  • Backpressure handling - prevents overwhelming consumers");
208    println!("  • Large dataset support - chunking for massive data");
209    println!("  • Concurrent processing - multiple items in parallel");
210    println!("  • Composable operators - chain transformations");
211    println!("\n🎉 Example completed!");
212
213    Ok(())
214}

Trait Implementations§

Source§

impl Clone for StreamNode

Source§

fn clone(&self) -> StreamNode

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for StreamNode

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Node for StreamNode

Source§

fn id(&self) -> &str

Source§

fn node_type(&self) -> NodeType

Source§

fn run<'life0, 'life1, 'async_trait>( &'life0 self, ctx: &'life1 mut Context, ) -> Pin<Box<dyn Future<Output = RuleResult> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more