use circular_buffer::CircularBuffer;
use cu29::prelude::*;
pub struct TimeboundCircularBuffer<const S: usize, P, M>
where
P: CuMsgPayload,
M: Metadata,
{
pub inner: CircularBuffer<S, CuStampedData<P, M>>,
}
#[allow(dead_code)]
fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
match tov {
Tov::Time(time) => Some(*time),
Tov::Range(range) => Some(range.start), Tov::None => None,
}
}
fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
match tov {
Tov::Time(time) => Some(*time),
Tov::Range(range) => Some(range.end), Tov::None => None,
}
}
impl<const S: usize, P> Default for TimeboundCircularBuffer<S, P, CuMsgMetadata>
where
P: CuMsgPayload,
{
fn default() -> Self {
Self::new()
}
}
impl<const S: usize, P> TimeboundCircularBuffer<S, P, CuMsgMetadata>
where
P: CuMsgPayload,
{
pub fn new() -> Self {
Self {
inner: CircularBuffer::<S, CuStampedData<P, CuMsgMetadata>>::new(),
}
}
pub fn iter_window(
&self,
start_time: CuTime,
end_time: CuTime,
) -> impl Iterator<Item = &CuStampedData<P, CuMsgMetadata>> {
self.inner.iter().filter(move |msg| match msg.tov {
Tov::Time(time) => time >= start_time && time <= end_time,
Tov::Range(range) => range.start >= start_time && range.end <= end_time,
_ => false,
})
}
pub fn purge(&mut self, time_horizon: CuTime) {
let drain_end = self
.inner
.iter()
.position(|msg| match msg.tov {
Tov::Time(time) => time >= time_horizon,
Tov::Range(range) => range.end >= time_horizon,
_ => false,
})
.unwrap_or(self.inner.len());
self.inner.drain(..drain_end);
}
pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
let mut latest: Option<CuTime> = None;
for msg in self.inner.iter() {
let time = extract_tov_time_right(&msg.tov).ok_or_else(|| {
CuError::from("Trying to align temporal data with no time information")
})?;
latest = Some(latest.map_or(time, |current_max| current_max.max(time)));
}
Ok(latest)
}
pub fn push(&mut self, msg: CuStampedData<P, CuMsgMetadata>) {
self.inner.push_back(msg);
}
}
#[macro_export]
macro_rules! alignment_buffers {
($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuStampedData<$payload:ty, CuMsgMetadata>>),*) => {
struct $struct_name {
target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration, $(pub $name: $crate::buffers::TimeboundCircularBuffer<$size, $payload, CuMsgMetadata>),*
}
impl $struct_name {
pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
Self {
target_alignment_window,
stale_data_horizon,
$($name: $crate::buffers::TimeboundCircularBuffer::<$size, $payload, CuMsgMetadata>::new()),*
}
}
#[allow(dead_code)]
pub fn purge(&mut self, now: cu29::clock::CuTime) {
use cu29::prelude::SaturatingSub;
let horizon_time = now.saturating_sub(self.stale_data_horizon);
$(self.$name.purge(horizon_time);)*
}
#[allow(dead_code)]
pub fn get_latest_aligned_data(
&mut self,
) -> Option<($(impl Iterator<Item = &cu29::cutask::CuStampedData<$payload, CuMsgMetadata>>),*)> {
use cu29::prelude::SaturatingSub;
let most_recent_time = [
$(self.$name.most_recent_time().unwrap_or(None)),*
]
.into_iter()
.flatten()
.min()?;
let time_to_get_complete_window =
most_recent_time.saturating_sub(self.target_alignment_window);
Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
}
}
};
}
pub use alignment_buffers;
#[cfg(test)]
mod tests {
use cu29::clock::Tov;
use cu29::cutask::*;
use std::time::Duration;
#[test]
fn simple_init_test() {
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u64, CuMsgMetadata>>);
let buffers =
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
assert_eq!(buffers.buffer1.inner.capacity(), 10);
assert_eq!(buffers.buffer2.inner.capacity(), 12);
}
#[test]
fn purge_test() {
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>);
let mut buffers =
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
let mut msg1 = CuStampedData::new(Some(1));
msg1.tov = Tov::Time(Duration::from_secs(1).into());
buffers.buffer1.inner.push_back(msg1.clone());
buffers.buffer2.inner.push_back(msg1);
buffers.purge(Duration::from_secs(2).into());
assert_eq!(buffers.buffer1.inner.len(), 1);
assert_eq!(buffers.buffer2.inner.len(), 1);
buffers.purge(Duration::from_secs(5).into());
assert_eq!(buffers.buffer1.inner.len(), 0);
assert_eq!(buffers.buffer2.inner.len(), 0);
}
#[test]
fn empty_buffers_test() {
alignment_buffers!(
AlignmentBuffers,
buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
);
let mut buffers = AlignmentBuffers::new(
Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
assert!(buffers.get_latest_aligned_data().is_none());
}
#[test]
fn horizon_and_window_alignment_test() {
alignment_buffers!(
AlignmentBuffers,
buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
);
let mut buffers = AlignmentBuffers::new(
Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
let mut msg1 = CuStampedData::new(Some(1));
msg1.tov = Tov::Time(Duration::from_secs(1).into());
buffers.buffer1.inner.push_back(msg1.clone());
buffers.buffer2.inner.push_back(msg1);
let mut msg2 = CuStampedData::new(Some(3));
msg2.tov = Tov::Time(Duration::from_secs(3).into());
buffers.buffer2.inner.push_back(msg2);
let mut msg3 = CuStampedData::new(Some(4));
msg3.tov = Tov::Time(Duration::from_secs(4).into());
buffers.buffer1.inner.push_back(msg3.clone());
buffers.buffer2.inner.push_back(msg3);
let now = Duration::from_secs(7).into();
buffers.purge(now);
if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
let collected1: Vec<_> = iter1.collect();
let collected2: Vec<_> = iter2.collect();
assert_eq!(collected1.len(), 1);
assert_eq!(collected2.len(), 2);
assert_eq!(collected1[0].payload(), Some(&4));
assert_eq!(collected2[0].payload(), Some(&3));
assert_eq!(collected2[1].payload(), Some(&4));
} else {
panic!("Expected aligned data, but got None");
}
assert_eq!(buffers.buffer1.inner.len(), 1);
assert_eq!(buffers.buffer2.inner.len(), 2);
}
}