hydro2_network/
node.rs

1// ---------------- [ File: src/node.rs ]
2crate::ix!();
3
4/// Represents a single node in the network. Each node has
5/// an associated operator and references to its input and output buffers.
6///
7/// TODO: make sure we cant push operators with the same ID
8#[derive(Builder,MutGetters,Setters,Getters,Debug,Clone)]
9#[builder(setter(into))]
10#[getset(get="pub",set = "pub", get_mut = "pub")]
11pub struct NetworkNode<NetworkItem> 
12where NetworkItem: Debug + Send + Sync
13{
14    /// Index of this node in the network.
15    index: usize,
16
17    /// The operator that this node executes.
18    operator: Arc<dyn Operator<NetworkItem>>,
19
20    /// All input buffers required by this node.
21    inputs:  NetworkNodeIoChannelArray<NetworkItem>,
22
23    /// All output buffers that this node will populate.
24    outputs: NetworkNodeIoChannelArray<NetworkItem>,
25}
26
27impl<NetworkItem> NetworkNode<NetworkItem> 
28where NetworkItem: Debug + Send + Sync
29{
30    /// Acquire read locks on inputs, then call the operator, storing its
31    /// results into a local buffer. Finally, acquire write locks and copy
32    /// the data into the real outputs via `finish`.
33    pub async fn execute(&self) -> NetResult<()> {
34
35        // 1) Acquire read locks
36        let mut read_guards: NetworkNodeIoChannelReadGuardArray<'_, NetworkItem> 
37            = [None, None, None, None];
38
39        for i in 0..4 {
40            if let Some(arc) = &self.inputs[i] {
41                read_guards[i] = Some(arc.read().await);
42            }
43        }
44
45        // Convert to a [Option<&NetworkItem>; 4]
46        let inputs: [Option<&NetworkItem>; 4] = [
47            read_guards[0].as_ref().map(|g| &**g),
48            read_guards[1].as_ref().map(|g| &**g),
49            read_guards[2].as_ref().map(|g| &**g),
50            read_guards[3].as_ref().map(|g| &**g),
51        ];
52
53        // 2) Prepare a local buffer for operator to fill
54        let mut output_buffer: NetworkNodeIoChannelValues<NetworkItem> =
55            [None, None, None, None];
56
57        // 3) Call the operator asynchronously, passing references
58        self.operator.execute(inputs, &mut output_buffer).await?;
59
60        // 4) Acquire write locks
61        let mut write_guards: [Option<tokio::sync::RwLockWriteGuard<'_, NetworkItem>>; 4]
62            = [None, None, None, None];
63
64        for i in 0..4 {
65            if let Some(arc) = &self.outputs[i] {
66                write_guards[i] = Some(arc.write().await);
67            }
68        }
69
70        // 5) Write results from local buffer into outputs
71        Self::finish_execution(write_guards, output_buffer)?;
72
73        Ok(())
74    }
75
76    fn finish_execution(mut output: NetworkNodeIoChannelWriteGuardArray<'_,NetworkItem>, mut values: NetworkNodeIoChannelValues<NetworkItem>) -> NetResult<()> 
77    where NetworkItem: Debug + Send + Sync
78    {
79        if let Some(o0) = &mut output[0] { if let Some(v) = values[0].take() { **o0 = v; } }
80        if let Some(o1) = &mut output[1] { if let Some(v) = values[1].take() { **o1 = v; } }
81        if let Some(o2) = &mut output[2] { if let Some(v) = values[2].take() { **o2 = v; } }
82        if let Some(o3) = &mut output[3] { if let Some(v) = values[3].take() { **o3 = v; } }
83        Ok(())
84    }
85}
86
87#[macro_export]
88macro_rules! node {
89    ($idx:expr => $op:expr) => {
90        NetworkNodeBuilder::default()
91            .index($idx as usize)
92            .operator($op.into_arc_operator())
93            .inputs([None, None, None, None])
94            .outputs([None, None, None, None])
95            .build()
96            .unwrap()
97    };
98}
99
100#[cfg(test)]
101mod node_macro_tests {
102    use super::*;
103
104    #[test]
105    fn test_node_macro_single_noop() {
106        // We use node!(0 => NoOpOperator::default())
107        let n0: NetworkNode<TestWireIO<i32>> = node!(0 => NoOpOperator::default());
108        assert_eq!(*n0.index(), 0);
109        assert_eq!(n0.operator().name(), "default");
110        // By default, node! sets inputs/outputs to [None;4]
111        for i in 0..4 {
112            assert!(n0.inputs()[i].is_none(), "Expected no inputs yet");
113            assert!(n0.outputs()[i].is_none(), "Expected no outputs yet");
114        }
115    }
116
117    #[test]
118    fn test_node_macro_with_custom_operator() {
119        // Suppose we have AddOp(7).
120        let n7: NetworkNode<TestWireIO<i32>> = node!(7 => AddOp::new(7));
121        assert_eq!(*n7.index(), 7);
122        assert_eq!(n7.operator().name(), "AddOp(+7)");
123        // Still no inputs/outputs at creation time
124        for i in 0..4 {
125            assert!(n7.inputs()[i].is_none());
126            assert!(n7.outputs()[i].is_none());
127        }
128    }
129}