use circular_buffer::CircularBuffer;
use cu29::clock::{CuTime, Tov};
use cu29::cutask::{CuMsg, CuMsgPayload};
use cu29::{CuError, CuResult};
pub struct TimeboundCircularBuffer<const S: usize, P>
where
P: CuMsgPayload,
{
pub inner: CircularBuffer<S, CuMsg<P>>,
}
#[allow(dead_code)]
fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
match tov {
Tov::Time(time) => Some(*time),
Tov::Range(range) => Some(range.start), Tov::None => None,
}
}
fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
match tov {
Tov::Time(time) => Some(*time),
Tov::Range(range) => Some(range.end), Tov::None => None,
}
}
impl<const S: usize, P> TimeboundCircularBuffer<S, P>
where
P: CuMsgPayload,
{
pub fn new() -> Self {
TimeboundCircularBuffer {
inner: CircularBuffer::<S, CuMsg<P>>::new(),
}
}
pub fn iter_window(
&self,
start_time: CuTime,
end_time: CuTime,
) -> impl Iterator<Item = &CuMsg<P>> {
self.inner.iter().filter(move |msg| match msg.metadata.tov {
Tov::Time(time) => time >= start_time && time <= end_time,
Tov::Range(range) => range.start >= start_time && range.end <= end_time,
_ => false,
})
}
pub fn purge(&mut self, time_horizon: CuTime) {
let drain_end = self
.inner
.iter()
.position(|msg| match msg.metadata.tov {
Tov::Time(time) => time >= time_horizon,
Tov::Range(range) => range.end >= time_horizon,
_ => false,
})
.unwrap_or(self.inner.len()); self.inner.drain(..drain_end);
}
pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
self.inner
.iter()
.map(|msg| extract_tov_time_right(&msg.metadata.tov))
.try_fold(None, |acc, time| {
let time = time.ok_or_else(|| {
CuError::from("Trying to align temporal data with no time information")
})?;
Ok(Some(
acc.map_or(time, |current_max: CuTime| current_max.max(time)),
))
})
}
pub fn push(&mut self, msg: CuMsg<P>) {
self.inner.push_back(msg);
}
}
#[macro_export]
macro_rules! alignment_buffers {
($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuMsg<$payload:ty>>),*) => {
struct $struct_name {
target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration, $(pub $name: crate::buffers::TimeboundCircularBuffer<$size, $payload>),*
}
impl $struct_name {
pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
Self {
target_alignment_window,
stale_data_horizon,
$($name: crate::buffers::TimeboundCircularBuffer::<$size, $payload>::new()),*
}
}
#[allow(dead_code)]
pub fn purge(&mut self, now: cu29::clock::CuTime) {
let horizon_time = now - self.stale_data_horizon;
$(self.$name.purge(horizon_time);)*
}
#[allow(dead_code)]
pub fn get_latest_aligned_data(
&mut self,
) -> Option<($(impl Iterator<Item = &cu29::cutask::CuMsg<$payload>>),*)> {
let most_recent_time = [
$(self.$name.most_recent_time().unwrap_or(None)),*
]
.iter()
.filter_map(|&time| time)
.min();
if most_recent_time.is_none() {
return None;
}
let most_recent_time = most_recent_time.unwrap();
let time_to_get_complete_window = most_recent_time - self.target_alignment_window;
Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
}
}
};
}
pub use alignment_buffers;
#[cfg(test)]
mod tests {
use cu29::clock::Tov;
use cu29::cutask::CuMsg;
use std::time::Duration;
#[test]
fn simple_init_test() {
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u64>>);
let buffers =
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
assert_eq!(buffers.buffer1.inner.capacity(), 10);
assert_eq!(buffers.buffer2.inner.capacity(), 12);
}
#[test]
fn purge_test() {
alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>);
let mut buffers =
AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
let mut msg1 = CuMsg::new(Some(1));
msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
buffers.buffer1.inner.push_back(msg1.clone());
buffers.buffer2.inner.push_back(msg1);
let _ = buffers.purge(Duration::from_secs(2).into());
assert_eq!(buffers.buffer1.inner.len(), 1);
assert_eq!(buffers.buffer2.inner.len(), 1);
let _ = buffers.purge(Duration::from_secs(5).into());
assert_eq!(buffers.buffer1.inner.len(), 0);
assert_eq!(buffers.buffer2.inner.len(), 0);
}
#[test]
fn empty_buffers_test() {
alignment_buffers!(
AlignmentBuffers,
buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
);
let mut buffers = AlignmentBuffers::new(
Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
assert!(buffers.get_latest_aligned_data().is_none());
}
#[test]
fn horizon_and_window_alignment_test() {
alignment_buffers!(
AlignmentBuffers,
buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
);
let mut buffers = AlignmentBuffers::new(
Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
let mut msg1 = CuMsg::new(Some(1));
msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
buffers.buffer1.inner.push_back(msg1.clone());
buffers.buffer2.inner.push_back(msg1);
let mut msg2 = CuMsg::new(Some(3));
msg2.metadata.tov = Tov::Time(Duration::from_secs(3).into());
buffers.buffer2.inner.push_back(msg2);
let mut msg3 = CuMsg::new(Some(4));
msg3.metadata.tov = Tov::Time(Duration::from_secs(4).into());
buffers.buffer1.inner.push_back(msg3.clone());
buffers.buffer2.inner.push_back(msg3);
let now = Duration::from_secs(7).into();
buffers.purge(now);
if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
let collected1: Vec<_> = iter1.collect();
let collected2: Vec<_> = iter2.collect();
assert_eq!(collected1.len(), 1);
assert_eq!(collected2.len(), 2);
assert_eq!(collected1[0].payload(), Some(&4));
assert_eq!(collected2[0].payload(), Some(&3));
assert_eq!(collected2[1].payload(), Some(&4));
} else {
panic!("Expected aligned data, but got None");
}
assert_eq!(buffers.buffer1.inner.len(), 1);
assert_eq!(buffers.buffer2.inner.len(), 2);
}
}