parallel_execution_demo/
parallel_execution_demo.rs

1//! Demonstration of parallel execution capabilities
2//!
3//! This example shows:
4//! - Simulated parallel execution with runtime measurements
5//! - Data dependency analysis
6//! - Maximum parallelism detection
7//! - Port mapping visualization in Mermaid diagrams
8
9use graph_sp::Graph;
10use std::collections::HashMap;
11use std::thread;
12use std::time::{Duration, Instant};
13
14fn main() {
15    println!("═══════════════════════════════════════════════════════════");
16    println!("  Parallel Execution Demonstration");
17    println!("  Showing runtime benefits of parallelization");
18    println!("═══════════════════════════════════════════════════════════\n");
19    
20    demo_sequential_vs_parallel();
21    demo_complex_dependencies();
22    demo_variant_parallelism();
23    demo_diamond_pattern();
24}
25
26/// Simulates a CPU-intensive operation
27fn simulate_work(ms: u64, input: &str) -> String {
28    thread::sleep(Duration::from_millis(ms));
29    format!("{}_processed", input)
30}
31
32fn demo_sequential_vs_parallel() {
33    println!("─────────────────────────────────────────────────────────");
34    println!("Demo 1: Sequential vs Parallel Execution");
35    println!("─────────────────────────────────────────────────────────");
36    
37    let mut graph = Graph::new();
38    
39    // Source node
40    graph.add(
41        |_: &HashMap<String, String>, _| {
42            let start = Instant::now();
43            let mut result = HashMap::new();
44            result.insert("data".to_string(), "source_data".to_string());
45            println!("    [{}ms] Source completed", start.elapsed().as_millis());
46            result
47        },
48        Some("Source"),
49        None,
50        Some(vec![("data", "data")])
51    );
52    
53    // Branch A: 100ms work
54    let mut branch_a = Graph::new();
55    branch_a.add(
56        |inputs: &HashMap<String, String>, _| {
57            let start = Instant::now();
58            let mut result = HashMap::new();
59            if let Some(data) = inputs.get("input") {
60                let processed = simulate_work(100, data);
61                result.insert("result".to_string(), processed);
62            }
63            println!("    [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64            result
65        },
66        Some("BranchA[100ms]"),
67        Some(vec![("data", "input")]),
68        Some(vec![("result", "result_a")])
69    );
70    
71    // Branch B: 100ms work
72    let mut branch_b = Graph::new();
73    branch_b.add(
74        |inputs: &HashMap<String, String>, _| {
75            let start = Instant::now();
76            let mut result = HashMap::new();
77            if let Some(data) = inputs.get("input") {
78                let processed = simulate_work(100, data);
79                result.insert("result".to_string(), processed);
80            }
81            println!("    [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82            result
83        },
84        Some("BranchB[100ms]"),
85        Some(vec![("data", "input")]),
86        Some(vec![("result", "result_b")])
87    );
88    
89    // Branch C: 100ms work
90    let mut branch_c = Graph::new();
91    branch_c.add(
92        |inputs: &HashMap<String, String>, _| {
93            let start = Instant::now();
94            let mut result = HashMap::new();
95            if let Some(data) = inputs.get("input") {
96                let processed = simulate_work(100, data);
97                result.insert("result".to_string(), processed);
98            }
99            println!("    [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100            result
101        },
102        Some("BranchC[100ms]"),
103        Some(vec![("data", "input")]),
104        Some(vec![("result", "result_c")])
105    );
106    
107    graph.branch(branch_a);
108    graph.branch(branch_b);
109    graph.branch(branch_c);
110    
111    let dag = graph.build();
112    
113    println!("\n📊 Sequential Execution (simulated):");
114    let start = Instant::now();
115    let _ = dag.execute(false, None);
116    let sequential_time = start.elapsed();
117    println!("  Total time: {}ms", sequential_time.as_millis());
118    
119    println!("\n⚡ With Parallel Execution:");
120    println!("  Expected time: ~100ms (all branches run simultaneously)");
121    println!("  Speedup: ~3x faster than sequential");
122    
123    println!("\n📈 DAG Statistics:");
124    let stats = dag.stats();
125    println!("{}", stats.summary());
126    println!("\n  Analysis:");
127    println!("  - Level 0: 1 node  (Source)");
128    println!("  - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129    println!("  - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130    
131    println!("\n🔍 Mermaid Visualization with Port Mappings:");
132    println!("{}", dag.to_mermaid());
133    println!();
134}
135
136fn demo_complex_dependencies() {
137    println!("─────────────────────────────────────────────────────────");
138    println!("Demo 2: Complex Data Dependencies");
139    println!("─────────────────────────────────────────────────────────");
140    
141    let mut graph = Graph::new();
142    
143    // Two independent sources
144    graph.add(
145        |_: &HashMap<String, String>, _| {
146            let mut result = HashMap::new();
147            result.insert("source1_data".to_string(), "100".to_string());
148            result
149        },
150        Some("Source1"),
151        None,
152        Some(vec![("source1_data", "data1")])
153    );
154    
155    graph.add(
156        |_: &HashMap<String, String>, _| {
157            let mut result = HashMap::new();
158            result.insert("source2_data".to_string(), "200".to_string());
159            result
160        },
161        Some("Source2"),
162        None,
163        Some(vec![("source2_data", "data2")])
164    );
165    
166    // Process each source independently (can run in parallel)
167    graph.add(
168        |inputs: &HashMap<String, String>, _| {
169            let mut result = HashMap::new();
170            if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171                thread::sleep(Duration::from_millis(50));
172                result.insert("processed".to_string(), (val * 2).to_string());
173            }
174            result
175        },
176        Some("Process1[50ms]"),
177        Some(vec![("data1", "in")]),
178        Some(vec![("processed", "proc1")])
179    );
180    
181    graph.add(
182        |inputs: &HashMap<String, String>, _| {
183            let mut result = HashMap::new();
184            if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185                thread::sleep(Duration::from_millis(50));
186                result.insert("processed".to_string(), (val * 3).to_string());
187            }
188            result
189        },
190        Some("Process2[50ms]"),
191        Some(vec![("data2", "in")]),
192        Some(vec![("processed", "proc2")])
193    );
194    
195    // Combine results (depends on both processors)
196    graph.add(
197        |inputs: &HashMap<String, String>, _| {
198            let mut result = HashMap::new();
199            let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200            let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201            thread::sleep(Duration::from_millis(30));
202            result.insert("combined".to_string(), format!("{}", v1 + v2));
203            result
204        },
205        Some("Combine[30ms]"),
206        Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207        Some(vec![("combined", "final")])
208    );
209    
210    let dag = graph.build();
211    
212    println!("\n📊 Execution with timing:");
213    let start = Instant::now();
214    let context = dag.execute(false, None);
215    let total_time = start.elapsed();
216    
217    println!("  Source1: data1 = {}", context.get("data1").unwrap());
218    println!("  Source2: data2 = {}", context.get("data2").unwrap());
219    println!("  Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220    println!("  Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221    println!("  Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222    println!("\n  Total execution time: {}ms", total_time.as_millis());
223    
224    println!("\n📈 Execution Levels (showing parallelism):");
225    for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226        print!("  Level {}: ", level_idx);
227        let node_names: Vec<String> = level.iter()
228            .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229            .collect();
230        println!("{}", node_names.join(", "));
231        if level.len() > 1 {
232            println!("           ↑ {} nodes can execute in parallel!", level.len());
233        }
234    }
235    
236    println!("\n⚡ Parallel Execution Analysis:");
237    println!("  Sequential time would be: 50+50+30 = 130ms");
238    println!("  With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239    println!("  Speedup: 1.6x");
240    
241    println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242    println!("{}", dag.to_mermaid());
243    println!();
244}
245
246fn demo_variant_parallelism() {
247    println!("─────────────────────────────────────────────────────────");
248    println!("Demo 3: Variant Parameter Sweep Parallelism");
249    println!("─────────────────────────────────────────────────────────");
250    
251    let mut graph = Graph::new();
252    
253    // Source
254    graph.add(
255        |_: &HashMap<String, String>, _| {
256            let mut result = HashMap::new();
257            result.insert("value".to_string(), "1000".to_string());
258            result
259        },
260        Some("DataSource"),
261        None,
262        Some(vec![("value", "data")])
263    );
264    
265    // Variant factory with different multipliers
266    fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267        move |inputs: &HashMap<String, String>, _| {
268            let start = Instant::now();
269            let mut result = HashMap::new();
270            if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271                // Simulate 100ms of work
272                thread::sleep(Duration::from_millis(100));
273                result.insert("result".to_string(), format!("{:.1}", val * factor));
274            }
275            println!("    [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276            result
277        }
278    }
279    
280    // Create 5 variants
281    graph.variant(
282        make_multiplier,
283        vec![0.5, 1.0, 1.5, 2.0, 2.5],
284        Some("Multiply[100ms]"),
285        Some(vec![("data", "input")]),
286        Some(vec![("result", "result")])
287    );
288    
289    let dag = graph.build();
290    
291    println!("\n📊 Executing 5 variants (each takes 100ms):");
292    let start = Instant::now();
293    let _ = dag.execute(false, None);
294    let total_time = start.elapsed();
295    
296    println!("\n  Total execution time: {}ms", total_time.as_millis());
297    
298    println!("\n⚡ Parallelism Analysis:");
299    println!("  Sequential execution: 100 × 5 = 500ms");
300    println!("  With parallel execution: ~100ms (all run simultaneously)");
301    println!("  Speedup: 5x");
302    
303    println!("\n📈 DAG Statistics:");
304    let stats = dag.stats();
305    println!("{}", stats.summary());
306    println!("  ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307    
308    println!("\n🔍 Mermaid Visualization:");
309    println!("{}", dag.to_mermaid());
310    println!();
311}
312
313fn demo_diamond_pattern() {
314    println!("─────────────────────────────────────────────────────────");
315    println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316    println!("─────────────────────────────────────────────────────────");
317    println!("This pattern shows:");
318    println!("  - One source splits into multiple parallel branches");
319    println!("  - Branches are processed independently");
320    println!("  - Results merge back into single output");
321    
322    let mut graph = Graph::new();
323    
324    // Top of diamond: Single source
325    graph.add(
326        |_: &HashMap<String, String>, _| {
327            let mut result = HashMap::new();
328            result.insert("raw".to_string(), "input_data".to_string());
329            result
330        },
331        Some("Source"),
332        None,
333        Some(vec![("raw", "data")])
334    );
335    
336    // Left branch: Transform A (50ms)
337    let mut branch_a = Graph::new();
338    branch_a.add(
339        |inputs: &HashMap<String, String>, _| {
340            let start = Instant::now();
341            thread::sleep(Duration::from_millis(50));
342            let mut result = HashMap::new();
343            if let Some(data) = inputs.get("in") {
344                result.insert("out".to_string(), format!("{}_transformA", data));
345            }
346            println!("    [{}ms] Transform A completed", start.elapsed().as_millis());
347            result
348        },
349        Some("TransformA[50ms]"),
350        Some(vec![("data", "in")]),
351        Some(vec![("out", "result")])
352    );
353    
354    // Right branch: Transform B (50ms)
355    let mut branch_b = Graph::new();
356    branch_b.add(
357        |inputs: &HashMap<String, String>, _| {
358            let start = Instant::now();
359            thread::sleep(Duration::from_millis(50));
360            let mut result = HashMap::new();
361            if let Some(data) = inputs.get("in") {
362                result.insert("out".to_string(), format!("{}_transformB", data));
363            }
364            println!("    [{}ms] Transform B completed", start.elapsed().as_millis());
365            result
366        },
367        Some("TransformB[50ms]"),
368        Some(vec![("data", "in")]),
369        Some(vec![("out", "result")])
370    );
371    
372    let branch_a_id = graph.branch(branch_a);
373    let branch_b_id = graph.branch(branch_b);
374    
375    // Bottom of diamond: Merge (30ms)
376    graph.merge(
377        |inputs: &HashMap<String, String>, _| {
378            let start = Instant::now();
379            thread::sleep(Duration::from_millis(30));
380            let mut result = HashMap::new();
381            let a = inputs.get("left").cloned().unwrap_or_default();
382            let b = inputs.get("right").cloned().unwrap_or_default();
383            result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384            println!("    [{}ms] Merge completed", start.elapsed().as_millis());
385            result
386        },
387        Some("Merge[30ms]"),
388        vec![
389            (branch_a_id, "result", "left"),
390            (branch_b_id, "result", "right")
391        ],
392        Some(vec![("merged", "final")])
393    );
394    
395    let dag = graph.build();
396    
397    println!("\n📊 Executing diamond pattern:");
398    let start = Instant::now();
399    let context = dag.execute(false, None);
400    let total_time = start.elapsed();
401    
402    println!("\n  Result: {}", context.get("final").unwrap());
403    println!("  Total execution time: {}ms", total_time.as_millis());
404    
405    println!("\n📈 Execution Levels:");
406    for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407        print!("  Level {}: ", level_idx);
408        let node_names: Vec<String> = level.iter()
409            .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410            .collect();
411        println!("{}", node_names.join(", "));
412    }
413    
414    println!("\n⚡ Timing Analysis:");
415    println!("  Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416    println!("  Parallel:   Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417    println!("  Speedup: 1.6x");
418    
419    println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420    println!("{}", dag.to_mermaid());
421    
422    println!("\n  The visualization shows:");
423    println!("  - Port mappings on edges (data→in, result→left, result→right)");
424    println!("  - Data dependencies between nodes");
425    println!("  - Parallel branches can execute simultaneously");
426    
427    println!("\n═══════════════════════════════════════════════════════════");
428    println!("  Parallel Execution Demo Complete!");
429    println!("═══════════════════════════════════════════════════════════");
430}