xerv_nodes/flow/
merge.rs

1//! Merge node (N→1 barrier).
2//!
3//! The merge node waits for all expected inputs before continuing.
4//! It collects data from multiple upstream nodes and merges them into
5//! a single output.
6
7use std::collections::HashMap;
8use xerv_core::traits::{Context, Node, NodeFuture, NodeInfo, NodeOutput, Port, PortDirection};
9use xerv_core::types::RelPtr;
10
11/// Configuration for merge behavior.
12#[derive(Debug, Clone)]
13pub enum MergeStrategy {
14    /// Wait for all inputs (barrier semantics).
15    WaitAll,
16    /// Proceed when any input arrives (race semantics).
17    FirstArrival,
18    /// Wait for a specific number of inputs.
19    WaitN(usize),
20}
21
22impl Default for MergeStrategy {
23    fn default() -> Self {
24        Self::WaitAll
25    }
26}
27
28/// Merge node - N→1 barrier.
29///
30/// Waits for multiple inputs to arrive before proceeding.
31/// The output contains all merged inputs.
32///
33/// # Ports
34/// - Input: Multiple named inputs (e.g., "in_0", "in_1", "in_2")
35/// - Output: "out" - Merged data containing all inputs
36///
37/// # Example Configuration
38/// ```yaml
39/// nodes:
40///   merge_results:
41///     type: std::merge
42///     config:
43///       strategy: wait_all  # or first_arrival, wait_n
44///       wait_count: 3       # for wait_n strategy
45///     inputs:
46///       - from: branch_a.out -> in_0
47///       - from: branch_b.out -> in_1
48///       - from: branch_c.out -> in_2
49/// ```
50#[derive(Debug)]
51pub struct MergeNode {
52    /// Number of expected inputs.
53    expected_inputs: usize,
54    /// Merge strategy.
55    strategy: MergeStrategy,
56}
57
58impl MergeNode {
59    /// Create a new merge node expecting the given number of inputs.
60    pub fn new(expected_inputs: usize) -> Self {
61        Self {
62            expected_inputs,
63            strategy: MergeStrategy::WaitAll,
64        }
65    }
66
67    /// Create a merge node with custom strategy.
68    pub fn with_strategy(expected_inputs: usize, strategy: MergeStrategy) -> Self {
69        Self {
70            expected_inputs,
71            strategy,
72        }
73    }
74}
75
76impl Node for MergeNode {
77    fn info(&self) -> NodeInfo {
78        let mut inputs = Vec::with_capacity(self.expected_inputs);
79        for i in 0..self.expected_inputs {
80            inputs.push(Port::named(
81                format!("in_{}", i),
82                PortDirection::Input,
83                "Any",
84            ));
85        }
86
87        NodeInfo::new("std", "merge")
88            .with_description("N→1 barrier that waits for all inputs before continuing")
89            .with_inputs(inputs)
90            .with_outputs(vec![Port::output("Any"), Port::error()])
91    }
92
93    fn execute<'a>(&'a self, _ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a> {
94        Box::pin(async move {
95            let received = inputs.len();
96
97            match &self.strategy {
98                MergeStrategy::WaitAll => {
99                    if received < self.expected_inputs {
100                        tracing::debug!(
101                            expected = self.expected_inputs,
102                            received = received,
103                            "Merge waiting for more inputs"
104                        );
105                        // In a real implementation, this would block until all inputs arrive.
106                        // For now, we proceed with what we have.
107                    }
108                }
109                MergeStrategy::FirstArrival => {
110                    // Proceed as soon as any input arrives
111                }
112                MergeStrategy::WaitN(n) => {
113                    if received < *n {
114                        tracing::debug!(
115                            expected = n,
116                            received = received,
117                            "Merge waiting for more inputs (wait_n)"
118                        );
119                    }
120                }
121            }
122
123            // For now, return the first input as the merged output.
124            // A proper implementation would combine all inputs into a single structure.
125            if let Some((_, ptr)) = inputs.into_iter().next() {
126                Ok(NodeOutput::out(ptr))
127            } else {
128                Ok(NodeOutput::out(RelPtr::<()>::null()))
129            }
130        })
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn merge_node_info() {
140        let node = MergeNode::new(3);
141        let info = node.info();
142
143        assert_eq!(info.name, "std::merge");
144        assert_eq!(info.inputs.len(), 3);
145        assert_eq!(info.inputs[0].name, "in_0");
146        assert_eq!(info.inputs[1].name, "in_1");
147        assert_eq!(info.inputs[2].name, "in_2");
148    }
149
150    #[test]
151    fn merge_strategy_default() {
152        let strategy = MergeStrategy::default();
153        assert!(matches!(strategy, MergeStrategy::WaitAll));
154    }
155}