cu_aligner/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod buffers;
4
5/// Define a task that aligns incoming messages based on their timestamps
6/// See module doc for use.
7#[macro_export]
8macro_rules! define_task {
9    ($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
10
11       paste::paste! {
12            $crate::buffers::alignment_buffers!(
13                AlignmentBuffers,
14                $(
15                    [<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
16                ),*
17            );
18        }
19
20        pub struct $name {
21            aligner: AlignmentBuffers,
22        }
23
24        impl Freezable for $name {}
25
26        impl CuTask for $name {
27            type Input<'m> = input_msg!('m, $($p),*);
28            type Output<'m> = output_msg!(($(
29                cu29::payload::CuArray<$p, { $mos }>
30            ),*));
31
32            fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
33            where
34                Self: Sized,
35            {
36                let config = config.ok_or_else(|| cu29::CuError::from("Config Missing"))?;
37                let target_alignment_window: u64 =
38                    config.get::<u32>("target_alignment_window_ms").ok_or_else(|| cu29::CuError::from("Missing target_alignment_window"))?.into();
39                let stale_data_horizon: u64 =
40                    config.get::<u32>("stale_data_horizon_ms").ok_or_else(|| cu29::CuError::from("Missing stale_data_horizon"))?.into();
41
42                Ok(Self {
43                    aligner: AlignmentBuffers::new(cu29_clock::CuDuration(target_alignment_window as u64 * 1_000_000),cu29_clock::CuDuration(stale_data_horizon as u64 * 1_000_000)),
44                })
45            }
46
47            fn preprocess(&mut self, clock: &cu29_clock::RobotClock) -> CuResult<()> {
48                self.aligner.purge(clock.now());
49                Ok(())
50            }
51
52            fn process(
53                &mut self,
54                _clock: &cu29::clock::RobotClock,
55                input: &Self::Input<'_>,
56                output: &mut Self::Output<'_>,
57            ) -> CuResult<()> {
58                // add the incoming data into the buffers
59                // input is a tuple of &CuMsg<T> for each T in the input
60                paste::paste! {
61                    $(
62                        self.aligner.[<buffer $index>].push(input.$index.clone());
63                    )*
64                }
65
66
67                let tuple_of_iters = self.aligner.get_latest_aligned_data();
68                if tuple_of_iters.is_none() {
69                    return Ok(());
70                }
71
72                // this is a tuple of iterators of CuStampedDataSet
73                let tuple_of_iters = tuple_of_iters.unwrap();
74
75                // Populate the CuArray fields in the output message
76                $(
77                    output.payload_mut().as_mut().unwrap().$index.fill_from_iter(tuple_of_iters.$index.map(|msg| *msg.payload().unwrap()));
78                )*
79                Ok(())
80            }
81        }
82    };
83}
84
85#[cfg(test)]
86mod tests {
87    use cu29::config::ComponentConfig;
88    use cu29::cutask::CuMsg;
89    use cu29::cutask::CuTask;
90    use cu29::cutask::Freezable;
91    use cu29::cutask::{CuMsgMetadata, CuStampedData};
92    use cu29::input_msg;
93    use cu29::output_msg;
94    use cu29::payload::CuArray;
95    use cu29::CuResult;
96
97    define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
98
99    #[test]
100    fn test_aligner_smoketest() {
101        let mut config = ComponentConfig::default();
102        config.set("target_alignment_window_ms", 1000);
103        config.set("stale_data_horizon_ms", 2000);
104        let mut aligner = AlignerTask::new(Some(&config)).unwrap();
105        let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
106        let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
107        let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
108        let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
109        let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
110
111        let clock = cu29::clock::RobotClock::new();
112        let result = aligner.process(&clock, &input, &mut output);
113        assert!(result.is_ok());
114    }
115}