Skip to main content

scud/attractor/handlers/
fan_in.rs

1//! Fan-in / consolidation handler.
2//!
3//! Reads parallel.results from context, ranks by status, selects best.
4
5use anyhow::Result;
6use async_trait::async_trait;
7use std::collections::HashMap;
8
9use crate::attractor::context::Context;
10use crate::attractor::graph::{PipelineGraph, PipelineNode};
11use crate::attractor::outcome::Outcome;
12use crate::attractor::run_directory::RunDirectory;
13
14use super::Handler;
15
16pub struct FanInHandler;
17
18#[async_trait]
19impl Handler for FanInHandler {
20    async fn execute(
21        &self,
22        _node: &PipelineNode,
23        context: &Context,
24        _graph: &PipelineGraph,
25        _run_dir: &RunDirectory,
26    ) -> Result<Outcome> {
27        // Read parallel results from context
28        let results = context.get("parallel.results").await;
29
30        let mut updates = HashMap::new();
31
32        if let Some(results) = results {
33            if let Some(arr) = results.as_array() {
34                // Select the first successful result
35                let best = arr.iter().find(|r| {
36                    r.get("status")
37                        .and_then(|s| s.as_str())
38                        .map(|s| s == "success")
39                        .unwrap_or(false)
40                });
41
42                if let Some(best) = best {
43                    updates.insert("parallel.best_result".into(), best.clone());
44                }
45            }
46        }
47
48        Ok(Outcome::success().with_context(updates))
49    }
50}