#![doc = include_str!("../README.md")]
pub mod buffers;
#[macro_export]
macro_rules! define_task {
($name:ident, $($index:tt => { $mis:expr, $mos:expr, $p:ty }),+) => {
paste::paste! {
#[allow(unused_imports)]
use cu29::prelude::*;
$crate::buffers::alignment_buffers!(
AlignmentBuffers,
$(
[<buffer $index>]: TimeboundCircularBuffer<$mis, CuStampedData<$p, CuMsgMetadata>>
),*
);
}
#[derive(Reflect)]
#[reflect(from_reflect = false)]
pub struct $name {
#[reflect(ignore)]
aligner: AlignmentBuffers,
}
impl Freezable for $name {}
impl CuTask for $name {
type Resources<'r> = ();
type Input<'m> = input_msg!('m, $($p),*);
type Output<'m> = output_msg!(($(
CuArray<$p, { $mos }>
),*));
fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
where
Self: Sized,
{
let config = config.ok_or_else(|| CuError::from("Config Missing"))?;
let target_alignment_window_ms: u64 = config
.get::<u32>("target_alignment_window_ms")?
.ok_or_else(|| CuError::from("Missing target_alignment_window"))?
.into();
let stale_data_horizon_ms: u64 = config
.get::<u32>("stale_data_horizon_ms")?
.ok_or_else(|| CuError::from("Missing stale_data_horizon"))?
.into();
let target_alignment_window =
cu29_clock::CuDuration(target_alignment_window_ms * 1_000_000);
let stale_data_horizon =
cu29_clock::CuDuration(stale_data_horizon_ms * 1_000_000);
Ok(Self {
aligner: AlignmentBuffers::new(target_alignment_window, stale_data_horizon),
})
}
fn preprocess(&mut self, ctx: &CuContext) -> CuResult<()> {
self.aligner.purge(ctx.now());
Ok(())
}
fn process(
&mut self,
_ctx: &CuContext,
input: &Self::Input<'_>,
output: &mut Self::Output<'_>,
) -> CuResult<()> {
paste::paste! {
$(
self.aligner.[<buffer $index>].push(input.$index.clone());
)*
}
let Some(tuple_of_iters) = self.aligner.get_latest_aligned_data() else {
return Ok(());
};
let output_payload = output.payload_mut().get_or_insert_with(Default::default);
$(
output_payload.$index.fill_from_iter(tuple_of_iters.$index.map(|msg| msg.payload().unwrap().clone()));
)*
Ok(())
}
}
};
}
#[cfg(test)]
mod tests {
use cu29::prelude::*;
define_task!(AlignerTask, 0 => { 10, 5, f32 }, 1 => { 5, 10, i32 });
#[test]
fn test_aligner_smoketest() {
let mut config = ComponentConfig::default();
config.set("target_alignment_window_ms", 1000);
config.set("stale_data_horizon_ms", 2000);
let mut aligner = AlignerTask::new(Some(&config), ()).unwrap();
let m1 = CuStampedData::<f32, CuMsgMetadata>::default();
let m2 = CuStampedData::<i32, CuMsgMetadata>::default();
let input: <AlignerTask as CuTask>::Input<'_> = (&m1, &m2);
let m3 = CuStampedData::<(CuArray<f32, 5>, CuArray<i32, 10>), CuMsgMetadata>::default();
let mut output: <AlignerTask as CuTask>::Output<'_> = m3;
let ctx = CuContext::new_with_clock();
let result = aligner.process(&ctx, &input, &mut output);
assert!(result.is_ok());
}
mod string_payload {
use super::*;
define_task!(StringAlignerTask, 0 => { 4, 4, String }, 1 => { 4, 4, String });
#[test]
fn test_aligner_string_payload() {
let mut config = ComponentConfig::default();
config.set("target_alignment_window_ms", 10);
config.set("stale_data_horizon_ms", 1000);
let mut aligner = StringAlignerTask::new(Some(&config), ()).unwrap();
let mut left = CuStampedData::<String, CuMsgMetadata>::new(Some("left".to_string()));
let mut right = CuStampedData::<String, CuMsgMetadata>::new(Some("right".to_string()));
let tov_time = CuDuration::from_millis(100);
left.tov = Tov::Time(tov_time);
right.tov = Tov::Time(tov_time);
let input: <StringAlignerTask as CuTask>::Input<'_> = (&left, &right);
let mut output =
CuStampedData::<(CuArray<String, 4>, CuArray<String, 4>), CuMsgMetadata>::default();
let ctx = CuContext::new_with_clock();
aligner.process(&ctx, &input, &mut output).unwrap();
let payload = output.payload().unwrap();
assert_eq!(payload.0.len(), 1);
assert_eq!(payload.1.len(), 1);
assert_eq!(payload.0.as_slice()[0], "left");
assert_eq!(payload.1.as_slice()[0], "right");
}
}
}