hydro2_network/
lib.rs

1// ---------------- [ File: src/lib.rs ]
2#[macro_use] mod imports; use imports::*;
3
4x!{wire_up}
5x!{test_wire}
6x!{edge}
7x!{node}
8x!{network}
9x!{validate}
10
11#[cfg(test)]
12mod large_network_integration_tests {
13    use super::*;
14    use hydro2_network_wire_derive::*;
15    use futures::executor::block_on; // or some async runtime approach
16
17    #[traced_test]
18    fn test_large_network_flow() -> Result<(), NetworkError> {
19
20        let net: Network<TestWireIO<i32>> = network!{
21            // We create 8 nodes in a chain/fan-out style:
22            //   n0(Constant(10)) => n1(AddOp(5)) => n2(MultiplyOp(3)) => n3(NoOp)
23            //   also from n2 => n4(SplitAndDoubleOp) => out => n5(AddOp(100)) => ...
24            //   etc. We can get creative.
25            vec![
26                // no input, output=10
27                node!(0 => ConstantOp::new(10)),   
28
29                // input=10 => output=15
30                node!(1 => AddOp::new(5)),         
31
32                // input=15 => output=45
33                node!(2 => MultiplyOp::new(3)),    
34
35                node!(3 => SingleChannelPassthroughOperator::<i32>::with_name("NoOp3")),
36
37                node!(4 => SplitAndDoubleOp::default()),
38
39                // feed from n4:0 or n4:1
40                node!(5 => AddOp::new(100)),       
41
42                // optional => invert something
43                node!(6 => MultiplyOp::new(-1)),   
44
45                node!(7 => SingleValOp::default()),
46            ],
47            // Edges:
48            // n0:0 -> n1:0 -> n2:0 -> n3:0 -> n4:0 => fan out => n5, n6, n7 or something
49            vec![
50                edge!(0:0 -> 1:0),
51                edge!(1:0 -> 2:0),
52                edge!(2:0 -> 3:0),
53                edge!(3:0 -> 4:0), // input_count=1 => no problem
54
55                // n4 => output_count=2 => we can feed n5, n6 from the two outputs
56                edge!(4:0 -> 5:0), // out0 => input0 => n5 => yields something
57                edge!(4:1 -> 6:0), // out1 => input0 => n6 => yields something
58
59                // n7 stands alone
60            ]
61        };
62
63        eprintln!("net: {:#?}", net);
64
65        // Now we want to actually “execute” the final pipeline. 
66        // Because we have no scheduling system here, let's do a manual BFS style:
67        //  - find a node with no inputs => run => find any node that is now complete => run => etc.
68        // Or we can just do a topological order and call node[i].execute().await in that order.
69
70        // Topological order: [0,1,2,3,4,5,6,7]
71        // We'll do an async block_on approach:
72        block_on(async {
73            for i in 0..8 {
74                net.nodes()[i].execute().await?;
75            }
76            Ok::<(), NetworkError>(())
77        })?;
78
79        eprintln!("net, post exec: {:#?}", net);
80
81        // Now let's see what happened:
82        // Node0 => output=10
83        // Node1 => input=10 => output=15
84        // Node2 => input=15 => output=45
85        // Node3 => input=45 => output=None => (NoOp)
86        // Node4 => input=45 => out0=45, out1=90
87        // Node5 => input=45 => output=145
88        // Node6 => input=90 => output=-90
89        // Node7 => input=145 => output=777 (SingleValOp doesn’t use input)
90        
91        // Let's verify that manually:
92        // We can read back each node’s output arcs if we want:
93        let node7_output_arc_opt = net.nodes()[7].outputs()[0].clone();
94        assert!(node7_output_arc_opt.is_some());
95        let node7_arc = node7_output_arc_opt.unwrap();
96        let node7_val = block_on(async { node7_arc.read().await.clone() });
97        // singleValOp => forced=777
98        assert_eq!(node7_val, TestWireIO::SingleValOpIO(SingleValOpIO::Output0(777)));
99
100        // Also we can confirm node6 => -90
101        let node6_arc = net.nodes()[6].outputs()[0].clone().unwrap();
102        let node6_val = block_on(async { node6_arc.read().await.clone() });
103        assert_eq!(node6_val, TestWireIO::MultiplyOpIO(MultiplyOpIO::Output0(-90)));
104
105        // Node5 => out=145
106        let node5_arc = net.nodes()[5].outputs()[0].clone().unwrap();
107        let node5_val = block_on(async { node5_arc.read().await.clone() });
108        assert_eq!(node5_val, TestWireIO::AddOpIO(AddOpIO::Output0(145)));
109
110        // Node4 => out0=45, out1=90
111        let node4_arc0 = net.nodes()[4].outputs()[0].clone().unwrap();
112        let node4_val0 = block_on(async { node4_arc0.read().await.clone() });
113        assert_eq!(node4_val0, TestWireIO::SplitAndDoubleOpIO(SplitAndDoubleOpIO::Output0(45)));
114        let node4_arc1 = net.nodes()[4].outputs()[1].clone().unwrap();
115        let node4_val1 = block_on(async { node4_arc1.read().await.clone() });
116        assert_eq!(node4_val1, TestWireIO::SplitAndDoubleOpIO(SplitAndDoubleOpIO::Output1(90)));
117
118        Ok(())
119    }
120}