parallel_execution/
parallel_execution.rs

1//! Parallel Execution Example
2//!
3//! This example demonstrates parallel node execution with:
4//! - Automatic layer detection
5//! - Concurrent execution of independent nodes
6//! - Parallelism analysis and statistics
7//! - Performance comparison with sequential execution
8//!
9//! To run: cargo run --example parallel_execution
10
11use rust_logic_graph::{Graph, GraphDef, Node, NodeType};
12use rust_logic_graph::parallel::{ParallelExecutor, ParallelConfig};
13use rust_logic_graph::node::RuleNode;
14use std::collections::HashMap;
15use std::time::Instant;
16
17#[tokio::main]
18async fn main() -> anyhow::Result<()> {
19    tracing_subscriber::fmt::init();
20
21    println!("šŸš€ Parallel Execution Example\n");
22
23    // Example 1: Diamond Graph (Classic Parallel Pattern)
24    println!("=== Example 1: Diamond Graph ===\n");
25    println!("Graph structure:");
26    println!("      A");
27    println!("     / \\");
28    println!("    B   C   <- Can run in parallel");
29    println!("     \\ /");
30    println!("      D\n");
31
32    let diamond_graph = create_diamond_graph();
33    analyze_and_execute(diamond_graph).await?;
34
35    // Example 2: Wide Graph (Maximum Parallelism)
36    println!("\n=== Example 2: Wide Graph (Maximum Parallelism) ===\n");
37    println!("Graph structure:");
38    println!("    A");
39    println!("  / | | \\ ");
40    println!(" B  C D  E  <- All run in parallel");
41    println!("  \\ | | /");
42    println!("    F\n");
43
44    let wide_graph = create_wide_graph();
45    analyze_and_execute(wide_graph).await?;
46
47    // Example 3: Linear Graph (No Parallelism)
48    println!("\n=== Example 3: Linear Graph (No Parallelism) ===\n");
49    println!("Graph structure: A -> B -> C -> D -> E\n");
50
51    let linear_graph = create_linear_graph();
52    analyze_and_execute(linear_graph).await?;
53
54    // Example 4: Complex Graph (Mixed Parallelism)
55    println!("\n=== Example 4: Complex Graph (Mixed Parallelism) ===\n");
56    println!("Graph structure:");
57    println!("      A");
58    println!("     / \\");
59    println!("    B   C");
60    println!("   /|   |\\");
61    println!("  D E   F G  <- Multiple parallel layers");
62    println!("   \\|   |/");
63    println!("    H   I");
64    println!("     \\ /");
65    println!("      J\n");
66
67    let complex_graph = create_complex_graph();
68    analyze_and_execute(complex_graph).await?;
69
70    // Example 5: Performance Comparison
71    println!("\n=== Example 5: Performance Comparison ===\n");
72    compare_performance().await?;
73
74    println!("\n=== Benefits of Parallel Execution ===");
75    println!("  • Reduced execution time for independent nodes");
76    println!("  • Better resource utilization");
77    println!("  • Scalable to large graphs");
78    println!("  • Automatic parallelism detection");
79    println!("  • No manual scheduling required");
80    println!("\nšŸŽ‰ Example completed!");
81
82    Ok(())
83}
84
85/// Create a diamond-shaped graph
86fn create_diamond_graph() -> GraphDef {
87    let mut def = GraphDef {
88        nodes: HashMap::new(),
89        edges: Vec::new(),
90    };
91
92    def.nodes.insert("A".to_string(), NodeType::RuleNode);
93    def.nodes.insert("B".to_string(), NodeType::RuleNode);
94    def.nodes.insert("C".to_string(), NodeType::RuleNode);
95    def.nodes.insert("D".to_string(), NodeType::RuleNode);
96
97    def.edges.push(rust_logic_graph::Edge {
98        from: "A".to_string(),
99        to: "B".to_string(),
100        rule: None,
101    });
102    def.edges.push(rust_logic_graph::Edge {
103        from: "A".to_string(),
104        to: "C".to_string(),
105        rule: None,
106    });
107    def.edges.push(rust_logic_graph::Edge {
108        from: "B".to_string(),
109        to: "D".to_string(),
110        rule: None,
111    });
112    def.edges.push(rust_logic_graph::Edge {
113        from: "C".to_string(),
114        to: "D".to_string(),
115        rule: None,
116    });
117
118    def
119}
120
121/// Create a wide graph with maximum parallelism in the middle
122fn create_wide_graph() -> GraphDef {
123    let mut def = GraphDef {
124        nodes: HashMap::new(),
125        edges: Vec::new(),
126    };
127
128    def.nodes.insert("A".to_string(), NodeType::RuleNode);
129    def.nodes.insert("B".to_string(), NodeType::RuleNode);
130    def.nodes.insert("C".to_string(), NodeType::RuleNode);
131    def.nodes.insert("D".to_string(), NodeType::RuleNode);
132    def.nodes.insert("E".to_string(), NodeType::RuleNode);
133    def.nodes.insert("F".to_string(), NodeType::RuleNode);
134
135    // A -> B, C, D, E
136    def.edges.push(rust_logic_graph::Edge {
137        from: "A".to_string(),
138        to: "B".to_string(),
139        rule: None,
140    });
141    def.edges.push(rust_logic_graph::Edge {
142        from: "A".to_string(),
143        to: "C".to_string(),
144        rule: None,
145    });
146    def.edges.push(rust_logic_graph::Edge {
147        from: "A".to_string(),
148        to: "D".to_string(),
149        rule: None,
150    });
151    def.edges.push(rust_logic_graph::Edge {
152        from: "A".to_string(),
153        to: "E".to_string(),
154        rule: None,
155    });
156
157    // B, C, D, E -> F
158    def.edges.push(rust_logic_graph::Edge {
159        from: "B".to_string(),
160        to: "F".to_string(),
161        rule: None,
162    });
163    def.edges.push(rust_logic_graph::Edge {
164        from: "C".to_string(),
165        to: "F".to_string(),
166        rule: None,
167    });
168    def.edges.push(rust_logic_graph::Edge {
169        from: "D".to_string(),
170        to: "F".to_string(),
171        rule: None,
172    });
173    def.edges.push(rust_logic_graph::Edge {
174        from: "E".to_string(),
175        to: "F".to_string(),
176        rule: None,
177    });
178
179    def
180}
181
182/// Create a linear graph with no parallelism
183fn create_linear_graph() -> GraphDef {
184    let mut def = GraphDef {
185        nodes: HashMap::new(),
186        edges: Vec::new(),
187    };
188
189    let nodes = vec!["A", "B", "C", "D", "E"];
190    for node in &nodes {
191        def.nodes.insert(node.to_string(), NodeType::RuleNode);
192    }
193
194    for i in 0..nodes.len() - 1 {
195        def.edges.push(rust_logic_graph::Edge {
196            from: nodes[i].to_string(),
197            to: nodes[i + 1].to_string(),
198            rule: None,
199        });
200    }
201
202    def
203}
204
205/// Create a complex graph with mixed parallelism
206fn create_complex_graph() -> GraphDef {
207    let mut def = GraphDef {
208        nodes: HashMap::new(),
209        edges: Vec::new(),
210    };
211
212    let nodes = vec!["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"];
213    for node in &nodes {
214        def.nodes.insert(node.to_string(), NodeType::RuleNode);
215    }
216
217    // Layer 1: A
218    // Layer 2: B, C
219    def.edges.push(rust_logic_graph::Edge {
220        from: "A".to_string(),
221        to: "B".to_string(),
222        rule: None,
223    });
224    def.edges.push(rust_logic_graph::Edge {
225        from: "A".to_string(),
226        to: "C".to_string(),
227        rule: None,
228    });
229
230    // Layer 3: D, E, F, G
231    def.edges.push(rust_logic_graph::Edge {
232        from: "B".to_string(),
233        to: "D".to_string(),
234        rule: None,
235    });
236    def.edges.push(rust_logic_graph::Edge {
237        from: "B".to_string(),
238        to: "E".to_string(),
239        rule: None,
240    });
241    def.edges.push(rust_logic_graph::Edge {
242        from: "C".to_string(),
243        to: "F".to_string(),
244        rule: None,
245    });
246    def.edges.push(rust_logic_graph::Edge {
247        from: "C".to_string(),
248        to: "G".to_string(),
249        rule: None,
250    });
251
252    // Layer 4: H, I
253    def.edges.push(rust_logic_graph::Edge {
254        from: "D".to_string(),
255        to: "H".to_string(),
256        rule: None,
257    });
258    def.edges.push(rust_logic_graph::Edge {
259        from: "E".to_string(),
260        to: "H".to_string(),
261        rule: None,
262    });
263    def.edges.push(rust_logic_graph::Edge {
264        from: "F".to_string(),
265        to: "I".to_string(),
266        rule: None,
267    });
268    def.edges.push(rust_logic_graph::Edge {
269        from: "G".to_string(),
270        to: "I".to_string(),
271        rule: None,
272    });
273
274    // Layer 5: J
275    def.edges.push(rust_logic_graph::Edge {
276        from: "H".to_string(),
277        to: "J".to_string(),
278        rule: None,
279    });
280    def.edges.push(rust_logic_graph::Edge {
281        from: "I".to_string(),
282        to: "J".to_string(),
283        rule: None,
284    });
285
286    def
287}
288
289/// Analyze parallelism and execute the graph
290async fn analyze_and_execute(def: GraphDef) -> anyhow::Result<()> {
291    // Create parallel executor
292    let mut executor = ParallelExecutor::new(ParallelConfig {
293        max_concurrent: 10,
294        verbose: true,
295    });
296
297    // Register nodes
298    for node_id in def.nodes.keys() {
299        let node: Box<dyn Node> = Box::new(RuleNode::new(node_id, "true"));
300        executor.register_node(node);
301    }
302
303    // Analyze parallelism
304    let stats = executor.get_parallelism_stats(&def)?;
305    stats.print_summary();
306
307    // Execute the graph
308    let mut graph = Graph::new(def);
309    let start = Instant::now();
310    executor.execute(&mut graph).await?;
311    let duration = start.elapsed();
312
313    println!("Execution completed in {:?}", duration);
314
315    Ok(())
316}
317
318/// Compare sequential vs parallel execution performance
319async fn compare_performance() -> anyhow::Result<()> {
320    println!("Comparing sequential vs parallel execution...\n");
321
322    // Create a graph with good parallelism
323    let def = create_wide_graph();
324
325    // Sequential execution simulation
326    let sequential_time = def.nodes.len(); // Assume 1 unit per node
327    println!("Sequential execution time (simulated): {} units", sequential_time);
328
329    // Parallel execution analysis
330    let executor = ParallelExecutor::default();
331    let stats = executor.get_parallelism_stats(&def)?;
332    let parallel_time = stats.num_layers; // Each layer is 1 unit
333
334    println!("Parallel execution time (simulated): {} units", parallel_time);
335    println!("Speedup: {:.2}x", sequential_time as f64 / parallel_time as f64);
336    println!("Efficiency: {:.1}%", (stats.theoretical_speedup / def.nodes.len() as f64) * 100.0);
337
338    Ok(())
339}