pub struct ParallelExecutor { /* private fields */ }Expand description
Parallel executor that identifies independent nodes and executes them concurrently
Implementations§
Source§impl ParallelExecutor
impl ParallelExecutor
Sourcepub fn new(config: ParallelConfig) -> Self
pub fn new(config: ParallelConfig) -> Self
Create a new parallel executor
Examples found in repository?
examples/parallel_execution.rs (lines 292-295)
290async fn analyze_and_execute(def: GraphDef) -> anyhow::Result<()> {
291 // Create parallel executor
292 let mut executor = ParallelExecutor::new(ParallelConfig {
293 max_concurrent: 10,
294 verbose: true,
295 });
296
297 // Register nodes
298 for node_id in def.nodes.keys() {
299 let node: Box<dyn Node> = Box::new(RuleNode::new(node_id, "true"));
300 executor.register_node(node);
301 }
302
303 // Analyze parallelism
304 let stats = executor.get_parallelism_stats(&def)?;
305 stats.print_summary();
306
307 // Execute the graph
308 let mut graph = Graph::new(def);
309 let start = Instant::now();
310 executor.execute(&mut graph).await?;
311 let duration = start.elapsed();
312
313 println!("Execution completed in {:?}", duration);
314
315 Ok(())
316}Sourcepub fn register_node(&mut self, node: Box<dyn Node>)
pub fn register_node(&mut self, node: Box<dyn Node>)
Register a node with the executor
Examples found in repository?
examples/parallel_execution.rs (line 300)
290async fn analyze_and_execute(def: GraphDef) -> anyhow::Result<()> {
291 // Create parallel executor
292 let mut executor = ParallelExecutor::new(ParallelConfig {
293 max_concurrent: 10,
294 verbose: true,
295 });
296
297 // Register nodes
298 for node_id in def.nodes.keys() {
299 let node: Box<dyn Node> = Box::new(RuleNode::new(node_id, "true"));
300 executor.register_node(node);
301 }
302
303 // Analyze parallelism
304 let stats = executor.get_parallelism_stats(&def)?;
305 stats.print_summary();
306
307 // Execute the graph
308 let mut graph = Graph::new(def);
309 let start = Instant::now();
310 executor.execute(&mut graph).await?;
311 let duration = start.elapsed();
312
313 println!("Execution completed in {:?}", duration);
314
315 Ok(())
316}Sourcepub fn identify_layers(&self, def: &GraphDef) -> Result<Vec<ExecutionLayer>>
pub fn identify_layers(&self, def: &GraphDef) -> Result<Vec<ExecutionLayer>>
Analyze graph and identify execution layers Nodes in the same layer have no dependencies on each other and can run in parallel
Sourcepub async fn execute(&self, graph: &mut Graph) -> Result<()>
pub async fn execute(&self, graph: &mut Graph) -> Result<()>
Execute the entire graph with parallel execution per layer
Examples found in repository?
examples/parallel_execution.rs (line 310)
290async fn analyze_and_execute(def: GraphDef) -> anyhow::Result<()> {
291 // Create parallel executor
292 let mut executor = ParallelExecutor::new(ParallelConfig {
293 max_concurrent: 10,
294 verbose: true,
295 });
296
297 // Register nodes
298 for node_id in def.nodes.keys() {
299 let node: Box<dyn Node> = Box::new(RuleNode::new(node_id, "true"));
300 executor.register_node(node);
301 }
302
303 // Analyze parallelism
304 let stats = executor.get_parallelism_stats(&def)?;
305 stats.print_summary();
306
307 // Execute the graph
308 let mut graph = Graph::new(def);
309 let start = Instant::now();
310 executor.execute(&mut graph).await?;
311 let duration = start.elapsed();
312
313 println!("Execution completed in {:?}", duration);
314
315 Ok(())
316}Sourcepub fn get_parallelism_stats(&self, def: &GraphDef) -> Result<ParallelismStats>
pub fn get_parallelism_stats(&self, def: &GraphDef) -> Result<ParallelismStats>
Get parallel execution statistics
Examples found in repository?
examples/parallel_execution.rs (line 304)
290async fn analyze_and_execute(def: GraphDef) -> anyhow::Result<()> {
291 // Create parallel executor
292 let mut executor = ParallelExecutor::new(ParallelConfig {
293 max_concurrent: 10,
294 verbose: true,
295 });
296
297 // Register nodes
298 for node_id in def.nodes.keys() {
299 let node: Box<dyn Node> = Box::new(RuleNode::new(node_id, "true"));
300 executor.register_node(node);
301 }
302
303 // Analyze parallelism
304 let stats = executor.get_parallelism_stats(&def)?;
305 stats.print_summary();
306
307 // Execute the graph
308 let mut graph = Graph::new(def);
309 let start = Instant::now();
310 executor.execute(&mut graph).await?;
311 let duration = start.elapsed();
312
313 println!("Execution completed in {:?}", duration);
314
315 Ok(())
316}
317
318/// Compare sequential vs parallel execution performance
319async fn compare_performance() -> anyhow::Result<()> {
320 println!("Comparing sequential vs parallel execution...\n");
321
322 // Create a graph with good parallelism
323 let def = create_wide_graph();
324
325 // Sequential execution simulation
326 let sequential_time = def.nodes.len(); // Assume 1 unit per node
327 println!("Sequential execution time (simulated): {} units", sequential_time);
328
329 // Parallel execution analysis
330 let executor = ParallelExecutor::default();
331 let stats = executor.get_parallelism_stats(&def)?;
332 let parallel_time = stats.num_layers; // Each layer is 1 unit
333
334 println!("Parallel execution time (simulated): {} units", parallel_time);
335 println!("Speedup: {:.2}x", sequential_time as f64 / parallel_time as f64);
336 println!("Efficiency: {:.1}%", (stats.theoretical_speedup / def.nodes.len() as f64) * 100.0);
337
338 Ok(())
339}Trait Implementations§
Auto Trait Implementations§
impl Freeze for ParallelExecutor
impl !RefUnwindSafe for ParallelExecutor
impl Send for ParallelExecutor
impl Sync for ParallelExecutor
impl Unpin for ParallelExecutor
impl !UnwindSafe for ParallelExecutor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more