circular_buffer/
drain.rs

1// Copyright © 2023-2025 Andrea Corbellini and contributors
2// SPDX-License-Identifier: BSD-3-Clause
3
4use crate::add_mod;
5use crate::iter::translate_range_bounds;
6use crate::iter::Iter;
7use crate::CircularBuffer;
8use core::fmt;
9use core::iter::FusedIterator;
10use core::marker::PhantomData;
11#[cfg(not(feature = "unstable"))]
12use core::mem::MaybeUninit;
13use core::ops::Range;
14use core::ops::RangeBounds;
15use core::ptr;
16use core::ptr::NonNull;
17
18/// A draining [iterator](core::iter::Iterator) that removes and returns elements from a
19/// `CircularBuffer`.
20///
21/// This struct is created by [`CircularBuffer::drain()`]. See its documentation for more details.
22pub struct Drain<'a, const N: usize, T> {
23    /// This is a pointer and not a reference (`&'a mut CircularBuffer`) because using a reference
24    /// would make `Drain` an invariant over `CircularBuffer`, but instead we want `Drain` to be
25    /// covariant over `CircularBuffer`.
26    ///
27    /// The reason why `Drain` needs to be covariant is that, semantically,
28    /// `CircularBuffer::drain()` should be equivalent to popping all the drained elements from the
29    /// buffer, storing them into a vector, and returning an iterable over the vector.
30    /// Equivalently, `Drain` owns the drained elements, so it would be unnecessarily restrictive
31    /// to make this type invariant over `CircularBuffer`.
32    buf: NonNull<CircularBuffer<N, T>>,
33    /// A backup of the size of the buffer. Necessary because `buf.size` is set to 0 during the
34    /// lifetime of the `Drain` and is restored only during drop.
35    buf_size: usize,
36    /// The range that was requested to drain. Necessary to properly rearrange the buffer memory
37    /// during drop.
38    range: Range<usize>,
39    /// An iterator over the indexes of the elements to return from the draining iterator.
40    /// Initially, `range` and `iter` are set to the same `Range`, but as the draining iterator is
41    /// used (via `Iterator::next()`, or similar), `iter` is mutated, while `range` is preserved.
42    iter: Range<usize>,
43    /// Necessary to bind the lifetime of `CircularBuffer` to `Drain`. Note that this is an `&`
44    /// reference, and not a mutable `&mut` reference: this is to make `Drain` covariant over
45    /// `CircularBuffer`.
46    phantom: PhantomData<&'a T>,
47}
48
49impl<'a, const N: usize, T> Drain<'a, N, T> {
50    pub(crate) fn over_range<R>(buf: &'a mut CircularBuffer<N, T>, range: R) -> Self
51    where
52        R: RangeBounds<usize>,
53    {
54        let (start, end) = translate_range_bounds(buf, range);
55
56        // Iterating over a `Drain` returns items from the buffer, but does actually remove the
57        // item from the buffer right away. Because of that, forgetting a `Drain` (via
58        // `mem::forget`) can potentially leave the `CircularBuffer` in an unsafe state: the same
59        // item may have been returned from the `Drain` iterator, and be part of the
60        // `CircularBuffer` at the same time, which would be unsafe for non-`Copy` types.
61        //
62        // To avoid getting into this unsafe state, the size of the buffer is set to 0 while the
63        // `Drain` is alive, and it's restored when the `Drain` is dropped. Forgetting a `Drain`
64        // will therefore forget all the items in the buffer (even the ones that were not drained).
65        // This ensures maximum safety while keeping the implementation simple and performant
66        // enough.
67        let buf_size = buf.size;
68        buf.size = 0;
69
70        let buf = NonNull::from(buf);
71
72        Self {
73            buf,
74            buf_size,
75            range: start..end,
76            iter: start..end,
77            phantom: PhantomData,
78        }
79    }
80
81    /// Reads an element from the `CircularBuffer`.
82    ///
83    /// # Safety
84    ///
85    /// The `index` must point to an initialized element of the buffer. After this method is used,
86    /// the element at `index` must be considered as uninitialized memory and therefore the `index`
87    /// must not be reused.
88    unsafe fn read(&self, index: usize) -> T {
89        debug_assert!(
90            index < N && index < self.buf_size,
91            "index out-of-bounds for buffer"
92        );
93        debug_assert!(
94            index >= self.range.start && index < self.range.end,
95            "index out-of-bounds for drain range"
96        );
97        debug_assert!(
98            index < self.iter.start || index >= self.iter.end,
99            "attempt to read an item that may be returned by the iterator"
100        );
101        let buf = unsafe { self.buf.as_ref() };
102        let index = add_mod(buf.start, index, N);
103        unsafe { ptr::read(buf.items[index].assume_init_ref()) }
104    }
105
106    fn as_slices(&self) -> (&[T], &[T]) {
107        if N == 0 || self.buf_size == 0 || self.iter.is_empty() {
108            return (&[][..], &[][..]);
109        }
110
111        let buf = unsafe { self.buf.as_ref() };
112
113        debug_assert!(buf.start < N, "start out-of-bounds");
114        debug_assert!(self.buf_size <= N, "size out-of-bounds");
115
116        let start = add_mod(buf.start, self.iter.start, N);
117        let end = add_mod(buf.start, self.iter.end, N);
118
119        let (right, left) = if start < end {
120            (&buf.items[start..end], &[][..])
121        } else {
122            let (left, right) = buf.items.split_at(end);
123            let right = &right[start - end..];
124            (right, left)
125        };
126
127        // SAFETY: The elements in these slices are guaranteed to be initialized
128        #[cfg(feature = "unstable")]
129        unsafe {
130            (right.assume_init_ref(), left.assume_init_ref())
131        }
132        #[cfg(not(feature = "unstable"))]
133        unsafe {
134            (
135                &*(right as *const [MaybeUninit<T>] as *const [T]),
136                &*(left as *const [MaybeUninit<T>] as *const [T]),
137            )
138        }
139    }
140
141    fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
142        if N == 0 || self.buf_size == 0 || self.iter.is_empty() {
143            return (&mut [][..], &mut [][..]);
144        }
145
146        let buf = unsafe { self.buf.as_mut() };
147
148        debug_assert!(buf.start < N, "start out-of-bounds");
149        debug_assert!(self.buf_size <= N, "size out-of-bounds");
150
151        let start = add_mod(buf.start, self.iter.start, N);
152        let end = add_mod(buf.start, self.iter.end, N);
153
154        let (right, left) = if start < end {
155            (&mut buf.items[start..end], &mut [][..])
156        } else {
157            let (left, right) = buf.items.split_at_mut(end);
158            let right = &mut right[start - end..];
159            (right, left)
160        };
161
162        // SAFETY: The elements in these slices are guaranteed to be initialized
163        #[cfg(feature = "unstable")]
164        unsafe {
165            (right.assume_init_mut(), left.assume_init_mut())
166        }
167        #[cfg(not(feature = "unstable"))]
168        unsafe {
169            (
170                &mut *(right as *mut [MaybeUninit<T>] as *mut [T]),
171                &mut *(left as *mut [MaybeUninit<T>] as *mut [T]),
172            )
173        }
174    }
175}
176
177impl<const N: usize, T> Iterator for Drain<'_, N, T> {
178    type Item = T;
179
180    #[inline]
181    fn next(&mut self) -> Option<Self::Item> {
182        // SAFETY: the element at the index is guaranteed to be initialized
183        self.iter.next().map(|index| unsafe { self.read(index) })
184    }
185
186    #[inline]
187    fn size_hint(&self) -> (usize, Option<usize>) {
188        self.iter.size_hint()
189    }
190}
191
192impl<const N: usize, T> ExactSizeIterator for Drain<'_, N, T> {
193    #[inline]
194    fn len(&self) -> usize {
195        self.iter.len()
196    }
197}
198
199impl<const N: usize, T> FusedIterator for Drain<'_, N, T> {}
200
201impl<const N: usize, T> DoubleEndedIterator for Drain<'_, N, T> {
202    fn next_back(&mut self) -> Option<Self::Item> {
203        // SAFETY: the element at the index is guaranteed to be initialized
204        self.iter
205            .next_back()
206            .map(|index| unsafe { self.read(index) })
207    }
208}
209
210impl<const N: usize, T> Drop for Drain<'_, N, T> {
211    fn drop(&mut self) {
212        // Drop the items that were not consumed
213        struct Dropper<'a, T>(&'a mut [T]);
214
215        impl<T> Drop for Dropper<'_, T> {
216            fn drop(&mut self) {
217                // SAFETY: the slice is guaranteed to be valid for read and writes as the `Drain`
218                // holds a mutable reference to the `CircularBuffer` that contains the data
219                // referenced by the slices.
220                unsafe {
221                    ptr::drop_in_place(self.0);
222                }
223            }
224        }
225
226        let (right, left) = self.as_mut_slices();
227
228        let right = Dropper(right);
229        let left = Dropper(left);
230
231        drop(right);
232        drop(left);
233
234        // The drain has left a "hole" of items in the `CircularBuffer` that either got moved out
235        // during iteration, or got dropped earlier. There are 3 possible scenarios for the state
236        // of the `CircularBuffer` at this point:
237        //
238        // 1. The "hole" is at the front of the buffer:
239        //    | hole | remaining items |
240        //
241        // 2. The "hole" is at the back of the buffer:
242        //    | remaining items | hole |
243        //
244        // 3. The "hole" is in the middle of the buffer:
245        //    | remaining items | hole | remaining items |
246        //
247        // Scenario #1 and #2 can be handled by adjusting the start offset and length of the
248        // buffer. Scenario #3 requires moving the remaining items into the "hole" to fill the gap.
249        //
250        // Filling the hole for scenario #3 requires at most a 3-steps. The worst case looks like
251        // this:
252        //
253        //     | back items [part 2/2] | front items | hole | back items [part 1/2] |
254        //                             ^
255        //                             ` start
256        //
257        // The first step to do is to move `back items [part 1/2]` into `hole`, so that the
258        // `CircularBuffer` looks like this:
259        //
260        //     | back items [part 2/2] | front items | back items [part 1/2] | hole |
261        //                             ^
262        //                             ` start
263        //
264        // Then a portion of `back items [part 2/2]` can be copied into the new `hole`. Note that
265        // `back items [part 2/2]` may not fit into `hole`, and so it may be necessary to split it
266        // in two chunks:
267        //
268        //     | hole | back items [part 3/3] | front items | back items [part 1/3] | back items [part 2/3] |
269        //                                    ^
270        //                                    ` start
271        //
272        // Finally the last chunk `back items [part 3/3]` can be moved into that `hole`:
273        //
274        //     | back items [part 3/3] | hole | front items | back items [part 1/3] | back items [part 2/3] |
275        //                                    ^
276        //                                    ` start
277        //
278        // A similar strategy could be applied to move the front items into the hole instead of the
279        // back items. Ideally the implementation should decide whether to move the front items or
280        // the back items depending on which one results in fewer data to be moved; however for now
281        // only the implementation always moves the back items.
282
283        // TODO: optimize for the case where the hole is in the front or the back
284        // TODO: optimize for the case where there are fewer items to move from the front
285
286        // SAFETY: `buf` is a valid pointer because `Drain` holds a mutable reference to it.
287        let buf = unsafe { self.buf.as_mut() };
288        let mut remaining = self.buf_size - self.range.end;
289
290        let items = CircularSlicePtr::new(&mut buf.items).add(buf.start);
291        let mut hole = items.add(self.range.start);
292        let mut backfill = items.add(self.range.end);
293
294        // This loop should run at most 3 times as explained above
295        while remaining > 0 {
296            let copy_len = hole
297                .available_len()
298                .min(backfill.available_len())
299                .min(remaining);
300            // SAFETY: both pointers are properly aligned, and are valid for read and writes.
301            unsafe { ptr::copy(backfill.as_ptr(), hole.as_mut_ptr(), copy_len) };
302
303            hole = hole.add(copy_len);
304            backfill = backfill.add(copy_len);
305            remaining -= copy_len;
306        }
307
308        // Now that the buffer memory contains valid items, the size can be restored
309        buf.size = self.buf_size - self.range.len();
310    }
311}
312
313impl<const N: usize, T> fmt::Debug for Drain<'_, N, T>
314where
315    T: fmt::Debug,
316{
317    #[inline]
318    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319        let (right, left) = self.as_slices();
320        let it = Iter { right, left };
321        it.fmt(f)
322    }
323}
324
325#[derive(Debug)]
326struct CircularSlicePtr<'a, T> {
327    slice_start: *mut T,
328    slice_len: usize,
329    offset: usize,
330    phantom: PhantomData<&'a T>,
331}
332
333impl<'a, T> CircularSlicePtr<'a, T> {
334    fn new(slice: &'a mut [T]) -> Self {
335        Self {
336            slice_start: slice as *mut [T] as *mut T,
337            slice_len: slice.len(),
338            offset: 0,
339            phantom: PhantomData,
340        }
341    }
342
343    fn as_ptr(&self) -> *const T {
344        debug_assert!(self.offset < self.slice_len);
345        // SAFETY: `slice_start` is a valid pointer because it was obtained from a reference that
346        // is still alive; `offset` is within the bounds of the slice, so the resulting pointer is
347        // also valid.
348        unsafe { self.slice_start.add(self.offset) }
349    }
350
351    fn as_mut_ptr(&self) -> *mut T {
352        debug_assert!(self.offset < self.slice_len);
353        // SAFETY: `slice_start` is a valid pointer because it was obtained from a reference that
354        // is still alive; `offset` is within the bounds of the slice, so the resulting pointer is
355        // also valid.
356        unsafe { self.slice_start.add(self.offset) }
357    }
358
359    fn available_len(&self) -> usize {
360        debug_assert!(self.offset < self.slice_len);
361        self.slice_len - self.offset
362    }
363
364    fn add(mut self, increment: usize) -> Self {
365        debug_assert!(self.offset < self.slice_len);
366        debug_assert!(increment <= self.slice_len);
367        self.offset = add_mod(self.offset, increment, self.slice_len);
368        self
369    }
370}
371
372// Need to manually implement `Copy` because `#[derive(Copy)]` requires `T` to implement `Copy`.
373// Also need to manually implement `Clone` because `Copy` requires `Clone`.
374
375impl<T> Copy for CircularSlicePtr<'_, T> {}
376
377impl<T> Clone for CircularSlicePtr<'_, T> {
378    fn clone(&self) -> Self {
379        *self
380    }
381}