hydro2_basic_operators/
stream_test_op.rs

1// ---------------- [ File: src/stream_test_op.rs ]
2crate::ix!();
3
4// --------------------------------------
5// StreamyOperator
6// --------------------------------------
7#[derive(NamedItem, Operator, Debug)]
8#[operator(
9    execute="run_stream",
10    opcode="BasicOpCode::StreamTestOp",
11    output0="T",
12    output1="T",
13    output2="T",
14    output3="T"
15)]
16pub struct StreamyOperator<T> 
17where T: PartialEq + Eq + Send + Sync + Debug + Copy + Zero,
18{
19    name: String,
20    outs: [T; 4],
21}
22
23impl<T> StreamyOperator<T> 
24where T: PartialEq + Eq + Send + Sync + Debug + Copy + Zero,
25{
26    pub fn new(label: &str) -> Self {
27        Self {
28            name: label.to_string(),
29            outs: [T::zero(),T::zero(),T::zero(),T::zero()],
30        }
31    }
32    pub fn new_with(label: &str, outs: [T;4]) -> Self {
33        Self {
34            name: label.to_string(),
35            outs
36        }
37    }
38
39    async fn run_stream(&self) -> NetResult<(T,T,T,T)> {
40        info!("StreamyOperator '{}' => run_stream", self.name);
41        Ok((self.outs[0], self.outs[1], self.outs[2], self.outs[3]))
42    }
43}
44
45#[cfg(test)]
46mod streamy_operator_tests {
47    use super::*;
48
49    #[tokio::test]
50    async fn test_streamy_operator_default_val() -> Result<(), NetworkError> {
51        let s = StreamyOperator::<i32>::new_with("tester",[7,0,0,0]);
52        assert_eq!(s.opcode().val(), BasicOpCode::StreamTestOp.val());
53        assert_eq!(s.name(), "tester");
54
55        let dummy_input = [None, None, None, None];
56        let mut out = [None,None,None,None];
57        s.execute(dummy_input,&mut out).await?;
58        // Because the operator’s output is [7,0,0,0]
59        // we must compare with Some(StreamyOperatorIO::Output0(7)), etc.
60        assert_eq!(out[0], Some(StreamyOperatorIO::Output0(7)));
61        assert_eq!(out[1], Some(StreamyOperatorIO::Output1(0)));
62        assert_eq!(out[2], Some(StreamyOperatorIO::Output2(0)));
63        assert_eq!(out[3], Some(StreamyOperatorIO::Output3(0)));
64        Ok(())
65    }
66
67    #[tokio::test]
68    async fn test_streamy_operator_custom_val() -> Result<(), NetworkError> {
69        let mut s = StreamyOperator::<i32>::new("my-stream");
70        s.outs = [123, 0, 0, 0];
71        let dummy_input = [None, None, None, None];
72        let mut out = [None,None,None,None];
73        s.execute(dummy_input, &mut out).await?;
74        assert_eq!(out[0], Some(StreamyOperatorIO::Output0(123)));
75        assert_eq!(out[1], Some(StreamyOperatorIO::Output1(0)));
76        assert_eq!(out[2], Some(StreamyOperatorIO::Output2(0)));
77        assert_eq!(out[3], Some(StreamyOperatorIO::Output3(0)));
78        Ok(())
79    }
80
81    #[tokio::test]
82    async fn test_streamy_operator_basic() -> Result<(), NetworkError> {
83        let op = StreamyOperator::<i32>::new_with("tester", [10,20,30,40]);
84        let input: [Option<&StreamyOperatorIO<i32>>;4] = [None,None,None,None];
85        let mut out: [Option<StreamyOperatorIO<i32>>;4] = [None,None,None,None];
86        op.execute(input, &mut out).await?;
87
88        // Expect out0=10, out1=20, out2=30, out3=40
89        assert_eq!(out[0], Some(StreamyOperatorIO::Output0(10)));
90        assert_eq!(out[1], Some(StreamyOperatorIO::Output1(20)));
91        assert_eq!(out[2], Some(StreamyOperatorIO::Output2(30)));
92        assert_eq!(out[3], Some(StreamyOperatorIO::Output3(40)));
93        Ok(())
94    }
95}