1use circular_buffer::CircularBuffer;
2use cu29::prelude::*;
3
4pub struct TimeboundCircularBuffer<const S: usize, P, M>
6where
7 P: CuMsgPayload,
8 M: Metadata,
9{
10 pub inner: CircularBuffer<S, CuStampedData<P, M>>,
11}
12
13#[allow(dead_code)]
14fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
15 match tov {
16 Tov::Time(time) => Some(*time),
17 Tov::Range(range) => Some(range.start), Tov::None => None,
19 }
20}
21
22fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
23 match tov {
24 Tov::Time(time) => Some(*time),
25 Tov::Range(range) => Some(range.end), Tov::None => None,
27 }
28}
29
30impl<const S: usize, P> Default for TimeboundCircularBuffer<S, P, CuMsgMetadata>
31where
32 P: CuMsgPayload,
33{
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl<const S: usize, P> TimeboundCircularBuffer<S, P, CuMsgMetadata>
40where
41 P: CuMsgPayload,
42{
43 pub fn new() -> Self {
44 TimeboundCircularBuffer {
45 inner: CircularBuffer::<S, CuStampedData<P, CuMsgMetadata>>::new(),
47 }
48 }
49
50 pub fn iter_window(
53 &self,
54 start_time: CuTime,
55 end_time: CuTime,
56 ) -> impl Iterator<Item = &CuStampedData<P, CuMsgMetadata>> {
57 self.inner.iter().filter(move |msg| match msg.tov {
58 Tov::Time(time) => time >= start_time && time <= end_time,
59 Tov::Range(range) => range.start >= start_time && range.end <= end_time,
60 _ => false,
61 })
62 }
63
64 pub fn purge(&mut self, time_horizon: CuTime) {
66 let drain_end = self
68 .inner
69 .iter()
70 .position(|msg| match msg.tov {
71 Tov::Time(time) => time >= time_horizon,
72 Tov::Range(range) => range.end >= time_horizon,
73 _ => false,
74 })
75 .unwrap_or(self.inner.len()); self.inner.drain(..drain_end);
79 }
80
81 pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
83 self.inner
84 .iter()
85 .map(|msg| extract_tov_time_right(&msg.tov))
86 .try_fold(None, |acc, time| {
87 let time = time.ok_or_else(|| {
88 CuError::from("Trying to align temporal data with no time information")
89 })?;
90 Ok(Some(
91 acc.map_or(time, |current_max: CuTime| current_max.max(time)),
92 ))
93 })
94 }
95
96 pub fn push(&mut self, msg: CuStampedData<P, CuMsgMetadata>) {
98 self.inner.push_back(msg);
99 }
100}
101
102#[macro_export]
103macro_rules! alignment_buffers {
104 ($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuStampedData<$payload:ty, CuMsgMetadata>>),*) => {
105 struct $struct_name {
106 target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration, $(pub $name: $crate::buffers::TimeboundCircularBuffer<$size, $payload, CuMsgMetadata>),*
109 }
110
111 impl $struct_name {
112 pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
113 Self {
114 target_alignment_window,
115 stale_data_horizon,
116 $($name: $crate::buffers::TimeboundCircularBuffer::<$size, $payload, CuMsgMetadata>::new()),*
117 }
118 }
119
120 #[allow(dead_code)]
122 pub fn purge(&mut self, now: cu29::clock::CuTime) {
123 use cu29::prelude::SaturatingSub;
124 let horizon_time = now.saturating_sub(self.stale_data_horizon);
125 $(self.$name.purge(horizon_time);)*
127 }
128
129 #[allow(dead_code)]
131 pub fn get_latest_aligned_data(
132 &mut self,
133 ) -> Option<($(impl Iterator<Item = &cu29::cutask::CuStampedData<$payload, CuMsgMetadata>>),*)> {
134 use cu29::prelude::SaturatingSub;
135 let most_recent_time = [
138 $(self.$name.most_recent_time().unwrap_or(None)),*
139 ]
140 .iter()
141 .filter_map(|&time| time)
142 .min();
143
144 most_recent_time?;
146
147 let most_recent_time = most_recent_time.unwrap();
148
149 let time_to_get_complete_window =
150 most_recent_time.saturating_sub(self.target_alignment_window);
151 Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
152 }
153 }
154 };
155}
156
157pub use alignment_buffers;
158
159#[cfg(test)]
160mod tests {
161 use cu29::clock::Tov;
162 use cu29::cutask::*;
163 use std::time::Duration;
164
165 #[test]
166 fn simple_init_test() {
167 alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u64, CuMsgMetadata>>);
168
169 let buffers =
170 AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
171 assert_eq!(buffers.buffer1.inner.capacity(), 10);
172 assert_eq!(buffers.buffer2.inner.capacity(), 12);
173 }
174
175 #[test]
176 fn purge_test() {
177 alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>);
178
179 let mut buffers =
180 AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
181
182 let mut msg1 = CuStampedData::new(Some(1));
183 msg1.tov = Tov::Time(Duration::from_secs(1).into());
184 buffers.buffer1.inner.push_back(msg1.clone());
185 buffers.buffer2.inner.push_back(msg1);
186 buffers.purge(Duration::from_secs(2).into());
188 assert_eq!(buffers.buffer1.inner.len(), 1);
189 assert_eq!(buffers.buffer2.inner.len(), 1);
190 buffers.purge(Duration::from_secs(5).into());
192 assert_eq!(buffers.buffer1.inner.len(), 0);
193 assert_eq!(buffers.buffer2.inner.len(), 0);
194 }
195
196 #[test]
197 fn empty_buffers_test() {
198 alignment_buffers!(
199 AlignmentBuffers,
200 buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
201 buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
202 );
203
204 let mut buffers = AlignmentBuffers::new(
205 Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
208
209 assert!(buffers.get_latest_aligned_data().is_none());
211 }
212
213 #[test]
214 fn horizon_and_window_alignment_test() {
215 alignment_buffers!(
216 AlignmentBuffers,
217 buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
218 buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
219 );
220
221 let mut buffers = AlignmentBuffers::new(
222 Duration::from_secs(2).into(), Duration::from_secs(5).into(), );
225
226 let mut msg1 = CuStampedData::new(Some(1));
228 msg1.tov = Tov::Time(Duration::from_secs(1).into());
229 buffers.buffer1.inner.push_back(msg1.clone());
230 buffers.buffer2.inner.push_back(msg1);
231
232 let mut msg2 = CuStampedData::new(Some(3));
233 msg2.tov = Tov::Time(Duration::from_secs(3).into());
234 buffers.buffer2.inner.push_back(msg2);
235
236 let mut msg3 = CuStampedData::new(Some(4));
237 msg3.tov = Tov::Time(Duration::from_secs(4).into());
238 buffers.buffer1.inner.push_back(msg3.clone());
239 buffers.buffer2.inner.push_back(msg3);
240
241 let now = Duration::from_secs(7).into();
243 buffers.purge(now);
245 if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
246 let collected1: Vec<_> = iter1.collect();
247 let collected2: Vec<_> = iter2.collect();
248
249 assert_eq!(collected1.len(), 1);
251 assert_eq!(collected2.len(), 2);
252
253 assert_eq!(collected1[0].payload(), Some(&4));
254 assert_eq!(collected2[0].payload(), Some(&3));
255 assert_eq!(collected2[1].payload(), Some(&4));
256 } else {
257 panic!("Expected aligned data, but got None");
258 }
259
260 assert_eq!(buffers.buffer1.inner.len(), 1);
262 assert_eq!(buffers.buffer2.inner.len(), 2);
263 }
264}