clone_stream/
ring_queue.rs

1use std::collections::BTreeMap;
2
3/// A ring buffer queue that wraps around at a maximum capacity.
4#[derive(Debug)]
5pub(crate) struct RingQueue<T>
6where
7    T: Clone,
8{
9    pub(crate) items: BTreeMap<usize, T>,
10    pub(crate) oldest: Option<usize>,
11    pub(crate) newest: Option<usize>,
12    capacity: usize,
13}
14
15impl<T> RingQueue<T>
16where
17    T: Clone,
18{
19    pub fn new(capacity: usize) -> Self {
20        Self {
21            items: BTreeMap::new(),
22            oldest: None,
23            newest: None,
24            capacity,
25        }
26    }
27
28    pub fn push(&mut self, item: T) {
29        if self.capacity == 0 {
30            return;
31        }
32        if let Some(newest) = self.newest {
33            let next_index = (newest + 1) % self.capacity;
34            if self.items.contains_key(&next_index) {
35                self.items.remove(&next_index);
36                if self.oldest == Some(next_index) {
37                    self.oldest = self.next_ring_index(next_index);
38                }
39            }
40            self.items.insert(next_index, item);
41            self.newest = Some(next_index);
42        } else {
43            self.newest = Some(0);
44            self.oldest = Some(0);
45            self.items.insert(0, item);
46        }
47    }
48
49    pub(crate) fn remove(&mut self, index: usize) -> Option<T> {
50        if self.capacity == 0 {
51            return None;
52        }
53        let removed = self.items.remove(&index);
54        if Some(index) == self.oldest {
55            self.oldest = self.next_ring_index(index);
56        }
57        if Some(index) == self.newest {
58            if self.oldest == Some(index) {
59                self.newest = None;
60            } else {
61                self.newest = self.prev_ring_index(index);
62            }
63        }
64        removed
65    }
66
67    pub fn pop_oldest(&mut self) -> Option<T> {
68        if self.capacity == 0 {
69            return None;
70        }
71        if let Some(oldest) = self.oldest
72            && let Some(item) = self.items.remove(&oldest)
73        {
74            if self.items.is_empty() {
75                self.oldest = None;
76                self.newest = None;
77            } else {
78                self.oldest = self.next_ring_index(oldest);
79            }
80            return Some(item);
81        }
82        None
83    }
84
85    pub fn is_empty(&self) -> bool {
86        self.items.is_empty()
87    }
88
89    pub(crate) fn clear(&mut self) {
90        self.items.clear();
91        self.oldest = None;
92        self.newest = None;
93    }
94
95    pub fn get(&self, index: usize) -> Option<&T> {
96        self.items.get(&index)
97    }
98
99    /// Checks if an index is within the valid range of the ring buffer.boundary.
100    fn is_valid_index(&self, index: usize) -> bool {
101        if let (Some(oldest), Some(newest)) = (self.oldest, self.newest) {
102            (oldest <= newest && index >= oldest && index <= newest)
103                || (oldest > newest && (index >= oldest || index <= newest))
104        } else {
105            false
106        }
107    }
108
109    /// Calculates the logical distance from one index to another in ring buffer order.
110    fn ring_distance(&self, from: usize, to: usize) -> Option<usize> {
111        if self.is_valid_index(from) && self.is_valid_index(to) {
112            let (oldest, newest) = (self.oldest?, self.newest?);
113
114            if oldest <= newest {
115                if to >= from { Some(to - from) } else { None }
116            } else {
117                // Wraparound case
118                let distance = (to + self.capacity - from) % self.capacity;
119                Some(distance)
120            }
121        } else {
122            None
123        }
124    }
125
126    fn next_ring_index(&self, from: usize) -> Option<usize> {
127        self.items
128            .range((from + 1)..)
129            .chain(self.items.range(..from))
130            .next()
131            .map(|(k, _)| *k)
132    }
133
134    fn prev_ring_index(&self, from: usize) -> Option<usize> {
135        self.items
136            .range(..from)
137            .chain(self.items.range((from + 1)..))
138            .next_back()
139            .map(|(k, _)| *k)
140    }
141
142    pub(crate) fn is_newer_than(&self, maybe_newer: usize, current: usize) -> bool {
143        self.ring_distance(current, maybe_newer)
144            .is_some_and(|distance| distance > 0)
145    }
146
147    /// Returns the first valid index newer than `current_index`, or None if no such index exists.
148    pub(crate) fn find_next_newer_index(&self, current_index: usize) -> Option<usize> {
149        let (oldest, newest) = (self.oldest?, self.newest?);
150
151        // Check consecutive index first
152        let next_consecutive = (current_index + 1) % self.capacity;
153        if self.items.contains_key(&next_consecutive)
154            && self.is_newer_than(next_consecutive, current_index)
155        {
156            return Some(next_consecutive);
157        }
158
159        self.ring_indices_from(oldest)
160            .take_while(|&idx| idx != newest)
161            .find(|&idx| self.is_newer_than(idx, current_index))
162            .or_else(|| {
163                // Check newest index last
164                self.is_newer_than(newest, current_index).then_some(newest)
165            })
166    }
167
168    /// Generate an iterator of valid indices starting from a given index in ring order
169    fn ring_indices_from(&self, start: usize) -> impl Iterator<Item = usize> + '_ {
170        (0..self.capacity)
171            .map(move |offset| (start + offset) % self.capacity)
172            .filter(|&idx| self.items.contains_key(&idx))
173    }
174}
175
176pub struct RingQueueIter<'a, T>
177where
178    T: Clone,
179{
180    queue: &'a RingQueue<T>,
181    current_index: Option<usize>,
182    remaining_items: usize,
183}
184
185impl<'a, T> RingQueueIter<'a, T>
186where
187    T: Clone,
188{
189    fn new(queue: &'a RingQueue<T>) -> Self {
190        Self {
191            queue,
192            current_index: queue.oldest,
193            remaining_items: queue.items.len(),
194        }
195    }
196}
197
198impl<'a, T> Iterator for RingQueueIter<'a, T>
199where
200    T: Clone,
201{
202    type Item = (usize, &'a T);
203
204    fn next(&mut self) -> Option<Self::Item> {
205        if self.remaining_items == 0 {
206            return None;
207        }
208
209        if let Some(index) = self.current_index
210            && let Some(item) = self.queue.items.get(&index)
211        {
212            self.remaining_items -= 1;
213
214            self.current_index = if self.remaining_items > 0 {
215                self.queue.next_ring_index(index)
216            } else {
217                None
218            };
219
220            return Some((index, item));
221        }
222
223        None
224    }
225}
226
227impl<'a, T> IntoIterator for &'a RingQueue<T>
228where
229    T: Clone,
230{
231    type Item = (usize, &'a T);
232    type IntoIter = RingQueueIter<'a, T>;
233
234    fn into_iter(self) -> Self::IntoIter {
235        RingQueueIter::new(self)
236    }
237}