pub struct StreamNode {
pub id: String,
pub processor: Arc<dyn StreamProcessor>,
pub backpressure_config: BackpressureConfig,
pub chunk_config: Option<ChunkConfig>,
pub collect_results: bool,
}Expand description
Stream node for processing data streams
Fields§
§id: String§processor: Arc<dyn StreamProcessor>§backpressure_config: BackpressureConfig§chunk_config: Option<ChunkConfig>§collect_results: boolImplementations§
Source§impl StreamNode
impl StreamNode
Sourcepub fn new(id: impl Into<String>, processor: Arc<dyn StreamProcessor>) -> Self
pub fn new(id: impl Into<String>, processor: Arc<dyn StreamProcessor>) -> Self
Create a new stream node
Examples found in repository?
examples/streaming_flow.rs (line 49)
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("🚀 Streaming Processing Example\n");
42
43 // Example 1: Basic streaming with backpressure
44 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("✓ Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 // Example 2: Large dataset with chunking
70 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("✓ Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 // Example 3: Map operator
100 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("✓ Results: {:?}\n", results);
122 }
123
124 // Example 4: Filter operator
125 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("✓ Even numbers: {:?}\n", results);
143 }
144
145 // Example 5: Fold operator (sum)
146 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); // Only return final result
160
161 let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("✓ Sum: {:?}\n", result);
169
170 // Example 6: Chained operations
171 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175 // First: filter even numbers
176 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 // Then: square the numbers
190 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("✓ Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" • Memory efficient - processes items incrementally");
207 println!(" • Backpressure handling - prevents overwhelming consumers");
208 println!(" • Large dataset support - chunking for massive data");
209 println!(" • Concurrent processing - multiple items in parallel");
210 println!(" • Composable operators - chain transformations");
211 println!("\n🎉 Example completed!");
212
213 Ok(())
214}Sourcepub fn with_backpressure(self, config: BackpressureConfig) -> Self
pub fn with_backpressure(self, config: BackpressureConfig) -> Self
Configure backpressure
Examples found in repository?
examples/streaming_flow.rs (lines 50-53)
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("🚀 Streaming Processing Example\n");
42
43 // Example 1: Basic streaming with backpressure
44 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("✓ Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 // Example 2: Large dataset with chunking
70 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("✓ Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 // Example 3: Map operator
100 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("✓ Results: {:?}\n", results);
122 }
123
124 // Example 4: Filter operator
125 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("✓ Even numbers: {:?}\n", results);
143 }
144
145 // Example 5: Fold operator (sum)
146 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); // Only return final result
160
161 let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("✓ Sum: {:?}\n", result);
169
170 // Example 6: Chained operations
171 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175 // First: filter even numbers
176 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 // Then: square the numbers
190 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("✓ Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" • Memory efficient - processes items incrementally");
207 println!(" • Backpressure handling - prevents overwhelming consumers");
208 println!(" • Large dataset support - chunking for massive data");
209 println!(" • Concurrent processing - multiple items in parallel");
210 println!(" • Composable operators - chain transformations");
211 println!("\n🎉 Example completed!");
212
213 Ok(())
214}Sourcepub fn with_chunking(self, config: ChunkConfig) -> Self
pub fn with_chunking(self, config: ChunkConfig) -> Self
Enable chunked processing for large datasets
Examples found in repository?
examples/streaming_flow.rs (lines 76-79)
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("🚀 Streaming Processing Example\n");
42
43 // Example 1: Basic streaming with backpressure
44 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("✓ Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 // Example 2: Large dataset with chunking
70 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("✓ Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 // Example 3: Map operator
100 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("✓ Results: {:?}\n", results);
122 }
123
124 // Example 4: Filter operator
125 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("✓ Even numbers: {:?}\n", results);
143 }
144
145 // Example 5: Fold operator (sum)
146 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); // Only return final result
160
161 let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("✓ Sum: {:?}\n", result);
169
170 // Example 6: Chained operations
171 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175 // First: filter even numbers
176 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 // Then: square the numbers
190 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("✓ Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" • Memory efficient - processes items incrementally");
207 println!(" • Backpressure handling - prevents overwhelming consumers");
208 println!(" • Large dataset support - chunking for massive data");
209 println!(" • Concurrent processing - multiple items in parallel");
210 println!(" • Composable operators - chain transformations");
211 println!("\n🎉 Example completed!");
212
213 Ok(())
214}Sourcepub fn collect_results(self, collect: bool) -> Self
pub fn collect_results(self, collect: bool) -> Self
Set whether to collect all results (default: true) If false, only the last result is stored
Examples found in repository?
examples/streaming_flow.rs (line 159)
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("🚀 Streaming Processing Example\n");
42
43 // Example 1: Basic streaming with backpressure
44 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("✓ Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 // Example 2: Large dataset with chunking
70 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("✓ Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 // Example 3: Map operator
100 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("✓ Results: {:?}\n", results);
122 }
123
124 // Example 4: Filter operator
125 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("✓ Even numbers: {:?}\n", results);
143 }
144
145 // Example 5: Fold operator (sum)
146 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); // Only return final result
160
161 let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("✓ Sum: {:?}\n", result);
169
170 // Example 6: Chained operations
171 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175 // First: filter even numbers
176 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 // Then: square the numbers
190 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("✓ Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" • Memory efficient - processes items incrementally");
207 println!(" • Backpressure handling - prevents overwhelming consumers");
208 println!(" • Large dataset support - chunking for massive data");
209 println!(" • Concurrent processing - multiple items in parallel");
210 println!(" • Composable operators - chain transformations");
211 println!("\n🎉 Example completed!");
212
213 Ok(())
214}Sourcepub async fn process_stream(
&self,
data: Vec<Value>,
ctx: &Context,
) -> RuleResult
pub async fn process_stream( &self, data: Vec<Value>, ctx: &Context, ) -> RuleResult
Process a stream of data
Examples found in repository?
examples/streaming_flow.rs (line 61)
38async fn main() -> anyhow::Result<()> {
39 tracing_subscriber::fmt::init();
40
41 println!("🚀 Streaming Processing Example\n");
42
43 // Example 1: Basic streaming with backpressure
44 println!("=== Example 1: Basic Streaming with Backpressure ===\n");
45
46 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
47
48 let processor = Arc::new(NumberProcessor);
49 let node = StreamNode::new("processor", processor)
50 .with_backpressure(BackpressureConfig {
51 buffer_size: 10,
52 max_concurrent: 5,
53 });
54
55 let ctx = Context {
56 data: HashMap::new(),
57 };
58
59 println!("Processing 100 numbers with backpressure...");
60 let start = std::time::Instant::now();
61 let result = node.process_stream(data, &ctx).await?;
62 let duration = start.elapsed();
63
64 if let Value::Array(results) = result {
65 println!("✓ Processed {} items in {:?}", results.len(), duration);
66 println!(" First: {:?}, Last: {:?}\n", results.first(), results.last());
67 }
68
69 // Example 2: Large dataset with chunking
70 println!("=== Example 2: Large Dataset with Chunking ===\n");
71
72 let large_data: Vec<Value> = (1..=10_000).map(|i| Value::Number(i.into())).collect();
73
74 let processor = Arc::new(NumberProcessor);
75 let node = StreamNode::new("chunked_processor", processor)
76 .with_chunking(ChunkConfig {
77 chunk_size: 1000,
78 overlap: 0,
79 })
80 .with_backpressure(BackpressureConfig {
81 buffer_size: 100,
82 max_concurrent: 10,
83 });
84
85 let ctx = Context {
86 data: HashMap::new(),
87 };
88
89 println!("Processing 10,000 numbers in chunks of 1,000...");
90 let start = std::time::Instant::now();
91 let result = node.process_stream(large_data, &ctx).await?;
92 let duration = start.elapsed();
93
94 if let Value::Array(results) = result {
95 println!("✓ Processed {} items in {:?}", results.len(), duration);
96 println!(" Throughput: {:.0} items/sec\n", results.len() as f64 / duration.as_secs_f64());
97 }
98
99 // Example 3: Map operator
100 println!("=== Example 3: Map Operator ===\n");
101
102 let data: Vec<Value> = (1..=10).map(|i| Value::Number(i.into())).collect();
103
104 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
105 if let Some(n) = v.as_i64() {
106 Value::Number((n * n).into())
107 } else {
108 v
109 }
110 }));
111
112 let node = StreamNode::new("map_node", map_op);
113 let ctx = Context {
114 data: HashMap::new(),
115 };
116
117 println!("Squaring numbers 1-10...");
118 let result = node.process_stream(data, &ctx).await?;
119
120 if let Value::Array(results) = result {
121 println!("✓ Results: {:?}\n", results);
122 }
123
124 // Example 4: Filter operator
125 println!("=== Example 4: Filter Operator ===\n");
126
127 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
128
129 let filter_op = Arc::new(FilterOperator::new("even_only", |v: &Value| {
130 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
131 }));
132
133 let node = StreamNode::new("filter_node", filter_op);
134 let ctx = Context {
135 data: HashMap::new(),
136 };
137
138 println!("Filtering even numbers from 1-20...");
139 let result = node.process_stream(data, &ctx).await?;
140
141 if let Value::Array(results) = result {
142 println!("✓ Even numbers: {:?}\n", results);
143 }
144
145 // Example 5: Fold operator (sum)
146 println!("=== Example 5: Fold Operator (Sum) ===\n");
147
148 let data: Vec<Value> = (1..=100).map(|i| Value::Number(i.into())).collect();
149
150 let fold_op = Arc::new(FoldOperator::new("sum", 0i64, |acc: i64, v: Value| {
151 acc + v.as_i64().unwrap_or(0)
152 }));
153
154 let node = StreamNode::new("fold_node", fold_op)
155 .with_chunking(ChunkConfig {
156 chunk_size: 100,
157 overlap: 0,
158 })
159 .collect_results(false); // Only return final result
160
161 let ctx = Context {
162 data: HashMap::new(),
163 };
164
165 println!("Summing numbers 1-100...");
166 let result = node.process_stream(data, &ctx).await?;
167
168 println!("✓ Sum: {:?}\n", result);
169
170 // Example 6: Chained operations
171 println!("=== Example 6: Chained Operations ===\n");
172
173 println!("Pipeline: numbers → filter(even) → map(square) → collect");
174
175 // First: filter even numbers
176 let data: Vec<Value> = (1..=20).map(|i| Value::Number(i.into())).collect();
177
178 let filter_op = Arc::new(FilterOperator::new("even", |v: &Value| {
179 v.as_i64().map(|n| n % 2 == 0).unwrap_or(false)
180 }));
181
182 let filter_node = StreamNode::new("filter", filter_op);
183 let ctx = Context {
184 data: HashMap::new(),
185 };
186
187 let filtered = filter_node.process_stream(data, &ctx).await?;
188
189 // Then: square the numbers
190 if let Value::Array(filtered_data) = filtered {
191 let map_op = Arc::new(MapOperator::new("square", |v: Value| {
192 if let Some(n) = v.as_i64() {
193 Value::Number((n * n).into())
194 } else {
195 v
196 }
197 }));
198
199 let map_node = StreamNode::new("square", map_op);
200 let result = map_node.process_stream(filtered_data, &ctx).await?;
201
202 println!("✓ Results: {:?}\n", result);
203 }
204
205 println!("=== Benefits of Streaming Processing ===");
206 println!(" • Memory efficient - processes items incrementally");
207 println!(" • Backpressure handling - prevents overwhelming consumers");
208 println!(" • Large dataset support - chunking for massive data");
209 println!(" • Concurrent processing - multiple items in parallel");
210 println!(" • Composable operators - chain transformations");
211 println!("\n🎉 Example completed!");
212
213 Ok(())
214}Trait Implementations§
Source§impl Clone for StreamNode
impl Clone for StreamNode
Source§fn clone(&self) -> StreamNode
fn clone(&self) -> StreamNode
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl Debug for StreamNode
impl Debug for StreamNode
Source§impl Node for StreamNode
impl Node for StreamNode
Auto Trait Implementations§
impl Freeze for StreamNode
impl !RefUnwindSafe for StreamNode
impl Send for StreamNode
impl Sync for StreamNode
impl Unpin for StreamNode
impl !UnwindSafe for StreamNode
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more