xerv_nodes/flow/merge.rs
1//! Merge node (N→1 barrier).
2//!
3//! The merge node waits for all expected inputs before continuing.
4//! It collects data from multiple upstream nodes and merges them into
5//! a single output.
6
7use std::collections::HashMap;
8use xerv_core::traits::{Context, Node, NodeFuture, NodeInfo, NodeOutput, Port, PortDirection};
9use xerv_core::types::RelPtr;
10
11/// Configuration for merge behavior.
12#[derive(Debug, Clone)]
13pub enum MergeStrategy {
14 /// Wait for all inputs (barrier semantics).
15 WaitAll,
16 /// Proceed when any input arrives (race semantics).
17 FirstArrival,
18 /// Wait for a specific number of inputs.
19 WaitN(usize),
20}
21
22impl Default for MergeStrategy {
23 fn default() -> Self {
24 Self::WaitAll
25 }
26}
27
28/// Merge node - N→1 barrier.
29///
30/// Waits for multiple inputs to arrive before proceeding.
31/// The output contains all merged inputs.
32///
33/// # Ports
34/// - Input: Multiple named inputs (e.g., "in_0", "in_1", "in_2")
35/// - Output: "out" - Merged data containing all inputs
36///
37/// # Example Configuration
38/// ```yaml
39/// nodes:
40/// merge_results:
41/// type: std::merge
42/// config:
43/// strategy: wait_all # or first_arrival, wait_n
44/// wait_count: 3 # for wait_n strategy
45/// inputs:
46/// - from: branch_a.out -> in_0
47/// - from: branch_b.out -> in_1
48/// - from: branch_c.out -> in_2
49/// ```
50#[derive(Debug)]
51pub struct MergeNode {
52 /// Number of expected inputs.
53 expected_inputs: usize,
54 /// Merge strategy.
55 strategy: MergeStrategy,
56}
57
58impl MergeNode {
59 /// Create a new merge node expecting the given number of inputs.
60 pub fn new(expected_inputs: usize) -> Self {
61 Self {
62 expected_inputs,
63 strategy: MergeStrategy::WaitAll,
64 }
65 }
66
67 /// Create a merge node with custom strategy.
68 pub fn with_strategy(expected_inputs: usize, strategy: MergeStrategy) -> Self {
69 Self {
70 expected_inputs,
71 strategy,
72 }
73 }
74}
75
76impl Node for MergeNode {
77 fn info(&self) -> NodeInfo {
78 let mut inputs = Vec::with_capacity(self.expected_inputs);
79 for i in 0..self.expected_inputs {
80 inputs.push(Port::named(
81 format!("in_{}", i),
82 PortDirection::Input,
83 "Any",
84 ));
85 }
86
87 NodeInfo::new("std", "merge")
88 .with_description("N→1 barrier that waits for all inputs before continuing")
89 .with_inputs(inputs)
90 .with_outputs(vec![Port::output("Any"), Port::error()])
91 }
92
93 fn execute<'a>(&'a self, _ctx: Context, inputs: HashMap<String, RelPtr<()>>) -> NodeFuture<'a> {
94 Box::pin(async move {
95 let received = inputs.len();
96
97 match &self.strategy {
98 MergeStrategy::WaitAll => {
99 if received < self.expected_inputs {
100 tracing::debug!(
101 expected = self.expected_inputs,
102 received = received,
103 "Merge waiting for more inputs"
104 );
105 // In a real implementation, this would block until all inputs arrive.
106 // For now, we proceed with what we have.
107 }
108 }
109 MergeStrategy::FirstArrival => {
110 // Proceed as soon as any input arrives
111 }
112 MergeStrategy::WaitN(n) => {
113 if received < *n {
114 tracing::debug!(
115 expected = n,
116 received = received,
117 "Merge waiting for more inputs (wait_n)"
118 );
119 }
120 }
121 }
122
123 // For now, return the first input as the merged output.
124 // A proper implementation would combine all inputs into a single structure.
125 if let Some((_, ptr)) = inputs.into_iter().next() {
126 Ok(NodeOutput::out(ptr))
127 } else {
128 Ok(NodeOutput::out(RelPtr::<()>::null()))
129 }
130 })
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn merge_node_info() {
140 let node = MergeNode::new(3);
141 let info = node.info();
142
143 assert_eq!(info.name, "std::merge");
144 assert_eq!(info.inputs.len(), 3);
145 assert_eq!(info.inputs[0].name, "in_0");
146 assert_eq!(info.inputs[1].name, "in_1");
147 assert_eq!(info.inputs[2].name, "in_2");
148 }
149
150 #[test]
151 fn merge_strategy_default() {
152 let strategy = MergeStrategy::default();
153 assert!(matches!(strategy, MergeStrategy::WaitAll));
154 }
155}