hydro2_network/lib.rs
1// ---------------- [ File: src/lib.rs ]
2#[macro_use] mod imports; use imports::*;
3
4x!{wire_up}
5x!{test_wire}
6x!{edge}
7x!{node}
8x!{network}
9x!{validate}
10
11#[cfg(test)]
12mod large_network_integration_tests {
13 use super::*;
14 use hydro2_network_wire_derive::*;
15 use futures::executor::block_on; // or some async runtime approach
16
17 #[traced_test]
18 fn test_large_network_flow() -> Result<(), NetworkError> {
19
20 let net: Network<TestWireIO<i32>> = network!{
21 // We create 8 nodes in a chain/fan-out style:
22 // n0(Constant(10)) => n1(AddOp(5)) => n2(MultiplyOp(3)) => n3(NoOp)
23 // also from n2 => n4(SplitAndDoubleOp) => out => n5(AddOp(100)) => ...
24 // etc. We can get creative.
25 vec![
26 // no input, output=10
27 node!(0 => ConstantOp::new(10)),
28
29 // input=10 => output=15
30 node!(1 => AddOp::new(5)),
31
32 // input=15 => output=45
33 node!(2 => MultiplyOp::new(3)),
34
35 node!(3 => SingleChannelPassthroughOperator::<i32>::with_name("NoOp3")),
36
37 node!(4 => SplitAndDoubleOp::default()),
38
39 // feed from n4:0 or n4:1
40 node!(5 => AddOp::new(100)),
41
42 // optional => invert something
43 node!(6 => MultiplyOp::new(-1)),
44
45 node!(7 => SingleValOp::default()),
46 ],
47 // Edges:
48 // n0:0 -> n1:0 -> n2:0 -> n3:0 -> n4:0 => fan out => n5, n6, n7 or something
49 vec![
50 edge!(0:0 -> 1:0),
51 edge!(1:0 -> 2:0),
52 edge!(2:0 -> 3:0),
53 edge!(3:0 -> 4:0), // input_count=1 => no problem
54
55 // n4 => output_count=2 => we can feed n5, n6 from the two outputs
56 edge!(4:0 -> 5:0), // out0 => input0 => n5 => yields something
57 edge!(4:1 -> 6:0), // out1 => input0 => n6 => yields something
58
59 // n7 stands alone
60 ]
61 };
62
63 eprintln!("net: {:#?}", net);
64
65 // Now we want to actually “execute” the final pipeline.
66 // Because we have no scheduling system here, let's do a manual BFS style:
67 // - find a node with no inputs => run => find any node that is now complete => run => etc.
68 // Or we can just do a topological order and call node[i].execute().await in that order.
69
70 // Topological order: [0,1,2,3,4,5,6,7]
71 // We'll do an async block_on approach:
72 block_on(async {
73 for i in 0..8 {
74 net.nodes()[i].execute().await?;
75 }
76 Ok::<(), NetworkError>(())
77 })?;
78
79 eprintln!("net, post exec: {:#?}", net);
80
81 // Now let's see what happened:
82 // Node0 => output=10
83 // Node1 => input=10 => output=15
84 // Node2 => input=15 => output=45
85 // Node3 => input=45 => output=None => (NoOp)
86 // Node4 => input=45 => out0=45, out1=90
87 // Node5 => input=45 => output=145
88 // Node6 => input=90 => output=-90
89 // Node7 => input=145 => output=777 (SingleValOp doesn’t use input)
90
91 // Let's verify that manually:
92 // We can read back each node’s output arcs if we want:
93 let node7_output_arc_opt = net.nodes()[7].outputs()[0].clone();
94 assert!(node7_output_arc_opt.is_some());
95 let node7_arc = node7_output_arc_opt.unwrap();
96 let node7_val = block_on(async { node7_arc.read().await.clone() });
97 // singleValOp => forced=777
98 assert_eq!(node7_val, TestWireIO::SingleValOpIO(SingleValOpIO::Output0(777)));
99
100 // Also we can confirm node6 => -90
101 let node6_arc = net.nodes()[6].outputs()[0].clone().unwrap();
102 let node6_val = block_on(async { node6_arc.read().await.clone() });
103 assert_eq!(node6_val, TestWireIO::MultiplyOpIO(MultiplyOpIO::Output0(-90)));
104
105 // Node5 => out=145
106 let node5_arc = net.nodes()[5].outputs()[0].clone().unwrap();
107 let node5_val = block_on(async { node5_arc.read().await.clone() });
108 assert_eq!(node5_val, TestWireIO::AddOpIO(AddOpIO::Output0(145)));
109
110 // Node4 => out0=45, out1=90
111 let node4_arc0 = net.nodes()[4].outputs()[0].clone().unwrap();
112 let node4_val0 = block_on(async { node4_arc0.read().await.clone() });
113 assert_eq!(node4_val0, TestWireIO::SplitAndDoubleOpIO(SplitAndDoubleOpIO::Output0(45)));
114 let node4_arc1 = net.nodes()[4].outputs()[1].clone().unwrap();
115 let node4_val1 = block_on(async { node4_arc1.read().await.clone() });
116 assert_eq!(node4_val1, TestWireIO::SplitAndDoubleOpIO(SplitAndDoubleOpIO::Output1(90)));
117
118 Ok(())
119 }
120}