hydro2_network/
wire_up.rs

1// ---------------- [ File: src/wire_up.rs ]
2crate::ix!();
3
4/// Wires up all outputs and inputs across the network, checking port‐range, type‐string compatibility,
5/// required‐connection fulfillment, and fan‐in/fan‐out rules. Returns an error if any violation occurs.
6pub fn wire_up_network<NetworkItem>(
7    net: &mut Network<NetworkItem>
8) -> NetResult<()>
9where
10    NetworkItem: Debug + Send + Sync + Default,
11{
12    // 1) Pre-allocate the outputs for each node based on operator.output_count().
13    //    We support up to 4 physical outputs. If operator claims >4, fail.
14    for (node_idx, node) in net.nodes_mut().iter_mut().enumerate() {
15        let out_count = node.operator().output_count();
16        if out_count > 4 {
17            return Err(NetworkError::InvalidConfiguration {
18                details: format!(
19                    "Node #{} operator output_count={} exceeds the max of 4 ports",
20                    node_idx, out_count
21                ),
22            });
23        }
24        // Allocate (Arc<AsyncRwLock<NetworkItem>>) for each real port:
25        for port_idx in 0..out_count {
26            node.outputs_mut()[port_idx] = Some(Arc::new(AsyncRwLock::new(NetworkItem::default())));
27        }
28        // Ensure leftover "physical" slots up to 4 are None:
29        for port_idx in out_count..4 {
30            node.outputs_mut()[port_idx] = None;
31        }
32    }
33
34    // 2) For each node, we track how many inputs are actually used in total
35    //    (kept for historical exact-match checks) plus per-port usage.
36    let mut used_input_count: Vec<usize> = vec![0; net.nodes().len()];
37    // Also track usage counts to validate required connections:
38    let mut input_usage = vec![[0usize; 4]; net.nodes().len()];   // node_idx -> [port0_count, port1_count, ...]
39    let mut output_usage = vec![[0usize; 4]; net.nodes().len()];  // node_idx -> [port0_count, port1_count, ...]
40
41    // We'll clone edges since we might do multiple passes
42    let edges = net.edges().clone();
43
44    // 3) Connect each edge => set the downstream node’s input array slot
45    //    to the same arc used by the upstream node’s output array slot.
46    for (edge_idx, edge) in edges.iter().enumerate() {
47        let src = edge.source_index();
48        let so  = edge.source_output_idx();
49        let dst = edge.dest_index();
50        let di  = edge.dest_input_idx();
51
52        // Basic node‐index range checks:
53        if *src >= net.nodes().len() || *dst >= net.nodes().len() {
54            return Err(NetworkError::InvalidConfiguration {
55                details: format!(
56                    "Edge #{} references invalid node index (src={}, dst={}, node_count={})",
57                    edge_idx, src, dst, net.nodes().len()
58                ),
59            });
60        }
61        let src_op = &net.nodes()[*src].operator();
62        let dst_op = &net.nodes()[*dst].operator();
63
64        // Upstream operator’s output_count => so must be < that count
65        if *so >= src_op.output_count() {
66            return Err(NetworkError::InvalidConfiguration {
67                details: format!(
68                    "Edge #{} references source node {} output port {}, but operator only has {} outputs",
69                    edge_idx, src, so, src_op.output_count()
70                ),
71            });
72        }
73        // Downstream operator’s input_count => di must be < that count
74        if *di >= dst_op.input_count() {
75            return Err(NetworkError::InvalidConfiguration {
76                details: format!(
77                    "Edge #{} references dest node {} input port {}, but operator only has {} inputs",
78                    edge_idx, dst, di, dst_op.input_count()
79                ),
80            });
81        }
82
83        // Check physical limit of 4:
84        if *so >= 4 || *di >= 4 {
85            return Err(NetworkError::InvalidConfiguration {
86                details: format!(
87                    "Edge #{} references port so={}, di={} >= 4, out of range for 4 ports",
88                    edge_idx, so, di
89                ),
90            });
91        }
92
93        // 3a) Check type‐string matching: out_str == in_str
94        // If either operator reports None for that port, treat as a mismatch.
95        let out_str = match src_op.output_port_type_str(*so) {
96            Some(s) => s,
97            None => {
98                return Err(NetworkError::InvalidConfiguration {
99                    details: format!(
100                        "Edge #{} source node {} output port {} has no declared type string",
101                        edge_idx, src, so
102                    ),
103                });
104            }
105        };
106        let in_str = match dst_op.input_port_type_str(*di) {
107            Some(s) => s,
108            None => {
109                return Err(NetworkError::InvalidConfiguration {
110                    details: format!(
111                        "Edge #{} dest node {} input port {} has no declared type string",
112                        edge_idx, dst, di
113                    ),
114                });
115            }
116        };
117        if out_str != in_str {
118            return Err(NetworkError::InvalidConfiguration {
119                details: format!(
120                    "Edge #{} type mismatch: source node {} output port {} => {:?} != dest node {} input port {} => {:?}",
121                    edge_idx, src, so, out_str, dst, di, in_str
122                ),
123            });
124        }
125
126        // 3b) Retrieve Arc from node[src].outputs[so] and assign to node[dst].inputs[di]
127        let arc_opt = net.nodes()[*src].outputs()[*so].clone();
128        if arc_opt.is_none() {
129            return Err(NetworkError::InvalidConfiguration {
130                details: format!(
131                    "Edge #{} references source node {} output port {}, but that port is None",
132                    edge_idx, src, so
133                ),
134            });
135        }
136        net.nodes_mut()[*dst].inputs_mut()[*di] = arc_opt;
137
138        // 3c) Keep track of usage:
139        used_input_count[*dst] += 1;
140        input_usage[*dst][*di] += 1;
141        output_usage[*src][*so] += 1;
142    }
143
144    // 4) Post-check: 
145    //    a) Each node’s total used inputs must match node.operator().input_count() => historical exact‐match rule
146    //    b) Ensure no input port is double‐fed (input_usage>1)
147    //    c) Enforce required connections for input ports
148    //    d) Enforce required connections for output ports
149    for (node_idx, node) in net.nodes().iter().enumerate() {
150        let op = node.operator();
151        let in_count = op.input_count();
152        let out_count = op.output_count();
153        let used = used_input_count[node_idx];
154
155        // (a) Historical exact‐match check
156        if used != in_count {
157            return Err(NetworkError::InvalidConfiguration {
158                details: format!(
159                    "Node #{} operator expects {} inputs, but wired edges used {}",
160                    node_idx, in_count, used
161                ),
162            });
163        }
164
165        // (b, c) For each input port in [0..in_count], check no double‐feeding & required connections
166        for i in 0..in_count {
167            if input_usage[node_idx][i] > 1 {
168                return Err(NetworkError::InvalidConfiguration {
169                    details: format!(
170                        "Node #{} input port {} is fed by multiple edges ({}), which is not allowed",
171                        node_idx, i, input_usage[node_idx][i]
172                    ),
173                });
174            }
175            if op.input_port_connection_required(i) && input_usage[node_idx][i] == 0 {
176                return Err(NetworkError::InvalidConfiguration {
177                    details: format!(
178                        "Node #{} input port {} is required but has no incoming edge",
179                        node_idx, i
180                    ),
181                });
182            }
183        }
184
185        // (d) For each output port in [0..out_count], if it’s required => must have at least one consumer
186        //     Note that fan‐out is allowed, so we only check output_usage[node_idx][port] > 0
187        for o in 0..out_count {
188            if op.output_port_connection_required(o) && output_usage[node_idx][o] == 0 {
189                return Err(NetworkError::InvalidConfiguration {
190                    details: format!(
191                        "Node #{} output port {} is required but has no downstream edge",
192                        node_idx, o
193                    ),
194                });
195            }
196        }
197    }
198
199    Ok(())
200}
201
202// =====================
203// wire_up_network_tests.rs
204// =====================
205#[cfg(test)]
206mod wire_up_network_tests {
207
208    use super::*; // bring in wire_up_network, Network, operator types, etc.
209
210    /// 1) Single node => operator=ConstantOp => input_count=0 => output_count=1 => no edges => OK.
211    ///    Basic check that we allocate 1 output slot and do not require any input edges.
212    #[test]
213    fn test_wire_up_single_constantop_ok() -> Result<(), NetworkError> {
214
215        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => ConstantOp::new(100));
216
217        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
218            .nodes(vec![n0])
219            .edges(vec![])
220            .build()
221            .unwrap();
222
223        // wire_up => ensures we have an Arc for output port=0
224        wire_up_network(&mut net)?;
225
226        assert!(net.nodes()[0].outputs()[0].is_some());
227        assert!(net.nodes()[0].outputs()[1].is_none());
228
229        // input_count=0 => so no edges => inputs=all None
230        for i in 0..4 {
231            assert!(net.nodes()[0].inputs()[i].is_none());
232        }
233
234        Ok(())
235    }
236
237    /// 2) Single node => operator=AddOp => input_count=1 => output_count=1 => no edges => Mismatch => error.
238    #[test]
239    fn test_wire_up_single_addop_mismatch() {
240        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => AddOp::new(10));
241        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
242            .nodes(vec![n0])
243            .edges(vec![])
244            .build()
245            .unwrap();
246
247        let res = wire_up_network(&mut net);
248        assert!(res.is_err());
249        if let Err(NetworkError::InvalidConfiguration{ details }) = res {
250            assert!(
251                details.contains("expects 1 inputs, but wired edges used 0"),
252                "Expected an input_count mismatch error, got: {}",
253                details
254            );
255        }
256    }
257
258    /// 3) Two nodes => Node0=ConstantOp => Node1=AddOp => single edge => OK.
259    ///    We specifically test that Node1 sees input_count=1 used => no mismatch.
260    #[test]
261    fn test_wire_up_two_nodes_ok() -> Result<(), NetworkError> {
262        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => ConstantOp::new(42));
263        let n1: NetworkNode<TestWireIO<i32>> = node!(1 => AddOp::new(5));
264        let e = edge!(0:0 -> 1:0);
265
266        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
267            .nodes(vec![n0, n1])
268            .edges(vec![e])
269            .build()
270            .unwrap();
271
272        wire_up_network(&mut net)?;
273
274        // Node0 => out_count=1 => outputs[0].is_some
275        assert!(net.nodes()[0].outputs()[0].is_some());
276        // Node1 => in_count=1 => inputs[0].is_some
277        assert!(net.nodes()[1].inputs()[0].is_some());
278
279        Ok(())
280    }
281
282    /// 4) Multiple edges from the same output => Node0 => Node1, Node2. 
283    ///    E.g. Node0=ConstantOp => Node1=AddOp => Node2=MultiplyOp => 
284    ///    We do not fail if we fan‐out from node0’s output to multiple nodes. 
285    ///    Each node sees input_count=1 used => no mismatch.
286    #[test]
287    fn test_wire_up_fanout_ok() -> Result<(), NetworkError> {
288        // Node0 => out_count=1 => edges => Node1( input_count=1 ) and Node2( input_count=1 )
289        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => ConstantOp::new(10));
290        let n1: NetworkNode<TestWireIO<i32>> = node!(1 => AddOp::new(100));
291        let n2: NetworkNode<TestWireIO<i32>> = node!(2 => MultiplyOp::new(2));
292
293        let e1 = edge!(0:0 -> 1:0);
294        let e2 = edge!(0:0 -> 2:0);
295
296        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
297            .nodes(vec![n0, n1, n2])
298            .edges(vec![e1, e2])
299            .build()
300            .unwrap();
301
302        wire_up_network(&mut net)?;
303        // Node0 => outputs[0].some => used by both Node1, Node2
304        assert!(net.nodes()[0].outputs()[0].is_some());
305        // Node1 => inputs[0].some
306        assert!(net.nodes()[1].inputs()[0].is_some());
307        // Node2 => inputs[0].some
308        assert!(net.nodes()[2].inputs()[0].is_some());
309        Ok(())
310    }
311
312    /// 5) The same input port is fed by multiple edges => that’s not allowed => we expect an error
313    ///    because used_input_count would be incremented more than once.
314    #[test]
315    fn test_wire_up_double_feed_same_input_port() {
316        // Node1 => input_count=1 => but we attempt 2 edges => Node0 -> Node1:0, Node2 -> Node1:0
317        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => ConstantOp::new(42));
318        let n1: NetworkNode<TestWireIO<i32>> = node!(1 => AddOp::new(5));
319        let n2: NetworkNode<TestWireIO<i32>> = node!(2 => ConstantOp::new(123));
320
321        // Edges => Node0:0->Node1:0, and Node2:0->Node1:0
322        // This means node1's input=port0 is fed from two arcs => 
323        // wire_up_network => used_input_count[1]=2 while operator requires 1 => error.
324        let e0 = edge!(0:0 -> 1:0);
325        let e1 = edge!(2:0 -> 1:0);
326
327        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
328            .nodes(vec![n0, n1, n2])
329            .edges(vec![e0, e1])
330            .build()
331            .unwrap();
332
333        let res = wire_up_network(&mut net);
334        assert!(res.is_err());
335        if let Err(NetworkError::InvalidConfiguration{details}) = res {
336            assert!(
337                details.contains("expects 1 inputs, but wired edges used 2"),
338                "Expected mismatch for node=1 with 2 edges into the same port. Got: {}",
339                details
340            );
341        }
342    }
343
344    /// 6) The operator is declared with .output_count=2, but we connect no edges to the 2nd output => that’s allowed
345    ///    as long as the operator’s input_count is matched. 
346    ///    We can create a custom operator that returns .input_count=1, .output_count=2 to test it.
347    #[test]
348    fn test_wire_up_operator_with_2_outputs_but_only_1_used() -> Result<(), NetworkError> {
349        // Node0 => DoubleOutOp => 2 outputs => we connect only the 0th to Node1 => that’s fine 
350        // as long as wire_up sees no mismatch in input_count. 
351        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => DoubleOutOp::default());
352        let n1: NetworkNode<TestWireIO<i32>> = node!(1 => MultiplyOp::new(2));
353        // Edges => 0:0 -> 1:0. We leave out 0:1 => not used. 
354        // Node0’s input_count=1 => we must feed it from e.g. a ConstantOp or remove the input altogether. 
355        // But that requires node0’s input_count=1 => Mismatch again. 
356        // So let’s chain a new Node(-1) => a short hack:
357        let nX: NetworkNode<TestWireIO<i32>> = node!(999 => ConstantOp::new(77));
358        // Edge => 999:0 -> 0:0 => so node0 gets input
359        let eX = edge!(999:0 -> 0:0);
360
361        let e0 = edge!(0:0 -> 1:0);
362
363        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
364            .nodes(vec![nX, n0, n1]) // nodeX=0th in array, node0=1, node1=2 => watch out for indexing mismatch
365            // Actually we must be consistent with indexes => let’s do the real approach:
366            .edges(vec![ eX, e0 ])
367            .build()
368            .unwrap();
369
370        // Wait, we are referencing node=999 in the builder but we only pushed 3 nodes => that’s an out_of_range.
371        // Instead, let’s reorder:
372        // node0 => constant
373        // node1 => double-out
374        // node2 => multiply
375        // edges => (0->1:0) => feed double-out input, (1:0->2:0) => only first output used
376        let n0const: NetworkNode<TestWireIO<i32>> = node!(0 => ConstantOp::new(77));
377        let n1dbl:   NetworkNode<TestWireIO<i32>> = node!(1 => DoubleOutOp::default());
378        let n2mul:   NetworkNode<TestWireIO<i32>> = node!(2 => MultiplyOp::new(2));
379        let eA = edge!(0:0 -> 1:0); // feed the double-out
380        let eB = edge!(1:0 -> 2:0); // use only the first output
381        let mut net = NetworkBuilder::<TestWireIO<i32>>::default()
382            .nodes(vec![n0const, n1dbl, n2mul])
383            .edges(vec![eA, eB])
384            .build()
385            .unwrap();
386
387        wire_up_network(&mut net)?;
388        // Node1 => out_count=2 => so we have outputs[0].some, outputs[1].some 
389        assert!(net.nodes()[1].outputs()[0].is_some());
390        assert!(net.nodes()[1].outputs()[1].is_some());
391        // Node2 => in_count=1 => inputs[0].some => from node1:0
392        assert!(net.nodes()[2].inputs()[0].is_some());
393        // That second output node1:1 is allocated but not used => that’s allowed => no error
394        Ok(())
395    }
396}