pub struct Graph { /* private fields */ }Expand description
Graph builder for constructing graphs with implicit node connections
Implementations§
Source§impl Graph
impl Graph
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new graph
Examples found in repository?
24fn demo_simple_output_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Simple Pipeline Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 1: Generate initial data
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("initial_value".to_string(), "100".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("initial_value", "raw_data")]) // Maps initial_value → raw_data
41 );
42
43 // Node 2: Process the data
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("input").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("processed".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Process"),
52 Some(vec![("raw_data", "input")]), // Maps raw_data → input
53 Some(vec![("processed", "final_result")]) // Maps processed → final_result
54 );
55
56 // Build and execute
57 let dag = graph.build();
58 let context = dag.execute(false, None);
59
60 println!("📦 Execution Context (all variables):");
61 for (key, value) in &context {
62 println!(" {} = {}", key, value);
63 }
64
65 println!("\n🎯 Accessing specific outputs:");
66
67 // Access by broadcast variable name (what's in the graph context)
68 if let Some(raw_data) = context.get("raw_data") {
69 println!(" raw_data: {}", raw_data);
70 }
71
72 if let Some(final_result) = context.get("final_result") {
73 println!(" final_result: {}", final_result);
74 }
75
76 // Check if a variable exists
77 if context.contains_key("final_result") {
78 println!("\n✅ Final result is available in context");
79 }
80
81 println!();
82}
83
84fn demo_branch_output_access() {
85 println!("─────────────────────────────────────────────────────────");
86 println!("Demo 2: Branch Output Access (Parallel Paths)");
87 println!("─────────────────────────────────────────────────────────\n");
88
89 let mut graph = Graph::new();
90
91 // Source node
92 graph.add(
93 |_: &HashMap<String, String>, _| {
94 let mut result = HashMap::new();
95 result.insert("value".to_string(), "50".to_string());
96 result
97 },
98 Some("Source"),
99 None,
100 Some(vec![("value", "shared_data")])
101 );
102
103 // Branch A: Statistics computation
104 let mut branch_a = Graph::new();
105 branch_a.add(
106 |inputs: &HashMap<String, String>, _| {
107 let val = inputs.get("data").unwrap();
108 let mut result = HashMap::new();
109 result.insert("stats_output".to_string(), format!("Stats of {}", val));
110 result
111 },
112 Some("Stats"),
113 Some(vec![("shared_data", "data")]),
114 Some(vec![("stats_output", "statistics")]) // Branch A produces "statistics"
115 );
116
117 // Branch B: Model training
118 let mut branch_b = Graph::new();
119 branch_b.add(
120 |inputs: &HashMap<String, String>, _| {
121 let val = inputs.get("data").unwrap();
122 let mut result = HashMap::new();
123 result.insert("model_output".to_string(), format!("Model trained on {}", val));
124 result
125 },
126 Some("Train"),
127 Some(vec![("shared_data", "data")]),
128 Some(vec![("model_output", "model")]) // Branch B produces "model"
129 );
130
131 // Branch C: Visualization
132 let mut branch_c = Graph::new();
133 branch_c.add(
134 |inputs: &HashMap<String, String>, _| {
135 let val = inputs.get("data").unwrap();
136 let mut result = HashMap::new();
137 result.insert("viz_output".to_string(), format!("Plot of {}", val));
138 result
139 },
140 Some("Visualize"),
141 Some(vec![("shared_data", "data")]),
142 Some(vec![("viz_output", "visualization")]) // Branch C produces "visualization"
143 );
144
145 // Add branches to main graph
146 graph.branch(branch_a);
147 graph.branch(branch_b);
148 graph.branch(branch_c);
149
150 // Execute
151 let dag = graph.build();
152 let context = dag.execute(false, None);
153
154 println!("📦 All outputs from parallel branches:");
155
156 // Access each branch's output
157 if let Some(stats) = context.get("statistics") {
158 println!(" Branch A (Statistics): {}", stats);
159 }
160
161 if let Some(model) = context.get("model") {
162 println!(" Branch B (Model): {}", model);
163 }
164
165 if let Some(viz) = context.get("visualization") {
166 println!(" Branch C (Visualization): {}", viz);
167 }
168
169 println!("\n🔍 All variables in context:");
170 for (key, value) in &context {
171 println!(" {} = {}", key, value);
172 }
173
174 println!();
175}
176
177fn demo_variant_output_access() {
178 println!("─────────────────────────────────────────────────────────");
179 println!("Demo 3: Variant Output Access (Parameter Sweep)");
180 println!("─────────────────────────────────────────────────────────\n");
181
182 let mut graph = Graph::new();
183
184 // Source node
185 graph.add(
186 |_: &HashMap<String, String>, _| {
187 let mut result = HashMap::new();
188 result.insert("value".to_string(), "10.0".to_string());
189 result
190 },
191 Some("DataSource"),
192 None,
193 Some(vec![("value", "input_data")])
194 );
195
196 // Variant nodes: scale by different factors
197 // Factory function that creates a scaler for each factor
198 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
199 move |inputs: &HashMap<String, String>, _| {
200 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
201 let mut result = HashMap::new();
202 result.insert("scaled".to_string(), (value * factor).to_string());
203 result
204 }
205 }
206
207 graph.variant(
208 make_scaler,
209 vec![2.0, 3.0, 5.0],
210 Some("Scale"),
211 Some(vec![("input_data", "data")]),
212 Some(vec![("scaled", "result")]) // Each variant produces "result"
213 );
214
215 // Execute
216 let dag = graph.build();
217 let context = dag.execute(false, None);
218
219 println!("📦 Variant outputs:");
220 println!(" Note: Variants with same output name overwrite each other");
221 println!(" The last variant (factor=5.0) writes to 'result'\n");
222
223 // Access the result (will be from the last variant)
224 if let Some(result) = context.get("result") {
225 println!(" result = {} (from last variant: 10.0 * 5.0)", result);
226 }
227
228 println!("\n💡 Tip: To preserve all variant outputs, use unique output names:");
229 println!(" Option 1: Map each variant to a different broadcast variable");
230 println!(" Option 2: Collect results in merge node");
231 println!(" Option 3: Use variant_params or closure capture to distinguish");
232
233 // Better approach: unique output names per variant
234 let mut graph2 = Graph::new();
235
236 graph2.add(
237 |_: &HashMap<String, String>, _| {
238 let mut result = HashMap::new();
239 result.insert("value".to_string(), "10.0".to_string());
240 result
241 },
242 Some("DataSource"),
243 None,
244 Some(vec![("value", "input_data")])
245 );
246
247 // Variant 1: 2x
248 fn make_scaler_unique(label: &str, factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> + '_ {
249 move |inputs: &HashMap<String, String>, _| {
250 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
251 let mut result = HashMap::new();
252 result.insert(label.to_string(), (value * factor).to_string());
253 result
254 }
255 }
256
257 graph2.variant(
258 |_x: &str| make_scaler_unique("scaled_2x", 2.0),
259 vec!["2x"],
260 Some("Scale2x"),
261 Some(vec![("input_data", "data")]),
262 Some(vec![("scaled_2x", "result_2x")])
263 );
264
265 graph2.variant(
266 |_x: &str| make_scaler_unique("scaled_3x", 3.0),
267 vec!["3x"],
268 Some("Scale3x"),
269 Some(vec![("input_data", "data")]),
270 Some(vec![("scaled_3x", "result_3x")])
271 );
272
273 let dag2 = graph2.build();
274 let context2 = dag2.execute(false, None);
275
276 println!("\n✅ Better approach - unique output names:");
277 if let Some(result_2x) = context2.get("result_2x") {
278 println!(" result_2x = {}", result_2x);
279 }
280 if let Some(result_3x) = context2.get("result_3x") {
281 println!(" result_3x = {}", result_3x);
282 }
283
284 println!();
285}
286
287fn demo_multiple_outputs() {
288 println!("─────────────────────────────────────────────────────────");
289 println!("Demo 4: Multiple Outputs from Single Node");
290 println!("─────────────────────────────────────────────────────────\n");
291
292 let mut graph = Graph::new();
293
294 // Node that produces multiple outputs
295 graph.add(
296 |_: &HashMap<String, String>, _| {
297 let mut result = HashMap::new();
298 result.insert("mean".to_string(), "50.5".to_string());
299 result.insert("median".to_string(), "48.0".to_string());
300 result.insert("stddev".to_string(), "12.3".to_string());
301 result.insert("count".to_string(), "100".to_string());
302 result
303 },
304 Some("Statistics"),
305 None,
306 Some(vec![
307 ("mean", "stat_mean"),
308 ("median", "stat_median"),
309 ("stddev", "stat_stddev"),
310 ("count", "sample_count")
311 ])
312 );
313
314 // Execute
315 let dag = graph.build();
316 let context = dag.execute(false, None);
317
318 println!("📊 Multiple outputs from single node:");
319
320 // Access each output individually
321 println!(" Mean: {}", context.get("stat_mean").unwrap());
322 println!(" Median: {}", context.get("stat_median").unwrap());
323 println!(" StdDev: {}", context.get("stat_stddev").unwrap());
324 println!(" Count: {}", context.get("sample_count").unwrap());
325
326 println!("\n📋 Complete execution context:");
327 for (key, value) in &context {
328 println!(" {} = {}", key, value);
329 }
330
331 println!("\n💡 Summary:");
332 println!(" ✓ dag.execute(false, None) returns HashMap<String, String>");
333 println!(" ✓ Keys are broadcast variable names (from output mappings)");
334 println!(" ✓ Use context.get(\"variable_name\") to access specific outputs");
335 println!(" ✓ All outputs accumulate in the context throughout execution");
336
337 println!();
338}More examples
28fn demo_simple_pipeline() {
29 println!("─────────────────────────────────────────────────────────");
30 println!("Demo 1: Simple Sequential Pipeline");
31 println!("─────────────────────────────────────────────────────────");
32
33 let mut graph = Graph::new();
34
35 // Data source node
36 graph.add(
37 |_: &HashMap<String, String>, _| {
38 let mut result = HashMap::new();
39 result.insert("value".to_string(), "42".to_string());
40 result
41 },
42 Some("DataSource"),
43 None, // No inputs (source node)
44 Some(vec![("value", "data")]) // (impl_var, broadcast_var)
45 );
46
47 // Processing node: multiply by 2
48 graph.add(
49 |inputs: &HashMap<String, String>, _| {
50 let mut result = HashMap::new();
51 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
52 result.insert("doubled".to_string(), (val * 2).to_string());
53 }
54 result
55 },
56 Some("Multiply"),
57 Some(vec![("data", "x")]), // (broadcast_var, impl_var)
58 Some(vec![("doubled", "result")]) // (impl_var, broadcast_var)
59 );
60
61 // Final processing: add 10
62 graph.add(
63 |inputs: &HashMap<String, String>, _| {
64 let mut result = HashMap::new();
65 if let Some(val) = inputs.get("num").and_then(|s| s.parse::<i32>().ok()) {
66 result.insert("sum".to_string(), (val + 10).to_string());
67 }
68 result
69 },
70 Some("AddTen"),
71 Some(vec![("result", "num")]),
72 Some(vec![("sum", "final")])
73 );
74
75 let dag = graph.build();
76 println!("\n📊 Execution:");
77 let context = dag.execute(false, None);
78
79 println!(" Input: data = {}", context.get("data").unwrap());
80 println!(" Step 1: result = {} (data * 2)", context.get("result").unwrap());
81 println!(" Step 2: final = {} (result + 10)", context.get("final").unwrap());
82
83 println!("\n📈 DAG Statistics:");
84 let stats = dag.stats();
85 println!("{}", stats.summary());
86
87 println!("\n🔍 Mermaid Visualization:");
88 println!("{}", dag.to_mermaid());
89 println!();
90}
91
92fn demo_branching() {
93 println!("─────────────────────────────────────────────────────────");
94 println!("Demo 2: Parallel Branching (Fan-Out)");
95 println!("─────────────────────────────────────────────────────────");
96
97 let mut graph = Graph::new();
98
99 // Source node
100 graph.add(
101 |_: &HashMap<String, String>, _| {
102 let mut result = HashMap::new();
103 result.insert("dataset".to_string(), "100".to_string());
104 result
105 },
106 Some("Source"),
107 None,
108 Some(vec![("dataset", "data")])
109 );
110
111 // Branch A: Compute statistics
112 let mut branch_a = Graph::new();
113 branch_a.add(
114 |inputs: &HashMap<String, String>, _| {
115 let mut result = HashMap::new();
116 if let Some(val) = inputs.get("input") {
117 result.insert("stats".to_string(), format!("Mean: {}", val));
118 }
119 result
120 },
121 Some("Statistics"),
122 Some(vec![("data", "input")]),
123 Some(vec![("stats", "stats_result")])
124 );
125
126 // Branch B: Train model
127 let mut branch_b = Graph::new();
128 branch_b.add(
129 |inputs: &HashMap<String, String>, _| {
130 let mut result = HashMap::new();
131 if let Some(val) = inputs.get("input") {
132 result.insert("model".to_string(), format!("Model trained on {}", val));
133 }
134 result
135 },
136 Some("MLModel"),
137 Some(vec![("data", "input")]),
138 Some(vec![("model", "model_result")])
139 );
140
141 // Branch C: Generate visualization
142 let mut branch_c = Graph::new();
143 branch_c.add(
144 |inputs: &HashMap<String, String>, _| {
145 let mut result = HashMap::new();
146 if let Some(val) = inputs.get("input") {
147 result.insert("plot".to_string(), format!("Plot of {}", val));
148 }
149 result
150 },
151 Some("Visualization"),
152 Some(vec![("data", "input")]),
153 Some(vec![("plot", "viz_result")])
154 );
155
156 graph.branch(branch_a);
157 graph.branch(branch_b);
158 graph.branch(branch_c);
159
160 let dag = graph.build();
161 println!("\n📊 Execution:");
162 let context = dag.execute(false, None);
163
164 println!(" Source: data = {}", context.get("data").unwrap());
165 println!(" Branch A (Stats): {}", context.get("stats_result").unwrap());
166 println!(" Branch B (Model): {}", context.get("model_result").unwrap());
167 println!(" Branch C (Viz): {}", context.get("viz_result").unwrap());
168
169 println!("\n📈 DAG Statistics:");
170 let stats = dag.stats();
171 println!("{}", stats.summary());
172 println!(" ⚡ All 3 branches can execute in parallel!");
173
174 println!("\n🔍 Mermaid Visualization:");
175 println!("{}", dag.to_mermaid());
176 println!();
177}
178
179fn demo_merging() {
180 println!("─────────────────────────────────────────────────────────");
181 println!("Demo 3: Branching + Merging (Fan-Out + Fan-In)");
182 println!("─────────────────────────────────────────────────────────");
183
184 let mut graph = Graph::new();
185
186 // Source
187 graph.add(
188 |_: &HashMap<String, String>, _| {
189 let mut result = HashMap::new();
190 result.insert("value".to_string(), "50".to_string());
191 result
192 },
193 Some("Source"),
194 None,
195 Some(vec![("value", "data")])
196 );
197
198 // Branch A: Add 10
199 let mut branch_a = Graph::new();
200 branch_a.add(
201 |inputs: &HashMap<String, String>, _| {
202 let mut result = HashMap::new();
203 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
204 result.insert("output".to_string(), (val + 10).to_string());
205 }
206 result
207 },
208 Some("PathA (+10)"),
209 Some(vec![("data", "x")]),
210 Some(vec![("output", "result")]) // Both branches use same output name!
211 );
212
213 // Branch B: Add 20
214 let mut branch_b = Graph::new();
215 branch_b.add(
216 |inputs: &HashMap<String, String>, _| {
217 let mut result = HashMap::new();
218 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
219 result.insert("output".to_string(), (val + 20).to_string());
220 }
221 result
222 },
223 Some("PathB (+20)"),
224 Some(vec![("data", "x")]),
225 Some(vec![("output", "result")]) // Both branches use same output name!
226 );
227
228 let branch_a_id = graph.branch(branch_a);
229 let branch_b_id = graph.branch(branch_b);
230
231 // Merge node: Combine results from both branches
232 graph.merge(
233 |inputs: &HashMap<String, String>, _| {
234 let mut result = HashMap::new();
235 let a = inputs.get("from_a").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
236 let b = inputs.get("from_b").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
237 result.insert("combined".to_string(), format!("{} + {} = {}", a, b, a + b));
238 result
239 },
240 Some("Merge"),
241 vec![
242 (branch_a_id, "result", "from_a"), // Map branch A's "result" to merge fn's "from_a"
243 (branch_b_id, "result", "from_b") // Map branch B's "result" to merge fn's "from_b"
244 ],
245 Some(vec![("combined", "final")])
246 );
247
248 let dag = graph.build();
249 println!("\n📊 Execution:");
250 let context = dag.execute(false, None);
251
252 println!(" Source: data = {}", context.get("data").unwrap());
253 println!(" Branch A: 50 + 10 = 60");
254 println!(" Branch B: 50 + 20 = 70");
255 println!(" Merged: {}", context.get("final").unwrap());
256
257 println!("\n📈 DAG Statistics:");
258 let stats = dag.stats();
259 println!("{}", stats.summary());
260
261 println!("\n🔍 Mermaid Visualization:");
262 println!("{}", dag.to_mermaid());
263 println!();
264}
265
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}
323
324fn demo_complex_graph() {
325 println!("─────────────────────────────────────────────────────────");
326 println!("Demo 5: Complex Graph (All Features Combined)");
327 println!("─────────────────────────────────────────────────────────");
328
329 let mut graph = Graph::new();
330
331 // 1. Data ingestion
332 graph.add(
333 |_: &HashMap<String, String>, _| {
334 let mut result = HashMap::new();
335 result.insert("raw_data".to_string(), "1000".to_string());
336 result
337 },
338 Some("Ingest"),
339 None,
340 Some(vec![("raw_data", "data")])
341 );
342
343 // 2. Preprocessing
344 graph.add(
345 |inputs: &HashMap<String, String>, _| {
346 let mut result = HashMap::new();
347 if let Some(val) = inputs.get("raw").and_then(|s| s.parse::<i32>().ok()) {
348 result.insert("cleaned".to_string(), (val / 10).to_string());
349 }
350 result
351 },
352 Some("Preprocess"),
353 Some(vec![("data", "raw")]),
354 Some(vec![("cleaned", "clean_data")])
355 );
356
357 // 3. Branch for different analyses
358 let mut stats_branch = Graph::new();
359 stats_branch.add(
360 |inputs: &HashMap<String, String>, _| {
361 let mut result = HashMap::new();
362 if let Some(val) = inputs.get("data") {
363 result.insert("stats".to_string(), format!("Stats({})", val));
364 }
365 result
366 },
367 Some("Stats"),
368 Some(vec![("clean_data", "data")]),
369 Some(vec![("stats", "statistics")])
370 );
371
372 let mut ml_branch = Graph::new();
373 ml_branch.add(
374 |inputs: &HashMap<String, String>, _| {
375 let mut result = HashMap::new();
376 if let Some(val) = inputs.get("data") {
377 result.insert("prediction".to_string(), format!("Pred({})", val));
378 }
379 result
380 },
381 Some("ML"),
382 Some(vec![("clean_data", "data")]),
383 Some(vec![("prediction", "ml_result")])
384 );
385
386 let stats_id = graph.branch(stats_branch);
387 let ml_id = graph.branch(ml_branch);
388
389 // 4. Merge branches
390 graph.merge(
391 |inputs: &HashMap<String, String>, _| {
392 let mut result = HashMap::new();
393 let stats = inputs.get("stats_in").cloned().unwrap_or_default();
394 let ml = inputs.get("ml_in").cloned().unwrap_or_default();
395 result.insert("report".to_string(), format!("{} & {}", stats, ml));
396 result
397 },
398 Some("Combine"),
399 vec![
400 (stats_id, "statistics", "stats_in"),
401 (ml_id, "ml_result", "ml_in")
402 ],
403 Some(vec![("report", "final_report")])
404 );
405
406 // 5. Final output formatting
407 graph.add(
408 |inputs: &HashMap<String, String>, _| {
409 let mut result = HashMap::new();
410 if let Some(report) = inputs.get("report") {
411 result.insert("formatted".to_string(), format!("[FINAL] {}", report));
412 }
413 result
414 },
415 Some("Format"),
416 Some(vec![("final_report", "report")]),
417 Some(vec![("formatted", "output")])
418 );
419
420 let dag = graph.build();
421 println!("\n📊 Execution:");
422 let context = dag.execute(false, None);
423
424 println!(" Step 1: Ingest → data = {}", context.get("data").unwrap());
425 println!(" Step 2: Preprocess → clean_data = {}", context.get("clean_data").unwrap());
426 println!(" Step 3: Branch A → statistics = {}", context.get("statistics").unwrap());
427 println!(" Branch B → ml_result = {}", context.get("ml_result").unwrap());
428 println!(" Step 4: Merge → final_report = {}", context.get("final_report").unwrap());
429 println!(" Step 5: Format → output = {}", context.get("output").unwrap());
430
431 println!("\n📈 DAG Statistics:");
432 let stats = dag.stats();
433 println!("{}", stats.summary());
434
435 println!("\n📋 Execution Order:");
436 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
437 println!(" Level {}: {} nodes", level_idx, level.len());
438 for &node_id in level {
439 let node = dag.nodes().iter().find(|n| n.id == node_id).unwrap();
440 println!(" - {}", node.display_name());
441 }
442 }
443
444 println!("\n🔍 Mermaid Visualization:");
445 println!("{}", dag.to_mermaid());
446 println!();
447
448 println!("═══════════════════════════════════════════════════════════");
449 println!(" Demo Complete!");
450 println!("═══════════════════════════════════════════════════════════");
451}24fn demo_per_node_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Per-Node Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 0: Source
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("value".to_string(), "10".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("value", "initial_data")])
41 );
42
43 // Node 1: Double
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("doubled".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Double"),
52 Some(vec![("initial_data", "in")]),
53 Some(vec![("doubled", "doubled_data")])
54 );
55
56 // Node 2: Add Ten
57 graph.add(
58 |inputs: &HashMap<String, String>, _| {
59 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
60 let mut result = HashMap::new();
61 result.insert("added".to_string(), (value + 10).to_string());
62 result
63 },
64 Some("AddTen"),
65 Some(vec![("doubled_data", "in")]),
66 Some(vec![("added", "final_result")])
67 );
68
69 // Execute and get detailed results
70 let dag = graph.build();
71 let result = dag.execute_detailed(false, None);
72
73 println!("🌍 Global Context (all variables):");
74 for (key, value) in &result.context {
75 println!(" {} = {}", key, value);
76 }
77
78 println!("\n📦 Per-Node Outputs:");
79 println!("\nNode 0 (Source) outputs:");
80 if let Some(outputs) = result.get_node_outputs(0) {
81 for (key, value) in outputs {
82 println!(" {} = {}", key, value);
83 }
84 }
85
86 println!("\nNode 1 (Double) outputs:");
87 if let Some(outputs) = result.get_node_outputs(1) {
88 for (key, value) in outputs {
89 println!(" {} = {}", key, value);
90 }
91 }
92
93 println!("\nNode 2 (AddTen) outputs:");
94 if let Some(outputs) = result.get_node_outputs(2) {
95 for (key, value) in outputs {
96 println!(" {} = {}", key, value);
97 }
98 }
99
100 println!("\n🎯 Accessing specific node outputs:");
101 if let Some(value) = result.get_from_node(0, "initial_data") {
102 println!(" Node 0 'initial_data': {}", value);
103 }
104 if let Some(value) = result.get_from_node(1, "doubled_data") {
105 println!(" Node 1 'doubled_data': {}", value);
106 }
107 if let Some(value) = result.get_from_node(2, "final_result") {
108 println!(" Node 2 'final_result': {}", value);
109 }
110
111 println!();
112}
113
114fn demo_per_branch_access() {
115 println!("─────────────────────────────────────────────────────────");
116 println!("Demo 2: Per-Branch Output Access");
117 println!("─────────────────────────────────────────────────────────\n");
118
119 let mut graph = Graph::new();
120
121 // Main graph: Source node
122 graph.add(
123 |_: &HashMap<String, String>, _| {
124 let mut result = HashMap::new();
125 result.insert("dataset".to_string(), "100".to_string());
126 result
127 },
128 Some("Source"),
129 None,
130 Some(vec![("dataset", "data")])
131 );
132
133 // Branch A: Statistics
134 let mut branch_a = Graph::new();
135 branch_a.add(
136 |inputs: &HashMap<String, String>, _| {
137 let value = inputs.get("input").unwrap();
138 let mut result = HashMap::new();
139 result.insert("stat_result".to_string(), format!("Mean of {}", value));
140 result
141 },
142 Some("Statistics"),
143 Some(vec![("data", "input")]),
144 Some(vec![("stat_result", "statistics")])
145 );
146
147 // Branch B: Model Training
148 let mut branch_b = Graph::new();
149 branch_b.add(
150 |inputs: &HashMap<String, String>, _| {
151 let value = inputs.get("input").unwrap();
152 let mut result = HashMap::new();
153 result.insert("model_result".to_string(), format!("Model trained on {}", value));
154 result
155 },
156 Some("ModelTraining"),
157 Some(vec![("data", "input")]),
158 Some(vec![("model_result", "trained_model")])
159 );
160
161 // Branch C: Visualization
162 let mut branch_c = Graph::new();
163 branch_c.add(
164 |inputs: &HashMap<String, String>, _| {
165 let value = inputs.get("input").unwrap();
166 let mut result = HashMap::new();
167 result.insert("viz_result".to_string(), format!("Plot of {}", value));
168 result
169 },
170 Some("Visualization"),
171 Some(vec![("data", "input")]),
172 Some(vec![("viz_result", "plot")])
173 );
174
175 let branch_a_id = graph.branch(branch_a);
176 let branch_b_id = graph.branch(branch_b);
177 let branch_c_id = graph.branch(branch_c);
178
179 // Execute and get detailed results
180 let dag = graph.build();
181 let result = dag.execute_detailed(false, None);
182
183 println!("🌍 Global Context:");
184 for (key, value) in &result.context {
185 println!(" {} = {}", key, value);
186 }
187
188 println!("\n🌿 Per-Branch Outputs:");
189
190 println!("\nBranch {} (Statistics) outputs:", branch_a_id);
191 if let Some(outputs) = result.get_branch_outputs(branch_a_id) {
192 for (key, value) in outputs {
193 println!(" {} = {}", key, value);
194 }
195 }
196
197 println!("\nBranch {} (Model Training) outputs:", branch_b_id);
198 if let Some(outputs) = result.get_branch_outputs(branch_b_id) {
199 for (key, value) in outputs {
200 println!(" {} = {}", key, value);
201 }
202 }
203
204 println!("\nBranch {} (Visualization) outputs:", branch_c_id);
205 if let Some(outputs) = result.get_branch_outputs(branch_c_id) {
206 for (key, value) in outputs {
207 println!(" {} = {}", key, value);
208 }
209 }
210
211 println!("\n🎯 Accessing specific branch outputs:");
212 if let Some(value) = result.get_from_branch(branch_a_id, "statistics") {
213 println!(" Branch {} 'statistics': {}", branch_a_id, value);
214 }
215 if let Some(value) = result.get_from_branch(branch_b_id, "trained_model") {
216 println!(" Branch {} 'trained_model': {}", branch_b_id, value);
217 }
218 if let Some(value) = result.get_from_branch(branch_c_id, "plot") {
219 println!(" Branch {} 'plot': {}", branch_c_id, value);
220 }
221
222 println!();
223}
224
225fn demo_variant_per_node_access() {
226 println!("─────────────────────────────────────────────────────────");
227 println!("Demo 3: Variant Outputs with Per-Node Tracking");
228 println!("─────────────────────────────────────────────────────────\n");
229
230 let mut graph = Graph::new();
231
232 // Source node
233 graph.add(
234 |_: &HashMap<String, String>, _| {
235 let mut result = HashMap::new();
236 result.insert("base_value".to_string(), "10".to_string());
237 result
238 },
239 Some("Source"),
240 None,
241 Some(vec![("base_value", "data")])
242 );
243
244 // Variant factory for scaling
245 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
246 move |inputs: &HashMap<String, String>, _| {
247 let value = inputs.get("input_data").unwrap().parse::<f64>().unwrap();
248 let mut result = HashMap::new();
249 result.insert("scaled_value".to_string(), (value * factor).to_string());
250 result
251 }
252 }
253
254 // Create variants with unique output names to preserve all results
255 graph.variant(
256 make_scaler,
257 vec![2.0, 3.0, 5.0],
258 Some("Scale"),
259 Some(vec![("data", "input_data")]),
260 Some(vec![("scaled_value", "result")]) // Note: will overwrite in global context
261 );
262
263 let dag = graph.build();
264 let result = dag.execute_detailed(false, None);
265
266 println!("🌍 Global Context (note: 'result' contains last variant's output):");
267 for (key, value) in &result.context {
268 println!(" {} = {}", key, value);
269 }
270
271 println!("\n📦 Per-Node Outputs (each variant tracked separately):");
272
273 // Node 0 is the source
274 // Nodes 1, 2, 3 are the variant nodes (2x, 3x, 5x scalers)
275 for node_id in 1..=3 {
276 println!("\nNode {} (Variant Scaler) outputs:", node_id);
277 if let Some(outputs) = result.get_node_outputs(node_id) {
278 for (key, value) in outputs {
279 println!(" {} = {}", key, value);
280 }
281 }
282 }
283
284 println!("\n💡 Key Insight:");
285 println!(" - Global context has 'result' = {} (last variant overwrites)", result.get("result").unwrap());
286 println!(" - But per-node outputs preserve ALL variant results:");
287 println!(" Node 1 (2x): result = {}", result.get_from_node(1, "result").unwrap());
288 println!(" Node 2 (3x): result = {}", result.get_from_node(2, "result").unwrap());
289 println!(" Node 3 (5x): result = {}", result.get_from_node(3, "result").unwrap());
290
291 println!();
292}
293
294fn demo_execution_history_tracking() {
295 println!("─────────────────────────────────────────────────────────");
296 println!("Demo 4: Execution History Tracking");
297 println!("─────────────────────────────────────────────────────────\n");
298
299 let mut graph = Graph::new();
300
301 // Multi-stage pipeline
302 graph.add(
303 |_: &HashMap<String, String>, _| {
304 let mut result = HashMap::new();
305 result.insert("raw".to_string(), "5".to_string());
306 result
307 },
308 Some("Load"),
309 None,
310 Some(vec![("raw", "input")])
311 );
312
313 graph.add(
314 |inputs: &HashMap<String, String>, _| {
315 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
316 let mut result = HashMap::new();
317 result.insert("cleaned".to_string(), (value + 1).to_string());
318 result
319 },
320 Some("Clean"),
321 Some(vec![("input", "x")]),
322 Some(vec![("cleaned", "clean_data")])
323 );
324
325 graph.add(
326 |inputs: &HashMap<String, String>, _| {
327 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
328 let mut result = HashMap::new();
329 result.insert("normalized".to_string(), (value * 10).to_string());
330 result
331 },
332 Some("Normalize"),
333 Some(vec![("clean_data", "x")]),
334 Some(vec![("normalized", "norm_data")])
335 );
336
337 graph.add(
338 |inputs: &HashMap<String, String>, _| {
339 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
340 let mut result = HashMap::new();
341 result.insert("transformed".to_string(), format!("FINAL_{}", value));
342 result
343 },
344 Some("Transform"),
345 Some(vec![("norm_data", "x")]),
346 Some(vec![("transformed", "output")])
347 );
348
349 let dag = graph.build();
350 let result = dag.execute_detailed(false, None);
351
352 println!("📊 Execution History (Data Flow Tracking):");
353 println!();
354 println!("Step-by-step transformation:");
355 println!(" 1. Load: input = {}", result.get_from_node(0, "input").unwrap());
356 println!(" 2. Clean: clean_data = {}", result.get_from_node(1, "clean_data").unwrap());
357 println!(" 3. Normalize: norm_data = {}", result.get_from_node(2, "norm_data").unwrap());
358 println!(" 4. Transform: output = {}", result.get_from_node(3, "output").unwrap());
359
360 println!("\n🔍 Debugging: Inspect any intermediate result:");
361 println!(" Need to debug the normalization step?");
362 println!(" Just check Node 2: {}", result.get_from_node(2, "norm_data").unwrap());
363
364 println!("\n✅ Benefits of Per-Node Access:");
365 println!(" ✓ Track data transformations step-by-step");
366 println!(" ✓ Debug issues by inspecting intermediate values");
367 println!(" ✓ Validate each processing stage independently");
368 println!(" ✓ Preserve all variant outputs even with name collisions");
369
370 println!();
371}32fn demo_sequential_vs_parallel() {
33 println!("─────────────────────────────────────────────────────────");
34 println!("Demo 1: Sequential vs Parallel Execution");
35 println!("─────────────────────────────────────────────────────────");
36
37 let mut graph = Graph::new();
38
39 // Source node
40 graph.add(
41 |_: &HashMap<String, String>, _| {
42 let start = Instant::now();
43 let mut result = HashMap::new();
44 result.insert("data".to_string(), "source_data".to_string());
45 println!(" [{}ms] Source completed", start.elapsed().as_millis());
46 result
47 },
48 Some("Source"),
49 None,
50 Some(vec![("data", "data")])
51 );
52
53 // Branch A: 100ms work
54 let mut branch_a = Graph::new();
55 branch_a.add(
56 |inputs: &HashMap<String, String>, _| {
57 let start = Instant::now();
58 let mut result = HashMap::new();
59 if let Some(data) = inputs.get("input") {
60 let processed = simulate_work(100, data);
61 result.insert("result".to_string(), processed);
62 }
63 println!(" [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64 result
65 },
66 Some("BranchA[100ms]"),
67 Some(vec![("data", "input")]),
68 Some(vec![("result", "result_a")])
69 );
70
71 // Branch B: 100ms work
72 let mut branch_b = Graph::new();
73 branch_b.add(
74 |inputs: &HashMap<String, String>, _| {
75 let start = Instant::now();
76 let mut result = HashMap::new();
77 if let Some(data) = inputs.get("input") {
78 let processed = simulate_work(100, data);
79 result.insert("result".to_string(), processed);
80 }
81 println!(" [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82 result
83 },
84 Some("BranchB[100ms]"),
85 Some(vec![("data", "input")]),
86 Some(vec![("result", "result_b")])
87 );
88
89 // Branch C: 100ms work
90 let mut branch_c = Graph::new();
91 branch_c.add(
92 |inputs: &HashMap<String, String>, _| {
93 let start = Instant::now();
94 let mut result = HashMap::new();
95 if let Some(data) = inputs.get("input") {
96 let processed = simulate_work(100, data);
97 result.insert("result".to_string(), processed);
98 }
99 println!(" [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100 result
101 },
102 Some("BranchC[100ms]"),
103 Some(vec![("data", "input")]),
104 Some(vec![("result", "result_c")])
105 );
106
107 graph.branch(branch_a);
108 graph.branch(branch_b);
109 graph.branch(branch_c);
110
111 let dag = graph.build();
112
113 println!("\n📊 Sequential Execution (simulated):");
114 let start = Instant::now();
115 let _ = dag.execute(false, None);
116 let sequential_time = start.elapsed();
117 println!(" Total time: {}ms", sequential_time.as_millis());
118
119 println!("\n⚡ With Parallel Execution:");
120 println!(" Expected time: ~100ms (all branches run simultaneously)");
121 println!(" Speedup: ~3x faster than sequential");
122
123 println!("\n📈 DAG Statistics:");
124 let stats = dag.stats();
125 println!("{}", stats.summary());
126 println!("\n Analysis:");
127 println!(" - Level 0: 1 node (Source)");
128 println!(" - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129 println!(" - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130
131 println!("\n🔍 Mermaid Visualization with Port Mappings:");
132 println!("{}", dag.to_mermaid());
133 println!();
134}
135
136fn demo_complex_dependencies() {
137 println!("─────────────────────────────────────────────────────────");
138 println!("Demo 2: Complex Data Dependencies");
139 println!("─────────────────────────────────────────────────────────");
140
141 let mut graph = Graph::new();
142
143 // Two independent sources
144 graph.add(
145 |_: &HashMap<String, String>, _| {
146 let mut result = HashMap::new();
147 result.insert("source1_data".to_string(), "100".to_string());
148 result
149 },
150 Some("Source1"),
151 None,
152 Some(vec![("source1_data", "data1")])
153 );
154
155 graph.add(
156 |_: &HashMap<String, String>, _| {
157 let mut result = HashMap::new();
158 result.insert("source2_data".to_string(), "200".to_string());
159 result
160 },
161 Some("Source2"),
162 None,
163 Some(vec![("source2_data", "data2")])
164 );
165
166 // Process each source independently (can run in parallel)
167 graph.add(
168 |inputs: &HashMap<String, String>, _| {
169 let mut result = HashMap::new();
170 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171 thread::sleep(Duration::from_millis(50));
172 result.insert("processed".to_string(), (val * 2).to_string());
173 }
174 result
175 },
176 Some("Process1[50ms]"),
177 Some(vec![("data1", "in")]),
178 Some(vec![("processed", "proc1")])
179 );
180
181 graph.add(
182 |inputs: &HashMap<String, String>, _| {
183 let mut result = HashMap::new();
184 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185 thread::sleep(Duration::from_millis(50));
186 result.insert("processed".to_string(), (val * 3).to_string());
187 }
188 result
189 },
190 Some("Process2[50ms]"),
191 Some(vec![("data2", "in")]),
192 Some(vec![("processed", "proc2")])
193 );
194
195 // Combine results (depends on both processors)
196 graph.add(
197 |inputs: &HashMap<String, String>, _| {
198 let mut result = HashMap::new();
199 let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200 let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201 thread::sleep(Duration::from_millis(30));
202 result.insert("combined".to_string(), format!("{}", v1 + v2));
203 result
204 },
205 Some("Combine[30ms]"),
206 Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207 Some(vec![("combined", "final")])
208 );
209
210 let dag = graph.build();
211
212 println!("\n📊 Execution with timing:");
213 let start = Instant::now();
214 let context = dag.execute(false, None);
215 let total_time = start.elapsed();
216
217 println!(" Source1: data1 = {}", context.get("data1").unwrap());
218 println!(" Source2: data2 = {}", context.get("data2").unwrap());
219 println!(" Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220 println!(" Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221 println!(" Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222 println!("\n Total execution time: {}ms", total_time.as_millis());
223
224 println!("\n📈 Execution Levels (showing parallelism):");
225 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226 print!(" Level {}: ", level_idx);
227 let node_names: Vec<String> = level.iter()
228 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229 .collect();
230 println!("{}", node_names.join(", "));
231 if level.len() > 1 {
232 println!(" ↑ {} nodes can execute in parallel!", level.len());
233 }
234 }
235
236 println!("\n⚡ Parallel Execution Analysis:");
237 println!(" Sequential time would be: 50+50+30 = 130ms");
238 println!(" With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239 println!(" Speedup: 1.6x");
240
241 println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242 println!("{}", dag.to_mermaid());
243 println!();
244}
245
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 // Source
254 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 // Variant factory with different multipliers
266 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 // Simulate 100ms of work
272 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 // Create 5 variants
281 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}
312
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 // Top of diamond: Single source
325 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 // Left branch: Transform A (50ms)
337 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 // Right branch: Transform B (50ms)
355 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 // Bottom of diamond: Merge (30ms)
376 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}11fn main() {
12 println!("=== Tuple-Based API Demo ===\n");
13
14 let mut graph = Graph::new();
15
16 // Source node - no inputs, produces "dataset" in context
17 fn data_source(_inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
18 let mut outputs = HashMap::new();
19 outputs.insert("raw".to_string(), "Sample Data".to_string());
20 outputs
21 }
22
23 graph.add(
24 data_source,
25 Some("Source"),
26 None, // No inputs
27 Some(vec![("raw", "dataset")]) // Function returns "raw", stored as "dataset" in context
28 );
29
30 println!("✓ Added source node");
31 println!(" Output mapping: function's 'raw' → context's 'dataset'\n");
32
33 // Process node - consumes "dataset" from context as "input_data", produces "processed_data" to context as "result"
34 fn processor(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
35 let default = String::new();
36 let data = inputs.get("input_data").unwrap_or(&default);
37 let mut outputs = HashMap::new();
38 outputs.insert("processed_data".to_string(), format!("Processed: {}", data));
39 outputs
40 }
41
42 graph.add(
43 processor,
44 Some("Process"),
45 Some(vec![("dataset", "input_data")]), // Context's "dataset" → function's "input_data"
46 Some(vec![("processed_data", "result")]) // Function's "processed_data" → context's "result"
47 );
48
49 println!("✓ Added processor node");
50 println!(" Input mapping: context's 'dataset' → function's 'input_data'");
51 println!(" Output mapping: function's 'processed_data' → context's 'result'\n");
52
53 // Create branches that both use the same variable names internally
54 let mut branch_a = Graph::new();
55 fn transform_a(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
56 let default = String::new();
57 let data = inputs.get("x").unwrap_or(&default);
58 let mut outputs = HashMap::new();
59 outputs.insert("y".to_string(), format!("{} [Path A]", data));
60 outputs
61 }
62 branch_a.add(
63 transform_a,
64 Some("Transform A"),
65 Some(vec![("result", "x")]), // Context's "result" → function's "x"
66 Some(vec![("y", "output")]) // Function's "y" → context's "output"
67 );
68
69 let mut branch_b = Graph::new();
70 fn transform_b(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
71 let default = String::new();
72 let data = inputs.get("x").unwrap_or(&default);
73 let mut outputs = HashMap::new();
74 outputs.insert("y".to_string(), format!("{} [Path B]", data));
75 outputs
76 }
77 branch_b.add(
78 transform_b,
79 Some("Transform B"),
80 Some(vec![("result", "x")]), // Same variable names as branch_a
81 Some(vec![("y", "output")]) // Same variable names as branch_a
82 );
83
84 println!("✓ Created two branches");
85 println!(" Both branches use same internal variable names (x, y)");
86 println!(" Both map 'result' → 'x' and 'y' → 'output'\n");
87
88 // Branch from the processor
89 let branch_a_id = graph.branch(branch_a);
90 let branch_b_id = graph.branch(branch_b);
91
92 println!("✓ Added branches to graph");
93 println!(" Branch A ID: {}", branch_a_id);
94 println!(" Branch B ID: {}\n", branch_b_id);
95
96 // Merge branches with branch-specific variable resolution
97 fn combine(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
98 let default = String::new();
99 let a = inputs.get("from_a").unwrap_or(&default);
100 let b = inputs.get("from_b").unwrap_or(&default);
101 let mut outputs = HashMap::new();
102 outputs.insert("merged".to_string(), format!("Combined: {} + {}", a, b));
103 outputs
104 }
105
106 graph.merge(
107 combine,
108 Some("Combine"),
109 vec![
110 (branch_a_id, "output", "from_a"), // Branch A's "output" → merge function's "from_a"
111 (branch_b_id, "output", "from_b") // Branch B's "output" → merge function's "from_b"
112 ],
113 Some(vec![("merged", "final_result")]) // Merge function's "merged" → context's "final_result"
114 );
115
116 println!("✓ Added merge node");
117 println!(" Branch-specific input mapping:");
118 println!(" Branch {} 'output' → merge function's 'from_a'", branch_a_id);
119 println!(" Branch {} 'output' → merge function's 'from_b'", branch_b_id);
120 println!(" Output mapping: merge function's 'merged' → context's 'final_result'\n");
121
122 // Variant example with factory pattern
123 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
124 move |inputs, _variant| {
125 let default = "1.0".to_string();
126 let data = inputs.get("value").unwrap_or(&default);
127 if let Ok(val) = data.parse::<f64>() {
128 let mut outputs = HashMap::new();
129 outputs.insert("scaled".to_string(), (val * factor).to_string());
130 outputs
131 } else {
132 HashMap::new()
133 }
134 }
135 }
136
137 graph.variant(
138 make_multiplier,
139 vec![2.0, 3.0, 5.0],
140 Some("Multiply"),
141 Some(vec![("final_result", "value")]), // Context's "final_result" → function's "value"
142 Some(vec![("scaled", "multiplied")]) // Function's "scaled" → context's "multiplied"
143 );
144
145 println!("✓ Added variant nodes with parameter sweep");
146 println!(" Three variants: factor = 2.0, 3.0, 5.0");
147 println!(" Input mapping: context's 'final_result' → function's 'value'");
148 println!(" Output mapping: function's 'scaled' → context's 'multiplied'\n");
149
150 println!("=== Summary ===");
151 println!("The tuple-based API provides clear separation between:");
152 println!("1. Context variable names (broadcast vars) - shared across the graph");
153 println!("2. Function parameter names (impl vars) - internal to each function");
154 println!("\nThis allows:");
155 println!("- Branches to use consistent internal naming (x, y)");
156 println!("- Merge to distinguish branch outputs using branch IDs");
157 println!("- Clear data flow visualization and debugging");
158}122fn main() {
123 println!("═══════════════════════════════════════════════════════════");
124 println!(" Variant Pattern Demo (sigexec-style)");
125 println!(" Full Actual Syntax Examples");
126 println!("═══════════════════════════════════════════════════════════\n");
127
128 // =========================================================================
129 // Demo 1: Single Variant - Basic Factory Pattern
130 // =========================================================================
131 println!("Demo 1: Single Variant with Factory Function");
132 println!("─────────────────────────────────────────────────────────\n");
133
134 println!("📝 Code:");
135 println!("```rust");
136 println!("fn make_scaler(factor: f64) -> impl Fn(...) -> ... {{");
137 println!(" move |inputs, _| {{");
138 println!(" let value = inputs.get(\"input_data\").unwrap().parse::<f64>().unwrap();");
139 println!(" let scaled = value * factor;");
140 println!(" outputs.insert(\"scaled_value\", scaled.to_string());");
141 println!(" }}");
142 println!("}}");
143 println!();
144 println!("let mut graph = Graph::new();");
145 println!("graph.add(data_source, Some(\"Source\"), None, Some(vec![(\"value\", \"data\")]));");
146 println!("graph.variant(");
147 println!(" make_scaler, // Factory function");
148 println!(" vec![2.0, 3.0, 5.0], // Parameter values to sweep");
149 println!(" Some(\"Scale\"), // Label");
150 println!(" Some(vec![(\"data\", \"input_data\")]), // Input mapping");
151 println!(" Some(vec![(\"scaled_value\", \"result\")]) // Output mapping");
152 println!(");");
153 println!("```\n");
154
155 let mut graph1 = Graph::new();
156 graph1.add(
157 data_source,
158 Some("Source"),
159 None,
160 Some(vec![("value", "data")])
161 );
162 graph1.variant(
163 make_scaler,
164 vec![2.0, 3.0, 5.0],
165 Some("Scale"),
166 Some(vec![("data", "input_data")]),
167 Some(vec![("scaled_value", "result")])
168 );
169
170 let dag1 = graph1.build();
171 println!("🎯 What happens:");
172 println!(" • Factory creates 3 nodes: Scale_2.0, Scale_3.0, Scale_5.0");
173 println!(" • Each node multiplies input by its factor");
174 println!(" • All variants can execute in parallel");
175 println!();
176
177 let stats1 = dag1.stats();
178 println!("📈 DAG Statistics:");
179 println!(" - Total nodes: {}", stats1.node_count);
180 println!(" - Depth: {} levels", stats1.depth);
181 println!(" - Max parallelism: {} nodes can run simultaneously", stats1.max_parallelism);
182 println!();
183
184 println!("🔍 Mermaid Visualization:");
185 println!("{}", dag1.to_mermaid());
186 println!();
187
188 // =========================================================================
189 // Demo 2: Multiple Variants - Cartesian Product
190 // =========================================================================
191 println!("\nDemo 2: Multiple Variants (Cartesian Product)");
192 println!("─────────────────────────────────────────────────────────\n");
193
194 println!("📝 Code:");
195 println!("```rust");
196 println!("graph.add(data_source, Some(\"Generate\"), None, Some(vec![(\"value\", \"data\")]));");
197 println!("graph.variant(make_scaler, vec![2.0, 3.0], Some(\"Scale\"), ...);");
198 println!("graph.variant(make_offsetter, vec![10, 20], Some(\"Offset\"), ...);");
199 println!("graph.add(stats_node, Some(\"Stats\"), Some(vec![(\"result\", \"result\")]), None);");
200 println!("```\n");
201
202 let mut graph2 = Graph::new();
203 graph2.add(
204 data_source,
205 Some("Generate"),
206 None,
207 Some(vec![("value", "data")])
208 );
209 graph2.variant(
210 make_scaler,
211 vec![2.0, 3.0],
212 Some("Scale"),
213 Some(vec![("data", "input_data")]),
214 Some(vec![("scaled_value", "result")])
215 );
216 graph2.variant(
217 make_offsetter,
218 vec![10, 20],
219 Some("Offset"),
220 Some(vec![("result", "number")]),
221 Some(vec![("offset_result", "result")])
222 );
223 graph2.add(
224 stats_node,
225 Some("Stats"),
226 Some(vec![("result", "result")]),
227 Some(vec![("summary", "final")])
228 );
229
230 let dag2 = graph2.build();
231 println!("🎯 What happens:");
232 println!(" • Scale creates 2 variants: x2.0, x3.0");
233 println!(" • Offset creates 2 variants: +10, +20");
234 println!(" • Total combinations: 2 × 2 = 4 execution paths");
235 println!(" • Each path: Generate → Scale[variant] → Offset[variant] → Stats");
236 println!();
237
238 let stats2 = dag2.stats();
239 println!("📈 DAG Statistics:");
240 println!(" - Total nodes: {}", stats2.node_count);
241 println!(" - Depth: {} levels", stats2.depth);
242 println!(" - Execution paths: 4 (2 scales × 2 offsets)");
243 println!();
244
245 println!("🔍 Mermaid Visualization:");
246 println!("{}", dag2.to_mermaid());
247 println!();
248
249 // =========================================================================
250 // Demo 3: Complex Factory - Struct Configuration
251 // =========================================================================
252 println!("\nDemo 3: Complex Factory with Struct Configuration");
253 println!("─────────────────────────────────────────────────────────\n");
254
255 println!("📝 Code:");
256 println!("```rust");
257 println!("#[derive(Clone)]");
258 println!("struct FilterConfig {{");
259 println!(" cutoff: f64,");
260 println!(" mode: String,");
261 println!("}}");
262 println!();
263 println!("fn make_filter(config: FilterConfig) -> impl Fn(...) -> ... {{");
264 println!(" move |inputs, _| {{");
265 println!(" let value = inputs.get(\"data\").unwrap().parse::<f64>().unwrap();");
266 println!(" let filtered = match config.mode.as_str() {{");
267 println!(" \"lowpass\" => value * config.cutoff,");
268 println!(" \"highpass\" => value * (1.0 - config.cutoff),");
269 println!(" _ => value,");
270 println!(" }};");
271 println!(" }}");
272 println!("}}");
273 println!();
274 println!("let configs = vec![");
275 println!(" FilterConfig {{ cutoff: 0.5, mode: \"lowpass\".to_string() }},");
276 println!(" FilterConfig {{ cutoff: 0.3, mode: \"highpass\".to_string() }},");
277 println!(" FilterConfig {{ cutoff: 0.7, mode: \"lowpass\".to_string() }},");
278 println!("];");
279 println!("graph.variant(make_filter, configs, Some(\"Filter\"), ...);");
280 println!("```\n");
281
282 let configs = vec![
283 FilterConfig { cutoff: 0.5, mode: "lowpass".to_string() },
284 FilterConfig { cutoff: 0.3, mode: "highpass".to_string() },
285 FilterConfig { cutoff: 0.7, mode: "lowpass".to_string() },
286 ];
287
288 let mut graph3 = Graph::new();
289 graph3.add(
290 data_source,
291 Some("Source"),
292 None,
293 Some(vec![("value", "data")])
294 );
295 graph3.variant(
296 make_filter,
297 configs,
298 Some("Filter"),
299 Some(vec![("data", "data")]),
300 Some(vec![("filtered", "result")])
301 );
302
303 let dag3 = graph3.build();
304 println!("🎯 What happens:");
305 println!(" • 3 filter variants created with different configurations");
306 println!(" • Each variant uses its own FilterConfig struct");
307 println!(" • Demonstrates passing complex types to factory");
308 println!();
309
310 let stats3 = dag3.stats();
311 println!("📈 DAG Statistics:");
312 println!(" - Total nodes: {}", stats3.node_count);
313 println!(" - Filter variants: 3");
314 println!(" - Max parallelism: {} nodes", stats3.max_parallelism);
315 println!();
316
317 // =========================================================================
318 // Demo 4: String Processing Variants
319 // =========================================================================
320 println!("\nDemo 4: String Processing Variants");
321 println!("─────────────────────────────────────────────────────────\n");
322
323 println!("📝 Code:");
324 println!("```rust");
325 println!("fn make_processor(prefix: &'static str) -> impl Fn(...) -> ... {{");
326 println!(" move |inputs, _| {{");
327 println!(" let text = inputs.get(\"text\").unwrap();");
328 println!(" let processed = format!(\"[{{}}] {{}}\", prefix, text);");
329 println!(" outputs.insert(\"processed_text\", processed);");
330 println!(" }}");
331 println!("}}");
332 println!();
333 println!("graph.variant(");
334 println!(" make_processor,");
335 println!(" vec![\"INFO\", \"WARN\", \"ERROR\"],");
336 println!(" Some(\"LogLevel\"),");
337 println!(" Some(vec![(\"message\", \"text\")]),");
338 println!(" Some(vec![(\"processed_text\", \"log\")])");
339 println!(");");
340 println!("```\n");
341
342 let mut graph4 = Graph::new();
343 graph4.add(
344 text_source,
345 Some("Source"),
346 None,
347 Some(vec![("message", "message")])
348 );
349 graph4.variant(
350 make_processor,
351 vec!["INFO", "WARN", "ERROR"],
352 Some("LogLevel"),
353 Some(vec![("message", "text")]),
354 Some(vec![("processed_text", "log")])
355 );
356
357 let dag4 = graph4.build();
358 println!("🎯 What happens:");
359 println!(" • 3 log level variants: INFO, WARN, ERROR");
360 println!(" • Each prefixes the message with its log level");
361 println!(" • Demonstrates string/static str parameters");
362 println!();
363
364 let stats4 = dag4.stats();
365 println!("📈 DAG Statistics:");
366 println!(" - Total nodes: {}", stats4.node_count);
367 println!(" - Log variants: 3");
368 println!();
369
370 println!("🔍 Mermaid Visualization:");
371 println!("{}", dag4.to_mermaid());
372 println!();
373
374 // =========================================================================
375 // Summary
376 // =========================================================================
377 println!("\n═══════════════════════════════════════════════════════════");
378 println!(" Summary: Key Variant Pattern Features");
379 println!("═══════════════════════════════════════════════════════════\n");
380
381 println!("✅ Factory Function Pattern:");
382 println!(" • Factory takes parameter(s), returns closure");
383 println!(" • Closure captures parameters in its environment");
384 println!(" • Same signature as regular node functions");
385 println!();
386
387 println!("✅ Parameter Flexibility:");
388 println!(" • Primitives: f64, i32, &str");
389 println!(" • Structs: Custom configuration objects");
390 println!(" • Arrays/Vectors: Multiple values at once");
391 println!();
392
393 println!("✅ Cartesian Products:");
394 println!(" • Multiple .variant() calls create all combinations");
395 println!(" • Example: 2 scales × 3 filters = 6 execution paths");
396 println!();
397
398 println!("✅ Port Mapping:");
399 println!(" • Variants use same tuple-based syntax");
400 println!(" • (broadcast_var, impl_var) for inputs");
401 println!(" • (impl_var, broadcast_var) for outputs");
402 println!();
403
404 println!("✅ Parallel Execution:");
405 println!(" • All variants at same level can run in parallel");
406 println!(" • DAG analysis identifies parallelization opportunities");
407 println!();
408}Sourcepub fn add<F>(
&mut self,
function_handle: F,
label: Option<&str>,
inputs: Option<Vec<(&str, &str)>>,
outputs: Option<Vec<(&str, &str)>>,
) -> &mut Self
pub fn add<F>( &mut self, function_handle: F, label: Option<&str>, inputs: Option<Vec<(&str, &str)>>, outputs: Option<Vec<(&str, &str)>>, ) -> &mut Self
Add a node to the graph with implicit connections
§Arguments
function_handle- The function to execute for this nodelabel- Optional label for visualizationinputs- Optional list of (broadcast_var, impl_var) tuples for inputsoutputs- Optional list of (impl_var, broadcast_var) tuples for outputs
§Implicit Connection Behavior
- The first node added has no dependencies
- Subsequent nodes automatically depend on the previous node
- This creates a natural sequential flow unless
.branch()is used
§Function Signature
Functions receive two parameters:
inputs: &HashMap<String, String>- Mapped input variables (impl_var names)variant_params: &HashMap<String, String>- Variant parameter values
Functions return outputs using impl_var names, which get mapped to broadcast_var names.
§Example
// Function sees "input_data", context has "data"
// Function returns "output_value", gets stored as "result" in context
graph.add(
process_fn,
Some("Process"),
Some(vec![("data", "input_data")]), // (broadcast, impl)
Some(vec![("output_value", "result")]) // (impl, broadcast)
);Examples found in repository?
24fn demo_simple_output_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Simple Pipeline Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 1: Generate initial data
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("initial_value".to_string(), "100".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("initial_value", "raw_data")]) // Maps initial_value → raw_data
41 );
42
43 // Node 2: Process the data
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("input").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("processed".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Process"),
52 Some(vec![("raw_data", "input")]), // Maps raw_data → input
53 Some(vec![("processed", "final_result")]) // Maps processed → final_result
54 );
55
56 // Build and execute
57 let dag = graph.build();
58 let context = dag.execute(false, None);
59
60 println!("📦 Execution Context (all variables):");
61 for (key, value) in &context {
62 println!(" {} = {}", key, value);
63 }
64
65 println!("\n🎯 Accessing specific outputs:");
66
67 // Access by broadcast variable name (what's in the graph context)
68 if let Some(raw_data) = context.get("raw_data") {
69 println!(" raw_data: {}", raw_data);
70 }
71
72 if let Some(final_result) = context.get("final_result") {
73 println!(" final_result: {}", final_result);
74 }
75
76 // Check if a variable exists
77 if context.contains_key("final_result") {
78 println!("\n✅ Final result is available in context");
79 }
80
81 println!();
82}
83
84fn demo_branch_output_access() {
85 println!("─────────────────────────────────────────────────────────");
86 println!("Demo 2: Branch Output Access (Parallel Paths)");
87 println!("─────────────────────────────────────────────────────────\n");
88
89 let mut graph = Graph::new();
90
91 // Source node
92 graph.add(
93 |_: &HashMap<String, String>, _| {
94 let mut result = HashMap::new();
95 result.insert("value".to_string(), "50".to_string());
96 result
97 },
98 Some("Source"),
99 None,
100 Some(vec![("value", "shared_data")])
101 );
102
103 // Branch A: Statistics computation
104 let mut branch_a = Graph::new();
105 branch_a.add(
106 |inputs: &HashMap<String, String>, _| {
107 let val = inputs.get("data").unwrap();
108 let mut result = HashMap::new();
109 result.insert("stats_output".to_string(), format!("Stats of {}", val));
110 result
111 },
112 Some("Stats"),
113 Some(vec![("shared_data", "data")]),
114 Some(vec![("stats_output", "statistics")]) // Branch A produces "statistics"
115 );
116
117 // Branch B: Model training
118 let mut branch_b = Graph::new();
119 branch_b.add(
120 |inputs: &HashMap<String, String>, _| {
121 let val = inputs.get("data").unwrap();
122 let mut result = HashMap::new();
123 result.insert("model_output".to_string(), format!("Model trained on {}", val));
124 result
125 },
126 Some("Train"),
127 Some(vec![("shared_data", "data")]),
128 Some(vec![("model_output", "model")]) // Branch B produces "model"
129 );
130
131 // Branch C: Visualization
132 let mut branch_c = Graph::new();
133 branch_c.add(
134 |inputs: &HashMap<String, String>, _| {
135 let val = inputs.get("data").unwrap();
136 let mut result = HashMap::new();
137 result.insert("viz_output".to_string(), format!("Plot of {}", val));
138 result
139 },
140 Some("Visualize"),
141 Some(vec![("shared_data", "data")]),
142 Some(vec![("viz_output", "visualization")]) // Branch C produces "visualization"
143 );
144
145 // Add branches to main graph
146 graph.branch(branch_a);
147 graph.branch(branch_b);
148 graph.branch(branch_c);
149
150 // Execute
151 let dag = graph.build();
152 let context = dag.execute(false, None);
153
154 println!("📦 All outputs from parallel branches:");
155
156 // Access each branch's output
157 if let Some(stats) = context.get("statistics") {
158 println!(" Branch A (Statistics): {}", stats);
159 }
160
161 if let Some(model) = context.get("model") {
162 println!(" Branch B (Model): {}", model);
163 }
164
165 if let Some(viz) = context.get("visualization") {
166 println!(" Branch C (Visualization): {}", viz);
167 }
168
169 println!("\n🔍 All variables in context:");
170 for (key, value) in &context {
171 println!(" {} = {}", key, value);
172 }
173
174 println!();
175}
176
177fn demo_variant_output_access() {
178 println!("─────────────────────────────────────────────────────────");
179 println!("Demo 3: Variant Output Access (Parameter Sweep)");
180 println!("─────────────────────────────────────────────────────────\n");
181
182 let mut graph = Graph::new();
183
184 // Source node
185 graph.add(
186 |_: &HashMap<String, String>, _| {
187 let mut result = HashMap::new();
188 result.insert("value".to_string(), "10.0".to_string());
189 result
190 },
191 Some("DataSource"),
192 None,
193 Some(vec![("value", "input_data")])
194 );
195
196 // Variant nodes: scale by different factors
197 // Factory function that creates a scaler for each factor
198 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
199 move |inputs: &HashMap<String, String>, _| {
200 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
201 let mut result = HashMap::new();
202 result.insert("scaled".to_string(), (value * factor).to_string());
203 result
204 }
205 }
206
207 graph.variant(
208 make_scaler,
209 vec![2.0, 3.0, 5.0],
210 Some("Scale"),
211 Some(vec![("input_data", "data")]),
212 Some(vec![("scaled", "result")]) // Each variant produces "result"
213 );
214
215 // Execute
216 let dag = graph.build();
217 let context = dag.execute(false, None);
218
219 println!("📦 Variant outputs:");
220 println!(" Note: Variants with same output name overwrite each other");
221 println!(" The last variant (factor=5.0) writes to 'result'\n");
222
223 // Access the result (will be from the last variant)
224 if let Some(result) = context.get("result") {
225 println!(" result = {} (from last variant: 10.0 * 5.0)", result);
226 }
227
228 println!("\n💡 Tip: To preserve all variant outputs, use unique output names:");
229 println!(" Option 1: Map each variant to a different broadcast variable");
230 println!(" Option 2: Collect results in merge node");
231 println!(" Option 3: Use variant_params or closure capture to distinguish");
232
233 // Better approach: unique output names per variant
234 let mut graph2 = Graph::new();
235
236 graph2.add(
237 |_: &HashMap<String, String>, _| {
238 let mut result = HashMap::new();
239 result.insert("value".to_string(), "10.0".to_string());
240 result
241 },
242 Some("DataSource"),
243 None,
244 Some(vec![("value", "input_data")])
245 );
246
247 // Variant 1: 2x
248 fn make_scaler_unique(label: &str, factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> + '_ {
249 move |inputs: &HashMap<String, String>, _| {
250 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
251 let mut result = HashMap::new();
252 result.insert(label.to_string(), (value * factor).to_string());
253 result
254 }
255 }
256
257 graph2.variant(
258 |_x: &str| make_scaler_unique("scaled_2x", 2.0),
259 vec!["2x"],
260 Some("Scale2x"),
261 Some(vec![("input_data", "data")]),
262 Some(vec![("scaled_2x", "result_2x")])
263 );
264
265 graph2.variant(
266 |_x: &str| make_scaler_unique("scaled_3x", 3.0),
267 vec!["3x"],
268 Some("Scale3x"),
269 Some(vec![("input_data", "data")]),
270 Some(vec![("scaled_3x", "result_3x")])
271 );
272
273 let dag2 = graph2.build();
274 let context2 = dag2.execute(false, None);
275
276 println!("\n✅ Better approach - unique output names:");
277 if let Some(result_2x) = context2.get("result_2x") {
278 println!(" result_2x = {}", result_2x);
279 }
280 if let Some(result_3x) = context2.get("result_3x") {
281 println!(" result_3x = {}", result_3x);
282 }
283
284 println!();
285}
286
287fn demo_multiple_outputs() {
288 println!("─────────────────────────────────────────────────────────");
289 println!("Demo 4: Multiple Outputs from Single Node");
290 println!("─────────────────────────────────────────────────────────\n");
291
292 let mut graph = Graph::new();
293
294 // Node that produces multiple outputs
295 graph.add(
296 |_: &HashMap<String, String>, _| {
297 let mut result = HashMap::new();
298 result.insert("mean".to_string(), "50.5".to_string());
299 result.insert("median".to_string(), "48.0".to_string());
300 result.insert("stddev".to_string(), "12.3".to_string());
301 result.insert("count".to_string(), "100".to_string());
302 result
303 },
304 Some("Statistics"),
305 None,
306 Some(vec![
307 ("mean", "stat_mean"),
308 ("median", "stat_median"),
309 ("stddev", "stat_stddev"),
310 ("count", "sample_count")
311 ])
312 );
313
314 // Execute
315 let dag = graph.build();
316 let context = dag.execute(false, None);
317
318 println!("📊 Multiple outputs from single node:");
319
320 // Access each output individually
321 println!(" Mean: {}", context.get("stat_mean").unwrap());
322 println!(" Median: {}", context.get("stat_median").unwrap());
323 println!(" StdDev: {}", context.get("stat_stddev").unwrap());
324 println!(" Count: {}", context.get("sample_count").unwrap());
325
326 println!("\n📋 Complete execution context:");
327 for (key, value) in &context {
328 println!(" {} = {}", key, value);
329 }
330
331 println!("\n💡 Summary:");
332 println!(" ✓ dag.execute(false, None) returns HashMap<String, String>");
333 println!(" ✓ Keys are broadcast variable names (from output mappings)");
334 println!(" ✓ Use context.get(\"variable_name\") to access specific outputs");
335 println!(" ✓ All outputs accumulate in the context throughout execution");
336
337 println!();
338}More examples
28fn demo_simple_pipeline() {
29 println!("─────────────────────────────────────────────────────────");
30 println!("Demo 1: Simple Sequential Pipeline");
31 println!("─────────────────────────────────────────────────────────");
32
33 let mut graph = Graph::new();
34
35 // Data source node
36 graph.add(
37 |_: &HashMap<String, String>, _| {
38 let mut result = HashMap::new();
39 result.insert("value".to_string(), "42".to_string());
40 result
41 },
42 Some("DataSource"),
43 None, // No inputs (source node)
44 Some(vec![("value", "data")]) // (impl_var, broadcast_var)
45 );
46
47 // Processing node: multiply by 2
48 graph.add(
49 |inputs: &HashMap<String, String>, _| {
50 let mut result = HashMap::new();
51 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
52 result.insert("doubled".to_string(), (val * 2).to_string());
53 }
54 result
55 },
56 Some("Multiply"),
57 Some(vec![("data", "x")]), // (broadcast_var, impl_var)
58 Some(vec![("doubled", "result")]) // (impl_var, broadcast_var)
59 );
60
61 // Final processing: add 10
62 graph.add(
63 |inputs: &HashMap<String, String>, _| {
64 let mut result = HashMap::new();
65 if let Some(val) = inputs.get("num").and_then(|s| s.parse::<i32>().ok()) {
66 result.insert("sum".to_string(), (val + 10).to_string());
67 }
68 result
69 },
70 Some("AddTen"),
71 Some(vec![("result", "num")]),
72 Some(vec![("sum", "final")])
73 );
74
75 let dag = graph.build();
76 println!("\n📊 Execution:");
77 let context = dag.execute(false, None);
78
79 println!(" Input: data = {}", context.get("data").unwrap());
80 println!(" Step 1: result = {} (data * 2)", context.get("result").unwrap());
81 println!(" Step 2: final = {} (result + 10)", context.get("final").unwrap());
82
83 println!("\n📈 DAG Statistics:");
84 let stats = dag.stats();
85 println!("{}", stats.summary());
86
87 println!("\n🔍 Mermaid Visualization:");
88 println!("{}", dag.to_mermaid());
89 println!();
90}
91
92fn demo_branching() {
93 println!("─────────────────────────────────────────────────────────");
94 println!("Demo 2: Parallel Branching (Fan-Out)");
95 println!("─────────────────────────────────────────────────────────");
96
97 let mut graph = Graph::new();
98
99 // Source node
100 graph.add(
101 |_: &HashMap<String, String>, _| {
102 let mut result = HashMap::new();
103 result.insert("dataset".to_string(), "100".to_string());
104 result
105 },
106 Some("Source"),
107 None,
108 Some(vec![("dataset", "data")])
109 );
110
111 // Branch A: Compute statistics
112 let mut branch_a = Graph::new();
113 branch_a.add(
114 |inputs: &HashMap<String, String>, _| {
115 let mut result = HashMap::new();
116 if let Some(val) = inputs.get("input") {
117 result.insert("stats".to_string(), format!("Mean: {}", val));
118 }
119 result
120 },
121 Some("Statistics"),
122 Some(vec![("data", "input")]),
123 Some(vec![("stats", "stats_result")])
124 );
125
126 // Branch B: Train model
127 let mut branch_b = Graph::new();
128 branch_b.add(
129 |inputs: &HashMap<String, String>, _| {
130 let mut result = HashMap::new();
131 if let Some(val) = inputs.get("input") {
132 result.insert("model".to_string(), format!("Model trained on {}", val));
133 }
134 result
135 },
136 Some("MLModel"),
137 Some(vec![("data", "input")]),
138 Some(vec![("model", "model_result")])
139 );
140
141 // Branch C: Generate visualization
142 let mut branch_c = Graph::new();
143 branch_c.add(
144 |inputs: &HashMap<String, String>, _| {
145 let mut result = HashMap::new();
146 if let Some(val) = inputs.get("input") {
147 result.insert("plot".to_string(), format!("Plot of {}", val));
148 }
149 result
150 },
151 Some("Visualization"),
152 Some(vec![("data", "input")]),
153 Some(vec![("plot", "viz_result")])
154 );
155
156 graph.branch(branch_a);
157 graph.branch(branch_b);
158 graph.branch(branch_c);
159
160 let dag = graph.build();
161 println!("\n📊 Execution:");
162 let context = dag.execute(false, None);
163
164 println!(" Source: data = {}", context.get("data").unwrap());
165 println!(" Branch A (Stats): {}", context.get("stats_result").unwrap());
166 println!(" Branch B (Model): {}", context.get("model_result").unwrap());
167 println!(" Branch C (Viz): {}", context.get("viz_result").unwrap());
168
169 println!("\n📈 DAG Statistics:");
170 let stats = dag.stats();
171 println!("{}", stats.summary());
172 println!(" ⚡ All 3 branches can execute in parallel!");
173
174 println!("\n🔍 Mermaid Visualization:");
175 println!("{}", dag.to_mermaid());
176 println!();
177}
178
179fn demo_merging() {
180 println!("─────────────────────────────────────────────────────────");
181 println!("Demo 3: Branching + Merging (Fan-Out + Fan-In)");
182 println!("─────────────────────────────────────────────────────────");
183
184 let mut graph = Graph::new();
185
186 // Source
187 graph.add(
188 |_: &HashMap<String, String>, _| {
189 let mut result = HashMap::new();
190 result.insert("value".to_string(), "50".to_string());
191 result
192 },
193 Some("Source"),
194 None,
195 Some(vec![("value", "data")])
196 );
197
198 // Branch A: Add 10
199 let mut branch_a = Graph::new();
200 branch_a.add(
201 |inputs: &HashMap<String, String>, _| {
202 let mut result = HashMap::new();
203 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
204 result.insert("output".to_string(), (val + 10).to_string());
205 }
206 result
207 },
208 Some("PathA (+10)"),
209 Some(vec![("data", "x")]),
210 Some(vec![("output", "result")]) // Both branches use same output name!
211 );
212
213 // Branch B: Add 20
214 let mut branch_b = Graph::new();
215 branch_b.add(
216 |inputs: &HashMap<String, String>, _| {
217 let mut result = HashMap::new();
218 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
219 result.insert("output".to_string(), (val + 20).to_string());
220 }
221 result
222 },
223 Some("PathB (+20)"),
224 Some(vec![("data", "x")]),
225 Some(vec![("output", "result")]) // Both branches use same output name!
226 );
227
228 let branch_a_id = graph.branch(branch_a);
229 let branch_b_id = graph.branch(branch_b);
230
231 // Merge node: Combine results from both branches
232 graph.merge(
233 |inputs: &HashMap<String, String>, _| {
234 let mut result = HashMap::new();
235 let a = inputs.get("from_a").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
236 let b = inputs.get("from_b").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
237 result.insert("combined".to_string(), format!("{} + {} = {}", a, b, a + b));
238 result
239 },
240 Some("Merge"),
241 vec![
242 (branch_a_id, "result", "from_a"), // Map branch A's "result" to merge fn's "from_a"
243 (branch_b_id, "result", "from_b") // Map branch B's "result" to merge fn's "from_b"
244 ],
245 Some(vec![("combined", "final")])
246 );
247
248 let dag = graph.build();
249 println!("\n📊 Execution:");
250 let context = dag.execute(false, None);
251
252 println!(" Source: data = {}", context.get("data").unwrap());
253 println!(" Branch A: 50 + 10 = 60");
254 println!(" Branch B: 50 + 20 = 70");
255 println!(" Merged: {}", context.get("final").unwrap());
256
257 println!("\n📈 DAG Statistics:");
258 let stats = dag.stats();
259 println!("{}", stats.summary());
260
261 println!("\n🔍 Mermaid Visualization:");
262 println!("{}", dag.to_mermaid());
263 println!();
264}
265
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}
323
324fn demo_complex_graph() {
325 println!("─────────────────────────────────────────────────────────");
326 println!("Demo 5: Complex Graph (All Features Combined)");
327 println!("─────────────────────────────────────────────────────────");
328
329 let mut graph = Graph::new();
330
331 // 1. Data ingestion
332 graph.add(
333 |_: &HashMap<String, String>, _| {
334 let mut result = HashMap::new();
335 result.insert("raw_data".to_string(), "1000".to_string());
336 result
337 },
338 Some("Ingest"),
339 None,
340 Some(vec![("raw_data", "data")])
341 );
342
343 // 2. Preprocessing
344 graph.add(
345 |inputs: &HashMap<String, String>, _| {
346 let mut result = HashMap::new();
347 if let Some(val) = inputs.get("raw").and_then(|s| s.parse::<i32>().ok()) {
348 result.insert("cleaned".to_string(), (val / 10).to_string());
349 }
350 result
351 },
352 Some("Preprocess"),
353 Some(vec![("data", "raw")]),
354 Some(vec![("cleaned", "clean_data")])
355 );
356
357 // 3. Branch for different analyses
358 let mut stats_branch = Graph::new();
359 stats_branch.add(
360 |inputs: &HashMap<String, String>, _| {
361 let mut result = HashMap::new();
362 if let Some(val) = inputs.get("data") {
363 result.insert("stats".to_string(), format!("Stats({})", val));
364 }
365 result
366 },
367 Some("Stats"),
368 Some(vec![("clean_data", "data")]),
369 Some(vec![("stats", "statistics")])
370 );
371
372 let mut ml_branch = Graph::new();
373 ml_branch.add(
374 |inputs: &HashMap<String, String>, _| {
375 let mut result = HashMap::new();
376 if let Some(val) = inputs.get("data") {
377 result.insert("prediction".to_string(), format!("Pred({})", val));
378 }
379 result
380 },
381 Some("ML"),
382 Some(vec![("clean_data", "data")]),
383 Some(vec![("prediction", "ml_result")])
384 );
385
386 let stats_id = graph.branch(stats_branch);
387 let ml_id = graph.branch(ml_branch);
388
389 // 4. Merge branches
390 graph.merge(
391 |inputs: &HashMap<String, String>, _| {
392 let mut result = HashMap::new();
393 let stats = inputs.get("stats_in").cloned().unwrap_or_default();
394 let ml = inputs.get("ml_in").cloned().unwrap_or_default();
395 result.insert("report".to_string(), format!("{} & {}", stats, ml));
396 result
397 },
398 Some("Combine"),
399 vec![
400 (stats_id, "statistics", "stats_in"),
401 (ml_id, "ml_result", "ml_in")
402 ],
403 Some(vec![("report", "final_report")])
404 );
405
406 // 5. Final output formatting
407 graph.add(
408 |inputs: &HashMap<String, String>, _| {
409 let mut result = HashMap::new();
410 if let Some(report) = inputs.get("report") {
411 result.insert("formatted".to_string(), format!("[FINAL] {}", report));
412 }
413 result
414 },
415 Some("Format"),
416 Some(vec![("final_report", "report")]),
417 Some(vec![("formatted", "output")])
418 );
419
420 let dag = graph.build();
421 println!("\n📊 Execution:");
422 let context = dag.execute(false, None);
423
424 println!(" Step 1: Ingest → data = {}", context.get("data").unwrap());
425 println!(" Step 2: Preprocess → clean_data = {}", context.get("clean_data").unwrap());
426 println!(" Step 3: Branch A → statistics = {}", context.get("statistics").unwrap());
427 println!(" Branch B → ml_result = {}", context.get("ml_result").unwrap());
428 println!(" Step 4: Merge → final_report = {}", context.get("final_report").unwrap());
429 println!(" Step 5: Format → output = {}", context.get("output").unwrap());
430
431 println!("\n📈 DAG Statistics:");
432 let stats = dag.stats();
433 println!("{}", stats.summary());
434
435 println!("\n📋 Execution Order:");
436 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
437 println!(" Level {}: {} nodes", level_idx, level.len());
438 for &node_id in level {
439 let node = dag.nodes().iter().find(|n| n.id == node_id).unwrap();
440 println!(" - {}", node.display_name());
441 }
442 }
443
444 println!("\n🔍 Mermaid Visualization:");
445 println!("{}", dag.to_mermaid());
446 println!();
447
448 println!("═══════════════════════════════════════════════════════════");
449 println!(" Demo Complete!");
450 println!("═══════════════════════════════════════════════════════════");
451}24fn demo_per_node_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Per-Node Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 0: Source
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("value".to_string(), "10".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("value", "initial_data")])
41 );
42
43 // Node 1: Double
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("doubled".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Double"),
52 Some(vec![("initial_data", "in")]),
53 Some(vec![("doubled", "doubled_data")])
54 );
55
56 // Node 2: Add Ten
57 graph.add(
58 |inputs: &HashMap<String, String>, _| {
59 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
60 let mut result = HashMap::new();
61 result.insert("added".to_string(), (value + 10).to_string());
62 result
63 },
64 Some("AddTen"),
65 Some(vec![("doubled_data", "in")]),
66 Some(vec![("added", "final_result")])
67 );
68
69 // Execute and get detailed results
70 let dag = graph.build();
71 let result = dag.execute_detailed(false, None);
72
73 println!("🌍 Global Context (all variables):");
74 for (key, value) in &result.context {
75 println!(" {} = {}", key, value);
76 }
77
78 println!("\n📦 Per-Node Outputs:");
79 println!("\nNode 0 (Source) outputs:");
80 if let Some(outputs) = result.get_node_outputs(0) {
81 for (key, value) in outputs {
82 println!(" {} = {}", key, value);
83 }
84 }
85
86 println!("\nNode 1 (Double) outputs:");
87 if let Some(outputs) = result.get_node_outputs(1) {
88 for (key, value) in outputs {
89 println!(" {} = {}", key, value);
90 }
91 }
92
93 println!("\nNode 2 (AddTen) outputs:");
94 if let Some(outputs) = result.get_node_outputs(2) {
95 for (key, value) in outputs {
96 println!(" {} = {}", key, value);
97 }
98 }
99
100 println!("\n🎯 Accessing specific node outputs:");
101 if let Some(value) = result.get_from_node(0, "initial_data") {
102 println!(" Node 0 'initial_data': {}", value);
103 }
104 if let Some(value) = result.get_from_node(1, "doubled_data") {
105 println!(" Node 1 'doubled_data': {}", value);
106 }
107 if let Some(value) = result.get_from_node(2, "final_result") {
108 println!(" Node 2 'final_result': {}", value);
109 }
110
111 println!();
112}
113
114fn demo_per_branch_access() {
115 println!("─────────────────────────────────────────────────────────");
116 println!("Demo 2: Per-Branch Output Access");
117 println!("─────────────────────────────────────────────────────────\n");
118
119 let mut graph = Graph::new();
120
121 // Main graph: Source node
122 graph.add(
123 |_: &HashMap<String, String>, _| {
124 let mut result = HashMap::new();
125 result.insert("dataset".to_string(), "100".to_string());
126 result
127 },
128 Some("Source"),
129 None,
130 Some(vec![("dataset", "data")])
131 );
132
133 // Branch A: Statistics
134 let mut branch_a = Graph::new();
135 branch_a.add(
136 |inputs: &HashMap<String, String>, _| {
137 let value = inputs.get("input").unwrap();
138 let mut result = HashMap::new();
139 result.insert("stat_result".to_string(), format!("Mean of {}", value));
140 result
141 },
142 Some("Statistics"),
143 Some(vec![("data", "input")]),
144 Some(vec![("stat_result", "statistics")])
145 );
146
147 // Branch B: Model Training
148 let mut branch_b = Graph::new();
149 branch_b.add(
150 |inputs: &HashMap<String, String>, _| {
151 let value = inputs.get("input").unwrap();
152 let mut result = HashMap::new();
153 result.insert("model_result".to_string(), format!("Model trained on {}", value));
154 result
155 },
156 Some("ModelTraining"),
157 Some(vec![("data", "input")]),
158 Some(vec![("model_result", "trained_model")])
159 );
160
161 // Branch C: Visualization
162 let mut branch_c = Graph::new();
163 branch_c.add(
164 |inputs: &HashMap<String, String>, _| {
165 let value = inputs.get("input").unwrap();
166 let mut result = HashMap::new();
167 result.insert("viz_result".to_string(), format!("Plot of {}", value));
168 result
169 },
170 Some("Visualization"),
171 Some(vec![("data", "input")]),
172 Some(vec![("viz_result", "plot")])
173 );
174
175 let branch_a_id = graph.branch(branch_a);
176 let branch_b_id = graph.branch(branch_b);
177 let branch_c_id = graph.branch(branch_c);
178
179 // Execute and get detailed results
180 let dag = graph.build();
181 let result = dag.execute_detailed(false, None);
182
183 println!("🌍 Global Context:");
184 for (key, value) in &result.context {
185 println!(" {} = {}", key, value);
186 }
187
188 println!("\n🌿 Per-Branch Outputs:");
189
190 println!("\nBranch {} (Statistics) outputs:", branch_a_id);
191 if let Some(outputs) = result.get_branch_outputs(branch_a_id) {
192 for (key, value) in outputs {
193 println!(" {} = {}", key, value);
194 }
195 }
196
197 println!("\nBranch {} (Model Training) outputs:", branch_b_id);
198 if let Some(outputs) = result.get_branch_outputs(branch_b_id) {
199 for (key, value) in outputs {
200 println!(" {} = {}", key, value);
201 }
202 }
203
204 println!("\nBranch {} (Visualization) outputs:", branch_c_id);
205 if let Some(outputs) = result.get_branch_outputs(branch_c_id) {
206 for (key, value) in outputs {
207 println!(" {} = {}", key, value);
208 }
209 }
210
211 println!("\n🎯 Accessing specific branch outputs:");
212 if let Some(value) = result.get_from_branch(branch_a_id, "statistics") {
213 println!(" Branch {} 'statistics': {}", branch_a_id, value);
214 }
215 if let Some(value) = result.get_from_branch(branch_b_id, "trained_model") {
216 println!(" Branch {} 'trained_model': {}", branch_b_id, value);
217 }
218 if let Some(value) = result.get_from_branch(branch_c_id, "plot") {
219 println!(" Branch {} 'plot': {}", branch_c_id, value);
220 }
221
222 println!();
223}
224
225fn demo_variant_per_node_access() {
226 println!("─────────────────────────────────────────────────────────");
227 println!("Demo 3: Variant Outputs with Per-Node Tracking");
228 println!("─────────────────────────────────────────────────────────\n");
229
230 let mut graph = Graph::new();
231
232 // Source node
233 graph.add(
234 |_: &HashMap<String, String>, _| {
235 let mut result = HashMap::new();
236 result.insert("base_value".to_string(), "10".to_string());
237 result
238 },
239 Some("Source"),
240 None,
241 Some(vec![("base_value", "data")])
242 );
243
244 // Variant factory for scaling
245 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
246 move |inputs: &HashMap<String, String>, _| {
247 let value = inputs.get("input_data").unwrap().parse::<f64>().unwrap();
248 let mut result = HashMap::new();
249 result.insert("scaled_value".to_string(), (value * factor).to_string());
250 result
251 }
252 }
253
254 // Create variants with unique output names to preserve all results
255 graph.variant(
256 make_scaler,
257 vec![2.0, 3.0, 5.0],
258 Some("Scale"),
259 Some(vec![("data", "input_data")]),
260 Some(vec![("scaled_value", "result")]) // Note: will overwrite in global context
261 );
262
263 let dag = graph.build();
264 let result = dag.execute_detailed(false, None);
265
266 println!("🌍 Global Context (note: 'result' contains last variant's output):");
267 for (key, value) in &result.context {
268 println!(" {} = {}", key, value);
269 }
270
271 println!("\n📦 Per-Node Outputs (each variant tracked separately):");
272
273 // Node 0 is the source
274 // Nodes 1, 2, 3 are the variant nodes (2x, 3x, 5x scalers)
275 for node_id in 1..=3 {
276 println!("\nNode {} (Variant Scaler) outputs:", node_id);
277 if let Some(outputs) = result.get_node_outputs(node_id) {
278 for (key, value) in outputs {
279 println!(" {} = {}", key, value);
280 }
281 }
282 }
283
284 println!("\n💡 Key Insight:");
285 println!(" - Global context has 'result' = {} (last variant overwrites)", result.get("result").unwrap());
286 println!(" - But per-node outputs preserve ALL variant results:");
287 println!(" Node 1 (2x): result = {}", result.get_from_node(1, "result").unwrap());
288 println!(" Node 2 (3x): result = {}", result.get_from_node(2, "result").unwrap());
289 println!(" Node 3 (5x): result = {}", result.get_from_node(3, "result").unwrap());
290
291 println!();
292}
293
294fn demo_execution_history_tracking() {
295 println!("─────────────────────────────────────────────────────────");
296 println!("Demo 4: Execution History Tracking");
297 println!("─────────────────────────────────────────────────────────\n");
298
299 let mut graph = Graph::new();
300
301 // Multi-stage pipeline
302 graph.add(
303 |_: &HashMap<String, String>, _| {
304 let mut result = HashMap::new();
305 result.insert("raw".to_string(), "5".to_string());
306 result
307 },
308 Some("Load"),
309 None,
310 Some(vec![("raw", "input")])
311 );
312
313 graph.add(
314 |inputs: &HashMap<String, String>, _| {
315 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
316 let mut result = HashMap::new();
317 result.insert("cleaned".to_string(), (value + 1).to_string());
318 result
319 },
320 Some("Clean"),
321 Some(vec![("input", "x")]),
322 Some(vec![("cleaned", "clean_data")])
323 );
324
325 graph.add(
326 |inputs: &HashMap<String, String>, _| {
327 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
328 let mut result = HashMap::new();
329 result.insert("normalized".to_string(), (value * 10).to_string());
330 result
331 },
332 Some("Normalize"),
333 Some(vec![("clean_data", "x")]),
334 Some(vec![("normalized", "norm_data")])
335 );
336
337 graph.add(
338 |inputs: &HashMap<String, String>, _| {
339 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
340 let mut result = HashMap::new();
341 result.insert("transformed".to_string(), format!("FINAL_{}", value));
342 result
343 },
344 Some("Transform"),
345 Some(vec![("norm_data", "x")]),
346 Some(vec![("transformed", "output")])
347 );
348
349 let dag = graph.build();
350 let result = dag.execute_detailed(false, None);
351
352 println!("📊 Execution History (Data Flow Tracking):");
353 println!();
354 println!("Step-by-step transformation:");
355 println!(" 1. Load: input = {}", result.get_from_node(0, "input").unwrap());
356 println!(" 2. Clean: clean_data = {}", result.get_from_node(1, "clean_data").unwrap());
357 println!(" 3. Normalize: norm_data = {}", result.get_from_node(2, "norm_data").unwrap());
358 println!(" 4. Transform: output = {}", result.get_from_node(3, "output").unwrap());
359
360 println!("\n🔍 Debugging: Inspect any intermediate result:");
361 println!(" Need to debug the normalization step?");
362 println!(" Just check Node 2: {}", result.get_from_node(2, "norm_data").unwrap());
363
364 println!("\n✅ Benefits of Per-Node Access:");
365 println!(" ✓ Track data transformations step-by-step");
366 println!(" ✓ Debug issues by inspecting intermediate values");
367 println!(" ✓ Validate each processing stage independently");
368 println!(" ✓ Preserve all variant outputs even with name collisions");
369
370 println!();
371}32fn demo_sequential_vs_parallel() {
33 println!("─────────────────────────────────────────────────────────");
34 println!("Demo 1: Sequential vs Parallel Execution");
35 println!("─────────────────────────────────────────────────────────");
36
37 let mut graph = Graph::new();
38
39 // Source node
40 graph.add(
41 |_: &HashMap<String, String>, _| {
42 let start = Instant::now();
43 let mut result = HashMap::new();
44 result.insert("data".to_string(), "source_data".to_string());
45 println!(" [{}ms] Source completed", start.elapsed().as_millis());
46 result
47 },
48 Some("Source"),
49 None,
50 Some(vec![("data", "data")])
51 );
52
53 // Branch A: 100ms work
54 let mut branch_a = Graph::new();
55 branch_a.add(
56 |inputs: &HashMap<String, String>, _| {
57 let start = Instant::now();
58 let mut result = HashMap::new();
59 if let Some(data) = inputs.get("input") {
60 let processed = simulate_work(100, data);
61 result.insert("result".to_string(), processed);
62 }
63 println!(" [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64 result
65 },
66 Some("BranchA[100ms]"),
67 Some(vec![("data", "input")]),
68 Some(vec![("result", "result_a")])
69 );
70
71 // Branch B: 100ms work
72 let mut branch_b = Graph::new();
73 branch_b.add(
74 |inputs: &HashMap<String, String>, _| {
75 let start = Instant::now();
76 let mut result = HashMap::new();
77 if let Some(data) = inputs.get("input") {
78 let processed = simulate_work(100, data);
79 result.insert("result".to_string(), processed);
80 }
81 println!(" [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82 result
83 },
84 Some("BranchB[100ms]"),
85 Some(vec![("data", "input")]),
86 Some(vec![("result", "result_b")])
87 );
88
89 // Branch C: 100ms work
90 let mut branch_c = Graph::new();
91 branch_c.add(
92 |inputs: &HashMap<String, String>, _| {
93 let start = Instant::now();
94 let mut result = HashMap::new();
95 if let Some(data) = inputs.get("input") {
96 let processed = simulate_work(100, data);
97 result.insert("result".to_string(), processed);
98 }
99 println!(" [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100 result
101 },
102 Some("BranchC[100ms]"),
103 Some(vec![("data", "input")]),
104 Some(vec![("result", "result_c")])
105 );
106
107 graph.branch(branch_a);
108 graph.branch(branch_b);
109 graph.branch(branch_c);
110
111 let dag = graph.build();
112
113 println!("\n📊 Sequential Execution (simulated):");
114 let start = Instant::now();
115 let _ = dag.execute(false, None);
116 let sequential_time = start.elapsed();
117 println!(" Total time: {}ms", sequential_time.as_millis());
118
119 println!("\n⚡ With Parallel Execution:");
120 println!(" Expected time: ~100ms (all branches run simultaneously)");
121 println!(" Speedup: ~3x faster than sequential");
122
123 println!("\n📈 DAG Statistics:");
124 let stats = dag.stats();
125 println!("{}", stats.summary());
126 println!("\n Analysis:");
127 println!(" - Level 0: 1 node (Source)");
128 println!(" - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129 println!(" - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130
131 println!("\n🔍 Mermaid Visualization with Port Mappings:");
132 println!("{}", dag.to_mermaid());
133 println!();
134}
135
136fn demo_complex_dependencies() {
137 println!("─────────────────────────────────────────────────────────");
138 println!("Demo 2: Complex Data Dependencies");
139 println!("─────────────────────────────────────────────────────────");
140
141 let mut graph = Graph::new();
142
143 // Two independent sources
144 graph.add(
145 |_: &HashMap<String, String>, _| {
146 let mut result = HashMap::new();
147 result.insert("source1_data".to_string(), "100".to_string());
148 result
149 },
150 Some("Source1"),
151 None,
152 Some(vec![("source1_data", "data1")])
153 );
154
155 graph.add(
156 |_: &HashMap<String, String>, _| {
157 let mut result = HashMap::new();
158 result.insert("source2_data".to_string(), "200".to_string());
159 result
160 },
161 Some("Source2"),
162 None,
163 Some(vec![("source2_data", "data2")])
164 );
165
166 // Process each source independently (can run in parallel)
167 graph.add(
168 |inputs: &HashMap<String, String>, _| {
169 let mut result = HashMap::new();
170 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171 thread::sleep(Duration::from_millis(50));
172 result.insert("processed".to_string(), (val * 2).to_string());
173 }
174 result
175 },
176 Some("Process1[50ms]"),
177 Some(vec![("data1", "in")]),
178 Some(vec![("processed", "proc1")])
179 );
180
181 graph.add(
182 |inputs: &HashMap<String, String>, _| {
183 let mut result = HashMap::new();
184 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185 thread::sleep(Duration::from_millis(50));
186 result.insert("processed".to_string(), (val * 3).to_string());
187 }
188 result
189 },
190 Some("Process2[50ms]"),
191 Some(vec![("data2", "in")]),
192 Some(vec![("processed", "proc2")])
193 );
194
195 // Combine results (depends on both processors)
196 graph.add(
197 |inputs: &HashMap<String, String>, _| {
198 let mut result = HashMap::new();
199 let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200 let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201 thread::sleep(Duration::from_millis(30));
202 result.insert("combined".to_string(), format!("{}", v1 + v2));
203 result
204 },
205 Some("Combine[30ms]"),
206 Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207 Some(vec![("combined", "final")])
208 );
209
210 let dag = graph.build();
211
212 println!("\n📊 Execution with timing:");
213 let start = Instant::now();
214 let context = dag.execute(false, None);
215 let total_time = start.elapsed();
216
217 println!(" Source1: data1 = {}", context.get("data1").unwrap());
218 println!(" Source2: data2 = {}", context.get("data2").unwrap());
219 println!(" Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220 println!(" Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221 println!(" Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222 println!("\n Total execution time: {}ms", total_time.as_millis());
223
224 println!("\n📈 Execution Levels (showing parallelism):");
225 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226 print!(" Level {}: ", level_idx);
227 let node_names: Vec<String> = level.iter()
228 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229 .collect();
230 println!("{}", node_names.join(", "));
231 if level.len() > 1 {
232 println!(" ↑ {} nodes can execute in parallel!", level.len());
233 }
234 }
235
236 println!("\n⚡ Parallel Execution Analysis:");
237 println!(" Sequential time would be: 50+50+30 = 130ms");
238 println!(" With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239 println!(" Speedup: 1.6x");
240
241 println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242 println!("{}", dag.to_mermaid());
243 println!();
244}
245
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 // Source
254 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 // Variant factory with different multipliers
266 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 // Simulate 100ms of work
272 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 // Create 5 variants
281 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}
312
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 // Top of diamond: Single source
325 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 // Left branch: Transform A (50ms)
337 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 // Right branch: Transform B (50ms)
355 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 // Bottom of diamond: Merge (30ms)
376 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}11fn main() {
12 println!("=== Tuple-Based API Demo ===\n");
13
14 let mut graph = Graph::new();
15
16 // Source node - no inputs, produces "dataset" in context
17 fn data_source(_inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
18 let mut outputs = HashMap::new();
19 outputs.insert("raw".to_string(), "Sample Data".to_string());
20 outputs
21 }
22
23 graph.add(
24 data_source,
25 Some("Source"),
26 None, // No inputs
27 Some(vec![("raw", "dataset")]) // Function returns "raw", stored as "dataset" in context
28 );
29
30 println!("✓ Added source node");
31 println!(" Output mapping: function's 'raw' → context's 'dataset'\n");
32
33 // Process node - consumes "dataset" from context as "input_data", produces "processed_data" to context as "result"
34 fn processor(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
35 let default = String::new();
36 let data = inputs.get("input_data").unwrap_or(&default);
37 let mut outputs = HashMap::new();
38 outputs.insert("processed_data".to_string(), format!("Processed: {}", data));
39 outputs
40 }
41
42 graph.add(
43 processor,
44 Some("Process"),
45 Some(vec![("dataset", "input_data")]), // Context's "dataset" → function's "input_data"
46 Some(vec![("processed_data", "result")]) // Function's "processed_data" → context's "result"
47 );
48
49 println!("✓ Added processor node");
50 println!(" Input mapping: context's 'dataset' → function's 'input_data'");
51 println!(" Output mapping: function's 'processed_data' → context's 'result'\n");
52
53 // Create branches that both use the same variable names internally
54 let mut branch_a = Graph::new();
55 fn transform_a(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
56 let default = String::new();
57 let data = inputs.get("x").unwrap_or(&default);
58 let mut outputs = HashMap::new();
59 outputs.insert("y".to_string(), format!("{} [Path A]", data));
60 outputs
61 }
62 branch_a.add(
63 transform_a,
64 Some("Transform A"),
65 Some(vec![("result", "x")]), // Context's "result" → function's "x"
66 Some(vec![("y", "output")]) // Function's "y" → context's "output"
67 );
68
69 let mut branch_b = Graph::new();
70 fn transform_b(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
71 let default = String::new();
72 let data = inputs.get("x").unwrap_or(&default);
73 let mut outputs = HashMap::new();
74 outputs.insert("y".to_string(), format!("{} [Path B]", data));
75 outputs
76 }
77 branch_b.add(
78 transform_b,
79 Some("Transform B"),
80 Some(vec![("result", "x")]), // Same variable names as branch_a
81 Some(vec![("y", "output")]) // Same variable names as branch_a
82 );
83
84 println!("✓ Created two branches");
85 println!(" Both branches use same internal variable names (x, y)");
86 println!(" Both map 'result' → 'x' and 'y' → 'output'\n");
87
88 // Branch from the processor
89 let branch_a_id = graph.branch(branch_a);
90 let branch_b_id = graph.branch(branch_b);
91
92 println!("✓ Added branches to graph");
93 println!(" Branch A ID: {}", branch_a_id);
94 println!(" Branch B ID: {}\n", branch_b_id);
95
96 // Merge branches with branch-specific variable resolution
97 fn combine(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
98 let default = String::new();
99 let a = inputs.get("from_a").unwrap_or(&default);
100 let b = inputs.get("from_b").unwrap_or(&default);
101 let mut outputs = HashMap::new();
102 outputs.insert("merged".to_string(), format!("Combined: {} + {}", a, b));
103 outputs
104 }
105
106 graph.merge(
107 combine,
108 Some("Combine"),
109 vec![
110 (branch_a_id, "output", "from_a"), // Branch A's "output" → merge function's "from_a"
111 (branch_b_id, "output", "from_b") // Branch B's "output" → merge function's "from_b"
112 ],
113 Some(vec![("merged", "final_result")]) // Merge function's "merged" → context's "final_result"
114 );
115
116 println!("✓ Added merge node");
117 println!(" Branch-specific input mapping:");
118 println!(" Branch {} 'output' → merge function's 'from_a'", branch_a_id);
119 println!(" Branch {} 'output' → merge function's 'from_b'", branch_b_id);
120 println!(" Output mapping: merge function's 'merged' → context's 'final_result'\n");
121
122 // Variant example with factory pattern
123 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
124 move |inputs, _variant| {
125 let default = "1.0".to_string();
126 let data = inputs.get("value").unwrap_or(&default);
127 if let Ok(val) = data.parse::<f64>() {
128 let mut outputs = HashMap::new();
129 outputs.insert("scaled".to_string(), (val * factor).to_string());
130 outputs
131 } else {
132 HashMap::new()
133 }
134 }
135 }
136
137 graph.variant(
138 make_multiplier,
139 vec![2.0, 3.0, 5.0],
140 Some("Multiply"),
141 Some(vec![("final_result", "value")]), // Context's "final_result" → function's "value"
142 Some(vec![("scaled", "multiplied")]) // Function's "scaled" → context's "multiplied"
143 );
144
145 println!("✓ Added variant nodes with parameter sweep");
146 println!(" Three variants: factor = 2.0, 3.0, 5.0");
147 println!(" Input mapping: context's 'final_result' → function's 'value'");
148 println!(" Output mapping: function's 'scaled' → context's 'multiplied'\n");
149
150 println!("=== Summary ===");
151 println!("The tuple-based API provides clear separation between:");
152 println!("1. Context variable names (broadcast vars) - shared across the graph");
153 println!("2. Function parameter names (impl vars) - internal to each function");
154 println!("\nThis allows:");
155 println!("- Branches to use consistent internal naming (x, y)");
156 println!("- Merge to distinguish branch outputs using branch IDs");
157 println!("- Clear data flow visualization and debugging");
158}122fn main() {
123 println!("═══════════════════════════════════════════════════════════");
124 println!(" Variant Pattern Demo (sigexec-style)");
125 println!(" Full Actual Syntax Examples");
126 println!("═══════════════════════════════════════════════════════════\n");
127
128 // =========================================================================
129 // Demo 1: Single Variant - Basic Factory Pattern
130 // =========================================================================
131 println!("Demo 1: Single Variant with Factory Function");
132 println!("─────────────────────────────────────────────────────────\n");
133
134 println!("📝 Code:");
135 println!("```rust");
136 println!("fn make_scaler(factor: f64) -> impl Fn(...) -> ... {{");
137 println!(" move |inputs, _| {{");
138 println!(" let value = inputs.get(\"input_data\").unwrap().parse::<f64>().unwrap();");
139 println!(" let scaled = value * factor;");
140 println!(" outputs.insert(\"scaled_value\", scaled.to_string());");
141 println!(" }}");
142 println!("}}");
143 println!();
144 println!("let mut graph = Graph::new();");
145 println!("graph.add(data_source, Some(\"Source\"), None, Some(vec![(\"value\", \"data\")]));");
146 println!("graph.variant(");
147 println!(" make_scaler, // Factory function");
148 println!(" vec![2.0, 3.0, 5.0], // Parameter values to sweep");
149 println!(" Some(\"Scale\"), // Label");
150 println!(" Some(vec![(\"data\", \"input_data\")]), // Input mapping");
151 println!(" Some(vec![(\"scaled_value\", \"result\")]) // Output mapping");
152 println!(");");
153 println!("```\n");
154
155 let mut graph1 = Graph::new();
156 graph1.add(
157 data_source,
158 Some("Source"),
159 None,
160 Some(vec![("value", "data")])
161 );
162 graph1.variant(
163 make_scaler,
164 vec![2.0, 3.0, 5.0],
165 Some("Scale"),
166 Some(vec![("data", "input_data")]),
167 Some(vec![("scaled_value", "result")])
168 );
169
170 let dag1 = graph1.build();
171 println!("🎯 What happens:");
172 println!(" • Factory creates 3 nodes: Scale_2.0, Scale_3.0, Scale_5.0");
173 println!(" • Each node multiplies input by its factor");
174 println!(" • All variants can execute in parallel");
175 println!();
176
177 let stats1 = dag1.stats();
178 println!("📈 DAG Statistics:");
179 println!(" - Total nodes: {}", stats1.node_count);
180 println!(" - Depth: {} levels", stats1.depth);
181 println!(" - Max parallelism: {} nodes can run simultaneously", stats1.max_parallelism);
182 println!();
183
184 println!("🔍 Mermaid Visualization:");
185 println!("{}", dag1.to_mermaid());
186 println!();
187
188 // =========================================================================
189 // Demo 2: Multiple Variants - Cartesian Product
190 // =========================================================================
191 println!("\nDemo 2: Multiple Variants (Cartesian Product)");
192 println!("─────────────────────────────────────────────────────────\n");
193
194 println!("📝 Code:");
195 println!("```rust");
196 println!("graph.add(data_source, Some(\"Generate\"), None, Some(vec![(\"value\", \"data\")]));");
197 println!("graph.variant(make_scaler, vec![2.0, 3.0], Some(\"Scale\"), ...);");
198 println!("graph.variant(make_offsetter, vec![10, 20], Some(\"Offset\"), ...);");
199 println!("graph.add(stats_node, Some(\"Stats\"), Some(vec![(\"result\", \"result\")]), None);");
200 println!("```\n");
201
202 let mut graph2 = Graph::new();
203 graph2.add(
204 data_source,
205 Some("Generate"),
206 None,
207 Some(vec![("value", "data")])
208 );
209 graph2.variant(
210 make_scaler,
211 vec![2.0, 3.0],
212 Some("Scale"),
213 Some(vec![("data", "input_data")]),
214 Some(vec![("scaled_value", "result")])
215 );
216 graph2.variant(
217 make_offsetter,
218 vec![10, 20],
219 Some("Offset"),
220 Some(vec![("result", "number")]),
221 Some(vec![("offset_result", "result")])
222 );
223 graph2.add(
224 stats_node,
225 Some("Stats"),
226 Some(vec![("result", "result")]),
227 Some(vec![("summary", "final")])
228 );
229
230 let dag2 = graph2.build();
231 println!("🎯 What happens:");
232 println!(" • Scale creates 2 variants: x2.0, x3.0");
233 println!(" • Offset creates 2 variants: +10, +20");
234 println!(" • Total combinations: 2 × 2 = 4 execution paths");
235 println!(" • Each path: Generate → Scale[variant] → Offset[variant] → Stats");
236 println!();
237
238 let stats2 = dag2.stats();
239 println!("📈 DAG Statistics:");
240 println!(" - Total nodes: {}", stats2.node_count);
241 println!(" - Depth: {} levels", stats2.depth);
242 println!(" - Execution paths: 4 (2 scales × 2 offsets)");
243 println!();
244
245 println!("🔍 Mermaid Visualization:");
246 println!("{}", dag2.to_mermaid());
247 println!();
248
249 // =========================================================================
250 // Demo 3: Complex Factory - Struct Configuration
251 // =========================================================================
252 println!("\nDemo 3: Complex Factory with Struct Configuration");
253 println!("─────────────────────────────────────────────────────────\n");
254
255 println!("📝 Code:");
256 println!("```rust");
257 println!("#[derive(Clone)]");
258 println!("struct FilterConfig {{");
259 println!(" cutoff: f64,");
260 println!(" mode: String,");
261 println!("}}");
262 println!();
263 println!("fn make_filter(config: FilterConfig) -> impl Fn(...) -> ... {{");
264 println!(" move |inputs, _| {{");
265 println!(" let value = inputs.get(\"data\").unwrap().parse::<f64>().unwrap();");
266 println!(" let filtered = match config.mode.as_str() {{");
267 println!(" \"lowpass\" => value * config.cutoff,");
268 println!(" \"highpass\" => value * (1.0 - config.cutoff),");
269 println!(" _ => value,");
270 println!(" }};");
271 println!(" }}");
272 println!("}}");
273 println!();
274 println!("let configs = vec![");
275 println!(" FilterConfig {{ cutoff: 0.5, mode: \"lowpass\".to_string() }},");
276 println!(" FilterConfig {{ cutoff: 0.3, mode: \"highpass\".to_string() }},");
277 println!(" FilterConfig {{ cutoff: 0.7, mode: \"lowpass\".to_string() }},");
278 println!("];");
279 println!("graph.variant(make_filter, configs, Some(\"Filter\"), ...);");
280 println!("```\n");
281
282 let configs = vec![
283 FilterConfig { cutoff: 0.5, mode: "lowpass".to_string() },
284 FilterConfig { cutoff: 0.3, mode: "highpass".to_string() },
285 FilterConfig { cutoff: 0.7, mode: "lowpass".to_string() },
286 ];
287
288 let mut graph3 = Graph::new();
289 graph3.add(
290 data_source,
291 Some("Source"),
292 None,
293 Some(vec![("value", "data")])
294 );
295 graph3.variant(
296 make_filter,
297 configs,
298 Some("Filter"),
299 Some(vec![("data", "data")]),
300 Some(vec![("filtered", "result")])
301 );
302
303 let dag3 = graph3.build();
304 println!("🎯 What happens:");
305 println!(" • 3 filter variants created with different configurations");
306 println!(" • Each variant uses its own FilterConfig struct");
307 println!(" • Demonstrates passing complex types to factory");
308 println!();
309
310 let stats3 = dag3.stats();
311 println!("📈 DAG Statistics:");
312 println!(" - Total nodes: {}", stats3.node_count);
313 println!(" - Filter variants: 3");
314 println!(" - Max parallelism: {} nodes", stats3.max_parallelism);
315 println!();
316
317 // =========================================================================
318 // Demo 4: String Processing Variants
319 // =========================================================================
320 println!("\nDemo 4: String Processing Variants");
321 println!("─────────────────────────────────────────────────────────\n");
322
323 println!("📝 Code:");
324 println!("```rust");
325 println!("fn make_processor(prefix: &'static str) -> impl Fn(...) -> ... {{");
326 println!(" move |inputs, _| {{");
327 println!(" let text = inputs.get(\"text\").unwrap();");
328 println!(" let processed = format!(\"[{{}}] {{}}\", prefix, text);");
329 println!(" outputs.insert(\"processed_text\", processed);");
330 println!(" }}");
331 println!("}}");
332 println!();
333 println!("graph.variant(");
334 println!(" make_processor,");
335 println!(" vec![\"INFO\", \"WARN\", \"ERROR\"],");
336 println!(" Some(\"LogLevel\"),");
337 println!(" Some(vec![(\"message\", \"text\")]),");
338 println!(" Some(vec![(\"processed_text\", \"log\")])");
339 println!(");");
340 println!("```\n");
341
342 let mut graph4 = Graph::new();
343 graph4.add(
344 text_source,
345 Some("Source"),
346 None,
347 Some(vec![("message", "message")])
348 );
349 graph4.variant(
350 make_processor,
351 vec!["INFO", "WARN", "ERROR"],
352 Some("LogLevel"),
353 Some(vec![("message", "text")]),
354 Some(vec![("processed_text", "log")])
355 );
356
357 let dag4 = graph4.build();
358 println!("🎯 What happens:");
359 println!(" • 3 log level variants: INFO, WARN, ERROR");
360 println!(" • Each prefixes the message with its log level");
361 println!(" • Demonstrates string/static str parameters");
362 println!();
363
364 let stats4 = dag4.stats();
365 println!("📈 DAG Statistics:");
366 println!(" - Total nodes: {}", stats4.node_count);
367 println!(" - Log variants: 3");
368 println!();
369
370 println!("🔍 Mermaid Visualization:");
371 println!("{}", dag4.to_mermaid());
372 println!();
373
374 // =========================================================================
375 // Summary
376 // =========================================================================
377 println!("\n═══════════════════════════════════════════════════════════");
378 println!(" Summary: Key Variant Pattern Features");
379 println!("═══════════════════════════════════════════════════════════\n");
380
381 println!("✅ Factory Function Pattern:");
382 println!(" • Factory takes parameter(s), returns closure");
383 println!(" • Closure captures parameters in its environment");
384 println!(" • Same signature as regular node functions");
385 println!();
386
387 println!("✅ Parameter Flexibility:");
388 println!(" • Primitives: f64, i32, &str");
389 println!(" • Structs: Custom configuration objects");
390 println!(" • Arrays/Vectors: Multiple values at once");
391 println!();
392
393 println!("✅ Cartesian Products:");
394 println!(" • Multiple .variant() calls create all combinations");
395 println!(" • Example: 2 scales × 3 filters = 6 execution paths");
396 println!();
397
398 println!("✅ Port Mapping:");
399 println!(" • Variants use same tuple-based syntax");
400 println!(" • (broadcast_var, impl_var) for inputs");
401 println!(" • (impl_var, broadcast_var) for outputs");
402 println!();
403
404 println!("✅ Parallel Execution:");
405 println!(" • All variants at same level can run in parallel");
406 println!(" • DAG analysis identifies parallelization opportunities");
407 println!();
408}Sourcepub fn branch(&mut self, subgraph: Graph) -> usize
pub fn branch(&mut self, subgraph: Graph) -> usize
Insert a branching subgraph
§Implicit Branching Behavior
- Sequential
.branch()calls without.add()between them implicitly branch from the same node - This allows creating multiple parallel execution paths easily
§Arguments
subgraph- A configured Graph representing the branch
§Returns
Returns the branch ID for use in merge operations
Examples found in repository?
92fn demo_branching() {
93 println!("─────────────────────────────────────────────────────────");
94 println!("Demo 2: Parallel Branching (Fan-Out)");
95 println!("─────────────────────────────────────────────────────────");
96
97 let mut graph = Graph::new();
98
99 // Source node
100 graph.add(
101 |_: &HashMap<String, String>, _| {
102 let mut result = HashMap::new();
103 result.insert("dataset".to_string(), "100".to_string());
104 result
105 },
106 Some("Source"),
107 None,
108 Some(vec![("dataset", "data")])
109 );
110
111 // Branch A: Compute statistics
112 let mut branch_a = Graph::new();
113 branch_a.add(
114 |inputs: &HashMap<String, String>, _| {
115 let mut result = HashMap::new();
116 if let Some(val) = inputs.get("input") {
117 result.insert("stats".to_string(), format!("Mean: {}", val));
118 }
119 result
120 },
121 Some("Statistics"),
122 Some(vec![("data", "input")]),
123 Some(vec![("stats", "stats_result")])
124 );
125
126 // Branch B: Train model
127 let mut branch_b = Graph::new();
128 branch_b.add(
129 |inputs: &HashMap<String, String>, _| {
130 let mut result = HashMap::new();
131 if let Some(val) = inputs.get("input") {
132 result.insert("model".to_string(), format!("Model trained on {}", val));
133 }
134 result
135 },
136 Some("MLModel"),
137 Some(vec![("data", "input")]),
138 Some(vec![("model", "model_result")])
139 );
140
141 // Branch C: Generate visualization
142 let mut branch_c = Graph::new();
143 branch_c.add(
144 |inputs: &HashMap<String, String>, _| {
145 let mut result = HashMap::new();
146 if let Some(val) = inputs.get("input") {
147 result.insert("plot".to_string(), format!("Plot of {}", val));
148 }
149 result
150 },
151 Some("Visualization"),
152 Some(vec![("data", "input")]),
153 Some(vec![("plot", "viz_result")])
154 );
155
156 graph.branch(branch_a);
157 graph.branch(branch_b);
158 graph.branch(branch_c);
159
160 let dag = graph.build();
161 println!("\n📊 Execution:");
162 let context = dag.execute(false, None);
163
164 println!(" Source: data = {}", context.get("data").unwrap());
165 println!(" Branch A (Stats): {}", context.get("stats_result").unwrap());
166 println!(" Branch B (Model): {}", context.get("model_result").unwrap());
167 println!(" Branch C (Viz): {}", context.get("viz_result").unwrap());
168
169 println!("\n📈 DAG Statistics:");
170 let stats = dag.stats();
171 println!("{}", stats.summary());
172 println!(" ⚡ All 3 branches can execute in parallel!");
173
174 println!("\n🔍 Mermaid Visualization:");
175 println!("{}", dag.to_mermaid());
176 println!();
177}
178
179fn demo_merging() {
180 println!("─────────────────────────────────────────────────────────");
181 println!("Demo 3: Branching + Merging (Fan-Out + Fan-In)");
182 println!("─────────────────────────────────────────────────────────");
183
184 let mut graph = Graph::new();
185
186 // Source
187 graph.add(
188 |_: &HashMap<String, String>, _| {
189 let mut result = HashMap::new();
190 result.insert("value".to_string(), "50".to_string());
191 result
192 },
193 Some("Source"),
194 None,
195 Some(vec![("value", "data")])
196 );
197
198 // Branch A: Add 10
199 let mut branch_a = Graph::new();
200 branch_a.add(
201 |inputs: &HashMap<String, String>, _| {
202 let mut result = HashMap::new();
203 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
204 result.insert("output".to_string(), (val + 10).to_string());
205 }
206 result
207 },
208 Some("PathA (+10)"),
209 Some(vec![("data", "x")]),
210 Some(vec![("output", "result")]) // Both branches use same output name!
211 );
212
213 // Branch B: Add 20
214 let mut branch_b = Graph::new();
215 branch_b.add(
216 |inputs: &HashMap<String, String>, _| {
217 let mut result = HashMap::new();
218 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
219 result.insert("output".to_string(), (val + 20).to_string());
220 }
221 result
222 },
223 Some("PathB (+20)"),
224 Some(vec![("data", "x")]),
225 Some(vec![("output", "result")]) // Both branches use same output name!
226 );
227
228 let branch_a_id = graph.branch(branch_a);
229 let branch_b_id = graph.branch(branch_b);
230
231 // Merge node: Combine results from both branches
232 graph.merge(
233 |inputs: &HashMap<String, String>, _| {
234 let mut result = HashMap::new();
235 let a = inputs.get("from_a").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
236 let b = inputs.get("from_b").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
237 result.insert("combined".to_string(), format!("{} + {} = {}", a, b, a + b));
238 result
239 },
240 Some("Merge"),
241 vec![
242 (branch_a_id, "result", "from_a"), // Map branch A's "result" to merge fn's "from_a"
243 (branch_b_id, "result", "from_b") // Map branch B's "result" to merge fn's "from_b"
244 ],
245 Some(vec![("combined", "final")])
246 );
247
248 let dag = graph.build();
249 println!("\n📊 Execution:");
250 let context = dag.execute(false, None);
251
252 println!(" Source: data = {}", context.get("data").unwrap());
253 println!(" Branch A: 50 + 10 = 60");
254 println!(" Branch B: 50 + 20 = 70");
255 println!(" Merged: {}", context.get("final").unwrap());
256
257 println!("\n📈 DAG Statistics:");
258 let stats = dag.stats();
259 println!("{}", stats.summary());
260
261 println!("\n🔍 Mermaid Visualization:");
262 println!("{}", dag.to_mermaid());
263 println!();
264}
265
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}
323
324fn demo_complex_graph() {
325 println!("─────────────────────────────────────────────────────────");
326 println!("Demo 5: Complex Graph (All Features Combined)");
327 println!("─────────────────────────────────────────────────────────");
328
329 let mut graph = Graph::new();
330
331 // 1. Data ingestion
332 graph.add(
333 |_: &HashMap<String, String>, _| {
334 let mut result = HashMap::new();
335 result.insert("raw_data".to_string(), "1000".to_string());
336 result
337 },
338 Some("Ingest"),
339 None,
340 Some(vec![("raw_data", "data")])
341 );
342
343 // 2. Preprocessing
344 graph.add(
345 |inputs: &HashMap<String, String>, _| {
346 let mut result = HashMap::new();
347 if let Some(val) = inputs.get("raw").and_then(|s| s.parse::<i32>().ok()) {
348 result.insert("cleaned".to_string(), (val / 10).to_string());
349 }
350 result
351 },
352 Some("Preprocess"),
353 Some(vec![("data", "raw")]),
354 Some(vec![("cleaned", "clean_data")])
355 );
356
357 // 3. Branch for different analyses
358 let mut stats_branch = Graph::new();
359 stats_branch.add(
360 |inputs: &HashMap<String, String>, _| {
361 let mut result = HashMap::new();
362 if let Some(val) = inputs.get("data") {
363 result.insert("stats".to_string(), format!("Stats({})", val));
364 }
365 result
366 },
367 Some("Stats"),
368 Some(vec![("clean_data", "data")]),
369 Some(vec![("stats", "statistics")])
370 );
371
372 let mut ml_branch = Graph::new();
373 ml_branch.add(
374 |inputs: &HashMap<String, String>, _| {
375 let mut result = HashMap::new();
376 if let Some(val) = inputs.get("data") {
377 result.insert("prediction".to_string(), format!("Pred({})", val));
378 }
379 result
380 },
381 Some("ML"),
382 Some(vec![("clean_data", "data")]),
383 Some(vec![("prediction", "ml_result")])
384 );
385
386 let stats_id = graph.branch(stats_branch);
387 let ml_id = graph.branch(ml_branch);
388
389 // 4. Merge branches
390 graph.merge(
391 |inputs: &HashMap<String, String>, _| {
392 let mut result = HashMap::new();
393 let stats = inputs.get("stats_in").cloned().unwrap_or_default();
394 let ml = inputs.get("ml_in").cloned().unwrap_or_default();
395 result.insert("report".to_string(), format!("{} & {}", stats, ml));
396 result
397 },
398 Some("Combine"),
399 vec![
400 (stats_id, "statistics", "stats_in"),
401 (ml_id, "ml_result", "ml_in")
402 ],
403 Some(vec![("report", "final_report")])
404 );
405
406 // 5. Final output formatting
407 graph.add(
408 |inputs: &HashMap<String, String>, _| {
409 let mut result = HashMap::new();
410 if let Some(report) = inputs.get("report") {
411 result.insert("formatted".to_string(), format!("[FINAL] {}", report));
412 }
413 result
414 },
415 Some("Format"),
416 Some(vec![("final_report", "report")]),
417 Some(vec![("formatted", "output")])
418 );
419
420 let dag = graph.build();
421 println!("\n📊 Execution:");
422 let context = dag.execute(false, None);
423
424 println!(" Step 1: Ingest → data = {}", context.get("data").unwrap());
425 println!(" Step 2: Preprocess → clean_data = {}", context.get("clean_data").unwrap());
426 println!(" Step 3: Branch A → statistics = {}", context.get("statistics").unwrap());
427 println!(" Branch B → ml_result = {}", context.get("ml_result").unwrap());
428 println!(" Step 4: Merge → final_report = {}", context.get("final_report").unwrap());
429 println!(" Step 5: Format → output = {}", context.get("output").unwrap());
430
431 println!("\n📈 DAG Statistics:");
432 let stats = dag.stats();
433 println!("{}", stats.summary());
434
435 println!("\n📋 Execution Order:");
436 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
437 println!(" Level {}: {} nodes", level_idx, level.len());
438 for &node_id in level {
439 let node = dag.nodes().iter().find(|n| n.id == node_id).unwrap();
440 println!(" - {}", node.display_name());
441 }
442 }
443
444 println!("\n🔍 Mermaid Visualization:");
445 println!("{}", dag.to_mermaid());
446 println!();
447
448 println!("═══════════════════════════════════════════════════════════");
449 println!(" Demo Complete!");
450 println!("═══════════════════════════════════════════════════════════");
451}More examples
84fn demo_branch_output_access() {
85 println!("─────────────────────────────────────────────────────────");
86 println!("Demo 2: Branch Output Access (Parallel Paths)");
87 println!("─────────────────────────────────────────────────────────\n");
88
89 let mut graph = Graph::new();
90
91 // Source node
92 graph.add(
93 |_: &HashMap<String, String>, _| {
94 let mut result = HashMap::new();
95 result.insert("value".to_string(), "50".to_string());
96 result
97 },
98 Some("Source"),
99 None,
100 Some(vec![("value", "shared_data")])
101 );
102
103 // Branch A: Statistics computation
104 let mut branch_a = Graph::new();
105 branch_a.add(
106 |inputs: &HashMap<String, String>, _| {
107 let val = inputs.get("data").unwrap();
108 let mut result = HashMap::new();
109 result.insert("stats_output".to_string(), format!("Stats of {}", val));
110 result
111 },
112 Some("Stats"),
113 Some(vec![("shared_data", "data")]),
114 Some(vec![("stats_output", "statistics")]) // Branch A produces "statistics"
115 );
116
117 // Branch B: Model training
118 let mut branch_b = Graph::new();
119 branch_b.add(
120 |inputs: &HashMap<String, String>, _| {
121 let val = inputs.get("data").unwrap();
122 let mut result = HashMap::new();
123 result.insert("model_output".to_string(), format!("Model trained on {}", val));
124 result
125 },
126 Some("Train"),
127 Some(vec![("shared_data", "data")]),
128 Some(vec![("model_output", "model")]) // Branch B produces "model"
129 );
130
131 // Branch C: Visualization
132 let mut branch_c = Graph::new();
133 branch_c.add(
134 |inputs: &HashMap<String, String>, _| {
135 let val = inputs.get("data").unwrap();
136 let mut result = HashMap::new();
137 result.insert("viz_output".to_string(), format!("Plot of {}", val));
138 result
139 },
140 Some("Visualize"),
141 Some(vec![("shared_data", "data")]),
142 Some(vec![("viz_output", "visualization")]) // Branch C produces "visualization"
143 );
144
145 // Add branches to main graph
146 graph.branch(branch_a);
147 graph.branch(branch_b);
148 graph.branch(branch_c);
149
150 // Execute
151 let dag = graph.build();
152 let context = dag.execute(false, None);
153
154 println!("📦 All outputs from parallel branches:");
155
156 // Access each branch's output
157 if let Some(stats) = context.get("statistics") {
158 println!(" Branch A (Statistics): {}", stats);
159 }
160
161 if let Some(model) = context.get("model") {
162 println!(" Branch B (Model): {}", model);
163 }
164
165 if let Some(viz) = context.get("visualization") {
166 println!(" Branch C (Visualization): {}", viz);
167 }
168
169 println!("\n🔍 All variables in context:");
170 for (key, value) in &context {
171 println!(" {} = {}", key, value);
172 }
173
174 println!();
175}32fn demo_sequential_vs_parallel() {
33 println!("─────────────────────────────────────────────────────────");
34 println!("Demo 1: Sequential vs Parallel Execution");
35 println!("─────────────────────────────────────────────────────────");
36
37 let mut graph = Graph::new();
38
39 // Source node
40 graph.add(
41 |_: &HashMap<String, String>, _| {
42 let start = Instant::now();
43 let mut result = HashMap::new();
44 result.insert("data".to_string(), "source_data".to_string());
45 println!(" [{}ms] Source completed", start.elapsed().as_millis());
46 result
47 },
48 Some("Source"),
49 None,
50 Some(vec![("data", "data")])
51 );
52
53 // Branch A: 100ms work
54 let mut branch_a = Graph::new();
55 branch_a.add(
56 |inputs: &HashMap<String, String>, _| {
57 let start = Instant::now();
58 let mut result = HashMap::new();
59 if let Some(data) = inputs.get("input") {
60 let processed = simulate_work(100, data);
61 result.insert("result".to_string(), processed);
62 }
63 println!(" [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64 result
65 },
66 Some("BranchA[100ms]"),
67 Some(vec![("data", "input")]),
68 Some(vec![("result", "result_a")])
69 );
70
71 // Branch B: 100ms work
72 let mut branch_b = Graph::new();
73 branch_b.add(
74 |inputs: &HashMap<String, String>, _| {
75 let start = Instant::now();
76 let mut result = HashMap::new();
77 if let Some(data) = inputs.get("input") {
78 let processed = simulate_work(100, data);
79 result.insert("result".to_string(), processed);
80 }
81 println!(" [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82 result
83 },
84 Some("BranchB[100ms]"),
85 Some(vec![("data", "input")]),
86 Some(vec![("result", "result_b")])
87 );
88
89 // Branch C: 100ms work
90 let mut branch_c = Graph::new();
91 branch_c.add(
92 |inputs: &HashMap<String, String>, _| {
93 let start = Instant::now();
94 let mut result = HashMap::new();
95 if let Some(data) = inputs.get("input") {
96 let processed = simulate_work(100, data);
97 result.insert("result".to_string(), processed);
98 }
99 println!(" [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100 result
101 },
102 Some("BranchC[100ms]"),
103 Some(vec![("data", "input")]),
104 Some(vec![("result", "result_c")])
105 );
106
107 graph.branch(branch_a);
108 graph.branch(branch_b);
109 graph.branch(branch_c);
110
111 let dag = graph.build();
112
113 println!("\n📊 Sequential Execution (simulated):");
114 let start = Instant::now();
115 let _ = dag.execute(false, None);
116 let sequential_time = start.elapsed();
117 println!(" Total time: {}ms", sequential_time.as_millis());
118
119 println!("\n⚡ With Parallel Execution:");
120 println!(" Expected time: ~100ms (all branches run simultaneously)");
121 println!(" Speedup: ~3x faster than sequential");
122
123 println!("\n📈 DAG Statistics:");
124 let stats = dag.stats();
125 println!("{}", stats.summary());
126 println!("\n Analysis:");
127 println!(" - Level 0: 1 node (Source)");
128 println!(" - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129 println!(" - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130
131 println!("\n🔍 Mermaid Visualization with Port Mappings:");
132 println!("{}", dag.to_mermaid());
133 println!();
134}
135
136fn demo_complex_dependencies() {
137 println!("─────────────────────────────────────────────────────────");
138 println!("Demo 2: Complex Data Dependencies");
139 println!("─────────────────────────────────────────────────────────");
140
141 let mut graph = Graph::new();
142
143 // Two independent sources
144 graph.add(
145 |_: &HashMap<String, String>, _| {
146 let mut result = HashMap::new();
147 result.insert("source1_data".to_string(), "100".to_string());
148 result
149 },
150 Some("Source1"),
151 None,
152 Some(vec![("source1_data", "data1")])
153 );
154
155 graph.add(
156 |_: &HashMap<String, String>, _| {
157 let mut result = HashMap::new();
158 result.insert("source2_data".to_string(), "200".to_string());
159 result
160 },
161 Some("Source2"),
162 None,
163 Some(vec![("source2_data", "data2")])
164 );
165
166 // Process each source independently (can run in parallel)
167 graph.add(
168 |inputs: &HashMap<String, String>, _| {
169 let mut result = HashMap::new();
170 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171 thread::sleep(Duration::from_millis(50));
172 result.insert("processed".to_string(), (val * 2).to_string());
173 }
174 result
175 },
176 Some("Process1[50ms]"),
177 Some(vec![("data1", "in")]),
178 Some(vec![("processed", "proc1")])
179 );
180
181 graph.add(
182 |inputs: &HashMap<String, String>, _| {
183 let mut result = HashMap::new();
184 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185 thread::sleep(Duration::from_millis(50));
186 result.insert("processed".to_string(), (val * 3).to_string());
187 }
188 result
189 },
190 Some("Process2[50ms]"),
191 Some(vec![("data2", "in")]),
192 Some(vec![("processed", "proc2")])
193 );
194
195 // Combine results (depends on both processors)
196 graph.add(
197 |inputs: &HashMap<String, String>, _| {
198 let mut result = HashMap::new();
199 let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200 let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201 thread::sleep(Duration::from_millis(30));
202 result.insert("combined".to_string(), format!("{}", v1 + v2));
203 result
204 },
205 Some("Combine[30ms]"),
206 Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207 Some(vec![("combined", "final")])
208 );
209
210 let dag = graph.build();
211
212 println!("\n📊 Execution with timing:");
213 let start = Instant::now();
214 let context = dag.execute(false, None);
215 let total_time = start.elapsed();
216
217 println!(" Source1: data1 = {}", context.get("data1").unwrap());
218 println!(" Source2: data2 = {}", context.get("data2").unwrap());
219 println!(" Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220 println!(" Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221 println!(" Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222 println!("\n Total execution time: {}ms", total_time.as_millis());
223
224 println!("\n📈 Execution Levels (showing parallelism):");
225 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226 print!(" Level {}: ", level_idx);
227 let node_names: Vec<String> = level.iter()
228 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229 .collect();
230 println!("{}", node_names.join(", "));
231 if level.len() > 1 {
232 println!(" ↑ {} nodes can execute in parallel!", level.len());
233 }
234 }
235
236 println!("\n⚡ Parallel Execution Analysis:");
237 println!(" Sequential time would be: 50+50+30 = 130ms");
238 println!(" With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239 println!(" Speedup: 1.6x");
240
241 println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242 println!("{}", dag.to_mermaid());
243 println!();
244}
245
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 // Source
254 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 // Variant factory with different multipliers
266 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 // Simulate 100ms of work
272 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 // Create 5 variants
281 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}
312
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 // Top of diamond: Single source
325 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 // Left branch: Transform A (50ms)
337 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 // Right branch: Transform B (50ms)
355 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 // Bottom of diamond: Merge (30ms)
376 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}114fn demo_per_branch_access() {
115 println!("─────────────────────────────────────────────────────────");
116 println!("Demo 2: Per-Branch Output Access");
117 println!("─────────────────────────────────────────────────────────\n");
118
119 let mut graph = Graph::new();
120
121 // Main graph: Source node
122 graph.add(
123 |_: &HashMap<String, String>, _| {
124 let mut result = HashMap::new();
125 result.insert("dataset".to_string(), "100".to_string());
126 result
127 },
128 Some("Source"),
129 None,
130 Some(vec![("dataset", "data")])
131 );
132
133 // Branch A: Statistics
134 let mut branch_a = Graph::new();
135 branch_a.add(
136 |inputs: &HashMap<String, String>, _| {
137 let value = inputs.get("input").unwrap();
138 let mut result = HashMap::new();
139 result.insert("stat_result".to_string(), format!("Mean of {}", value));
140 result
141 },
142 Some("Statistics"),
143 Some(vec![("data", "input")]),
144 Some(vec![("stat_result", "statistics")])
145 );
146
147 // Branch B: Model Training
148 let mut branch_b = Graph::new();
149 branch_b.add(
150 |inputs: &HashMap<String, String>, _| {
151 let value = inputs.get("input").unwrap();
152 let mut result = HashMap::new();
153 result.insert("model_result".to_string(), format!("Model trained on {}", value));
154 result
155 },
156 Some("ModelTraining"),
157 Some(vec![("data", "input")]),
158 Some(vec![("model_result", "trained_model")])
159 );
160
161 // Branch C: Visualization
162 let mut branch_c = Graph::new();
163 branch_c.add(
164 |inputs: &HashMap<String, String>, _| {
165 let value = inputs.get("input").unwrap();
166 let mut result = HashMap::new();
167 result.insert("viz_result".to_string(), format!("Plot of {}", value));
168 result
169 },
170 Some("Visualization"),
171 Some(vec![("data", "input")]),
172 Some(vec![("viz_result", "plot")])
173 );
174
175 let branch_a_id = graph.branch(branch_a);
176 let branch_b_id = graph.branch(branch_b);
177 let branch_c_id = graph.branch(branch_c);
178
179 // Execute and get detailed results
180 let dag = graph.build();
181 let result = dag.execute_detailed(false, None);
182
183 println!("🌍 Global Context:");
184 for (key, value) in &result.context {
185 println!(" {} = {}", key, value);
186 }
187
188 println!("\n🌿 Per-Branch Outputs:");
189
190 println!("\nBranch {} (Statistics) outputs:", branch_a_id);
191 if let Some(outputs) = result.get_branch_outputs(branch_a_id) {
192 for (key, value) in outputs {
193 println!(" {} = {}", key, value);
194 }
195 }
196
197 println!("\nBranch {} (Model Training) outputs:", branch_b_id);
198 if let Some(outputs) = result.get_branch_outputs(branch_b_id) {
199 for (key, value) in outputs {
200 println!(" {} = {}", key, value);
201 }
202 }
203
204 println!("\nBranch {} (Visualization) outputs:", branch_c_id);
205 if let Some(outputs) = result.get_branch_outputs(branch_c_id) {
206 for (key, value) in outputs {
207 println!(" {} = {}", key, value);
208 }
209 }
210
211 println!("\n🎯 Accessing specific branch outputs:");
212 if let Some(value) = result.get_from_branch(branch_a_id, "statistics") {
213 println!(" Branch {} 'statistics': {}", branch_a_id, value);
214 }
215 if let Some(value) = result.get_from_branch(branch_b_id, "trained_model") {
216 println!(" Branch {} 'trained_model': {}", branch_b_id, value);
217 }
218 if let Some(value) = result.get_from_branch(branch_c_id, "plot") {
219 println!(" Branch {} 'plot': {}", branch_c_id, value);
220 }
221
222 println!();
223}11fn main() {
12 println!("=== Tuple-Based API Demo ===\n");
13
14 let mut graph = Graph::new();
15
16 // Source node - no inputs, produces "dataset" in context
17 fn data_source(_inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
18 let mut outputs = HashMap::new();
19 outputs.insert("raw".to_string(), "Sample Data".to_string());
20 outputs
21 }
22
23 graph.add(
24 data_source,
25 Some("Source"),
26 None, // No inputs
27 Some(vec![("raw", "dataset")]) // Function returns "raw", stored as "dataset" in context
28 );
29
30 println!("✓ Added source node");
31 println!(" Output mapping: function's 'raw' → context's 'dataset'\n");
32
33 // Process node - consumes "dataset" from context as "input_data", produces "processed_data" to context as "result"
34 fn processor(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
35 let default = String::new();
36 let data = inputs.get("input_data").unwrap_or(&default);
37 let mut outputs = HashMap::new();
38 outputs.insert("processed_data".to_string(), format!("Processed: {}", data));
39 outputs
40 }
41
42 graph.add(
43 processor,
44 Some("Process"),
45 Some(vec![("dataset", "input_data")]), // Context's "dataset" → function's "input_data"
46 Some(vec![("processed_data", "result")]) // Function's "processed_data" → context's "result"
47 );
48
49 println!("✓ Added processor node");
50 println!(" Input mapping: context's 'dataset' → function's 'input_data'");
51 println!(" Output mapping: function's 'processed_data' → context's 'result'\n");
52
53 // Create branches that both use the same variable names internally
54 let mut branch_a = Graph::new();
55 fn transform_a(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
56 let default = String::new();
57 let data = inputs.get("x").unwrap_or(&default);
58 let mut outputs = HashMap::new();
59 outputs.insert("y".to_string(), format!("{} [Path A]", data));
60 outputs
61 }
62 branch_a.add(
63 transform_a,
64 Some("Transform A"),
65 Some(vec![("result", "x")]), // Context's "result" → function's "x"
66 Some(vec![("y", "output")]) // Function's "y" → context's "output"
67 );
68
69 let mut branch_b = Graph::new();
70 fn transform_b(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
71 let default = String::new();
72 let data = inputs.get("x").unwrap_or(&default);
73 let mut outputs = HashMap::new();
74 outputs.insert("y".to_string(), format!("{} [Path B]", data));
75 outputs
76 }
77 branch_b.add(
78 transform_b,
79 Some("Transform B"),
80 Some(vec![("result", "x")]), // Same variable names as branch_a
81 Some(vec![("y", "output")]) // Same variable names as branch_a
82 );
83
84 println!("✓ Created two branches");
85 println!(" Both branches use same internal variable names (x, y)");
86 println!(" Both map 'result' → 'x' and 'y' → 'output'\n");
87
88 // Branch from the processor
89 let branch_a_id = graph.branch(branch_a);
90 let branch_b_id = graph.branch(branch_b);
91
92 println!("✓ Added branches to graph");
93 println!(" Branch A ID: {}", branch_a_id);
94 println!(" Branch B ID: {}\n", branch_b_id);
95
96 // Merge branches with branch-specific variable resolution
97 fn combine(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
98 let default = String::new();
99 let a = inputs.get("from_a").unwrap_or(&default);
100 let b = inputs.get("from_b").unwrap_or(&default);
101 let mut outputs = HashMap::new();
102 outputs.insert("merged".to_string(), format!("Combined: {} + {}", a, b));
103 outputs
104 }
105
106 graph.merge(
107 combine,
108 Some("Combine"),
109 vec![
110 (branch_a_id, "output", "from_a"), // Branch A's "output" → merge function's "from_a"
111 (branch_b_id, "output", "from_b") // Branch B's "output" → merge function's "from_b"
112 ],
113 Some(vec![("merged", "final_result")]) // Merge function's "merged" → context's "final_result"
114 );
115
116 println!("✓ Added merge node");
117 println!(" Branch-specific input mapping:");
118 println!(" Branch {} 'output' → merge function's 'from_a'", branch_a_id);
119 println!(" Branch {} 'output' → merge function's 'from_b'", branch_b_id);
120 println!(" Output mapping: merge function's 'merged' → context's 'final_result'\n");
121
122 // Variant example with factory pattern
123 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
124 move |inputs, _variant| {
125 let default = "1.0".to_string();
126 let data = inputs.get("value").unwrap_or(&default);
127 if let Ok(val) = data.parse::<f64>() {
128 let mut outputs = HashMap::new();
129 outputs.insert("scaled".to_string(), (val * factor).to_string());
130 outputs
131 } else {
132 HashMap::new()
133 }
134 }
135 }
136
137 graph.variant(
138 make_multiplier,
139 vec![2.0, 3.0, 5.0],
140 Some("Multiply"),
141 Some(vec![("final_result", "value")]), // Context's "final_result" → function's "value"
142 Some(vec![("scaled", "multiplied")]) // Function's "scaled" → context's "multiplied"
143 );
144
145 println!("✓ Added variant nodes with parameter sweep");
146 println!(" Three variants: factor = 2.0, 3.0, 5.0");
147 println!(" Input mapping: context's 'final_result' → function's 'value'");
148 println!(" Output mapping: function's 'scaled' → context's 'multiplied'\n");
149
150 println!("=== Summary ===");
151 println!("The tuple-based API provides clear separation between:");
152 println!("1. Context variable names (broadcast vars) - shared across the graph");
153 println!("2. Function parameter names (impl vars) - internal to each function");
154 println!("\nThis allows:");
155 println!("- Branches to use consistent internal naming (x, y)");
156 println!("- Merge to distinguish branch outputs using branch IDs");
157 println!("- Clear data flow visualization and debugging");
158}Sourcepub fn variant<F, P, NF>(
&mut self,
factory: F,
param_values: Vec<P>,
label: Option<&str>,
inputs: Option<Vec<(&str, &str)>>,
outputs: Option<Vec<(&str, &str)>>,
) -> &mut Self
pub fn variant<F, P, NF>( &mut self, factory: F, param_values: Vec<P>, label: Option<&str>, inputs: Option<Vec<(&str, &str)>>, outputs: Option<Vec<(&str, &str)>>, ) -> &mut Self
Create configuration sweep variants using a factory function (sigexec-style)
Takes a factory function and an array of parameter values. The factory is called with each parameter value to create a node function for that variant.
§Arguments
factory- Function that takes a parameter value and returns a node functionparam_values- Array of parameter values to sweep overlabel- Optional label for visualization (default: None)inputs- Optional list of (broadcast_var, impl_var) tuples for inputsoutputs- Optional list of (impl_var, broadcast_var) tuples for outputs
§Example
fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
move |inputs, _variant_params| {
let mut outputs = HashMap::new();
if let Some(val) = inputs.get("x").and_then(|s| s.parse::<f64>().ok()) {
outputs.insert("scaled_x".to_string(), (val * factor).to_string());
}
outputs
}
}
graph.variant(
make_scaler,
vec![2.0, 3.0, 5.0],
Some("Scale"),
Some(vec![("data", "x")]), // (broadcast, impl)
Some(vec![("scaled_x", "result")]) // (impl, broadcast)
);§Behavior
- Creates one node per parameter value
- Each node is created by calling factory(param_value)
- Nodes still receive both regular inputs and variant_params
- All variants branch from the same point and can execute in parallel
Examples found in repository?
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}More examples
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 // Source
254 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 // Variant factory with different multipliers
266 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 // Simulate 100ms of work
272 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 // Create 5 variants
281 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}225fn demo_variant_per_node_access() {
226 println!("─────────────────────────────────────────────────────────");
227 println!("Demo 3: Variant Outputs with Per-Node Tracking");
228 println!("─────────────────────────────────────────────────────────\n");
229
230 let mut graph = Graph::new();
231
232 // Source node
233 graph.add(
234 |_: &HashMap<String, String>, _| {
235 let mut result = HashMap::new();
236 result.insert("base_value".to_string(), "10".to_string());
237 result
238 },
239 Some("Source"),
240 None,
241 Some(vec![("base_value", "data")])
242 );
243
244 // Variant factory for scaling
245 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
246 move |inputs: &HashMap<String, String>, _| {
247 let value = inputs.get("input_data").unwrap().parse::<f64>().unwrap();
248 let mut result = HashMap::new();
249 result.insert("scaled_value".to_string(), (value * factor).to_string());
250 result
251 }
252 }
253
254 // Create variants with unique output names to preserve all results
255 graph.variant(
256 make_scaler,
257 vec![2.0, 3.0, 5.0],
258 Some("Scale"),
259 Some(vec![("data", "input_data")]),
260 Some(vec![("scaled_value", "result")]) // Note: will overwrite in global context
261 );
262
263 let dag = graph.build();
264 let result = dag.execute_detailed(false, None);
265
266 println!("🌍 Global Context (note: 'result' contains last variant's output):");
267 for (key, value) in &result.context {
268 println!(" {} = {}", key, value);
269 }
270
271 println!("\n📦 Per-Node Outputs (each variant tracked separately):");
272
273 // Node 0 is the source
274 // Nodes 1, 2, 3 are the variant nodes (2x, 3x, 5x scalers)
275 for node_id in 1..=3 {
276 println!("\nNode {} (Variant Scaler) outputs:", node_id);
277 if let Some(outputs) = result.get_node_outputs(node_id) {
278 for (key, value) in outputs {
279 println!(" {} = {}", key, value);
280 }
281 }
282 }
283
284 println!("\n💡 Key Insight:");
285 println!(" - Global context has 'result' = {} (last variant overwrites)", result.get("result").unwrap());
286 println!(" - But per-node outputs preserve ALL variant results:");
287 println!(" Node 1 (2x): result = {}", result.get_from_node(1, "result").unwrap());
288 println!(" Node 2 (3x): result = {}", result.get_from_node(2, "result").unwrap());
289 println!(" Node 3 (5x): result = {}", result.get_from_node(3, "result").unwrap());
290
291 println!();
292}177fn demo_variant_output_access() {
178 println!("─────────────────────────────────────────────────────────");
179 println!("Demo 3: Variant Output Access (Parameter Sweep)");
180 println!("─────────────────────────────────────────────────────────\n");
181
182 let mut graph = Graph::new();
183
184 // Source node
185 graph.add(
186 |_: &HashMap<String, String>, _| {
187 let mut result = HashMap::new();
188 result.insert("value".to_string(), "10.0".to_string());
189 result
190 },
191 Some("DataSource"),
192 None,
193 Some(vec![("value", "input_data")])
194 );
195
196 // Variant nodes: scale by different factors
197 // Factory function that creates a scaler for each factor
198 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
199 move |inputs: &HashMap<String, String>, _| {
200 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
201 let mut result = HashMap::new();
202 result.insert("scaled".to_string(), (value * factor).to_string());
203 result
204 }
205 }
206
207 graph.variant(
208 make_scaler,
209 vec![2.0, 3.0, 5.0],
210 Some("Scale"),
211 Some(vec![("input_data", "data")]),
212 Some(vec![("scaled", "result")]) // Each variant produces "result"
213 );
214
215 // Execute
216 let dag = graph.build();
217 let context = dag.execute(false, None);
218
219 println!("📦 Variant outputs:");
220 println!(" Note: Variants with same output name overwrite each other");
221 println!(" The last variant (factor=5.0) writes to 'result'\n");
222
223 // Access the result (will be from the last variant)
224 if let Some(result) = context.get("result") {
225 println!(" result = {} (from last variant: 10.0 * 5.0)", result);
226 }
227
228 println!("\n💡 Tip: To preserve all variant outputs, use unique output names:");
229 println!(" Option 1: Map each variant to a different broadcast variable");
230 println!(" Option 2: Collect results in merge node");
231 println!(" Option 3: Use variant_params or closure capture to distinguish");
232
233 // Better approach: unique output names per variant
234 let mut graph2 = Graph::new();
235
236 graph2.add(
237 |_: &HashMap<String, String>, _| {
238 let mut result = HashMap::new();
239 result.insert("value".to_string(), "10.0".to_string());
240 result
241 },
242 Some("DataSource"),
243 None,
244 Some(vec![("value", "input_data")])
245 );
246
247 // Variant 1: 2x
248 fn make_scaler_unique(label: &str, factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> + '_ {
249 move |inputs: &HashMap<String, String>, _| {
250 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
251 let mut result = HashMap::new();
252 result.insert(label.to_string(), (value * factor).to_string());
253 result
254 }
255 }
256
257 graph2.variant(
258 |_x: &str| make_scaler_unique("scaled_2x", 2.0),
259 vec!["2x"],
260 Some("Scale2x"),
261 Some(vec![("input_data", "data")]),
262 Some(vec![("scaled_2x", "result_2x")])
263 );
264
265 graph2.variant(
266 |_x: &str| make_scaler_unique("scaled_3x", 3.0),
267 vec!["3x"],
268 Some("Scale3x"),
269 Some(vec![("input_data", "data")]),
270 Some(vec![("scaled_3x", "result_3x")])
271 );
272
273 let dag2 = graph2.build();
274 let context2 = dag2.execute(false, None);
275
276 println!("\n✅ Better approach - unique output names:");
277 if let Some(result_2x) = context2.get("result_2x") {
278 println!(" result_2x = {}", result_2x);
279 }
280 if let Some(result_3x) = context2.get("result_3x") {
281 println!(" result_3x = {}", result_3x);
282 }
283
284 println!();
285}11fn main() {
12 println!("=== Tuple-Based API Demo ===\n");
13
14 let mut graph = Graph::new();
15
16 // Source node - no inputs, produces "dataset" in context
17 fn data_source(_inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
18 let mut outputs = HashMap::new();
19 outputs.insert("raw".to_string(), "Sample Data".to_string());
20 outputs
21 }
22
23 graph.add(
24 data_source,
25 Some("Source"),
26 None, // No inputs
27 Some(vec![("raw", "dataset")]) // Function returns "raw", stored as "dataset" in context
28 );
29
30 println!("✓ Added source node");
31 println!(" Output mapping: function's 'raw' → context's 'dataset'\n");
32
33 // Process node - consumes "dataset" from context as "input_data", produces "processed_data" to context as "result"
34 fn processor(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
35 let default = String::new();
36 let data = inputs.get("input_data").unwrap_or(&default);
37 let mut outputs = HashMap::new();
38 outputs.insert("processed_data".to_string(), format!("Processed: {}", data));
39 outputs
40 }
41
42 graph.add(
43 processor,
44 Some("Process"),
45 Some(vec![("dataset", "input_data")]), // Context's "dataset" → function's "input_data"
46 Some(vec![("processed_data", "result")]) // Function's "processed_data" → context's "result"
47 );
48
49 println!("✓ Added processor node");
50 println!(" Input mapping: context's 'dataset' → function's 'input_data'");
51 println!(" Output mapping: function's 'processed_data' → context's 'result'\n");
52
53 // Create branches that both use the same variable names internally
54 let mut branch_a = Graph::new();
55 fn transform_a(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
56 let default = String::new();
57 let data = inputs.get("x").unwrap_or(&default);
58 let mut outputs = HashMap::new();
59 outputs.insert("y".to_string(), format!("{} [Path A]", data));
60 outputs
61 }
62 branch_a.add(
63 transform_a,
64 Some("Transform A"),
65 Some(vec![("result", "x")]), // Context's "result" → function's "x"
66 Some(vec![("y", "output")]) // Function's "y" → context's "output"
67 );
68
69 let mut branch_b = Graph::new();
70 fn transform_b(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
71 let default = String::new();
72 let data = inputs.get("x").unwrap_or(&default);
73 let mut outputs = HashMap::new();
74 outputs.insert("y".to_string(), format!("{} [Path B]", data));
75 outputs
76 }
77 branch_b.add(
78 transform_b,
79 Some("Transform B"),
80 Some(vec![("result", "x")]), // Same variable names as branch_a
81 Some(vec![("y", "output")]) // Same variable names as branch_a
82 );
83
84 println!("✓ Created two branches");
85 println!(" Both branches use same internal variable names (x, y)");
86 println!(" Both map 'result' → 'x' and 'y' → 'output'\n");
87
88 // Branch from the processor
89 let branch_a_id = graph.branch(branch_a);
90 let branch_b_id = graph.branch(branch_b);
91
92 println!("✓ Added branches to graph");
93 println!(" Branch A ID: {}", branch_a_id);
94 println!(" Branch B ID: {}\n", branch_b_id);
95
96 // Merge branches with branch-specific variable resolution
97 fn combine(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
98 let default = String::new();
99 let a = inputs.get("from_a").unwrap_or(&default);
100 let b = inputs.get("from_b").unwrap_or(&default);
101 let mut outputs = HashMap::new();
102 outputs.insert("merged".to_string(), format!("Combined: {} + {}", a, b));
103 outputs
104 }
105
106 graph.merge(
107 combine,
108 Some("Combine"),
109 vec![
110 (branch_a_id, "output", "from_a"), // Branch A's "output" → merge function's "from_a"
111 (branch_b_id, "output", "from_b") // Branch B's "output" → merge function's "from_b"
112 ],
113 Some(vec![("merged", "final_result")]) // Merge function's "merged" → context's "final_result"
114 );
115
116 println!("✓ Added merge node");
117 println!(" Branch-specific input mapping:");
118 println!(" Branch {} 'output' → merge function's 'from_a'", branch_a_id);
119 println!(" Branch {} 'output' → merge function's 'from_b'", branch_b_id);
120 println!(" Output mapping: merge function's 'merged' → context's 'final_result'\n");
121
122 // Variant example with factory pattern
123 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
124 move |inputs, _variant| {
125 let default = "1.0".to_string();
126 let data = inputs.get("value").unwrap_or(&default);
127 if let Ok(val) = data.parse::<f64>() {
128 let mut outputs = HashMap::new();
129 outputs.insert("scaled".to_string(), (val * factor).to_string());
130 outputs
131 } else {
132 HashMap::new()
133 }
134 }
135 }
136
137 graph.variant(
138 make_multiplier,
139 vec![2.0, 3.0, 5.0],
140 Some("Multiply"),
141 Some(vec![("final_result", "value")]), // Context's "final_result" → function's "value"
142 Some(vec![("scaled", "multiplied")]) // Function's "scaled" → context's "multiplied"
143 );
144
145 println!("✓ Added variant nodes with parameter sweep");
146 println!(" Three variants: factor = 2.0, 3.0, 5.0");
147 println!(" Input mapping: context's 'final_result' → function's 'value'");
148 println!(" Output mapping: function's 'scaled' → context's 'multiplied'\n");
149
150 println!("=== Summary ===");
151 println!("The tuple-based API provides clear separation between:");
152 println!("1. Context variable names (broadcast vars) - shared across the graph");
153 println!("2. Function parameter names (impl vars) - internal to each function");
154 println!("\nThis allows:");
155 println!("- Branches to use consistent internal naming (x, y)");
156 println!("- Merge to distinguish branch outputs using branch IDs");
157 println!("- Clear data flow visualization and debugging");
158}122fn main() {
123 println!("═══════════════════════════════════════════════════════════");
124 println!(" Variant Pattern Demo (sigexec-style)");
125 println!(" Full Actual Syntax Examples");
126 println!("═══════════════════════════════════════════════════════════\n");
127
128 // =========================================================================
129 // Demo 1: Single Variant - Basic Factory Pattern
130 // =========================================================================
131 println!("Demo 1: Single Variant with Factory Function");
132 println!("─────────────────────────────────────────────────────────\n");
133
134 println!("📝 Code:");
135 println!("```rust");
136 println!("fn make_scaler(factor: f64) -> impl Fn(...) -> ... {{");
137 println!(" move |inputs, _| {{");
138 println!(" let value = inputs.get(\"input_data\").unwrap().parse::<f64>().unwrap();");
139 println!(" let scaled = value * factor;");
140 println!(" outputs.insert(\"scaled_value\", scaled.to_string());");
141 println!(" }}");
142 println!("}}");
143 println!();
144 println!("let mut graph = Graph::new();");
145 println!("graph.add(data_source, Some(\"Source\"), None, Some(vec![(\"value\", \"data\")]));");
146 println!("graph.variant(");
147 println!(" make_scaler, // Factory function");
148 println!(" vec![2.0, 3.0, 5.0], // Parameter values to sweep");
149 println!(" Some(\"Scale\"), // Label");
150 println!(" Some(vec![(\"data\", \"input_data\")]), // Input mapping");
151 println!(" Some(vec![(\"scaled_value\", \"result\")]) // Output mapping");
152 println!(");");
153 println!("```\n");
154
155 let mut graph1 = Graph::new();
156 graph1.add(
157 data_source,
158 Some("Source"),
159 None,
160 Some(vec![("value", "data")])
161 );
162 graph1.variant(
163 make_scaler,
164 vec![2.0, 3.0, 5.0],
165 Some("Scale"),
166 Some(vec![("data", "input_data")]),
167 Some(vec![("scaled_value", "result")])
168 );
169
170 let dag1 = graph1.build();
171 println!("🎯 What happens:");
172 println!(" • Factory creates 3 nodes: Scale_2.0, Scale_3.0, Scale_5.0");
173 println!(" • Each node multiplies input by its factor");
174 println!(" • All variants can execute in parallel");
175 println!();
176
177 let stats1 = dag1.stats();
178 println!("📈 DAG Statistics:");
179 println!(" - Total nodes: {}", stats1.node_count);
180 println!(" - Depth: {} levels", stats1.depth);
181 println!(" - Max parallelism: {} nodes can run simultaneously", stats1.max_parallelism);
182 println!();
183
184 println!("🔍 Mermaid Visualization:");
185 println!("{}", dag1.to_mermaid());
186 println!();
187
188 // =========================================================================
189 // Demo 2: Multiple Variants - Cartesian Product
190 // =========================================================================
191 println!("\nDemo 2: Multiple Variants (Cartesian Product)");
192 println!("─────────────────────────────────────────────────────────\n");
193
194 println!("📝 Code:");
195 println!("```rust");
196 println!("graph.add(data_source, Some(\"Generate\"), None, Some(vec![(\"value\", \"data\")]));");
197 println!("graph.variant(make_scaler, vec![2.0, 3.0], Some(\"Scale\"), ...);");
198 println!("graph.variant(make_offsetter, vec![10, 20], Some(\"Offset\"), ...);");
199 println!("graph.add(stats_node, Some(\"Stats\"), Some(vec![(\"result\", \"result\")]), None);");
200 println!("```\n");
201
202 let mut graph2 = Graph::new();
203 graph2.add(
204 data_source,
205 Some("Generate"),
206 None,
207 Some(vec![("value", "data")])
208 );
209 graph2.variant(
210 make_scaler,
211 vec![2.0, 3.0],
212 Some("Scale"),
213 Some(vec![("data", "input_data")]),
214 Some(vec![("scaled_value", "result")])
215 );
216 graph2.variant(
217 make_offsetter,
218 vec![10, 20],
219 Some("Offset"),
220 Some(vec![("result", "number")]),
221 Some(vec![("offset_result", "result")])
222 );
223 graph2.add(
224 stats_node,
225 Some("Stats"),
226 Some(vec![("result", "result")]),
227 Some(vec![("summary", "final")])
228 );
229
230 let dag2 = graph2.build();
231 println!("🎯 What happens:");
232 println!(" • Scale creates 2 variants: x2.0, x3.0");
233 println!(" • Offset creates 2 variants: +10, +20");
234 println!(" • Total combinations: 2 × 2 = 4 execution paths");
235 println!(" • Each path: Generate → Scale[variant] → Offset[variant] → Stats");
236 println!();
237
238 let stats2 = dag2.stats();
239 println!("📈 DAG Statistics:");
240 println!(" - Total nodes: {}", stats2.node_count);
241 println!(" - Depth: {} levels", stats2.depth);
242 println!(" - Execution paths: 4 (2 scales × 2 offsets)");
243 println!();
244
245 println!("🔍 Mermaid Visualization:");
246 println!("{}", dag2.to_mermaid());
247 println!();
248
249 // =========================================================================
250 // Demo 3: Complex Factory - Struct Configuration
251 // =========================================================================
252 println!("\nDemo 3: Complex Factory with Struct Configuration");
253 println!("─────────────────────────────────────────────────────────\n");
254
255 println!("📝 Code:");
256 println!("```rust");
257 println!("#[derive(Clone)]");
258 println!("struct FilterConfig {{");
259 println!(" cutoff: f64,");
260 println!(" mode: String,");
261 println!("}}");
262 println!();
263 println!("fn make_filter(config: FilterConfig) -> impl Fn(...) -> ... {{");
264 println!(" move |inputs, _| {{");
265 println!(" let value = inputs.get(\"data\").unwrap().parse::<f64>().unwrap();");
266 println!(" let filtered = match config.mode.as_str() {{");
267 println!(" \"lowpass\" => value * config.cutoff,");
268 println!(" \"highpass\" => value * (1.0 - config.cutoff),");
269 println!(" _ => value,");
270 println!(" }};");
271 println!(" }}");
272 println!("}}");
273 println!();
274 println!("let configs = vec![");
275 println!(" FilterConfig {{ cutoff: 0.5, mode: \"lowpass\".to_string() }},");
276 println!(" FilterConfig {{ cutoff: 0.3, mode: \"highpass\".to_string() }},");
277 println!(" FilterConfig {{ cutoff: 0.7, mode: \"lowpass\".to_string() }},");
278 println!("];");
279 println!("graph.variant(make_filter, configs, Some(\"Filter\"), ...);");
280 println!("```\n");
281
282 let configs = vec![
283 FilterConfig { cutoff: 0.5, mode: "lowpass".to_string() },
284 FilterConfig { cutoff: 0.3, mode: "highpass".to_string() },
285 FilterConfig { cutoff: 0.7, mode: "lowpass".to_string() },
286 ];
287
288 let mut graph3 = Graph::new();
289 graph3.add(
290 data_source,
291 Some("Source"),
292 None,
293 Some(vec![("value", "data")])
294 );
295 graph3.variant(
296 make_filter,
297 configs,
298 Some("Filter"),
299 Some(vec![("data", "data")]),
300 Some(vec![("filtered", "result")])
301 );
302
303 let dag3 = graph3.build();
304 println!("🎯 What happens:");
305 println!(" • 3 filter variants created with different configurations");
306 println!(" • Each variant uses its own FilterConfig struct");
307 println!(" • Demonstrates passing complex types to factory");
308 println!();
309
310 let stats3 = dag3.stats();
311 println!("📈 DAG Statistics:");
312 println!(" - Total nodes: {}", stats3.node_count);
313 println!(" - Filter variants: 3");
314 println!(" - Max parallelism: {} nodes", stats3.max_parallelism);
315 println!();
316
317 // =========================================================================
318 // Demo 4: String Processing Variants
319 // =========================================================================
320 println!("\nDemo 4: String Processing Variants");
321 println!("─────────────────────────────────────────────────────────\n");
322
323 println!("📝 Code:");
324 println!("```rust");
325 println!("fn make_processor(prefix: &'static str) -> impl Fn(...) -> ... {{");
326 println!(" move |inputs, _| {{");
327 println!(" let text = inputs.get(\"text\").unwrap();");
328 println!(" let processed = format!(\"[{{}}] {{}}\", prefix, text);");
329 println!(" outputs.insert(\"processed_text\", processed);");
330 println!(" }}");
331 println!("}}");
332 println!();
333 println!("graph.variant(");
334 println!(" make_processor,");
335 println!(" vec![\"INFO\", \"WARN\", \"ERROR\"],");
336 println!(" Some(\"LogLevel\"),");
337 println!(" Some(vec![(\"message\", \"text\")]),");
338 println!(" Some(vec![(\"processed_text\", \"log\")])");
339 println!(");");
340 println!("```\n");
341
342 let mut graph4 = Graph::new();
343 graph4.add(
344 text_source,
345 Some("Source"),
346 None,
347 Some(vec![("message", "message")])
348 );
349 graph4.variant(
350 make_processor,
351 vec!["INFO", "WARN", "ERROR"],
352 Some("LogLevel"),
353 Some(vec![("message", "text")]),
354 Some(vec![("processed_text", "log")])
355 );
356
357 let dag4 = graph4.build();
358 println!("🎯 What happens:");
359 println!(" • 3 log level variants: INFO, WARN, ERROR");
360 println!(" • Each prefixes the message with its log level");
361 println!(" • Demonstrates string/static str parameters");
362 println!();
363
364 let stats4 = dag4.stats();
365 println!("📈 DAG Statistics:");
366 println!(" - Total nodes: {}", stats4.node_count);
367 println!(" - Log variants: 3");
368 println!();
369
370 println!("🔍 Mermaid Visualization:");
371 println!("{}", dag4.to_mermaid());
372 println!();
373
374 // =========================================================================
375 // Summary
376 // =========================================================================
377 println!("\n═══════════════════════════════════════════════════════════");
378 println!(" Summary: Key Variant Pattern Features");
379 println!("═══════════════════════════════════════════════════════════\n");
380
381 println!("✅ Factory Function Pattern:");
382 println!(" • Factory takes parameter(s), returns closure");
383 println!(" • Closure captures parameters in its environment");
384 println!(" • Same signature as regular node functions");
385 println!();
386
387 println!("✅ Parameter Flexibility:");
388 println!(" • Primitives: f64, i32, &str");
389 println!(" • Structs: Custom configuration objects");
390 println!(" • Arrays/Vectors: Multiple values at once");
391 println!();
392
393 println!("✅ Cartesian Products:");
394 println!(" • Multiple .variant() calls create all combinations");
395 println!(" • Example: 2 scales × 3 filters = 6 execution paths");
396 println!();
397
398 println!("✅ Port Mapping:");
399 println!(" • Variants use same tuple-based syntax");
400 println!(" • (broadcast_var, impl_var) for inputs");
401 println!(" • (impl_var, broadcast_var) for outputs");
402 println!();
403
404 println!("✅ Parallel Execution:");
405 println!(" • All variants at same level can run in parallel");
406 println!(" • DAG analysis identifies parallelization opportunities");
407 println!();
408}Sourcepub fn merge<F>(
&mut self,
merge_fn: F,
label: Option<&str>,
inputs: Vec<(usize, &str, &str)>,
outputs: Option<Vec<(&str, &str)>>,
) -> &mut Self
pub fn merge<F>( &mut self, merge_fn: F, label: Option<&str>, inputs: Vec<(usize, &str, &str)>, outputs: Option<Vec<(&str, &str)>>, ) -> &mut Self
Merge multiple branches back together with a merge function
After branching, use .merge() to bring parallel paths back to a single point.
The merge function receives outputs from all specified branches and combines them.
§Arguments
merge_fn- Function that combines outputs from all brancheslabel- Optional label for visualizationinputs- List of (branch_id, broadcast_var, impl_var) tuples specifying which branch outputs to mergeoutputs- Optional list of (impl_var, broadcast_var) tuples for outputs
§Example
graph.add(source_fn, Some("Source"), None, Some(vec![("src_out", "data")]));
let mut branch_a = Graph::new();
branch_a.add(process_a, Some("Process A"), Some(vec![("data", "input")]), Some(vec![("output", "result")]));
let mut branch_b = Graph::new();
branch_b.add(process_b, Some("Process B"), Some(vec![("data", "input")]), Some(vec![("output", "result")]));
let branch_a_id = graph.branch(branch_a);
let branch_b_id = graph.branch(branch_b);
// Merge function combines results from both branches
// Branches can use same output name "result", merge maps them distinctly
graph.merge(
combine_fn,
Some("Combine"),
vec![
(branch_a_id, "result", "a_result"), // (branch, broadcast, impl)
(branch_b_id, "result", "b_result")
],
Some(vec![("combined", "final")]) // (impl, broadcast)
);Examples found in repository?
179fn demo_merging() {
180 println!("─────────────────────────────────────────────────────────");
181 println!("Demo 3: Branching + Merging (Fan-Out + Fan-In)");
182 println!("─────────────────────────────────────────────────────────");
183
184 let mut graph = Graph::new();
185
186 // Source
187 graph.add(
188 |_: &HashMap<String, String>, _| {
189 let mut result = HashMap::new();
190 result.insert("value".to_string(), "50".to_string());
191 result
192 },
193 Some("Source"),
194 None,
195 Some(vec![("value", "data")])
196 );
197
198 // Branch A: Add 10
199 let mut branch_a = Graph::new();
200 branch_a.add(
201 |inputs: &HashMap<String, String>, _| {
202 let mut result = HashMap::new();
203 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
204 result.insert("output".to_string(), (val + 10).to_string());
205 }
206 result
207 },
208 Some("PathA (+10)"),
209 Some(vec![("data", "x")]),
210 Some(vec![("output", "result")]) // Both branches use same output name!
211 );
212
213 // Branch B: Add 20
214 let mut branch_b = Graph::new();
215 branch_b.add(
216 |inputs: &HashMap<String, String>, _| {
217 let mut result = HashMap::new();
218 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
219 result.insert("output".to_string(), (val + 20).to_string());
220 }
221 result
222 },
223 Some("PathB (+20)"),
224 Some(vec![("data", "x")]),
225 Some(vec![("output", "result")]) // Both branches use same output name!
226 );
227
228 let branch_a_id = graph.branch(branch_a);
229 let branch_b_id = graph.branch(branch_b);
230
231 // Merge node: Combine results from both branches
232 graph.merge(
233 |inputs: &HashMap<String, String>, _| {
234 let mut result = HashMap::new();
235 let a = inputs.get("from_a").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
236 let b = inputs.get("from_b").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
237 result.insert("combined".to_string(), format!("{} + {} = {}", a, b, a + b));
238 result
239 },
240 Some("Merge"),
241 vec![
242 (branch_a_id, "result", "from_a"), // Map branch A's "result" to merge fn's "from_a"
243 (branch_b_id, "result", "from_b") // Map branch B's "result" to merge fn's "from_b"
244 ],
245 Some(vec![("combined", "final")])
246 );
247
248 let dag = graph.build();
249 println!("\n📊 Execution:");
250 let context = dag.execute(false, None);
251
252 println!(" Source: data = {}", context.get("data").unwrap());
253 println!(" Branch A: 50 + 10 = 60");
254 println!(" Branch B: 50 + 20 = 70");
255 println!(" Merged: {}", context.get("final").unwrap());
256
257 println!("\n📈 DAG Statistics:");
258 let stats = dag.stats();
259 println!("{}", stats.summary());
260
261 println!("\n🔍 Mermaid Visualization:");
262 println!("{}", dag.to_mermaid());
263 println!();
264}
265
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}
323
324fn demo_complex_graph() {
325 println!("─────────────────────────────────────────────────────────");
326 println!("Demo 5: Complex Graph (All Features Combined)");
327 println!("─────────────────────────────────────────────────────────");
328
329 let mut graph = Graph::new();
330
331 // 1. Data ingestion
332 graph.add(
333 |_: &HashMap<String, String>, _| {
334 let mut result = HashMap::new();
335 result.insert("raw_data".to_string(), "1000".to_string());
336 result
337 },
338 Some("Ingest"),
339 None,
340 Some(vec![("raw_data", "data")])
341 );
342
343 // 2. Preprocessing
344 graph.add(
345 |inputs: &HashMap<String, String>, _| {
346 let mut result = HashMap::new();
347 if let Some(val) = inputs.get("raw").and_then(|s| s.parse::<i32>().ok()) {
348 result.insert("cleaned".to_string(), (val / 10).to_string());
349 }
350 result
351 },
352 Some("Preprocess"),
353 Some(vec![("data", "raw")]),
354 Some(vec![("cleaned", "clean_data")])
355 );
356
357 // 3. Branch for different analyses
358 let mut stats_branch = Graph::new();
359 stats_branch.add(
360 |inputs: &HashMap<String, String>, _| {
361 let mut result = HashMap::new();
362 if let Some(val) = inputs.get("data") {
363 result.insert("stats".to_string(), format!("Stats({})", val));
364 }
365 result
366 },
367 Some("Stats"),
368 Some(vec![("clean_data", "data")]),
369 Some(vec![("stats", "statistics")])
370 );
371
372 let mut ml_branch = Graph::new();
373 ml_branch.add(
374 |inputs: &HashMap<String, String>, _| {
375 let mut result = HashMap::new();
376 if let Some(val) = inputs.get("data") {
377 result.insert("prediction".to_string(), format!("Pred({})", val));
378 }
379 result
380 },
381 Some("ML"),
382 Some(vec![("clean_data", "data")]),
383 Some(vec![("prediction", "ml_result")])
384 );
385
386 let stats_id = graph.branch(stats_branch);
387 let ml_id = graph.branch(ml_branch);
388
389 // 4. Merge branches
390 graph.merge(
391 |inputs: &HashMap<String, String>, _| {
392 let mut result = HashMap::new();
393 let stats = inputs.get("stats_in").cloned().unwrap_or_default();
394 let ml = inputs.get("ml_in").cloned().unwrap_or_default();
395 result.insert("report".to_string(), format!("{} & {}", stats, ml));
396 result
397 },
398 Some("Combine"),
399 vec![
400 (stats_id, "statistics", "stats_in"),
401 (ml_id, "ml_result", "ml_in")
402 ],
403 Some(vec![("report", "final_report")])
404 );
405
406 // 5. Final output formatting
407 graph.add(
408 |inputs: &HashMap<String, String>, _| {
409 let mut result = HashMap::new();
410 if let Some(report) = inputs.get("report") {
411 result.insert("formatted".to_string(), format!("[FINAL] {}", report));
412 }
413 result
414 },
415 Some("Format"),
416 Some(vec![("final_report", "report")]),
417 Some(vec![("formatted", "output")])
418 );
419
420 let dag = graph.build();
421 println!("\n📊 Execution:");
422 let context = dag.execute(false, None);
423
424 println!(" Step 1: Ingest → data = {}", context.get("data").unwrap());
425 println!(" Step 2: Preprocess → clean_data = {}", context.get("clean_data").unwrap());
426 println!(" Step 3: Branch A → statistics = {}", context.get("statistics").unwrap());
427 println!(" Branch B → ml_result = {}", context.get("ml_result").unwrap());
428 println!(" Step 4: Merge → final_report = {}", context.get("final_report").unwrap());
429 println!(" Step 5: Format → output = {}", context.get("output").unwrap());
430
431 println!("\n📈 DAG Statistics:");
432 let stats = dag.stats();
433 println!("{}", stats.summary());
434
435 println!("\n📋 Execution Order:");
436 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
437 println!(" Level {}: {} nodes", level_idx, level.len());
438 for &node_id in level {
439 let node = dag.nodes().iter().find(|n| n.id == node_id).unwrap();
440 println!(" - {}", node.display_name());
441 }
442 }
443
444 println!("\n🔍 Mermaid Visualization:");
445 println!("{}", dag.to_mermaid());
446 println!();
447
448 println!("═══════════════════════════════════════════════════════════");
449 println!(" Demo Complete!");
450 println!("═══════════════════════════════════════════════════════════");
451}More examples
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 // Top of diamond: Single source
325 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 // Left branch: Transform A (50ms)
337 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 // Right branch: Transform B (50ms)
355 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 // Bottom of diamond: Merge (30ms)
376 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}11fn main() {
12 println!("=== Tuple-Based API Demo ===\n");
13
14 let mut graph = Graph::new();
15
16 // Source node - no inputs, produces "dataset" in context
17 fn data_source(_inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
18 let mut outputs = HashMap::new();
19 outputs.insert("raw".to_string(), "Sample Data".to_string());
20 outputs
21 }
22
23 graph.add(
24 data_source,
25 Some("Source"),
26 None, // No inputs
27 Some(vec![("raw", "dataset")]) // Function returns "raw", stored as "dataset" in context
28 );
29
30 println!("✓ Added source node");
31 println!(" Output mapping: function's 'raw' → context's 'dataset'\n");
32
33 // Process node - consumes "dataset" from context as "input_data", produces "processed_data" to context as "result"
34 fn processor(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
35 let default = String::new();
36 let data = inputs.get("input_data").unwrap_or(&default);
37 let mut outputs = HashMap::new();
38 outputs.insert("processed_data".to_string(), format!("Processed: {}", data));
39 outputs
40 }
41
42 graph.add(
43 processor,
44 Some("Process"),
45 Some(vec![("dataset", "input_data")]), // Context's "dataset" → function's "input_data"
46 Some(vec![("processed_data", "result")]) // Function's "processed_data" → context's "result"
47 );
48
49 println!("✓ Added processor node");
50 println!(" Input mapping: context's 'dataset' → function's 'input_data'");
51 println!(" Output mapping: function's 'processed_data' → context's 'result'\n");
52
53 // Create branches that both use the same variable names internally
54 let mut branch_a = Graph::new();
55 fn transform_a(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
56 let default = String::new();
57 let data = inputs.get("x").unwrap_or(&default);
58 let mut outputs = HashMap::new();
59 outputs.insert("y".to_string(), format!("{} [Path A]", data));
60 outputs
61 }
62 branch_a.add(
63 transform_a,
64 Some("Transform A"),
65 Some(vec![("result", "x")]), // Context's "result" → function's "x"
66 Some(vec![("y", "output")]) // Function's "y" → context's "output"
67 );
68
69 let mut branch_b = Graph::new();
70 fn transform_b(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
71 let default = String::new();
72 let data = inputs.get("x").unwrap_or(&default);
73 let mut outputs = HashMap::new();
74 outputs.insert("y".to_string(), format!("{} [Path B]", data));
75 outputs
76 }
77 branch_b.add(
78 transform_b,
79 Some("Transform B"),
80 Some(vec![("result", "x")]), // Same variable names as branch_a
81 Some(vec![("y", "output")]) // Same variable names as branch_a
82 );
83
84 println!("✓ Created two branches");
85 println!(" Both branches use same internal variable names (x, y)");
86 println!(" Both map 'result' → 'x' and 'y' → 'output'\n");
87
88 // Branch from the processor
89 let branch_a_id = graph.branch(branch_a);
90 let branch_b_id = graph.branch(branch_b);
91
92 println!("✓ Added branches to graph");
93 println!(" Branch A ID: {}", branch_a_id);
94 println!(" Branch B ID: {}\n", branch_b_id);
95
96 // Merge branches with branch-specific variable resolution
97 fn combine(inputs: &HashMap<String, String>, _variant: &HashMap<String, String>) -> HashMap<String, String> {
98 let default = String::new();
99 let a = inputs.get("from_a").unwrap_or(&default);
100 let b = inputs.get("from_b").unwrap_or(&default);
101 let mut outputs = HashMap::new();
102 outputs.insert("merged".to_string(), format!("Combined: {} + {}", a, b));
103 outputs
104 }
105
106 graph.merge(
107 combine,
108 Some("Combine"),
109 vec![
110 (branch_a_id, "output", "from_a"), // Branch A's "output" → merge function's "from_a"
111 (branch_b_id, "output", "from_b") // Branch B's "output" → merge function's "from_b"
112 ],
113 Some(vec![("merged", "final_result")]) // Merge function's "merged" → context's "final_result"
114 );
115
116 println!("✓ Added merge node");
117 println!(" Branch-specific input mapping:");
118 println!(" Branch {} 'output' → merge function's 'from_a'", branch_a_id);
119 println!(" Branch {} 'output' → merge function's 'from_b'", branch_b_id);
120 println!(" Output mapping: merge function's 'merged' → context's 'final_result'\n");
121
122 // Variant example with factory pattern
123 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
124 move |inputs, _variant| {
125 let default = "1.0".to_string();
126 let data = inputs.get("value").unwrap_or(&default);
127 if let Ok(val) = data.parse::<f64>() {
128 let mut outputs = HashMap::new();
129 outputs.insert("scaled".to_string(), (val * factor).to_string());
130 outputs
131 } else {
132 HashMap::new()
133 }
134 }
135 }
136
137 graph.variant(
138 make_multiplier,
139 vec![2.0, 3.0, 5.0],
140 Some("Multiply"),
141 Some(vec![("final_result", "value")]), // Context's "final_result" → function's "value"
142 Some(vec![("scaled", "multiplied")]) // Function's "scaled" → context's "multiplied"
143 );
144
145 println!("✓ Added variant nodes with parameter sweep");
146 println!(" Three variants: factor = 2.0, 3.0, 5.0");
147 println!(" Input mapping: context's 'final_result' → function's 'value'");
148 println!(" Output mapping: function's 'scaled' → context's 'multiplied'\n");
149
150 println!("=== Summary ===");
151 println!("The tuple-based API provides clear separation between:");
152 println!("1. Context variable names (broadcast vars) - shared across the graph");
153 println!("2. Function parameter names (impl vars) - internal to each function");
154 println!("\nThis allows:");
155 println!("- Branches to use consistent internal naming (x, y)");
156 println!("- Merge to distinguish branch outputs using branch IDs");
157 println!("- Clear data flow visualization and debugging");
158}Sourcepub fn build(self) -> Dag
pub fn build(self) -> Dag
Build the final DAG from the graph builder
This performs the implicit inspection phase:
- Full graph traversal
- Execution path optimization
- Data flow connection determination
- Identification of parallelizable operations
Examples found in repository?
24fn demo_simple_output_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Simple Pipeline Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 1: Generate initial data
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("initial_value".to_string(), "100".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("initial_value", "raw_data")]) // Maps initial_value → raw_data
41 );
42
43 // Node 2: Process the data
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("input").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("processed".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Process"),
52 Some(vec![("raw_data", "input")]), // Maps raw_data → input
53 Some(vec![("processed", "final_result")]) // Maps processed → final_result
54 );
55
56 // Build and execute
57 let dag = graph.build();
58 let context = dag.execute(false, None);
59
60 println!("📦 Execution Context (all variables):");
61 for (key, value) in &context {
62 println!(" {} = {}", key, value);
63 }
64
65 println!("\n🎯 Accessing specific outputs:");
66
67 // Access by broadcast variable name (what's in the graph context)
68 if let Some(raw_data) = context.get("raw_data") {
69 println!(" raw_data: {}", raw_data);
70 }
71
72 if let Some(final_result) = context.get("final_result") {
73 println!(" final_result: {}", final_result);
74 }
75
76 // Check if a variable exists
77 if context.contains_key("final_result") {
78 println!("\n✅ Final result is available in context");
79 }
80
81 println!();
82}
83
84fn demo_branch_output_access() {
85 println!("─────────────────────────────────────────────────────────");
86 println!("Demo 2: Branch Output Access (Parallel Paths)");
87 println!("─────────────────────────────────────────────────────────\n");
88
89 let mut graph = Graph::new();
90
91 // Source node
92 graph.add(
93 |_: &HashMap<String, String>, _| {
94 let mut result = HashMap::new();
95 result.insert("value".to_string(), "50".to_string());
96 result
97 },
98 Some("Source"),
99 None,
100 Some(vec![("value", "shared_data")])
101 );
102
103 // Branch A: Statistics computation
104 let mut branch_a = Graph::new();
105 branch_a.add(
106 |inputs: &HashMap<String, String>, _| {
107 let val = inputs.get("data").unwrap();
108 let mut result = HashMap::new();
109 result.insert("stats_output".to_string(), format!("Stats of {}", val));
110 result
111 },
112 Some("Stats"),
113 Some(vec![("shared_data", "data")]),
114 Some(vec![("stats_output", "statistics")]) // Branch A produces "statistics"
115 );
116
117 // Branch B: Model training
118 let mut branch_b = Graph::new();
119 branch_b.add(
120 |inputs: &HashMap<String, String>, _| {
121 let val = inputs.get("data").unwrap();
122 let mut result = HashMap::new();
123 result.insert("model_output".to_string(), format!("Model trained on {}", val));
124 result
125 },
126 Some("Train"),
127 Some(vec![("shared_data", "data")]),
128 Some(vec![("model_output", "model")]) // Branch B produces "model"
129 );
130
131 // Branch C: Visualization
132 let mut branch_c = Graph::new();
133 branch_c.add(
134 |inputs: &HashMap<String, String>, _| {
135 let val = inputs.get("data").unwrap();
136 let mut result = HashMap::new();
137 result.insert("viz_output".to_string(), format!("Plot of {}", val));
138 result
139 },
140 Some("Visualize"),
141 Some(vec![("shared_data", "data")]),
142 Some(vec![("viz_output", "visualization")]) // Branch C produces "visualization"
143 );
144
145 // Add branches to main graph
146 graph.branch(branch_a);
147 graph.branch(branch_b);
148 graph.branch(branch_c);
149
150 // Execute
151 let dag = graph.build();
152 let context = dag.execute(false, None);
153
154 println!("📦 All outputs from parallel branches:");
155
156 // Access each branch's output
157 if let Some(stats) = context.get("statistics") {
158 println!(" Branch A (Statistics): {}", stats);
159 }
160
161 if let Some(model) = context.get("model") {
162 println!(" Branch B (Model): {}", model);
163 }
164
165 if let Some(viz) = context.get("visualization") {
166 println!(" Branch C (Visualization): {}", viz);
167 }
168
169 println!("\n🔍 All variables in context:");
170 for (key, value) in &context {
171 println!(" {} = {}", key, value);
172 }
173
174 println!();
175}
176
177fn demo_variant_output_access() {
178 println!("─────────────────────────────────────────────────────────");
179 println!("Demo 3: Variant Output Access (Parameter Sweep)");
180 println!("─────────────────────────────────────────────────────────\n");
181
182 let mut graph = Graph::new();
183
184 // Source node
185 graph.add(
186 |_: &HashMap<String, String>, _| {
187 let mut result = HashMap::new();
188 result.insert("value".to_string(), "10.0".to_string());
189 result
190 },
191 Some("DataSource"),
192 None,
193 Some(vec![("value", "input_data")])
194 );
195
196 // Variant nodes: scale by different factors
197 // Factory function that creates a scaler for each factor
198 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
199 move |inputs: &HashMap<String, String>, _| {
200 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
201 let mut result = HashMap::new();
202 result.insert("scaled".to_string(), (value * factor).to_string());
203 result
204 }
205 }
206
207 graph.variant(
208 make_scaler,
209 vec![2.0, 3.0, 5.0],
210 Some("Scale"),
211 Some(vec![("input_data", "data")]),
212 Some(vec![("scaled", "result")]) // Each variant produces "result"
213 );
214
215 // Execute
216 let dag = graph.build();
217 let context = dag.execute(false, None);
218
219 println!("📦 Variant outputs:");
220 println!(" Note: Variants with same output name overwrite each other");
221 println!(" The last variant (factor=5.0) writes to 'result'\n");
222
223 // Access the result (will be from the last variant)
224 if let Some(result) = context.get("result") {
225 println!(" result = {} (from last variant: 10.0 * 5.0)", result);
226 }
227
228 println!("\n💡 Tip: To preserve all variant outputs, use unique output names:");
229 println!(" Option 1: Map each variant to a different broadcast variable");
230 println!(" Option 2: Collect results in merge node");
231 println!(" Option 3: Use variant_params or closure capture to distinguish");
232
233 // Better approach: unique output names per variant
234 let mut graph2 = Graph::new();
235
236 graph2.add(
237 |_: &HashMap<String, String>, _| {
238 let mut result = HashMap::new();
239 result.insert("value".to_string(), "10.0".to_string());
240 result
241 },
242 Some("DataSource"),
243 None,
244 Some(vec![("value", "input_data")])
245 );
246
247 // Variant 1: 2x
248 fn make_scaler_unique(label: &str, factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> + '_ {
249 move |inputs: &HashMap<String, String>, _| {
250 let value = inputs.get("data").unwrap().parse::<f64>().unwrap();
251 let mut result = HashMap::new();
252 result.insert(label.to_string(), (value * factor).to_string());
253 result
254 }
255 }
256
257 graph2.variant(
258 |_x: &str| make_scaler_unique("scaled_2x", 2.0),
259 vec!["2x"],
260 Some("Scale2x"),
261 Some(vec![("input_data", "data")]),
262 Some(vec![("scaled_2x", "result_2x")])
263 );
264
265 graph2.variant(
266 |_x: &str| make_scaler_unique("scaled_3x", 3.0),
267 vec!["3x"],
268 Some("Scale3x"),
269 Some(vec![("input_data", "data")]),
270 Some(vec![("scaled_3x", "result_3x")])
271 );
272
273 let dag2 = graph2.build();
274 let context2 = dag2.execute(false, None);
275
276 println!("\n✅ Better approach - unique output names:");
277 if let Some(result_2x) = context2.get("result_2x") {
278 println!(" result_2x = {}", result_2x);
279 }
280 if let Some(result_3x) = context2.get("result_3x") {
281 println!(" result_3x = {}", result_3x);
282 }
283
284 println!();
285}
286
287fn demo_multiple_outputs() {
288 println!("─────────────────────────────────────────────────────────");
289 println!("Demo 4: Multiple Outputs from Single Node");
290 println!("─────────────────────────────────────────────────────────\n");
291
292 let mut graph = Graph::new();
293
294 // Node that produces multiple outputs
295 graph.add(
296 |_: &HashMap<String, String>, _| {
297 let mut result = HashMap::new();
298 result.insert("mean".to_string(), "50.5".to_string());
299 result.insert("median".to_string(), "48.0".to_string());
300 result.insert("stddev".to_string(), "12.3".to_string());
301 result.insert("count".to_string(), "100".to_string());
302 result
303 },
304 Some("Statistics"),
305 None,
306 Some(vec![
307 ("mean", "stat_mean"),
308 ("median", "stat_median"),
309 ("stddev", "stat_stddev"),
310 ("count", "sample_count")
311 ])
312 );
313
314 // Execute
315 let dag = graph.build();
316 let context = dag.execute(false, None);
317
318 println!("📊 Multiple outputs from single node:");
319
320 // Access each output individually
321 println!(" Mean: {}", context.get("stat_mean").unwrap());
322 println!(" Median: {}", context.get("stat_median").unwrap());
323 println!(" StdDev: {}", context.get("stat_stddev").unwrap());
324 println!(" Count: {}", context.get("sample_count").unwrap());
325
326 println!("\n📋 Complete execution context:");
327 for (key, value) in &context {
328 println!(" {} = {}", key, value);
329 }
330
331 println!("\n💡 Summary:");
332 println!(" ✓ dag.execute(false, None) returns HashMap<String, String>");
333 println!(" ✓ Keys are broadcast variable names (from output mappings)");
334 println!(" ✓ Use context.get(\"variable_name\") to access specific outputs");
335 println!(" ✓ All outputs accumulate in the context throughout execution");
336
337 println!();
338}More examples
28fn demo_simple_pipeline() {
29 println!("─────────────────────────────────────────────────────────");
30 println!("Demo 1: Simple Sequential Pipeline");
31 println!("─────────────────────────────────────────────────────────");
32
33 let mut graph = Graph::new();
34
35 // Data source node
36 graph.add(
37 |_: &HashMap<String, String>, _| {
38 let mut result = HashMap::new();
39 result.insert("value".to_string(), "42".to_string());
40 result
41 },
42 Some("DataSource"),
43 None, // No inputs (source node)
44 Some(vec![("value", "data")]) // (impl_var, broadcast_var)
45 );
46
47 // Processing node: multiply by 2
48 graph.add(
49 |inputs: &HashMap<String, String>, _| {
50 let mut result = HashMap::new();
51 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
52 result.insert("doubled".to_string(), (val * 2).to_string());
53 }
54 result
55 },
56 Some("Multiply"),
57 Some(vec![("data", "x")]), // (broadcast_var, impl_var)
58 Some(vec![("doubled", "result")]) // (impl_var, broadcast_var)
59 );
60
61 // Final processing: add 10
62 graph.add(
63 |inputs: &HashMap<String, String>, _| {
64 let mut result = HashMap::new();
65 if let Some(val) = inputs.get("num").and_then(|s| s.parse::<i32>().ok()) {
66 result.insert("sum".to_string(), (val + 10).to_string());
67 }
68 result
69 },
70 Some("AddTen"),
71 Some(vec![("result", "num")]),
72 Some(vec![("sum", "final")])
73 );
74
75 let dag = graph.build();
76 println!("\n📊 Execution:");
77 let context = dag.execute(false, None);
78
79 println!(" Input: data = {}", context.get("data").unwrap());
80 println!(" Step 1: result = {} (data * 2)", context.get("result").unwrap());
81 println!(" Step 2: final = {} (result + 10)", context.get("final").unwrap());
82
83 println!("\n📈 DAG Statistics:");
84 let stats = dag.stats();
85 println!("{}", stats.summary());
86
87 println!("\n🔍 Mermaid Visualization:");
88 println!("{}", dag.to_mermaid());
89 println!();
90}
91
92fn demo_branching() {
93 println!("─────────────────────────────────────────────────────────");
94 println!("Demo 2: Parallel Branching (Fan-Out)");
95 println!("─────────────────────────────────────────────────────────");
96
97 let mut graph = Graph::new();
98
99 // Source node
100 graph.add(
101 |_: &HashMap<String, String>, _| {
102 let mut result = HashMap::new();
103 result.insert("dataset".to_string(), "100".to_string());
104 result
105 },
106 Some("Source"),
107 None,
108 Some(vec![("dataset", "data")])
109 );
110
111 // Branch A: Compute statistics
112 let mut branch_a = Graph::new();
113 branch_a.add(
114 |inputs: &HashMap<String, String>, _| {
115 let mut result = HashMap::new();
116 if let Some(val) = inputs.get("input") {
117 result.insert("stats".to_string(), format!("Mean: {}", val));
118 }
119 result
120 },
121 Some("Statistics"),
122 Some(vec![("data", "input")]),
123 Some(vec![("stats", "stats_result")])
124 );
125
126 // Branch B: Train model
127 let mut branch_b = Graph::new();
128 branch_b.add(
129 |inputs: &HashMap<String, String>, _| {
130 let mut result = HashMap::new();
131 if let Some(val) = inputs.get("input") {
132 result.insert("model".to_string(), format!("Model trained on {}", val));
133 }
134 result
135 },
136 Some("MLModel"),
137 Some(vec![("data", "input")]),
138 Some(vec![("model", "model_result")])
139 );
140
141 // Branch C: Generate visualization
142 let mut branch_c = Graph::new();
143 branch_c.add(
144 |inputs: &HashMap<String, String>, _| {
145 let mut result = HashMap::new();
146 if let Some(val) = inputs.get("input") {
147 result.insert("plot".to_string(), format!("Plot of {}", val));
148 }
149 result
150 },
151 Some("Visualization"),
152 Some(vec![("data", "input")]),
153 Some(vec![("plot", "viz_result")])
154 );
155
156 graph.branch(branch_a);
157 graph.branch(branch_b);
158 graph.branch(branch_c);
159
160 let dag = graph.build();
161 println!("\n📊 Execution:");
162 let context = dag.execute(false, None);
163
164 println!(" Source: data = {}", context.get("data").unwrap());
165 println!(" Branch A (Stats): {}", context.get("stats_result").unwrap());
166 println!(" Branch B (Model): {}", context.get("model_result").unwrap());
167 println!(" Branch C (Viz): {}", context.get("viz_result").unwrap());
168
169 println!("\n📈 DAG Statistics:");
170 let stats = dag.stats();
171 println!("{}", stats.summary());
172 println!(" ⚡ All 3 branches can execute in parallel!");
173
174 println!("\n🔍 Mermaid Visualization:");
175 println!("{}", dag.to_mermaid());
176 println!();
177}
178
179fn demo_merging() {
180 println!("─────────────────────────────────────────────────────────");
181 println!("Demo 3: Branching + Merging (Fan-Out + Fan-In)");
182 println!("─────────────────────────────────────────────────────────");
183
184 let mut graph = Graph::new();
185
186 // Source
187 graph.add(
188 |_: &HashMap<String, String>, _| {
189 let mut result = HashMap::new();
190 result.insert("value".to_string(), "50".to_string());
191 result
192 },
193 Some("Source"),
194 None,
195 Some(vec![("value", "data")])
196 );
197
198 // Branch A: Add 10
199 let mut branch_a = Graph::new();
200 branch_a.add(
201 |inputs: &HashMap<String, String>, _| {
202 let mut result = HashMap::new();
203 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
204 result.insert("output".to_string(), (val + 10).to_string());
205 }
206 result
207 },
208 Some("PathA (+10)"),
209 Some(vec![("data", "x")]),
210 Some(vec![("output", "result")]) // Both branches use same output name!
211 );
212
213 // Branch B: Add 20
214 let mut branch_b = Graph::new();
215 branch_b.add(
216 |inputs: &HashMap<String, String>, _| {
217 let mut result = HashMap::new();
218 if let Some(val) = inputs.get("x").and_then(|s| s.parse::<i32>().ok()) {
219 result.insert("output".to_string(), (val + 20).to_string());
220 }
221 result
222 },
223 Some("PathB (+20)"),
224 Some(vec![("data", "x")]),
225 Some(vec![("output", "result")]) // Both branches use same output name!
226 );
227
228 let branch_a_id = graph.branch(branch_a);
229 let branch_b_id = graph.branch(branch_b);
230
231 // Merge node: Combine results from both branches
232 graph.merge(
233 |inputs: &HashMap<String, String>, _| {
234 let mut result = HashMap::new();
235 let a = inputs.get("from_a").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
236 let b = inputs.get("from_b").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
237 result.insert("combined".to_string(), format!("{} + {} = {}", a, b, a + b));
238 result
239 },
240 Some("Merge"),
241 vec![
242 (branch_a_id, "result", "from_a"), // Map branch A's "result" to merge fn's "from_a"
243 (branch_b_id, "result", "from_b") // Map branch B's "result" to merge fn's "from_b"
244 ],
245 Some(vec![("combined", "final")])
246 );
247
248 let dag = graph.build();
249 println!("\n📊 Execution:");
250 let context = dag.execute(false, None);
251
252 println!(" Source: data = {}", context.get("data").unwrap());
253 println!(" Branch A: 50 + 10 = 60");
254 println!(" Branch B: 50 + 20 = 70");
255 println!(" Merged: {}", context.get("final").unwrap());
256
257 println!("\n📈 DAG Statistics:");
258 let stats = dag.stats();
259 println!("{}", stats.summary());
260
261 println!("\n🔍 Mermaid Visualization:");
262 println!("{}", dag.to_mermaid());
263 println!();
264}
265
266fn demo_variants() {
267 println!("─────────────────────────────────────────────────────────");
268 println!("Demo 4: Parameter Sweep with Variants");
269 println!("─────────────────────────────────────────────────────────");
270
271 let mut graph = Graph::new();
272
273 // Source node
274 graph.add(
275 |_: &HashMap<String, String>, _| {
276 let mut result = HashMap::new();
277 result.insert("base_value".to_string(), "10.0".to_string());
278 result
279 },
280 Some("DataSource"),
281 None,
282 Some(vec![("base_value", "data")])
283 );
284
285 // Variant factory: Scale by different learning rates
286 fn make_scaler(learning_rate: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
287 move |inputs: &HashMap<String, String>, _| {
288 let mut result = HashMap::new();
289 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
290 let scaled = val * learning_rate;
291 result.insert("scaled_value".to_string(), format!("{:.2}", scaled));
292 }
293 result
294 }
295 }
296
297 // Create variants using Linspace for learning rate sweep
298 graph.variant(
299 make_scaler,
300 vec![0.001, 0.01, 0.1, 1.0],
301 Some("ScaleLR"),
302 Some(vec![("data", "input")]),
303 Some(vec![("scaled_value", "result")])
304 );
305
306 let dag = graph.build();
307 println!("\n📊 Execution:");
308 let context = dag.execute(false, None);
309
310 println!(" Source: data = {}", context.get("data").unwrap());
311 println!(" Variants created for learning rates: [0.001, 0.01, 0.1, 1.0]");
312 println!(" (Each variant computes: data * learning_rate)");
313
314 println!("\n📈 DAG Statistics:");
315 let stats = dag.stats();
316 println!("{}", stats.summary());
317 println!(" ⚡ All {} variants can execute in parallel!", stats.variant_count);
318
319 println!("\n🔍 Mermaid Visualization:");
320 println!("{}", dag.to_mermaid());
321 println!();
322}
323
324fn demo_complex_graph() {
325 println!("─────────────────────────────────────────────────────────");
326 println!("Demo 5: Complex Graph (All Features Combined)");
327 println!("─────────────────────────────────────────────────────────");
328
329 let mut graph = Graph::new();
330
331 // 1. Data ingestion
332 graph.add(
333 |_: &HashMap<String, String>, _| {
334 let mut result = HashMap::new();
335 result.insert("raw_data".to_string(), "1000".to_string());
336 result
337 },
338 Some("Ingest"),
339 None,
340 Some(vec![("raw_data", "data")])
341 );
342
343 // 2. Preprocessing
344 graph.add(
345 |inputs: &HashMap<String, String>, _| {
346 let mut result = HashMap::new();
347 if let Some(val) = inputs.get("raw").and_then(|s| s.parse::<i32>().ok()) {
348 result.insert("cleaned".to_string(), (val / 10).to_string());
349 }
350 result
351 },
352 Some("Preprocess"),
353 Some(vec![("data", "raw")]),
354 Some(vec![("cleaned", "clean_data")])
355 );
356
357 // 3. Branch for different analyses
358 let mut stats_branch = Graph::new();
359 stats_branch.add(
360 |inputs: &HashMap<String, String>, _| {
361 let mut result = HashMap::new();
362 if let Some(val) = inputs.get("data") {
363 result.insert("stats".to_string(), format!("Stats({})", val));
364 }
365 result
366 },
367 Some("Stats"),
368 Some(vec![("clean_data", "data")]),
369 Some(vec![("stats", "statistics")])
370 );
371
372 let mut ml_branch = Graph::new();
373 ml_branch.add(
374 |inputs: &HashMap<String, String>, _| {
375 let mut result = HashMap::new();
376 if let Some(val) = inputs.get("data") {
377 result.insert("prediction".to_string(), format!("Pred({})", val));
378 }
379 result
380 },
381 Some("ML"),
382 Some(vec![("clean_data", "data")]),
383 Some(vec![("prediction", "ml_result")])
384 );
385
386 let stats_id = graph.branch(stats_branch);
387 let ml_id = graph.branch(ml_branch);
388
389 // 4. Merge branches
390 graph.merge(
391 |inputs: &HashMap<String, String>, _| {
392 let mut result = HashMap::new();
393 let stats = inputs.get("stats_in").cloned().unwrap_or_default();
394 let ml = inputs.get("ml_in").cloned().unwrap_or_default();
395 result.insert("report".to_string(), format!("{} & {}", stats, ml));
396 result
397 },
398 Some("Combine"),
399 vec![
400 (stats_id, "statistics", "stats_in"),
401 (ml_id, "ml_result", "ml_in")
402 ],
403 Some(vec![("report", "final_report")])
404 );
405
406 // 5. Final output formatting
407 graph.add(
408 |inputs: &HashMap<String, String>, _| {
409 let mut result = HashMap::new();
410 if let Some(report) = inputs.get("report") {
411 result.insert("formatted".to_string(), format!("[FINAL] {}", report));
412 }
413 result
414 },
415 Some("Format"),
416 Some(vec![("final_report", "report")]),
417 Some(vec![("formatted", "output")])
418 );
419
420 let dag = graph.build();
421 println!("\n📊 Execution:");
422 let context = dag.execute(false, None);
423
424 println!(" Step 1: Ingest → data = {}", context.get("data").unwrap());
425 println!(" Step 2: Preprocess → clean_data = {}", context.get("clean_data").unwrap());
426 println!(" Step 3: Branch A → statistics = {}", context.get("statistics").unwrap());
427 println!(" Branch B → ml_result = {}", context.get("ml_result").unwrap());
428 println!(" Step 4: Merge → final_report = {}", context.get("final_report").unwrap());
429 println!(" Step 5: Format → output = {}", context.get("output").unwrap());
430
431 println!("\n📈 DAG Statistics:");
432 let stats = dag.stats();
433 println!("{}", stats.summary());
434
435 println!("\n📋 Execution Order:");
436 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
437 println!(" Level {}: {} nodes", level_idx, level.len());
438 for &node_id in level {
439 let node = dag.nodes().iter().find(|n| n.id == node_id).unwrap();
440 println!(" - {}", node.display_name());
441 }
442 }
443
444 println!("\n🔍 Mermaid Visualization:");
445 println!("{}", dag.to_mermaid());
446 println!();
447
448 println!("═══════════════════════════════════════════════════════════");
449 println!(" Demo Complete!");
450 println!("═══════════════════════════════════════════════════════════");
451}24fn demo_per_node_access() {
25 println!("─────────────────────────────────────────────────────────");
26 println!("Demo 1: Per-Node Output Access");
27 println!("─────────────────────────────────────────────────────────\n");
28
29 let mut graph = Graph::new();
30
31 // Node 0: Source
32 graph.add(
33 |_: &HashMap<String, String>, _| {
34 let mut result = HashMap::new();
35 result.insert("value".to_string(), "10".to_string());
36 result
37 },
38 Some("Source"),
39 None,
40 Some(vec![("value", "initial_data")])
41 );
42
43 // Node 1: Double
44 graph.add(
45 |inputs: &HashMap<String, String>, _| {
46 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
47 let mut result = HashMap::new();
48 result.insert("doubled".to_string(), (value * 2).to_string());
49 result
50 },
51 Some("Double"),
52 Some(vec![("initial_data", "in")]),
53 Some(vec![("doubled", "doubled_data")])
54 );
55
56 // Node 2: Add Ten
57 graph.add(
58 |inputs: &HashMap<String, String>, _| {
59 let value = inputs.get("in").unwrap().parse::<i32>().unwrap();
60 let mut result = HashMap::new();
61 result.insert("added".to_string(), (value + 10).to_string());
62 result
63 },
64 Some("AddTen"),
65 Some(vec![("doubled_data", "in")]),
66 Some(vec![("added", "final_result")])
67 );
68
69 // Execute and get detailed results
70 let dag = graph.build();
71 let result = dag.execute_detailed(false, None);
72
73 println!("🌍 Global Context (all variables):");
74 for (key, value) in &result.context {
75 println!(" {} = {}", key, value);
76 }
77
78 println!("\n📦 Per-Node Outputs:");
79 println!("\nNode 0 (Source) outputs:");
80 if let Some(outputs) = result.get_node_outputs(0) {
81 for (key, value) in outputs {
82 println!(" {} = {}", key, value);
83 }
84 }
85
86 println!("\nNode 1 (Double) outputs:");
87 if let Some(outputs) = result.get_node_outputs(1) {
88 for (key, value) in outputs {
89 println!(" {} = {}", key, value);
90 }
91 }
92
93 println!("\nNode 2 (AddTen) outputs:");
94 if let Some(outputs) = result.get_node_outputs(2) {
95 for (key, value) in outputs {
96 println!(" {} = {}", key, value);
97 }
98 }
99
100 println!("\n🎯 Accessing specific node outputs:");
101 if let Some(value) = result.get_from_node(0, "initial_data") {
102 println!(" Node 0 'initial_data': {}", value);
103 }
104 if let Some(value) = result.get_from_node(1, "doubled_data") {
105 println!(" Node 1 'doubled_data': {}", value);
106 }
107 if let Some(value) = result.get_from_node(2, "final_result") {
108 println!(" Node 2 'final_result': {}", value);
109 }
110
111 println!();
112}
113
114fn demo_per_branch_access() {
115 println!("─────────────────────────────────────────────────────────");
116 println!("Demo 2: Per-Branch Output Access");
117 println!("─────────────────────────────────────────────────────────\n");
118
119 let mut graph = Graph::new();
120
121 // Main graph: Source node
122 graph.add(
123 |_: &HashMap<String, String>, _| {
124 let mut result = HashMap::new();
125 result.insert("dataset".to_string(), "100".to_string());
126 result
127 },
128 Some("Source"),
129 None,
130 Some(vec![("dataset", "data")])
131 );
132
133 // Branch A: Statistics
134 let mut branch_a = Graph::new();
135 branch_a.add(
136 |inputs: &HashMap<String, String>, _| {
137 let value = inputs.get("input").unwrap();
138 let mut result = HashMap::new();
139 result.insert("stat_result".to_string(), format!("Mean of {}", value));
140 result
141 },
142 Some("Statistics"),
143 Some(vec![("data", "input")]),
144 Some(vec![("stat_result", "statistics")])
145 );
146
147 // Branch B: Model Training
148 let mut branch_b = Graph::new();
149 branch_b.add(
150 |inputs: &HashMap<String, String>, _| {
151 let value = inputs.get("input").unwrap();
152 let mut result = HashMap::new();
153 result.insert("model_result".to_string(), format!("Model trained on {}", value));
154 result
155 },
156 Some("ModelTraining"),
157 Some(vec![("data", "input")]),
158 Some(vec![("model_result", "trained_model")])
159 );
160
161 // Branch C: Visualization
162 let mut branch_c = Graph::new();
163 branch_c.add(
164 |inputs: &HashMap<String, String>, _| {
165 let value = inputs.get("input").unwrap();
166 let mut result = HashMap::new();
167 result.insert("viz_result".to_string(), format!("Plot of {}", value));
168 result
169 },
170 Some("Visualization"),
171 Some(vec![("data", "input")]),
172 Some(vec![("viz_result", "plot")])
173 );
174
175 let branch_a_id = graph.branch(branch_a);
176 let branch_b_id = graph.branch(branch_b);
177 let branch_c_id = graph.branch(branch_c);
178
179 // Execute and get detailed results
180 let dag = graph.build();
181 let result = dag.execute_detailed(false, None);
182
183 println!("🌍 Global Context:");
184 for (key, value) in &result.context {
185 println!(" {} = {}", key, value);
186 }
187
188 println!("\n🌿 Per-Branch Outputs:");
189
190 println!("\nBranch {} (Statistics) outputs:", branch_a_id);
191 if let Some(outputs) = result.get_branch_outputs(branch_a_id) {
192 for (key, value) in outputs {
193 println!(" {} = {}", key, value);
194 }
195 }
196
197 println!("\nBranch {} (Model Training) outputs:", branch_b_id);
198 if let Some(outputs) = result.get_branch_outputs(branch_b_id) {
199 for (key, value) in outputs {
200 println!(" {} = {}", key, value);
201 }
202 }
203
204 println!("\nBranch {} (Visualization) outputs:", branch_c_id);
205 if let Some(outputs) = result.get_branch_outputs(branch_c_id) {
206 for (key, value) in outputs {
207 println!(" {} = {}", key, value);
208 }
209 }
210
211 println!("\n🎯 Accessing specific branch outputs:");
212 if let Some(value) = result.get_from_branch(branch_a_id, "statistics") {
213 println!(" Branch {} 'statistics': {}", branch_a_id, value);
214 }
215 if let Some(value) = result.get_from_branch(branch_b_id, "trained_model") {
216 println!(" Branch {} 'trained_model': {}", branch_b_id, value);
217 }
218 if let Some(value) = result.get_from_branch(branch_c_id, "plot") {
219 println!(" Branch {} 'plot': {}", branch_c_id, value);
220 }
221
222 println!();
223}
224
225fn demo_variant_per_node_access() {
226 println!("─────────────────────────────────────────────────────────");
227 println!("Demo 3: Variant Outputs with Per-Node Tracking");
228 println!("─────────────────────────────────────────────────────────\n");
229
230 let mut graph = Graph::new();
231
232 // Source node
233 graph.add(
234 |_: &HashMap<String, String>, _| {
235 let mut result = HashMap::new();
236 result.insert("base_value".to_string(), "10".to_string());
237 result
238 },
239 Some("Source"),
240 None,
241 Some(vec![("base_value", "data")])
242 );
243
244 // Variant factory for scaling
245 fn make_scaler(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
246 move |inputs: &HashMap<String, String>, _| {
247 let value = inputs.get("input_data").unwrap().parse::<f64>().unwrap();
248 let mut result = HashMap::new();
249 result.insert("scaled_value".to_string(), (value * factor).to_string());
250 result
251 }
252 }
253
254 // Create variants with unique output names to preserve all results
255 graph.variant(
256 make_scaler,
257 vec![2.0, 3.0, 5.0],
258 Some("Scale"),
259 Some(vec![("data", "input_data")]),
260 Some(vec![("scaled_value", "result")]) // Note: will overwrite in global context
261 );
262
263 let dag = graph.build();
264 let result = dag.execute_detailed(false, None);
265
266 println!("🌍 Global Context (note: 'result' contains last variant's output):");
267 for (key, value) in &result.context {
268 println!(" {} = {}", key, value);
269 }
270
271 println!("\n📦 Per-Node Outputs (each variant tracked separately):");
272
273 // Node 0 is the source
274 // Nodes 1, 2, 3 are the variant nodes (2x, 3x, 5x scalers)
275 for node_id in 1..=3 {
276 println!("\nNode {} (Variant Scaler) outputs:", node_id);
277 if let Some(outputs) = result.get_node_outputs(node_id) {
278 for (key, value) in outputs {
279 println!(" {} = {}", key, value);
280 }
281 }
282 }
283
284 println!("\n💡 Key Insight:");
285 println!(" - Global context has 'result' = {} (last variant overwrites)", result.get("result").unwrap());
286 println!(" - But per-node outputs preserve ALL variant results:");
287 println!(" Node 1 (2x): result = {}", result.get_from_node(1, "result").unwrap());
288 println!(" Node 2 (3x): result = {}", result.get_from_node(2, "result").unwrap());
289 println!(" Node 3 (5x): result = {}", result.get_from_node(3, "result").unwrap());
290
291 println!();
292}
293
294fn demo_execution_history_tracking() {
295 println!("─────────────────────────────────────────────────────────");
296 println!("Demo 4: Execution History Tracking");
297 println!("─────────────────────────────────────────────────────────\n");
298
299 let mut graph = Graph::new();
300
301 // Multi-stage pipeline
302 graph.add(
303 |_: &HashMap<String, String>, _| {
304 let mut result = HashMap::new();
305 result.insert("raw".to_string(), "5".to_string());
306 result
307 },
308 Some("Load"),
309 None,
310 Some(vec![("raw", "input")])
311 );
312
313 graph.add(
314 |inputs: &HashMap<String, String>, _| {
315 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
316 let mut result = HashMap::new();
317 result.insert("cleaned".to_string(), (value + 1).to_string());
318 result
319 },
320 Some("Clean"),
321 Some(vec![("input", "x")]),
322 Some(vec![("cleaned", "clean_data")])
323 );
324
325 graph.add(
326 |inputs: &HashMap<String, String>, _| {
327 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
328 let mut result = HashMap::new();
329 result.insert("normalized".to_string(), (value * 10).to_string());
330 result
331 },
332 Some("Normalize"),
333 Some(vec![("clean_data", "x")]),
334 Some(vec![("normalized", "norm_data")])
335 );
336
337 graph.add(
338 |inputs: &HashMap<String, String>, _| {
339 let value = inputs.get("x").unwrap().parse::<i32>().unwrap();
340 let mut result = HashMap::new();
341 result.insert("transformed".to_string(), format!("FINAL_{}", value));
342 result
343 },
344 Some("Transform"),
345 Some(vec![("norm_data", "x")]),
346 Some(vec![("transformed", "output")])
347 );
348
349 let dag = graph.build();
350 let result = dag.execute_detailed(false, None);
351
352 println!("📊 Execution History (Data Flow Tracking):");
353 println!();
354 println!("Step-by-step transformation:");
355 println!(" 1. Load: input = {}", result.get_from_node(0, "input").unwrap());
356 println!(" 2. Clean: clean_data = {}", result.get_from_node(1, "clean_data").unwrap());
357 println!(" 3. Normalize: norm_data = {}", result.get_from_node(2, "norm_data").unwrap());
358 println!(" 4. Transform: output = {}", result.get_from_node(3, "output").unwrap());
359
360 println!("\n🔍 Debugging: Inspect any intermediate result:");
361 println!(" Need to debug the normalization step?");
362 println!(" Just check Node 2: {}", result.get_from_node(2, "norm_data").unwrap());
363
364 println!("\n✅ Benefits of Per-Node Access:");
365 println!(" ✓ Track data transformations step-by-step");
366 println!(" ✓ Debug issues by inspecting intermediate values");
367 println!(" ✓ Validate each processing stage independently");
368 println!(" ✓ Preserve all variant outputs even with name collisions");
369
370 println!();
371}32fn demo_sequential_vs_parallel() {
33 println!("─────────────────────────────────────────────────────────");
34 println!("Demo 1: Sequential vs Parallel Execution");
35 println!("─────────────────────────────────────────────────────────");
36
37 let mut graph = Graph::new();
38
39 // Source node
40 graph.add(
41 |_: &HashMap<String, String>, _| {
42 let start = Instant::now();
43 let mut result = HashMap::new();
44 result.insert("data".to_string(), "source_data".to_string());
45 println!(" [{}ms] Source completed", start.elapsed().as_millis());
46 result
47 },
48 Some("Source"),
49 None,
50 Some(vec![("data", "data")])
51 );
52
53 // Branch A: 100ms work
54 let mut branch_a = Graph::new();
55 branch_a.add(
56 |inputs: &HashMap<String, String>, _| {
57 let start = Instant::now();
58 let mut result = HashMap::new();
59 if let Some(data) = inputs.get("input") {
60 let processed = simulate_work(100, data);
61 result.insert("result".to_string(), processed);
62 }
63 println!(" [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64 result
65 },
66 Some("BranchA[100ms]"),
67 Some(vec![("data", "input")]),
68 Some(vec![("result", "result_a")])
69 );
70
71 // Branch B: 100ms work
72 let mut branch_b = Graph::new();
73 branch_b.add(
74 |inputs: &HashMap<String, String>, _| {
75 let start = Instant::now();
76 let mut result = HashMap::new();
77 if let Some(data) = inputs.get("input") {
78 let processed = simulate_work(100, data);
79 result.insert("result".to_string(), processed);
80 }
81 println!(" [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82 result
83 },
84 Some("BranchB[100ms]"),
85 Some(vec![("data", "input")]),
86 Some(vec![("result", "result_b")])
87 );
88
89 // Branch C: 100ms work
90 let mut branch_c = Graph::new();
91 branch_c.add(
92 |inputs: &HashMap<String, String>, _| {
93 let start = Instant::now();
94 let mut result = HashMap::new();
95 if let Some(data) = inputs.get("input") {
96 let processed = simulate_work(100, data);
97 result.insert("result".to_string(), processed);
98 }
99 println!(" [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100 result
101 },
102 Some("BranchC[100ms]"),
103 Some(vec![("data", "input")]),
104 Some(vec![("result", "result_c")])
105 );
106
107 graph.branch(branch_a);
108 graph.branch(branch_b);
109 graph.branch(branch_c);
110
111 let dag = graph.build();
112
113 println!("\n📊 Sequential Execution (simulated):");
114 let start = Instant::now();
115 let _ = dag.execute(false, None);
116 let sequential_time = start.elapsed();
117 println!(" Total time: {}ms", sequential_time.as_millis());
118
119 println!("\n⚡ With Parallel Execution:");
120 println!(" Expected time: ~100ms (all branches run simultaneously)");
121 println!(" Speedup: ~3x faster than sequential");
122
123 println!("\n📈 DAG Statistics:");
124 let stats = dag.stats();
125 println!("{}", stats.summary());
126 println!("\n Analysis:");
127 println!(" - Level 0: 1 node (Source)");
128 println!(" - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129 println!(" - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130
131 println!("\n🔍 Mermaid Visualization with Port Mappings:");
132 println!("{}", dag.to_mermaid());
133 println!();
134}
135
136fn demo_complex_dependencies() {
137 println!("─────────────────────────────────────────────────────────");
138 println!("Demo 2: Complex Data Dependencies");
139 println!("─────────────────────────────────────────────────────────");
140
141 let mut graph = Graph::new();
142
143 // Two independent sources
144 graph.add(
145 |_: &HashMap<String, String>, _| {
146 let mut result = HashMap::new();
147 result.insert("source1_data".to_string(), "100".to_string());
148 result
149 },
150 Some("Source1"),
151 None,
152 Some(vec![("source1_data", "data1")])
153 );
154
155 graph.add(
156 |_: &HashMap<String, String>, _| {
157 let mut result = HashMap::new();
158 result.insert("source2_data".to_string(), "200".to_string());
159 result
160 },
161 Some("Source2"),
162 None,
163 Some(vec![("source2_data", "data2")])
164 );
165
166 // Process each source independently (can run in parallel)
167 graph.add(
168 |inputs: &HashMap<String, String>, _| {
169 let mut result = HashMap::new();
170 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171 thread::sleep(Duration::from_millis(50));
172 result.insert("processed".to_string(), (val * 2).to_string());
173 }
174 result
175 },
176 Some("Process1[50ms]"),
177 Some(vec![("data1", "in")]),
178 Some(vec![("processed", "proc1")])
179 );
180
181 graph.add(
182 |inputs: &HashMap<String, String>, _| {
183 let mut result = HashMap::new();
184 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185 thread::sleep(Duration::from_millis(50));
186 result.insert("processed".to_string(), (val * 3).to_string());
187 }
188 result
189 },
190 Some("Process2[50ms]"),
191 Some(vec![("data2", "in")]),
192 Some(vec![("processed", "proc2")])
193 );
194
195 // Combine results (depends on both processors)
196 graph.add(
197 |inputs: &HashMap<String, String>, _| {
198 let mut result = HashMap::new();
199 let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200 let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201 thread::sleep(Duration::from_millis(30));
202 result.insert("combined".to_string(), format!("{}", v1 + v2));
203 result
204 },
205 Some("Combine[30ms]"),
206 Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207 Some(vec![("combined", "final")])
208 );
209
210 let dag = graph.build();
211
212 println!("\n📊 Execution with timing:");
213 let start = Instant::now();
214 let context = dag.execute(false, None);
215 let total_time = start.elapsed();
216
217 println!(" Source1: data1 = {}", context.get("data1").unwrap());
218 println!(" Source2: data2 = {}", context.get("data2").unwrap());
219 println!(" Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220 println!(" Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221 println!(" Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222 println!("\n Total execution time: {}ms", total_time.as_millis());
223
224 println!("\n📈 Execution Levels (showing parallelism):");
225 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226 print!(" Level {}: ", level_idx);
227 let node_names: Vec<String> = level.iter()
228 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229 .collect();
230 println!("{}", node_names.join(", "));
231 if level.len() > 1 {
232 println!(" ↑ {} nodes can execute in parallel!", level.len());
233 }
234 }
235
236 println!("\n⚡ Parallel Execution Analysis:");
237 println!(" Sequential time would be: 50+50+30 = 130ms");
238 println!(" With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239 println!(" Speedup: 1.6x");
240
241 println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242 println!("{}", dag.to_mermaid());
243 println!();
244}
245
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 // Source
254 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 // Variant factory with different multipliers
266 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 // Simulate 100ms of work
272 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 // Create 5 variants
281 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}
312
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 // Top of diamond: Single source
325 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 // Left branch: Transform A (50ms)
337 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 // Right branch: Transform B (50ms)
355 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 // Bottom of diamond: Merge (30ms)
376 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}122fn main() {
123 println!("═══════════════════════════════════════════════════════════");
124 println!(" Variant Pattern Demo (sigexec-style)");
125 println!(" Full Actual Syntax Examples");
126 println!("═══════════════════════════════════════════════════════════\n");
127
128 // =========================================================================
129 // Demo 1: Single Variant - Basic Factory Pattern
130 // =========================================================================
131 println!("Demo 1: Single Variant with Factory Function");
132 println!("─────────────────────────────────────────────────────────\n");
133
134 println!("📝 Code:");
135 println!("```rust");
136 println!("fn make_scaler(factor: f64) -> impl Fn(...) -> ... {{");
137 println!(" move |inputs, _| {{");
138 println!(" let value = inputs.get(\"input_data\").unwrap().parse::<f64>().unwrap();");
139 println!(" let scaled = value * factor;");
140 println!(" outputs.insert(\"scaled_value\", scaled.to_string());");
141 println!(" }}");
142 println!("}}");
143 println!();
144 println!("let mut graph = Graph::new();");
145 println!("graph.add(data_source, Some(\"Source\"), None, Some(vec![(\"value\", \"data\")]));");
146 println!("graph.variant(");
147 println!(" make_scaler, // Factory function");
148 println!(" vec![2.0, 3.0, 5.0], // Parameter values to sweep");
149 println!(" Some(\"Scale\"), // Label");
150 println!(" Some(vec![(\"data\", \"input_data\")]), // Input mapping");
151 println!(" Some(vec![(\"scaled_value\", \"result\")]) // Output mapping");
152 println!(");");
153 println!("```\n");
154
155 let mut graph1 = Graph::new();
156 graph1.add(
157 data_source,
158 Some("Source"),
159 None,
160 Some(vec![("value", "data")])
161 );
162 graph1.variant(
163 make_scaler,
164 vec![2.0, 3.0, 5.0],
165 Some("Scale"),
166 Some(vec![("data", "input_data")]),
167 Some(vec![("scaled_value", "result")])
168 );
169
170 let dag1 = graph1.build();
171 println!("🎯 What happens:");
172 println!(" • Factory creates 3 nodes: Scale_2.0, Scale_3.0, Scale_5.0");
173 println!(" • Each node multiplies input by its factor");
174 println!(" • All variants can execute in parallel");
175 println!();
176
177 let stats1 = dag1.stats();
178 println!("📈 DAG Statistics:");
179 println!(" - Total nodes: {}", stats1.node_count);
180 println!(" - Depth: {} levels", stats1.depth);
181 println!(" - Max parallelism: {} nodes can run simultaneously", stats1.max_parallelism);
182 println!();
183
184 println!("🔍 Mermaid Visualization:");
185 println!("{}", dag1.to_mermaid());
186 println!();
187
188 // =========================================================================
189 // Demo 2: Multiple Variants - Cartesian Product
190 // =========================================================================
191 println!("\nDemo 2: Multiple Variants (Cartesian Product)");
192 println!("─────────────────────────────────────────────────────────\n");
193
194 println!("📝 Code:");
195 println!("```rust");
196 println!("graph.add(data_source, Some(\"Generate\"), None, Some(vec![(\"value\", \"data\")]));");
197 println!("graph.variant(make_scaler, vec![2.0, 3.0], Some(\"Scale\"), ...);");
198 println!("graph.variant(make_offsetter, vec![10, 20], Some(\"Offset\"), ...);");
199 println!("graph.add(stats_node, Some(\"Stats\"), Some(vec![(\"result\", \"result\")]), None);");
200 println!("```\n");
201
202 let mut graph2 = Graph::new();
203 graph2.add(
204 data_source,
205 Some("Generate"),
206 None,
207 Some(vec![("value", "data")])
208 );
209 graph2.variant(
210 make_scaler,
211 vec![2.0, 3.0],
212 Some("Scale"),
213 Some(vec![("data", "input_data")]),
214 Some(vec![("scaled_value", "result")])
215 );
216 graph2.variant(
217 make_offsetter,
218 vec![10, 20],
219 Some("Offset"),
220 Some(vec![("result", "number")]),
221 Some(vec![("offset_result", "result")])
222 );
223 graph2.add(
224 stats_node,
225 Some("Stats"),
226 Some(vec![("result", "result")]),
227 Some(vec![("summary", "final")])
228 );
229
230 let dag2 = graph2.build();
231 println!("🎯 What happens:");
232 println!(" • Scale creates 2 variants: x2.0, x3.0");
233 println!(" • Offset creates 2 variants: +10, +20");
234 println!(" • Total combinations: 2 × 2 = 4 execution paths");
235 println!(" • Each path: Generate → Scale[variant] → Offset[variant] → Stats");
236 println!();
237
238 let stats2 = dag2.stats();
239 println!("📈 DAG Statistics:");
240 println!(" - Total nodes: {}", stats2.node_count);
241 println!(" - Depth: {} levels", stats2.depth);
242 println!(" - Execution paths: 4 (2 scales × 2 offsets)");
243 println!();
244
245 println!("🔍 Mermaid Visualization:");
246 println!("{}", dag2.to_mermaid());
247 println!();
248
249 // =========================================================================
250 // Demo 3: Complex Factory - Struct Configuration
251 // =========================================================================
252 println!("\nDemo 3: Complex Factory with Struct Configuration");
253 println!("─────────────────────────────────────────────────────────\n");
254
255 println!("📝 Code:");
256 println!("```rust");
257 println!("#[derive(Clone)]");
258 println!("struct FilterConfig {{");
259 println!(" cutoff: f64,");
260 println!(" mode: String,");
261 println!("}}");
262 println!();
263 println!("fn make_filter(config: FilterConfig) -> impl Fn(...) -> ... {{");
264 println!(" move |inputs, _| {{");
265 println!(" let value = inputs.get(\"data\").unwrap().parse::<f64>().unwrap();");
266 println!(" let filtered = match config.mode.as_str() {{");
267 println!(" \"lowpass\" => value * config.cutoff,");
268 println!(" \"highpass\" => value * (1.0 - config.cutoff),");
269 println!(" _ => value,");
270 println!(" }};");
271 println!(" }}");
272 println!("}}");
273 println!();
274 println!("let configs = vec![");
275 println!(" FilterConfig {{ cutoff: 0.5, mode: \"lowpass\".to_string() }},");
276 println!(" FilterConfig {{ cutoff: 0.3, mode: \"highpass\".to_string() }},");
277 println!(" FilterConfig {{ cutoff: 0.7, mode: \"lowpass\".to_string() }},");
278 println!("];");
279 println!("graph.variant(make_filter, configs, Some(\"Filter\"), ...);");
280 println!("```\n");
281
282 let configs = vec![
283 FilterConfig { cutoff: 0.5, mode: "lowpass".to_string() },
284 FilterConfig { cutoff: 0.3, mode: "highpass".to_string() },
285 FilterConfig { cutoff: 0.7, mode: "lowpass".to_string() },
286 ];
287
288 let mut graph3 = Graph::new();
289 graph3.add(
290 data_source,
291 Some("Source"),
292 None,
293 Some(vec![("value", "data")])
294 );
295 graph3.variant(
296 make_filter,
297 configs,
298 Some("Filter"),
299 Some(vec![("data", "data")]),
300 Some(vec![("filtered", "result")])
301 );
302
303 let dag3 = graph3.build();
304 println!("🎯 What happens:");
305 println!(" • 3 filter variants created with different configurations");
306 println!(" • Each variant uses its own FilterConfig struct");
307 println!(" • Demonstrates passing complex types to factory");
308 println!();
309
310 let stats3 = dag3.stats();
311 println!("📈 DAG Statistics:");
312 println!(" - Total nodes: {}", stats3.node_count);
313 println!(" - Filter variants: 3");
314 println!(" - Max parallelism: {} nodes", stats3.max_parallelism);
315 println!();
316
317 // =========================================================================
318 // Demo 4: String Processing Variants
319 // =========================================================================
320 println!("\nDemo 4: String Processing Variants");
321 println!("─────────────────────────────────────────────────────────\n");
322
323 println!("📝 Code:");
324 println!("```rust");
325 println!("fn make_processor(prefix: &'static str) -> impl Fn(...) -> ... {{");
326 println!(" move |inputs, _| {{");
327 println!(" let text = inputs.get(\"text\").unwrap();");
328 println!(" let processed = format!(\"[{{}}] {{}}\", prefix, text);");
329 println!(" outputs.insert(\"processed_text\", processed);");
330 println!(" }}");
331 println!("}}");
332 println!();
333 println!("graph.variant(");
334 println!(" make_processor,");
335 println!(" vec![\"INFO\", \"WARN\", \"ERROR\"],");
336 println!(" Some(\"LogLevel\"),");
337 println!(" Some(vec![(\"message\", \"text\")]),");
338 println!(" Some(vec![(\"processed_text\", \"log\")])");
339 println!(");");
340 println!("```\n");
341
342 let mut graph4 = Graph::new();
343 graph4.add(
344 text_source,
345 Some("Source"),
346 None,
347 Some(vec![("message", "message")])
348 );
349 graph4.variant(
350 make_processor,
351 vec!["INFO", "WARN", "ERROR"],
352 Some("LogLevel"),
353 Some(vec![("message", "text")]),
354 Some(vec![("processed_text", "log")])
355 );
356
357 let dag4 = graph4.build();
358 println!("🎯 What happens:");
359 println!(" • 3 log level variants: INFO, WARN, ERROR");
360 println!(" • Each prefixes the message with its log level");
361 println!(" • Demonstrates string/static str parameters");
362 println!();
363
364 let stats4 = dag4.stats();
365 println!("📈 DAG Statistics:");
366 println!(" - Total nodes: {}", stats4.node_count);
367 println!(" - Log variants: 3");
368 println!();
369
370 println!("🔍 Mermaid Visualization:");
371 println!("{}", dag4.to_mermaid());
372 println!();
373
374 // =========================================================================
375 // Summary
376 // =========================================================================
377 println!("\n═══════════════════════════════════════════════════════════");
378 println!(" Summary: Key Variant Pattern Features");
379 println!("═══════════════════════════════════════════════════════════\n");
380
381 println!("✅ Factory Function Pattern:");
382 println!(" • Factory takes parameter(s), returns closure");
383 println!(" • Closure captures parameters in its environment");
384 println!(" • Same signature as regular node functions");
385 println!();
386
387 println!("✅ Parameter Flexibility:");
388 println!(" • Primitives: f64, i32, &str");
389 println!(" • Structs: Custom configuration objects");
390 println!(" • Arrays/Vectors: Multiple values at once");
391 println!();
392
393 println!("✅ Cartesian Products:");
394 println!(" • Multiple .variant() calls create all combinations");
395 println!(" • Example: 2 scales × 3 filters = 6 execution paths");
396 println!();
397
398 println!("✅ Port Mapping:");
399 println!(" • Variants use same tuple-based syntax");
400 println!(" • (broadcast_var, impl_var) for inputs");
401 println!(" • (impl_var, broadcast_var) for outputs");
402 println!();
403
404 println!("✅ Parallel Execution:");
405 println!(" • All variants at same level can run in parallel");
406 println!(" • DAG analysis identifies parallelization opportunities");
407 println!();
408}