1#![doc = include_str!("../README.md")]
2
3pub mod buffers;
4
5#[macro_export]
8macro_rules! define_task {
9 ($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
10
11 paste::paste! {
12 #[allow(unused_imports)]
13 use cu29::prelude::*;
14
15 $crate::buffers::alignment_buffers!(
16 AlignmentBuffers,
17 $(
18 [<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
19 ),*
20 );
21 }
22
23 #[derive(Reflect)]
24 #[reflect(from_reflect = false)]
25 pub struct $name {
26 #[reflect(ignore)]
27 aligner: AlignmentBuffers,
28 }
29
30 impl Freezable for $name {}
31
32 impl CuTask for $name {
33 type Resources<'r> = ();
34 type Input<'m> = input_msg!('m, $($p),*);
35 type Output<'m> = output_msg!(($(
36 CuArray<$p, { $mos }>
37 ),*));
38
39 fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
40 where
41 Self: Sized,
42 {
43 let config = config.ok_or_else(|| CuError::from("Config Missing"))?;
44 let target_alignment_window_ms: u64 = config
45 .get::<u32>("target_alignment_window_ms")?
46 .ok_or_else(|| CuError::from("Missing target_alignment_window"))?
47 .into();
48 let stale_data_horizon_ms: u64 = config
49 .get::<u32>("stale_data_horizon_ms")?
50 .ok_or_else(|| CuError::from("Missing stale_data_horizon"))?
51 .into();
52
53 let target_alignment_window =
54 cu29_clock::CuDuration(target_alignment_window_ms * 1_000_000);
55 let stale_data_horizon =
56 cu29_clock::CuDuration(stale_data_horizon_ms * 1_000_000);
57
58 Ok(Self {
59 aligner: AlignmentBuffers::new(target_alignment_window, stale_data_horizon),
60 })
61 }
62
63 fn preprocess(&mut self, ctx: &CuContext) -> CuResult<()> {
64 self.aligner.purge(ctx.now());
65 Ok(())
66 }
67
68 fn process(
69 &mut self,
70 _ctx: &CuContext,
71 input: &Self::Input<'_>,
72 output: &mut Self::Output<'_>,
73 ) -> CuResult<()> {
74 paste::paste! {
77 $(
78 self.aligner.[<buffer $index>].push(input.$index.clone());
79 )*
80 }
81
82
83 let Some(tuple_of_iters) = self.aligner.get_latest_aligned_data() else {
85 return Ok(());
86 };
87
88 let output_payload = output.payload_mut().get_or_insert_with(Default::default);
90 $(
91 output_payload.$index.fill_from_iter(tuple_of_iters.$index.map(|msg| msg.payload().unwrap().clone()));
92 )*
93 Ok(())
94 }
95 }
96 };
97}
98
99#[cfg(test)]
100mod tests {
101 use cu29::prelude::*;
102
103 define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
104 #[test]
105 fn test_aligner_smoketest() {
106 let mut config = ComponentConfig::default();
107 config.set("target_alignment_window_ms", 1000);
108 config.set("stale_data_horizon_ms", 2000);
109 let mut aligner = AlignerTask::new(Some(&config), ()).unwrap();
110 let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
111 let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
112 let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
113 let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
114 let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
115
116 let ctx = CuContext::new_with_clock();
117 let result = aligner.process(&ctx, &input, &mut output);
118 assert!(result.is_ok());
119 }
120 mod string_payload {
121 use super::*;
122
123 define_task!(StringAlignerTask, 0 => { 4, 4, String }, 1 => { 4, 4, String });
124
125 #[test]
126 fn test_aligner_string_payload() {
127 let mut config = ComponentConfig::default();
128 config.set("target_alignment_window_ms", 10);
129 config.set("stale_data_horizon_ms", 1000);
130 let mut aligner = StringAlignerTask::new(Some(&config), ()).unwrap();
131
132 let mut left = CuStampedData::<String, CuMsgMetadata>::new(Some("left".to_string()));
133 let mut right = CuStampedData::<String, CuMsgMetadata>::new(Some("right".to_string()));
134 let tov_time = CuDuration::from_millis(100);
135 left.tov = Tov::Time(tov_time);
136 right.tov = Tov::Time(tov_time);
137
138 let input: <StringAlignerTask as CuTask>::Input<'_> = (&left, &right);
139 let mut output =
140 CuStampedData::<(CuArray<String, 4>, CuArray<String, 4>), CuMsgMetadata>::default();
141
142 let ctx = CuContext::new_with_clock();
143 aligner.process(&ctx, &input, &mut output).unwrap();
144
145 let payload = output.payload().unwrap();
146 assert_eq!(payload.0.len(), 1);
147 assert_eq!(payload.1.len(), 1);
148 assert_eq!(payload.0.as_slice()[0], "left");
149 assert_eq!(payload.1.as_slice()[0], "right");
150 }
151 }
152}