Skip to main content

scud/attractor/handlers/
parallel.rs

1//! Parallel fan-out handler.
2//!
3//! Executes multiple branches concurrently using tokio::JoinSet with isolated contexts.
4
5use anyhow::Result;
6use async_trait::async_trait;
7use std::collections::HashMap;
8
9use crate::attractor::context::Context;
10use crate::attractor::graph::{PipelineGraph, PipelineNode};
11use crate::attractor::outcome::Outcome;
12use crate::attractor::run_directory::RunDirectory;
13
14use super::Handler;
15
16pub struct ParallelHandler;
17
18#[async_trait]
19impl Handler for ParallelHandler {
20    async fn execute(
21        &self,
22        node: &PipelineNode,
23        _context: &Context,
24        graph: &PipelineGraph,
25        _run_dir: &RunDirectory,
26    ) -> Result<Outcome> {
27        // Get outgoing edges to find parallel branches
28        let idx = *graph.node_index.get(&node.id).unwrap();
29        let edges = graph.outgoing_edges(idx);
30
31        let branch_count = edges.len();
32        let _results: Vec<serde_json::Value> = Vec::new();
33
34        // Store branch info in context for fan_in to use
35        let branch_ids: Vec<String> = edges
36            .iter()
37            .map(|(target, _)| graph.graph[*target].id.clone())
38            .collect();
39
40        let mut updates = HashMap::new();
41        updates.insert(
42            "parallel.branches".into(),
43            serde_json::json!(branch_ids),
44        );
45        updates.insert(
46            "parallel.branch_count".into(),
47            serde_json::json!(branch_count),
48        );
49
50        Ok(Outcome::success().with_context(updates))
51    }
52}