cu_aligner/
buffers.rs

1use circular_buffer::CircularBuffer;
2use cu29::clock::{CuTime, Tov};
3use cu29::cutask::{CuMsg, CuMsgPayload};
4use cu29::{CuError, CuResult};
5
6/// An augmented circular buffer that allows for time-based operations.
7pub struct TimeboundCircularBuffer<const S: usize, P>
8where
9    P: CuMsgPayload,
10{
11    pub inner: CircularBuffer<S, CuMsg<P>>,
12}
13
14#[allow(dead_code)]
15fn extract_tov_time_left(tov: &Tov) -> Option<CuTime> {
16    match tov {
17        Tov::Time(time) => Some(*time),
18        Tov::Range(range) => Some(range.start), // Use the start of the range for alignment
19        Tov::None => None,
20    }
21}
22
23fn extract_tov_time_right(tov: &Tov) -> Option<CuTime> {
24    match tov {
25        Tov::Time(time) => Some(*time),
26        Tov::Range(range) => Some(range.end), // Use the end of the range for alignment
27        Tov::None => None,
28    }
29}
30
31impl<const S: usize, P> Default for TimeboundCircularBuffer<S, P>
32where
33    P: CuMsgPayload,
34{
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl<const S: usize, P> TimeboundCircularBuffer<S, P>
41where
42    P: CuMsgPayload,
43{
44    pub fn new() -> Self {
45        TimeboundCircularBuffer {
46            // It is assumed to be sorted by time with non overlapping ranges if they are Tov::Range
47            inner: CircularBuffer::<S, CuMsg<P>>::new(),
48        }
49    }
50
51    /// Gets a slice of messages that fall within the given time range.
52    /// In case of a Tov::Range, the message is included if its start and end time fall within the range.
53    pub fn iter_window(
54        &self,
55        start_time: CuTime,
56        end_time: CuTime,
57    ) -> impl Iterator<Item = &CuMsg<P>> {
58        self.inner.iter().filter(move |msg| match msg.metadata.tov {
59            Tov::Time(time) => time >= start_time && time <= end_time,
60            Tov::Range(range) => range.start >= start_time && range.end <= end_time,
61            _ => false,
62        })
63    }
64
65    /// Remove all the messages that are older than the given time horizon.
66    pub fn purge(&mut self, time_horizon: CuTime) {
67        // Find the index of the first element that should be retained
68        let drain_end = self
69            .inner
70            .iter()
71            .position(|msg| match msg.metadata.tov {
72                Tov::Time(time) => time >= time_horizon,
73                Tov::Range(range) => range.end >= time_horizon,
74                _ => false,
75            })
76            .unwrap_or(self.inner.len()); // If none match, drain the entire buffer
77
78        // Drain all elements before the `drain_end` index
79        self.inner.drain(..drain_end);
80    }
81
82    /// Get the most recent time of the messages in the buffer.
83    pub fn most_recent_time(&self) -> CuResult<Option<CuTime>> {
84        self.inner
85            .iter()
86            .map(|msg| extract_tov_time_right(&msg.metadata.tov))
87            .try_fold(None, |acc, time| {
88                let time = time.ok_or_else(|| {
89                    CuError::from("Trying to align temporal data with no time information")
90                })?;
91                Ok(Some(
92                    acc.map_or(time, |current_max: CuTime| current_max.max(time)),
93                ))
94            })
95    }
96
97    /// Push a message into the buffer.
98    pub fn push(&mut self, msg: CuMsg<P>) {
99        self.inner.push_back(msg);
100    }
101}
102
103#[macro_export]
104macro_rules! alignment_buffers {
105    ($struct_name:ident, $($name:ident: TimeboundCircularBuffer<$size:expr, CuMsg<$payload:ty>>),*) => {
106        struct $struct_name {
107            target_alignment_window: cu29::clock::CuDuration, // size of the most recent data window to align
108            stale_data_horizon: cu29::clock::CuDuration,  // time horizon for purging stale data
109            $(pub $name: $crate::buffers::TimeboundCircularBuffer<$size, $payload>),*
110        }
111
112        impl $struct_name {
113            pub fn new(target_alignment_window: cu29::clock::CuDuration, stale_data_horizon: cu29::clock::CuDuration) -> Self {
114                Self {
115                    target_alignment_window,
116                    stale_data_horizon,
117                    $($name: $crate::buffers::TimeboundCircularBuffer::<$size, $payload>::new()),*
118                }
119            }
120
121            /// Call this to be sure we discard the old/ non relevant data
122            #[allow(dead_code)]
123            pub fn purge(&mut self, now: cu29::clock::CuTime) {
124                let horizon_time = now - self.stale_data_horizon;
125                // purge all the stale data from the TimeboundCircularBuffers first
126                $(self.$name.purge(horizon_time);)*
127            }
128
129            /// Get the most recent set of aligned data from all the buffers matching the constraints set at construction.
130            #[allow(dead_code)]
131            pub fn get_latest_aligned_data(
132                &mut self,
133            ) -> Option<($(impl Iterator<Item = &cu29::cutask::CuMsg<$payload>>),*)> {
134                // Now find the min of the max of the last time for all buffers
135                // meaning the most recent time at which all buffers have data
136                let most_recent_time = [
137                    $(self.$name.most_recent_time().unwrap_or(None)),*
138                ]
139                .iter()
140                .filter_map(|&time| time)
141                .min();
142
143                // If there is no data in any of the buffers, return early
144                most_recent_time?;
145
146                let most_recent_time = most_recent_time.unwrap();
147
148                let time_to_get_complete_window = most_recent_time - self.target_alignment_window;
149                Some(($(self.$name.iter_window(time_to_get_complete_window, most_recent_time)),*))
150            }
151        }
152    };
153}
154
155pub use alignment_buffers;
156
157#[cfg(test)]
158mod tests {
159    use cu29::clock::Tov;
160    use cu29::cutask::CuMsg;
161    use std::time::Duration;
162
163    #[test]
164    fn simple_init_test() {
165        alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u64>>);
166
167        let buffers =
168            AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
169        assert_eq!(buffers.buffer1.inner.capacity(), 10);
170        assert_eq!(buffers.buffer2.inner.capacity(), 12);
171    }
172
173    #[test]
174    fn purge_test() {
175        alignment_buffers!(AlignmentBuffers, buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>, buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>);
176
177        let mut buffers =
178            AlignmentBuffers::new(Duration::from_secs(1).into(), Duration::from_secs(2).into());
179
180        let mut msg1 = CuMsg::new(Some(1));
181        msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
182        buffers.buffer1.inner.push_back(msg1.clone());
183        buffers.buffer2.inner.push_back(msg1);
184        // within the horizon
185        buffers.purge(Duration::from_secs(2).into());
186        assert_eq!(buffers.buffer1.inner.len(), 1);
187        assert_eq!(buffers.buffer2.inner.len(), 1);
188        // outside the horizon
189        buffers.purge(Duration::from_secs(5).into());
190        assert_eq!(buffers.buffer1.inner.len(), 0);
191        assert_eq!(buffers.buffer2.inner.len(), 0);
192    }
193
194    #[test]
195    fn empty_buffers_test() {
196        alignment_buffers!(
197            AlignmentBuffers,
198            buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
199            buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
200        );
201
202        let mut buffers = AlignmentBuffers::new(
203            Duration::from_secs(2).into(), // 2-second alignment window
204            Duration::from_secs(5).into(), // 5-second stale data horizon
205        );
206
207        // Advance time to 10 seconds
208        assert!(buffers.get_latest_aligned_data().is_none());
209    }
210
211    #[test]
212    fn horizon_and_window_alignment_test() {
213        alignment_buffers!(
214            AlignmentBuffers,
215            buffer1: TimeboundCircularBuffer<10, CuMsg<u32>>,
216            buffer2: TimeboundCircularBuffer<12, CuMsg<u32>>
217        );
218
219        let mut buffers = AlignmentBuffers::new(
220            Duration::from_secs(2).into(), // 2-second alignment window
221            Duration::from_secs(5).into(), // 5-second stale data horizon
222        );
223
224        // Insert messages with timestamps
225        let mut msg1 = CuMsg::new(Some(1));
226        msg1.metadata.tov = Tov::Time(Duration::from_secs(1).into());
227        buffers.buffer1.inner.push_back(msg1.clone());
228        buffers.buffer2.inner.push_back(msg1);
229
230        let mut msg2 = CuMsg::new(Some(3));
231        msg2.metadata.tov = Tov::Time(Duration::from_secs(3).into());
232        buffers.buffer2.inner.push_back(msg2);
233
234        let mut msg3 = CuMsg::new(Some(4));
235        msg3.metadata.tov = Tov::Time(Duration::from_secs(4).into());
236        buffers.buffer1.inner.push_back(msg3.clone());
237        buffers.buffer2.inner.push_back(msg3);
238
239        // Advance time to 7 seconds; horizon is 7 - 5 = everything 2+ should stay
240        let now = Duration::from_secs(7).into();
241        // Emulate a normal workflow here.
242        buffers.purge(now);
243        if let Some((iter1, iter2)) = buffers.get_latest_aligned_data() {
244            let collected1: Vec<_> = iter1.collect();
245            let collected2: Vec<_> = iter2.collect();
246
247            // Verify only messages within the alignment window [5, 7] are returned
248            assert_eq!(collected1.len(), 1);
249            assert_eq!(collected2.len(), 2);
250
251            assert_eq!(collected1[0].payload(), Some(&4));
252            assert_eq!(collected2[0].payload(), Some(&3));
253            assert_eq!(collected2[1].payload(), Some(&4));
254        } else {
255            panic!("Expected aligned data, but got None");
256        }
257
258        // Ensure older messages outside the horizon [>2 seconds] are purged
259        assert_eq!(buffers.buffer1.inner.len(), 1);
260        assert_eq!(buffers.buffer2.inner.len(), 2);
261    }
262}