1#![doc = include_str!("../README.md")]
2
3pub mod buffers;
4
5#[macro_export]
8macro_rules! define_task {
9 ($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
10
11 paste::paste! {
12 $crate::buffers::alignment_buffers!(
13 AlignmentBuffers,
14 $(
15 [<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
16 ),*
17 );
18 }
19
20 pub struct $name {
21 aligner: AlignmentBuffers,
22 }
23
24 impl Freezable for $name {}
25
26 impl CuTask for $name {
27 type Input<'m> = input_msg!('m, $($p),*);
28 type Output<'m> = output_msg!(($(
29 cu29::payload::CuArray<$p, { $mos }>
30 ),*));
31
32 fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
33 where
34 Self: Sized,
35 {
36 let config = config.ok_or_else(|| cu29::CuError::from("Config Missing"))?;
37 let target_alignment_window: u64 =
38 config.get::<u32>("target_alignment_window_ms").ok_or_else(|| cu29::CuError::from("Missing target_alignment_window"))?.into();
39 let stale_data_horizon: u64 =
40 config.get::<u32>("stale_data_horizon_ms").ok_or_else(|| cu29::CuError::from("Missing stale_data_horizon"))?.into();
41
42 Ok(Self {
43 aligner: AlignmentBuffers::new(cu29_clock::CuDuration(target_alignment_window as u64 * 1_000_000),cu29_clock::CuDuration(stale_data_horizon as u64 * 1_000_000)),
44 })
45 }
46
47 fn preprocess(&mut self, clock: &cu29_clock::RobotClock) -> CuResult<()> {
48 self.aligner.purge(clock.now());
49 Ok(())
50 }
51
52 fn process(
53 &mut self,
54 _clock: &cu29::clock::RobotClock,
55 input: &Self::Input<'_>,
56 output: &mut Self::Output<'_>,
57 ) -> CuResult<()> {
58 paste::paste! {
61 $(
62 self.aligner.[<buffer $index>].push(input.$index.clone());
63 )*
64 }
65
66
67 let tuple_of_iters = self.aligner.get_latest_aligned_data();
68 if tuple_of_iters.is_none() {
69 return Ok(());
70 }
71
72 let tuple_of_iters = tuple_of_iters.unwrap();
74
75 $(
77 output.payload_mut().as_mut().unwrap().$index.fill_from_iter(tuple_of_iters.$index.map(|msg| *msg.payload().unwrap()));
78 )*
79 Ok(())
80 }
81 }
82 };
83}
84
85#[cfg(test)]
86mod tests {
87 use cu29::config::ComponentConfig;
88 use cu29::cutask::CuMsg;
89 use cu29::cutask::CuTask;
90 use cu29::cutask::Freezable;
91 use cu29::cutask::{CuMsgMetadata, CuStampedData};
92 use cu29::input_msg;
93 use cu29::output_msg;
94 use cu29::payload::CuArray;
95 use cu29::CuResult;
96
97 define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
98
99 #[test]
100 fn test_aligner_smoketest() {
101 let mut config = ComponentConfig::default();
102 config.set("target_alignment_window_ms", 1000);
103 config.set("stale_data_horizon_ms", 2000);
104 let mut aligner = AlignerTask::new(Some(&config)).unwrap();
105 let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
106 let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
107 let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
108 let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
109 let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
110
111 let clock = cu29::clock::RobotClock::new();
112 let result = aligner.process(&clock, &input, &mut output);
113 assert!(result.is_ok());
114 }
115}