1use circular_buffer::CircularBuffer;
2use cu29::clock::{CuTime, Tov};
3use cu29::cutask::{CuMsg, CuMsgPayload};
4use cu29::{CuError, CuResult};
5
6pub struct TimeboundCircularBuffer<const S: usize, P>
8where
9 P: CuMsgPayload,
10{
11 pub inner: CircularBuffer<S, CuMsg<P>>,
12}
13
14#[allow(dead_code)]
15fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
16 match tov {
17 Tov::Time(time) => Some(*time),
18 Tov::Range(range) => Some(range.start), Tov::None => None,
20 }
21}
22
23fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
24 match tov {
25 Tov::Time(time) => Some(*time),
26 Tov::Range(range) => Some(range.end), Tov::None => None,
28 }
29}
30
31impl<const S: usize, P> Default for TimeboundCircularBuffer<S, P>
32where
33 P: CuMsgPayload,
34{
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl<const S: usize, P> TimeboundCircularBuffer<S, P>
41where
42 P: CuMsgPayload,
43{
44 pub fn new() -> Self {
45 TimeboundCircularBuffer {
46 inner: CircularBuffer::<S, CuMsg<P>>::new(),
48 }
49 }
50
51 pub fn iter_window(
54 &self,
55 start_time: CuTime,
56 end_time: CuTime,
57 ) -> impl Iterator<Item = &CuMsg<P>> {
58 self.inner.iter().filter(move |msg| match msg.metadata.tov {
59 Tov::Time(time) => time >= start_time && time <= end_time,
60 Tov::Range(range) => range.start >= start_time && range.end <= end_time,
61 _ => false,
62 })
63 }
64
65 pub fn purge(&mut self, time_horizon: CuTime) {
67 let drain_end = self
69 .inner
70 .iter()
71 .position(|msg| match msg.metadata.tov {
72 Tov::Time(time) => time >= time_horizon,
73 Tov::Range(range) => range.end >= time_horizon,
74 _ => false,
75 })
76 .unwrap_or(self.inner.len()); self.inner.drain(..drain_end);
80 }
81
82 pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
84 self.inner
85 .iter()
86 .map(|msg| extract_tov_time_right(&msg.metadata.tov))
87 .try_fold(None, |acc, time| {
88 let time = time.ok_or_else(|| {
89 CuError::from("Trying to align temporal data with no time information")
90 })?;
91 Ok(Some(
92 acc.map_or(time, |current_max: CuTime| current_max.max(time)),
93 ))
94 })
95 }
96
97 pub fn push(&mut self, msg: CuMsg<P>) {
99 self.inner.push_back(msg);
100 }
101}
102
103#[macro_export]
104macro_rules! alignment_buffers {
105 ($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuMsg<$payload:ty>>),*) => {
106 struct $struct_name {
107 target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration, $(pub $name: $crate::buffers::TimeboundCircularBuffer<$size, $payload>),*
110 }
111
112 impl $struct_name {
113 pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
114 Self {
115 target_alignment_window,
116 stale_data_horizon,
117 $($name: $crate::buffers::TimeboundCircularBuffer::<$size, $payload>::new()),*
118 }
119 }
120
121 #[allow(dead_code)]
123 pub fn purge(&mut self, now: cu29::clock::CuTime) {
124 let horizon_time = now - self.stale_data_horizon;
125 $(self.$name.purge(horizon_time);)*
127 }
128
129 #[allow(dead_code)]
131 pub fn get_latest_aligned_data(
132 &mut self,
133 ) -> Option<($(impl Iterator<Item = &cu29::cutask::CuMsg<$payload>>),*)> {
134 let most_recent_time = [
137 $(self.$name.most_recent_time().unwrap_or(None)),*
138 ]
139 .iter()
140 .filter_map(|&time| time)
141 .min();
142
143 most_recent_time?;
145
146 let most_recent_time = most_recent_time.unwrap();
147
148 let time_to_get_complete_window = most_recent_time - self.target_alignment_window;
149 Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
150 }
151 }
152 };
153}
154
155pub use alignment_buffers;
156
157#[cfg(test)]
158mod tests {
159 use cu29::clock::Tov;
160 use cu29::cutask::CuMsg;
161 use std::time::Duration;
162
163 #[test]
164 fn simple_init_test() {
165 alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u64>>);
166
167 let buffers =
168 AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
169 assert_eq!(buffers.buffer1.inner.capacity(), 10);
170 assert_eq!(buffers.buffer2.inner.capacity(), 12);
171 }
172
173 #[test]
174 fn purge_test() {
175 alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>);
176
177 let mut buffers =
178 AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
179
180 let mut msg1 = CuMsg::new(Some(1));
181 msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
182 buffers.buffer1.inner.push_back(msg1.clone());
183 buffers.buffer2.inner.push_back(msg1);
184 buffers.purge(Duration::from_secs(2).into());
186 assert_eq!(buffers.buffer1.inner.len(), 1);
187 assert_eq!(buffers.buffer2.inner.len(), 1);
188 buffers.purge(Duration::from_secs(5).into());
190 assert_eq!(buffers.buffer1.inner.len(), 0);
191 assert_eq!(buffers.buffer2.inner.len(), 0);
192 }
193
194 #[test]
195 fn empty_buffers_test() {
196 alignment_buffers!(
197 AlignmentBuffers,
198 buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
199 buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
200 );
201
202 let mut buffers = AlignmentBuffers::new(
203 Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
206
207 assert!(buffers.get_latest_aligned_data().is_none());
209 }
210
211 #[test]
212 fn horizon_and_window_alignment_test() {
213 alignment_buffers!(
214 AlignmentBuffers,
215 buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
216 buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
217 );
218
219 let mut buffers = AlignmentBuffers::new(
220 Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
223
224 let mut msg1 = CuMsg::new(Some(1));
226 msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
227 buffers.buffer1.inner.push_back(msg1.clone());
228 buffers.buffer2.inner.push_back(msg1);
229
230 let mut msg2 = CuMsg::new(Some(3));
231 msg2.metadata.tov = Tov::Time(Duration::from_secs(3).into());
232 buffers.buffer2.inner.push_back(msg2);
233
234 let mut msg3 = CuMsg::new(Some(4));
235 msg3.metadata.tov = Tov::Time(Duration::from_secs(4).into());
236 buffers.buffer1.inner.push_back(msg3.clone());
237 buffers.buffer2.inner.push_back(msg3);
238
239 let now = Duration::from_secs(7).into();
241 buffers.purge(now);
243 if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
244 let collected1: Vec<_> = iter1.collect();
245 let collected2: Vec<_> = iter2.collect();
246
247 assert_eq!(collected1.len(), 1);
249 assert_eq!(collected2.len(), 2);
250
251 assert_eq!(collected1[0].payload(), Some(&4));
252 assert_eq!(collected2[0].payload(), Some(&3));
253 assert_eq!(collected2[1].payload(), Some(&4));
254 } else {
255 panic!("Expected aligned data, but got None");
256 }
257
258 assert_eq!(buffers.buffer1.inner.len(), 1);
260 assert_eq!(buffers.buffer2.inner.len(), 2);
261 }
262}