cu_aligner/
buffers.rs

1use circular_buffer::CircularBuffer;
2use cu29::prelude::*;
3
4/// An augmented circular buffer that allows for time-based operations.
5pub struct TimeboundCircularBuffer<const S: usize, P, M>
6where
7    P: CuMsgPayload,
8    M: Metadata,
9{
10    pub inner: CircularBuffer<S, CuStampedData<P, M>>,
11}
12
13#[allow(dead_code)]
14fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
15    match tov {
16        Tov::Time(time) => Some(*time),
17        Tov::Range(range) => Some(range.start), // Use the start of the range for alignment
18        Tov::None => None,
19    }
20}
21
22fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
23    match tov {
24        Tov::Time(time) => Some(*time),
25        Tov::Range(range) => Some(range.end), // Use the end of the range for alignment
26        Tov::None => None,
27    }
28}
29
30impl<const S: usize, P> Default for TimeboundCircularBuffer<S, P, CuMsgMetadata>
31where
32    P: CuMsgPayload,
33{
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl<const S: usize, P> TimeboundCircularBuffer<S, P, CuMsgMetadata>
40where
41    P: CuMsgPayload,
42{
43    pub fn new() -> Self {
44        TimeboundCircularBuffer {
45            // It is assumed to be sorted by time with non overlapping ranges if they are Tov::Range
46            inner: CircularBuffer::<S, CuStampedData<P, CuMsgMetadata>>::new(),
47        }
48    }
49
50    /// Gets a slice of messages that fall within the given time range.
51    /// In case of a Tov::Range, the message is included if its start and end time fall within the range.
52    pub fn iter_window(
53        &self,
54        start_time: CuTime,
55        end_time: CuTime,
56    ) -> impl Iterator<Item = &CuStampedData<P, CuMsgMetadata>> {
57        self.inner.iter().filter(move |msg| match msg.tov {
58            Tov::Time(time) => time >= start_time && time <= end_time,
59            Tov::Range(range) => range.start >= start_time && range.end <= end_time,
60            _ => false,
61        })
62    }
63
64    /// Remove all the messages that are older than the given time horizon.
65    pub fn purge(&mut self, time_horizon: CuTime) {
66        // Find the index of the first element that should be retained
67        let drain_end = self
68            .inner
69            .iter()
70            .position(|msg| match msg.tov {
71                Tov::Time(time) => time >= time_horizon,
72                Tov::Range(range) => range.end >= time_horizon,
73                _ => false,
74            })
75            .unwrap_or(self.inner.len()); // If none match, drain the entire buffer
76
77        // Drain all elements before the `drain_end` index
78        self.inner.drain(..drain_end);
79    }
80
81    /// Get the most recent time of the messages in the buffer.
82    pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
83        self.inner
84            .iter()
85            .map(|msg| extract_tov_time_right(&msg.tov))
86            .try_fold(None, |acc, time| {
87                let time = time.ok_or_else(|| {
88                    CuError::from("Trying to align temporal data with no time information")
89                })?;
90                Ok(Some(
91                    acc.map_or(time, |current_max: CuTime| current_max.max(time)),
92                ))
93            })
94    }
95
96    /// Push a message into the buffer.
97    pub fn push(&mut self, msg: CuStampedData<P, CuMsgMetadata>) {
98        self.inner.push_back(msg);
99    }
100}
101
102#[macro_export]
103macro_rules! alignment_buffers {
104    ($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuStampedData<$payload:ty, CuMsgMetadata>>),*) => {
105        struct $struct_name {
106            target_alignment_window: cu29::clock::CuDuration, // size of the most recent data window to align
107            stale_data_horizon: cu29::clock::CuDuration,  // time horizon for purging stale data
108            $(pub $name: $crate::buffers::TimeboundCircularBuffer<$size, $payload, CuMsgMetadata>),*
109        }
110
111        impl $struct_name {
112            pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
113                Self {
114                    target_alignment_window,
115                    stale_data_horizon,
116                    $($name: $crate::buffers::TimeboundCircularBuffer::<$size, $payload, CuMsgMetadata>::new()),*
117                }
118            }
119
120            /// Call this to be sure we discard the old/ non relevant data
121            #[allow(dead_code)]
122            pub fn purge(&mut self, now: cu29::clock::CuTime) {
123                use cu29::prelude::SaturatingSub;
124                let horizon_time = now.saturating_sub(self.stale_data_horizon);
125                // purge all the stale data from the TimeboundCircularBuffers first
126                $(self.$name.purge(horizon_time);)*
127            }
128
129            /// Get the most recent set of aligned data from all the buffers matching the constraints set at construction.
130            #[allow(dead_code)]
131            pub fn get_latest_aligned_data(
132                &mut self,
133            ) -> Option<($(impl Iterator<Item = &cu29::cutask::CuStampedData<$payload, CuMsgMetadata>>),*)> {
134                use cu29::prelude::SaturatingSub;
135                // Now find the min of the max of the last time for all buffers
136                // meaning the most recent time at which all buffers have data
137                let most_recent_time = [
138                    $(self.$name.most_recent_time().unwrap_or(None)),*
139                ]
140                .iter()
141                .filter_map(|&time| time)
142                .min();
143
144                // If there is no data in any of the buffers, return early
145                most_recent_time?;
146
147                let most_recent_time = most_recent_time.unwrap();
148
149                let time_to_get_complete_window =
150                    most_recent_time.saturating_sub(self.target_alignment_window);
151                Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
152            }
153        }
154    };
155}
156
157pub use alignment_buffers;
158
159#[cfg(test)]
160mod tests {
161    use cu29::clock::Tov;
162    use cu29::cutask::*;
163    use std::time::Duration;
164
165    #[test]
166    fn simple_init_test() {
167        alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u64, CuMsgMetadata>>);
168
169        let buffers =
170            AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
171        assert_eq!(buffers.buffer1.inner.capacity(), 10);
172        assert_eq!(buffers.buffer2.inner.capacity(), 12);
173    }
174
175    #[test]
176    fn purge_test() {
177        alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>, buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>);
178
179        let mut buffers =
180            AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
181
182        let mut msg1 = CuStampedData::new(Some(1));
183        msg1.tov = Tov::Time(Duration::from_secs(1).into());
184        buffers.buffer1.inner.push_back(msg1.clone());
185        buffers.buffer2.inner.push_back(msg1);
186        // within the horizon
187        buffers.purge(Duration::from_secs(2).into());
188        assert_eq!(buffers.buffer1.inner.len(), 1);
189        assert_eq!(buffers.buffer2.inner.len(), 1);
190        // outside the horizon
191        buffers.purge(Duration::from_secs(5).into());
192        assert_eq!(buffers.buffer1.inner.len(), 0);
193        assert_eq!(buffers.buffer2.inner.len(), 0);
194    }
195
196    #[test]
197    fn empty_buffers_test() {
198        alignment_buffers!(
199            AlignmentBuffers,
200            buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
201            buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
202        );
203
204        let mut buffers = AlignmentBuffers::new(
205            Duration::from_secs(2).into(), // 2-second alignment window
206            Duration::from_secs(5).into(), // 5-second stale data horizon
207        );
208
209        // Advance time to 10 seconds
210        assert!(buffers.get_latest_aligned_data().is_none());
211    }
212
213    #[test]
214    fn horizon_and_window_alignment_test() {
215        alignment_buffers!(
216            AlignmentBuffers,
217            buffer1: TimeboundCircularBuffer<10, CuStampedData<u32, CuMsgMetadata>>,
218            buffer2: TimeboundCircularBuffer<12, CuStampedData<u32, CuMsgMetadata>>
219        );
220
221        let mut buffers = AlignmentBuffers::new(
222            Duration::from_secs(2).into(), // 2-second alignment window
223            Duration::from_secs(5).into(), // 5-second stale data horizon
224        );
225
226        // Insert messages with timestamps
227        let mut msg1 = CuStampedData::new(Some(1));
228        msg1.tov = Tov::Time(Duration::from_secs(1).into());
229        buffers.buffer1.inner.push_back(msg1.clone());
230        buffers.buffer2.inner.push_back(msg1);
231
232        let mut msg2 = CuStampedData::new(Some(3));
233        msg2.tov = Tov::Time(Duration::from_secs(3).into());
234        buffers.buffer2.inner.push_back(msg2);
235
236        let mut msg3 = CuStampedData::new(Some(4));
237        msg3.tov = Tov::Time(Duration::from_secs(4).into());
238        buffers.buffer1.inner.push_back(msg3.clone());
239        buffers.buffer2.inner.push_back(msg3);
240
241        // Advance time to 7 seconds; horizon is 7 - 5 = everything 2+ should stay
242        let now = Duration::from_secs(7).into();
243        // Emulate a normal workflow here.
244        buffers.purge(now);
245        if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
246            let collected1: Vec<_> = iter1.collect();
247            let collected2: Vec<_> = iter2.collect();
248
249            // Verify only messages within the alignment window [5, 7] are returned
250            assert_eq!(collected1.len(), 1);
251            assert_eq!(collected2.len(), 2);
252
253            assert_eq!(collected1[0].payload(), Some(&4));
254            assert_eq!(collected2[0].payload(), Some(&3));
255            assert_eq!(collected2[1].payload(), Some(&4));
256        } else {
257            panic!("Expected aligned data, but got None");
258        }
259
260        // Ensure older messages outside the horizon [>2 seconds] are purged
261        assert_eq!(buffers.buffer1.inner.len(), 1);
262        assert_eq!(buffers.buffer2.inner.len(), 2);
263    }
264}