clone_stream/
ring_queue.rs

1use std::collections::BTreeMap;
2
3use log::trace;
4
5/// A ring buffer queue that wraps around at a maximum capacity.
6#[derive(Debug)]
7pub(crate) struct RingQueue<T>
8where
9    T: Clone,
10{
11    pub(crate) items: BTreeMap<usize, T>,
12    pub(crate) oldest: Option<usize>,
13    pub(crate) newest: Option<usize>,
14    capacity: usize,
15}
16
17impl<T> RingQueue<T>
18where
19    T: Clone,
20{
21    pub fn new(capacity: usize) -> Self {
22        Self {
23            items: BTreeMap::new(),
24            oldest: None,
25            newest: None,
26            capacity,
27        }
28    }
29
30    pub fn push(&mut self, item: T) {
31        if self.capacity == 0 {
32            return;
33        }
34
35        // If queue is at capacity, remove oldest item first
36        if self.items.len() >= self.capacity
37            && let Some(oldest) = self.oldest
38        {
39            self.items.remove(&oldest);
40            self.oldest = self.next_ring_index(oldest);
41        }
42
43        if let Some(newest) = self.newest {
44            let next_index = (newest + 1) % self.capacity;
45            self.items.insert(next_index, item);
46            self.newest = Some(next_index);
47
48            // Update oldest if this is the first item after being empty
49            if self.oldest.is_none() {
50                self.oldest = Some(next_index);
51            }
52        } else {
53            // First item
54            self.newest = Some(0);
55            self.oldest = Some(0);
56            self.items.insert(0, item);
57        }
58    }
59
60    pub(crate) fn remove(&mut self, index: usize) -> Option<T> {
61        if self.capacity == 0 {
62            return None;
63        }
64        let removed = self.items.remove(&index);
65        if Some(index) == self.oldest {
66            self.oldest = self.next_ring_index(index);
67        }
68        if Some(index) == self.newest {
69            if self.oldest == Some(index) {
70                self.newest = None;
71            } else {
72                self.newest = self.prev_ring_index(index);
73            }
74        }
75        removed
76    }
77
78    pub fn pop_oldest(&mut self) -> Option<T> {
79        if self.capacity == 0 {
80            return None;
81        }
82        if let Some(oldest) = self.oldest
83            && let Some(item) = self.items.remove(&oldest)
84        {
85            if self.items.is_empty() {
86                self.oldest = None;
87                self.newest = None;
88            } else {
89                self.oldest = self.next_ring_index(oldest);
90            }
91            return Some(item);
92        }
93        None
94    }
95
96    pub fn is_empty(&self) -> bool {
97        self.items.is_empty()
98    }
99
100    pub fn oldest_index(&self) -> Option<usize> {
101        if self.is_empty() { None } else { self.oldest }
102    }
103
104    pub(crate) fn clear(&mut self) {
105        self.items.clear();
106        self.oldest = None;
107        self.newest = None;
108    }
109
110    pub fn get(&self, index: usize) -> Option<&T> {
111        self.items.get(&index)
112    }
113
114    /// Checks if an index is within the valid range of the ring
115    /// buffer.boundary.
116    fn is_valid_index(&self, index: usize) -> bool {
117        if let (Some(oldest), Some(newest)) = (self.oldest, self.newest) {
118            (oldest <= newest && index >= oldest && index <= newest)
119                || (oldest > newest && (index >= oldest || index <= newest))
120        } else {
121            false
122        }
123    }
124
125    /// Calculates the logical distance from one index to another in ring buffer
126    /// order.
127    fn ring_distance(&self, from: usize, to: usize) -> Option<usize> {
128        if self.is_valid_index(from) && self.is_valid_index(to) {
129            let (oldest, newest) = (self.oldest?, self.newest?);
130
131            if oldest <= newest {
132                if to >= from { Some(to - from) } else { None }
133            } else {
134                // Wraparound case
135                let distance = (to + self.capacity - from) % self.capacity;
136                Some(distance)
137            }
138        } else {
139            None
140        }
141    }
142
143    fn next_ring_index(&self, from: usize) -> Option<usize> {
144        self.items
145            .range((from + 1)..)
146            .chain(self.items.range(..from))
147            .next()
148            .map(|(k, _)| *k)
149    }
150
151    fn prev_ring_index(&self, from: usize) -> Option<usize> {
152        self.items
153            .range(..from)
154            .chain(self.items.range((from + 1)..))
155            .next_back()
156            .map(|(k, _)| *k)
157    }
158
159    pub(crate) fn is_newer_than(&self, maybe_newer: usize, current: usize) -> bool {
160        self.ring_distance(current, maybe_newer)
161            .is_some_and(|distance| distance > 0)
162    }
163
164    /// Returns the first valid index newer than `current_index`, or None if no
165    /// such index exists.
166    pub(crate) fn find_next_newer_index(&self, current_index: usize) -> Option<usize> {
167        let (oldest, newest) = (self.oldest?, self.newest?);
168        trace!("Finding next newer index after {current_index}, oldest={oldest}, newest={newest}");
169        trace!("Current queue has length {:?}", self.items.len());
170        // Check consecutive index first
171        let next_consecutive = (current_index + 1) % self.capacity;
172
173        trace!("Next consecutive index is {next_consecutive}");
174        if self.items.contains_key(&next_consecutive)
175            && self.is_newer_than(next_consecutive, current_index)
176        {
177            return Some(next_consecutive);
178        }
179
180        self.ring_indices_from(oldest)
181            .take_while(|&idx| idx != newest)
182            .find(|&idx| self.is_newer_than(idx, current_index))
183            .or_else(|| {
184                // Check newest index last
185                self.is_newer_than(newest, current_index).then_some(newest)
186            })
187    }
188
189    /// Generate an iterator of valid indices starting from a given index in
190    /// ring order
191    fn ring_indices_from(&self, start: usize) -> impl Iterator<Item = usize> + '_ {
192        (0..self.capacity)
193            .map(move |offset| (start + offset) % self.capacity)
194            .filter(|&idx| self.items.contains_key(&idx))
195    }
196}
197
198pub struct RingQueueIter<'a, T>
199where
200    T: Clone,
201{
202    queue: &'a RingQueue<T>,
203    current_index: Option<usize>,
204    remaining_items: usize,
205}
206
207impl<'a, T> RingQueueIter<'a, T>
208where
209    T: Clone,
210{
211    fn new(queue: &'a RingQueue<T>) -> Self {
212        Self {
213            queue,
214            current_index: queue.oldest,
215            remaining_items: queue.items.len(),
216        }
217    }
218}
219
220impl<'a, T> Iterator for RingQueueIter<'a, T>
221where
222    T: Clone,
223{
224    type Item = (usize, &'a T);
225
226    fn next(&mut self) -> Option<Self::Item> {
227        if self.remaining_items == 0 {
228            return None;
229        }
230
231        if let Some(index) = self.current_index
232            && let Some(item) = self.queue.items.get(&index)
233        {
234            self.remaining_items -= 1;
235
236            self.current_index = if self.remaining_items > 0 {
237                self.queue.next_ring_index(index)
238            } else {
239                None
240            };
241
242            return Some((index, item));
243        }
244
245        None
246    }
247}
248
249impl<'a, T> IntoIterator for &'a RingQueue<T>
250where
251    T: Clone,
252{
253    type Item = (usize, &'a T);
254    type IntoIter = RingQueueIter<'a, T>;
255
256    fn into_iter(self) -> Self::IntoIter {
257        RingQueueIter::new(self)
258    }
259}
260
261impl<T> Extend<T> for RingQueue<T>
262where
263    T: Clone,
264{
265    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
266        for item in iter {
267            self.push(item);
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_wraparound_eviction() {
278        let mut queue = RingQueue::new(3);
279        
280        queue.extend(["a", "b", "c", "d"]);
281        
282        assert_eq!(queue.oldest, Some(1), "Oldest should advance after eviction");
283        assert_eq!(queue.newest, Some(0), "Newest should wrap to index 0");
284        assert_eq!(queue.get(0), Some(&"d"), "New item at wrapped index");
285    }
286
287    #[test]
288    fn test_ring_iteration_order() {
289        let mut queue = RingQueue::new(3);
290        
291        queue.extend(["a", "b", "c", "d"]);
292        
293        let items: Vec<_> = queue.into_iter().map(|(_, item)| *item).collect();
294        assert_eq!(items, vec!["b", "c", "d"], "Should iterate from oldest to newest");
295    }
296
297    #[test]
298    fn test_find_next_newer_index() {
299        let mut queue = RingQueue::new(4);
300        
301        queue.extend(["a", "b", "c", "d", "e"]);
302        
303        assert_eq!(queue.find_next_newer_index(1), Some(2), "Should find next newer after oldest");
304        assert_eq!(queue.find_next_newer_index(2), Some(3), "Should find next in sequence");
305        assert_eq!(queue.find_next_newer_index(3), Some(0), "Should wrap to newest");
306    }
307
308    #[test]
309    fn test_is_newer_than_with_wraparound() {
310        let mut queue = RingQueue::new(4);
311        
312        queue.extend(["a", "b", "c", "d", "e"]);
313        
314        assert!(queue.is_newer_than(0, 3), "Wrapped newest should be newer than previous");
315        assert!(queue.is_newer_than(2, 1), "Index 2 should be newer than oldest index 1");
316        assert!(queue.is_newer_than(3, 2), "Index 3 should be newer than index 2");
317    }
318
319    #[test]
320    fn test_ring_distance() {
321        let mut queue = RingQueue::new(4);
322        
323        queue.extend(["a", "b", "c", "d", "e"]);
324        
325        assert_eq!(queue.ring_distance(1, 2), Some(1), "Adjacent distance");
326        assert_eq!(queue.ring_distance(3, 0), Some(1), "Wraparound distance");
327        assert_eq!(queue.ring_distance(0, 1), Some(1), "Full circle distance");
328    }
329}