Skip to main content

cu_aligner/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod buffers;
4
5/// Define a task that aligns incoming messages based on their timestamps
6/// See module doc for use.
7#[macro_export]
8macro_rules! define_task {
9    ($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
10
11       paste::paste! {
12            #[allow(unused_imports)]
13            use cu29::prelude::*;
14
15            $crate::buffers::alignment_buffers!(
16                AlignmentBuffers,
17                $(
18                    [<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
19                ),*
20            );
21        }
22
23        #[derive(Reflect)]
24        #[reflect(from_reflect = false)]
25        pub struct $name {
26            #[reflect(ignore)]
27            aligner: AlignmentBuffers,
28        }
29
30        impl Freezable for $name {}
31
32        impl CuTask for $name {
33    type Resources<'r> = ();
34            type Input<'m> = input_msg!('m, $($p),*);
35            type Output<'m> = output_msg!(($(
36                CuArray<$p, { $mos }>
37            ),*));
38
39            fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
40            where
41                Self: Sized,
42            {
43                let config = config.ok_or_else(|| CuError::from("Config Missing"))?;
44                let target_alignment_window_ms: u64 = config
45                    .get::<u32>("target_alignment_window_ms")?
46                    .ok_or_else(|| CuError::from("Missing target_alignment_window"))?
47                    .into();
48                let stale_data_horizon_ms: u64 = config
49                    .get::<u32>("stale_data_horizon_ms")?
50                    .ok_or_else(|| CuError::from("Missing stale_data_horizon"))?
51                    .into();
52
53                let target_alignment_window =
54                    cu29_clock::CuDuration(target_alignment_window_ms * 1_000_000);
55                let stale_data_horizon =
56                    cu29_clock::CuDuration(stale_data_horizon_ms * 1_000_000);
57
58                Ok(Self {
59                    aligner: AlignmentBuffers::new(target_alignment_window, stale_data_horizon),
60                })
61            }
62
63            fn preprocess(&mut self, ctx: &CuContext) -> CuResult<()> {
64                self.aligner.purge(ctx.now());
65                Ok(())
66            }
67
68            fn process(
69                &mut self,
70                _ctx: &CuContext,
71                input: &Self::Input<'_>,
72                output: &mut Self::Output<'_>,
73            ) -> CuResult<()> {
74                // add the incoming data into the buffers
75                // input is a tuple of &CuMsg<T> for each T in the input
76                paste::paste! {
77                    $(
78                        self.aligner.[<buffer $index>].push(input.$index.clone());
79                    )*
80                }
81
82
83                // this is a tuple of iterators of CuStampedDataSet
84                let Some(tuple_of_iters) = self.aligner.get_latest_aligned_data() else {
85                    return Ok(());
86                };
87
88                // Populate the CuArray fields in the output message
89                let output_payload = output.payload_mut().get_or_insert_with(Default::default);
90                $(
91                    output_payload.$index.fill_from_iter(tuple_of_iters.$index.map(|msg| msg.payload().unwrap().clone()));
92                )*
93                Ok(())
94            }
95        }
96    };
97}
98
99#[cfg(test)]
100mod tests {
101    use cu29::prelude::*;
102
103    define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
104    #[test]
105    fn test_aligner_smoketest() {
106        let mut config = ComponentConfig::default();
107        config.set("target_alignment_window_ms", 1000);
108        config.set("stale_data_horizon_ms", 2000);
109        let mut aligner = AlignerTask::new(Some(&config), ()).unwrap();
110        let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
111        let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
112        let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
113        let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
114        let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
115
116        let ctx = CuContext::new_with_clock();
117        let result = aligner.process(&ctx, &input, &mut output);
118        assert!(result.is_ok());
119    }
120    mod string_payload {
121        use super::*;
122
123        define_task!(StringAlignerTask, 0 => { 4, 4, String }, 1 => { 4, 4, String });
124
125        #[test]
126        fn test_aligner_string_payload() {
127            let mut config = ComponentConfig::default();
128            config.set("target_alignment_window_ms", 10);
129            config.set("stale_data_horizon_ms", 1000);
130            let mut aligner = StringAlignerTask::new(Some(&config), ()).unwrap();
131
132            let mut left = CuStampedData::<String, CuMsgMetadata>::new(Some("left".to_string()));
133            let mut right = CuStampedData::<String, CuMsgMetadata>::new(Some("right".to_string()));
134            let tov_time = CuDuration::from_millis(100);
135            left.tov = Tov::Time(tov_time);
136            right.tov = Tov::Time(tov_time);
137
138            let input: <StringAlignerTask as CuTask>::Input<'_> = (&left, &right);
139            let mut output =
140                CuStampedData::<(CuArray<String, 4>, CuArray<String, 4>), CuMsgMetadata>::default();
141
142            let ctx = CuContext::new_with_clock();
143            aligner.process(&ctx, &input, &mut output).unwrap();
144
145            let payload = output.payload().unwrap();
146            assert_eq!(payload.0.len(), 1);
147            assert_eq!(payload.1.len(), 1);
148            assert_eq!(payload.0.as_slice()[0], "left");
149            assert_eq!(payload.1.as_slice()[0], "right");
150        }
151    }
152}