1use graph_sp::Graph;
10use std::collections::HashMap;
11use std::thread;
12use std::time::{Duration, Instant};
13
14fn main() {
15 println!("═══════════════════════════════════════════════════════════");
16 println!(" Parallel Execution Demonstration");
17 println!(" Showing runtime benefits of parallelization");
18 println!("═══════════════════════════════════════════════════════════\n");
19
20 demo_sequential_vs_parallel();
21 demo_complex_dependencies();
22 demo_variant_parallelism();
23 demo_diamond_pattern();
24}
25
26fn simulate_work(ms: u64, input: &str) -> String {
28 thread::sleep(Duration::from_millis(ms));
29 format!("{}_processed", input)
30}
31
32fn demo_sequential_vs_parallel() {
33 println!("─────────────────────────────────────────────────────────");
34 println!("Demo 1: Sequential vs Parallel Execution");
35 println!("─────────────────────────────────────────────────────────");
36
37 let mut graph = Graph::new();
38
39 graph.add(
41 |_: &HashMap<String, String>, _| {
42 let start = Instant::now();
43 let mut result = HashMap::new();
44 result.insert("data".to_string(), "source_data".to_string());
45 println!(" [{}ms] Source completed", start.elapsed().as_millis());
46 result
47 },
48 Some("Source"),
49 None,
50 Some(vec![("data", "data")])
51 );
52
53 let mut branch_a = Graph::new();
55 branch_a.add(
56 |inputs: &HashMap<String, String>, _| {
57 let start = Instant::now();
58 let mut result = HashMap::new();
59 if let Some(data) = inputs.get("input") {
60 let processed = simulate_work(100, data);
61 result.insert("result".to_string(), processed);
62 }
63 println!(" [{}ms] Branch A completed (100ms work)", start.elapsed().as_millis());
64 result
65 },
66 Some("BranchA[100ms]"),
67 Some(vec![("data", "input")]),
68 Some(vec![("result", "result_a")])
69 );
70
71 let mut branch_b = Graph::new();
73 branch_b.add(
74 |inputs: &HashMap<String, String>, _| {
75 let start = Instant::now();
76 let mut result = HashMap::new();
77 if let Some(data) = inputs.get("input") {
78 let processed = simulate_work(100, data);
79 result.insert("result".to_string(), processed);
80 }
81 println!(" [{}ms] Branch B completed (100ms work)", start.elapsed().as_millis());
82 result
83 },
84 Some("BranchB[100ms]"),
85 Some(vec![("data", "input")]),
86 Some(vec![("result", "result_b")])
87 );
88
89 let mut branch_c = Graph::new();
91 branch_c.add(
92 |inputs: &HashMap<String, String>, _| {
93 let start = Instant::now();
94 let mut result = HashMap::new();
95 if let Some(data) = inputs.get("input") {
96 let processed = simulate_work(100, data);
97 result.insert("result".to_string(), processed);
98 }
99 println!(" [{}ms] Branch C completed (100ms work)", start.elapsed().as_millis());
100 result
101 },
102 Some("BranchC[100ms]"),
103 Some(vec![("data", "input")]),
104 Some(vec![("result", "result_c")])
105 );
106
107 graph.branch(branch_a);
108 graph.branch(branch_b);
109 graph.branch(branch_c);
110
111 let dag = graph.build();
112
113 println!("\n📊 Sequential Execution (simulated):");
114 let start = Instant::now();
115 let _ = dag.execute(false, None);
116 let sequential_time = start.elapsed();
117 println!(" Total time: {}ms", sequential_time.as_millis());
118
119 println!("\n⚡ With Parallel Execution:");
120 println!(" Expected time: ~100ms (all branches run simultaneously)");
121 println!(" Speedup: ~3x faster than sequential");
122
123 println!("\n📈 DAG Statistics:");
124 let stats = dag.stats();
125 println!("{}", stats.summary());
126 println!("\n Analysis:");
127 println!(" - Level 0: 1 node (Source)");
128 println!(" - Level 1: 3 nodes (BranchA, BranchB, BranchC) ← Can run in parallel!");
129 println!(" - Max parallelism: {} nodes can execute simultaneously", stats.max_parallelism);
130
131 println!("\n🔍 Mermaid Visualization with Port Mappings:");
132 println!("{}", dag.to_mermaid());
133 println!();
134}
135
136fn demo_complex_dependencies() {
137 println!("─────────────────────────────────────────────────────────");
138 println!("Demo 2: Complex Data Dependencies");
139 println!("─────────────────────────────────────────────────────────");
140
141 let mut graph = Graph::new();
142
143 graph.add(
145 |_: &HashMap<String, String>, _| {
146 let mut result = HashMap::new();
147 result.insert("source1_data".to_string(), "100".to_string());
148 result
149 },
150 Some("Source1"),
151 None,
152 Some(vec![("source1_data", "data1")])
153 );
154
155 graph.add(
156 |_: &HashMap<String, String>, _| {
157 let mut result = HashMap::new();
158 result.insert("source2_data".to_string(), "200".to_string());
159 result
160 },
161 Some("Source2"),
162 None,
163 Some(vec![("source2_data", "data2")])
164 );
165
166 graph.add(
168 |inputs: &HashMap<String, String>, _| {
169 let mut result = HashMap::new();
170 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
171 thread::sleep(Duration::from_millis(50));
172 result.insert("processed".to_string(), (val * 2).to_string());
173 }
174 result
175 },
176 Some("Process1[50ms]"),
177 Some(vec![("data1", "in")]),
178 Some(vec![("processed", "proc1")])
179 );
180
181 graph.add(
182 |inputs: &HashMap<String, String>, _| {
183 let mut result = HashMap::new();
184 if let Some(val) = inputs.get("in").and_then(|s| s.parse::<i32>().ok()) {
185 thread::sleep(Duration::from_millis(50));
186 result.insert("processed".to_string(), (val * 3).to_string());
187 }
188 result
189 },
190 Some("Process2[50ms]"),
191 Some(vec![("data2", "in")]),
192 Some(vec![("processed", "proc2")])
193 );
194
195 graph.add(
197 |inputs: &HashMap<String, String>, _| {
198 let mut result = HashMap::new();
199 let v1 = inputs.get("p1").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
200 let v2 = inputs.get("p2").and_then(|s| s.parse::<i32>().ok()).unwrap_or(0);
201 thread::sleep(Duration::from_millis(30));
202 result.insert("combined".to_string(), format!("{}", v1 + v2));
203 result
204 },
205 Some("Combine[30ms]"),
206 Some(vec![("proc1", "p1"), ("proc2", "p2")]),
207 Some(vec![("combined", "final")])
208 );
209
210 let dag = graph.build();
211
212 println!("\n📊 Execution with timing:");
213 let start = Instant::now();
214 let context = dag.execute(false, None);
215 let total_time = start.elapsed();
216
217 println!(" Source1: data1 = {}", context.get("data1").unwrap());
218 println!(" Source2: data2 = {}", context.get("data2").unwrap());
219 println!(" Process1: proc1 = {} (data1 * 2)", context.get("proc1").unwrap());
220 println!(" Process2: proc2 = {} (data2 * 3)", context.get("proc2").unwrap());
221 println!(" Combine: final = {} (proc1 + proc2)", context.get("final").unwrap());
222 println!("\n Total execution time: {}ms", total_time.as_millis());
223
224 println!("\n📈 Execution Levels (showing parallelism):");
225 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
226 print!(" Level {}: ", level_idx);
227 let node_names: Vec<String> = level.iter()
228 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
229 .collect();
230 println!("{}", node_names.join(", "));
231 if level.len() > 1 {
232 println!(" ↑ {} nodes can execute in parallel!", level.len());
233 }
234 }
235
236 println!("\n⚡ Parallel Execution Analysis:");
237 println!(" Sequential time would be: 50+50+30 = 130ms");
238 println!(" With parallelism: Level0→Level1(parallel)→Level2 = ~80ms");
239 println!(" Speedup: 1.6x");
240
241 println!("\n🔍 Mermaid Visualization (shows data dependencies):");
242 println!("{}", dag.to_mermaid());
243 println!();
244}
245
246fn demo_variant_parallelism() {
247 println!("─────────────────────────────────────────────────────────");
248 println!("Demo 3: Variant Parameter Sweep Parallelism");
249 println!("─────────────────────────────────────────────────────────");
250
251 let mut graph = Graph::new();
252
253 graph.add(
255 |_: &HashMap<String, String>, _| {
256 let mut result = HashMap::new();
257 result.insert("value".to_string(), "1000".to_string());
258 result
259 },
260 Some("DataSource"),
261 None,
262 Some(vec![("value", "data")])
263 );
264
265 fn make_multiplier(factor: f64) -> impl Fn(&HashMap<String, String>, &HashMap<String, String>) -> HashMap<String, String> {
267 move |inputs: &HashMap<String, String>, _| {
268 let start = Instant::now();
269 let mut result = HashMap::new();
270 if let Some(val) = inputs.get("input").and_then(|s| s.parse::<f64>().ok()) {
271 thread::sleep(Duration::from_millis(100));
273 result.insert("result".to_string(), format!("{:.1}", val * factor));
274 }
275 println!(" [{}ms] Variant (factor={}) completed", start.elapsed().as_millis(), factor);
276 result
277 }
278 }
279
280 graph.variant(
282 make_multiplier,
283 vec![0.5, 1.0, 1.5, 2.0, 2.5],
284 Some("Multiply[100ms]"),
285 Some(vec![("data", "input")]),
286 Some(vec![("result", "result")])
287 );
288
289 let dag = graph.build();
290
291 println!("\n📊 Executing 5 variants (each takes 100ms):");
292 let start = Instant::now();
293 let _ = dag.execute(false, None);
294 let total_time = start.elapsed();
295
296 println!("\n Total execution time: {}ms", total_time.as_millis());
297
298 println!("\n⚡ Parallelism Analysis:");
299 println!(" Sequential execution: 100 × 5 = 500ms");
300 println!(" With parallel execution: ~100ms (all run simultaneously)");
301 println!(" Speedup: 5x");
302
303 println!("\n📈 DAG Statistics:");
304 let stats = dag.stats();
305 println!("{}", stats.summary());
306 println!(" ↑ All {} variant nodes can execute in parallel!", stats.variant_count);
307
308 println!("\n🔍 Mermaid Visualization:");
309 println!("{}", dag.to_mermaid());
310 println!();
311}
312
313fn demo_diamond_pattern() {
314 println!("─────────────────────────────────────────────────────────");
315 println!("Demo 4: Diamond Pattern (Fan-Out → Fan-In)");
316 println!("─────────────────────────────────────────────────────────");
317 println!("This pattern shows:");
318 println!(" - One source splits into multiple parallel branches");
319 println!(" - Branches are processed independently");
320 println!(" - Results merge back into single output");
321
322 let mut graph = Graph::new();
323
324 graph.add(
326 |_: &HashMap<String, String>, _| {
327 let mut result = HashMap::new();
328 result.insert("raw".to_string(), "input_data".to_string());
329 result
330 },
331 Some("Source"),
332 None,
333 Some(vec![("raw", "data")])
334 );
335
336 let mut branch_a = Graph::new();
338 branch_a.add(
339 |inputs: &HashMap<String, String>, _| {
340 let start = Instant::now();
341 thread::sleep(Duration::from_millis(50));
342 let mut result = HashMap::new();
343 if let Some(data) = inputs.get("in") {
344 result.insert("out".to_string(), format!("{}_transformA", data));
345 }
346 println!(" [{}ms] Transform A completed", start.elapsed().as_millis());
347 result
348 },
349 Some("TransformA[50ms]"),
350 Some(vec![("data", "in")]),
351 Some(vec![("out", "result")])
352 );
353
354 let mut branch_b = Graph::new();
356 branch_b.add(
357 |inputs: &HashMap<String, String>, _| {
358 let start = Instant::now();
359 thread::sleep(Duration::from_millis(50));
360 let mut result = HashMap::new();
361 if let Some(data) = inputs.get("in") {
362 result.insert("out".to_string(), format!("{}_transformB", data));
363 }
364 println!(" [{}ms] Transform B completed", start.elapsed().as_millis());
365 result
366 },
367 Some("TransformB[50ms]"),
368 Some(vec![("data", "in")]),
369 Some(vec![("out", "result")])
370 );
371
372 let branch_a_id = graph.branch(branch_a);
373 let branch_b_id = graph.branch(branch_b);
374
375 graph.merge(
377 |inputs: &HashMap<String, String>, _| {
378 let start = Instant::now();
379 thread::sleep(Duration::from_millis(30));
380 let mut result = HashMap::new();
381 let a = inputs.get("left").cloned().unwrap_or_default();
382 let b = inputs.get("right").cloned().unwrap_or_default();
383 result.insert("merged".to_string(), format!("[{}+{}]", a, b));
384 println!(" [{}ms] Merge completed", start.elapsed().as_millis());
385 result
386 },
387 Some("Merge[30ms]"),
388 vec![
389 (branch_a_id, "result", "left"),
390 (branch_b_id, "result", "right")
391 ],
392 Some(vec![("merged", "final")])
393 );
394
395 let dag = graph.build();
396
397 println!("\n📊 Executing diamond pattern:");
398 let start = Instant::now();
399 let context = dag.execute(false, None);
400 let total_time = start.elapsed();
401
402 println!("\n Result: {}", context.get("final").unwrap());
403 println!(" Total execution time: {}ms", total_time.as_millis());
404
405 println!("\n📈 Execution Levels:");
406 for (level_idx, level) in dag.execution_levels().iter().enumerate() {
407 print!(" Level {}: ", level_idx);
408 let node_names: Vec<String> = level.iter()
409 .map(|&node_id| dag.nodes().iter().find(|n| n.id == node_id).unwrap().display_name())
410 .collect();
411 println!("{}", node_names.join(", "));
412 }
413
414 println!("\n⚡ Timing Analysis:");
415 println!(" Sequential: Source(0ms) + TransformA(50ms) + TransformB(50ms) + Merge(30ms) = 130ms");
416 println!(" Parallel: Source(0ms) → [TransformA + TransformB](50ms) → Merge(30ms) = 80ms");
417 println!(" Speedup: 1.6x");
418
419 println!("\n🔍 Mermaid Visualization (Diamond Shape):");
420 println!("{}", dag.to_mermaid());
421
422 println!("\n The visualization shows:");
423 println!(" - Port mappings on edges (data→in, result→left, result→right)");
424 println!(" - Data dependencies between nodes");
425 println!(" - Parallel branches can execute simultaneously");
426
427 println!("\n═══════════════════════════════════════════════════════════");
428 println!(" Parallel Execution Demo Complete!");
429 println!("═══════════════════════════════════════════════════════════");
430}