cu_aligner/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod buffers;
4
5/// Define a task that aligns incoming messages based on their timestamps
6/// See module doc for use.
7#[macro_export]
8macro_rules! define_task {
9    ($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
10
11       paste::paste! {
12            $crate::buffers::alignment_buffers!(
13                AlignmentBuffers,
14                $(
15                    [<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
16                ),*
17            );
18        }
19
20        pub struct $name {
21            aligner: AlignmentBuffers,
22        }
23
24        impl Freezable for $name {}
25
26        impl CuTask for $name {
27    type Resources<'r> = ();
28            type Input<'m> = input_msg!('m, $($p),*);
29            type Output<'m> = output_msg!(($(
30                cu29::payload::CuArray<$p, { $mos }>
31            ),*));
32
33            fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
34            where
35                Self: Sized,
36            {
37                let config = config.ok_or_else(|| cu29::CuError::from("Config Missing"))?;
38                let target_alignment_window: u64 =
39                    config.get::<u32>("target_alignment_window_ms").ok_or_else(|| cu29::CuError::from("Missing target_alignment_window"))?.into();
40                let stale_data_horizon: u64 =
41                    config.get::<u32>("stale_data_horizon_ms").ok_or_else(|| cu29::CuError::from("Missing stale_data_horizon"))?.into();
42
43                Ok(Self {
44                    aligner: AlignmentBuffers::new(cu29_clock::CuDuration(target_alignment_window as u64 * 1_000_000),cu29_clock::CuDuration(stale_data_horizon as u64 * 1_000_000)),
45                })
46            }
47
48            fn preprocess(&mut self, clock: &cu29_clock::RobotClock) -> CuResult<()> {
49                self.aligner.purge(clock.now());
50                Ok(())
51            }
52
53            fn process(
54                &mut self,
55                _clock: &cu29::clock::RobotClock,
56                input: &Self::Input<'_>,
57                output: &mut Self::Output<'_>,
58            ) -> CuResult<()> {
59                // add the incoming data into the buffers
60                // input is a tuple of &CuMsg<T> for each T in the input
61                paste::paste! {
62                    $(
63                        self.aligner.[<buffer $index>].push(input.$index.clone());
64                    )*
65                }
66
67
68                let tuple_of_iters = self.aligner.get_latest_aligned_data();
69                if tuple_of_iters.is_none() {
70                    return Ok(());
71                }
72
73                // this is a tuple of iterators of CuStampedDataSet
74                let tuple_of_iters = tuple_of_iters.unwrap();
75
76                // Populate the CuArray fields in the output message
77                let output_payload = output.payload_mut().get_or_insert_with(Default::default);
78                $(
79                    output_payload.$index.fill_from_iter(tuple_of_iters.$index.map(|msg| msg.payload().unwrap().clone()));
80                )*
81                Ok(())
82            }
83        }
84    };
85}
86
87#[cfg(test)]
88mod tests {
89    use cu29::CuResult;
90    use cu29::config::ComponentConfig;
91    use cu29::cutask::CuMsg;
92    use cu29::cutask::CuTask;
93    use cu29::cutask::Freezable;
94    use cu29::cutask::{CuMsgMetadata, CuStampedData};
95    use cu29::input_msg;
96    use cu29::output_msg;
97    use cu29::payload::CuArray;
98
99    define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
100    #[test]
101    fn test_aligner_smoketest() {
102        let mut config = ComponentConfig::default();
103        config.set("target_alignment_window_ms", 1000);
104        config.set("stale_data_horizon_ms", 2000);
105        let mut aligner = AlignerTask::new(Some(&config), ()).unwrap();
106        let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
107        let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
108        let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
109        let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
110        let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
111
112        let clock = cu29::clock::RobotClock::new();
113        let result = aligner.process(&clock, &input, &mut output);
114        assert!(result.is_ok());
115    }
116    mod string_payload {
117        use super::*;
118        use cu29::clock::{CuDuration, Tov};
119
120        define_task!(StringAlignerTask, 0 => { 4, 4, String }, 1 => { 4, 4, String });
121
122        #[test]
123        fn test_aligner_string_payload() {
124            let mut config = ComponentConfig::default();
125            config.set("target_alignment_window_ms", 10);
126            config.set("stale_data_horizon_ms", 1000);
127            let mut aligner = StringAlignerTask::new(Some(&config), ()).unwrap();
128
129            let mut left = CuStampedData::<String, CuMsgMetadata>::new(Some("left".to_string()));
130            let mut right = CuStampedData::<String, CuMsgMetadata>::new(Some("right".to_string()));
131            let tov_time = CuDuration::from_millis(100);
132            left.tov = Tov::Time(tov_time);
133            right.tov = Tov::Time(tov_time);
134
135            let input: <StringAlignerTask as CuTask>::Input<'_> = (&left, &right);
136            let mut output =
137                CuStampedData::<(CuArray<String, 4>, CuArray<String, 4>), CuMsgMetadata>::default();
138
139            let clock = cu29::clock::RobotClock::new();
140            aligner.process(&clock, &input, &mut output).unwrap();
141
142            let payload = output.payload().unwrap();
143            assert_eq!(payload.0.len(), 1);
144            assert_eq!(payload.1.len(), 1);
145            assert_eq!(payload.0.as_slice()[0], "left");
146            assert_eq!(payload.1.as_slice()[0], "right");
147        }
148    }
149}