ringbuf/traits/
consumer.rs

1use super::{
2    observer::{DelegateObserver, Observer},
3    utils::modulus,
4};
5use crate::utils::{move_uninit_slice, slice_as_uninit_mut, slice_assume_init_mut, slice_assume_init_ref};
6use core::{iter::Chain, mem::MaybeUninit, ptr, slice};
7#[cfg(feature = "std")]
8use std::io::{self, Write};
9
10/// Consumer part of ring buffer.
11pub trait Consumer: Observer {
12    /// Set read index.
13    ///
14    /// # Safety
15    ///
16    /// Index must go only forward, never backward. It is recommended to use [`Self::advance_read_index`] instead.
17    ///
18    /// All slots with index less than `value` must be uninitialized until write index, all slots with index equal or greater - must be initialized.
19    unsafe fn set_read_index(&self, value: usize);
20
21    /// Moves `read` pointer by `count` places forward.
22    ///
23    /// # Safety
24    ///
25    /// First `count` items in occupied memory must be moved out or dropped.
26    ///
27    /// Must not be called concurrently.
28    unsafe fn advance_read_index(&self, count: usize) {
29        unsafe { self.set_read_index((self.read_index() + count) % modulus(self)) };
30    }
31
32    /// Provides a direct access to the ring buffer occupied memory.
33    /// The difference from [`Self::as_slices`] is that this method provides slices of [`MaybeUninit`], so items may be moved out of slices.
34    ///
35    /// Returns a pair of slices of stored items, the second one may be empty.
36    /// Elements with lower indices in slice are older. First slice contains older items that second one.
37    ///
38    /// # Safety
39    ///
40    /// All items are initialized. Elements must be removed starting from the beginning of first slice.
41    /// When all items are removed from the first slice then items must be removed from the beginning of the second slice.
42    ///
43    /// *This method must be followed by [`Self::advance_read_index`] call with the number of items being removed previously as argument.*
44    /// *No other mutating calls allowed before that.*
45    fn occupied_slices(&self) -> (&[MaybeUninit<Self::Item>], &[MaybeUninit<Self::Item>]) {
46        unsafe { self.unsafe_slices(self.read_index(), self.write_index()) }
47    }
48
49    /// Provides a direct mutable access to the ring buffer occupied memory.
50    ///
51    /// Same as [`Self::occupied_slices`].
52    ///
53    /// # Safety
54    ///
55    /// When some item is replaced with uninitialized value then it must not be read anymore.
56    unsafe fn occupied_slices_mut(&mut self) -> (&mut [MaybeUninit<Self::Item>], &mut [MaybeUninit<Self::Item>]) {
57        unsafe { self.unsafe_slices_mut(self.read_index(), self.write_index()) }
58    }
59
60    /// Returns a pair of slices which contain, in order, the contents of the ring buffer.
61    #[inline]
62    fn as_slices(&self) -> (&[Self::Item], &[Self::Item]) {
63        unsafe {
64            let (left, right) = self.occupied_slices();
65            (slice_assume_init_ref(left), slice_assume_init_ref(right))
66        }
67    }
68
69    /// Returns a pair of mutable slices which contain, in order, the contents of the ring buffer.
70    #[inline]
71    fn as_mut_slices(&mut self) -> (&mut [Self::Item], &mut [Self::Item]) {
72        unsafe {
73            let (left, right) = self.occupied_slices_mut();
74            (slice_assume_init_mut(left), slice_assume_init_mut(right))
75        }
76    }
77
78    /// Returns a reference to the eldest item in the ring buffer, if exists.
79    #[inline]
80    fn first(&self) -> Option<&Self::Item> {
81        self.as_slices().0.first()
82    }
83    /// Returns a mutable reference to the eldest item in the ring buffer, if exists.
84    #[inline]
85    fn first_mut(&mut self) -> Option<&mut Self::Item> {
86        self.as_mut_slices().0.first_mut()
87    }
88    /// Returns a reference to the most recent item in the ring buffer, if exists.
89    ///
90    /// *Returned item may not be actually the most recent if there is a concurrent producer activity.*
91    fn last(&self) -> Option<&Self::Item> {
92        let (first, second) = self.as_slices();
93        if second.is_empty() { first.last() } else { second.last() }
94    }
95    /// Returns a mutable reference to the most recent item in the ring buffer, if exists.
96    ///
97    /// *Returned item may not be actually the most recent if there is a concurrent producer activity.*
98    fn last_mut(&mut self) -> Option<&mut Self::Item> {
99        let (first, second) = self.as_mut_slices();
100        if second.is_empty() { first.last_mut() } else { second.last_mut() }
101    }
102
103    /// Removes the eldest item from the ring buffer and returns it.
104    ///
105    /// Returns `None` if the ring buffer is empty.
106    fn try_pop(&mut self) -> Option<Self::Item> {
107        if !self.is_empty() {
108            let elem = unsafe { self.occupied_slices().0.get_unchecked(0).assume_init_read() };
109            unsafe { self.advance_read_index(1) };
110            Some(elem)
111        } else {
112            None
113        }
114    }
115
116    /// Returns the reference to the eldest item without removing it from the buffer.
117    ///
118    /// Returns `None` if the ring buffer is empty.
119    fn try_peek(&self) -> Option<&Self::Item> {
120        if !self.is_empty() {
121            Some(unsafe { self.occupied_slices().0.get_unchecked(0).assume_init_ref() })
122        } else {
123            None
124        }
125    }
126
127    /// Copies items from the ring buffer to an uninit slice without removing them from the ring buffer.
128    ///
129    /// Returns a number of items being copied.
130    fn peek_slice_uninit(&self, elems: &mut [MaybeUninit<Self::Item>]) -> usize {
131        let (left, right) = self.occupied_slices();
132        if elems.len() < left.len() {
133            move_uninit_slice(elems, unsafe { left.get_unchecked(..elems.len()) });
134            elems.len()
135        } else {
136            let (left_elems, elems) = elems.split_at_mut(left.len());
137            move_uninit_slice(left_elems, left);
138            left.len()
139                + if elems.len() < right.len() {
140                    move_uninit_slice(elems, unsafe { right.get_unchecked(..elems.len()) });
141                    elems.len()
142                } else {
143                    move_uninit_slice(unsafe { elems.get_unchecked_mut(..right.len()) }, right);
144                    right.len()
145                }
146        }
147    }
148
149    /// Copies items from the ring buffer to a slice without removing them from the ring buffer.
150    ///
151    /// Returns a number of items being copied.
152    fn peek_slice(&self, elems: &mut [Self::Item]) -> usize
153    where
154        Self::Item: Copy,
155    {
156        self.peek_slice_uninit(unsafe { slice_as_uninit_mut(elems) })
157    }
158
159    /// Removes items from the ring buffer and writes them into an uninit slice.
160    ///
161    /// Returns count of items been removed.
162    fn pop_slice_uninit(&mut self, elems: &mut [MaybeUninit<Self::Item>]) -> usize {
163        let count = self.peek_slice_uninit(elems);
164        unsafe { self.advance_read_index(count) };
165        count
166    }
167
168    /// Removes items from the ring buffer and writes them into a slice.
169    ///
170    /// Returns count of items been removed.
171    fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize
172    where
173        Self::Item: Copy,
174    {
175        self.pop_slice_uninit(unsafe { slice_as_uninit_mut(elems) })
176    }
177
178    /// Returns an iterator that removes items one by one from the ring buffer.
179    fn pop_iter(&mut self) -> PopIter<'_, Self> {
180        PopIter::new(self)
181    }
182
183    /// Returns a front-to-back iterator containing references to items in the ring buffer.
184    ///
185    /// This iterator does not remove items out of the ring buffer.
186    fn iter(&self) -> Iter<'_, Self> {
187        let (left, right) = self.as_slices();
188        left.iter().chain(right.iter())
189    }
190
191    /// Returns a front-to-back iterator that returns mutable references to items in the ring buffer.
192    ///
193    /// This iterator does not remove items out of the ring buffer.
194    fn iter_mut(&mut self) -> IterMut<'_, Self> {
195        let (left, right) = self.as_mut_slices();
196        left.iter_mut().chain(right.iter_mut())
197    }
198
199    /// Removes at most `count` and at least `min(count, Self::len())` items from the buffer and safely drops them.
200    ///
201    /// If there is no concurring producer activity then exactly `min(count, Self::len())` items are removed.
202    ///
203    /// Returns the number of deleted items.
204    ///
205    /// ```
206    /// # extern crate ringbuf;
207    /// # use ringbuf::{LocalRb, storage::Array, traits::*};
208    /// # fn main() {
209    /// let mut rb = LocalRb::<Array<i32, 8>>::default();
210    ///
211    /// assert_eq!(rb.push_iter(0..8), 8);
212    ///
213    /// assert_eq!(rb.skip(4), 4);
214    /// assert_eq!(rb.skip(8), 4);
215    /// assert_eq!(rb.skip(4), 0);
216    /// # }
217    /// ```
218    fn skip(&mut self, count: usize) -> usize {
219        unsafe {
220            let (left, right) = self.occupied_slices_mut();
221            for elem in left.iter_mut().chain(right.iter_mut()).take(count) {
222                ptr::drop_in_place(elem.as_mut_ptr());
223            }
224            let actual_count = usize::min(count, left.len() + right.len());
225            self.advance_read_index(actual_count);
226            actual_count
227        }
228    }
229
230    /// Removes all items from the buffer and safely drops them.
231    ///
232    /// Returns the number of deleted items.
233    fn clear(&mut self) -> usize {
234        unsafe {
235            let (left, right) = self.occupied_slices_mut();
236            for elem in left.iter_mut().chain(right.iter_mut()) {
237                ptr::drop_in_place(elem.as_mut_ptr());
238            }
239            let count = left.len() + right.len();
240            self.advance_read_index(count);
241            count
242        }
243    }
244
245    #[cfg(feature = "std")]
246    /// Removes at most first `count` bytes from the ring buffer and writes them into a [`Write`] instance.
247    /// If `count` is `None` then as much as possible bytes will be written.
248    ///
249    /// Returns:
250    ///
251    /// + `None`: ring buffer is empty or `count` is `0`. In this case `write` isn't called at all.
252    /// + `Some(Ok(n))`: `write` succeeded. `n` is number of bytes been written. `n == 0` means that `write` also returned `0`.
253    /// + `Some(Err(e))`: `write` is failed and `e` is original error. In this case it is guaranteed that no items was written to the writer.
254    ///   To achieve this we write only one contiguous slice at once. So this call may write less than `occupied_len` items even if the writer is ready to get more.
255    fn write_into<S: Write>(&mut self, writer: &mut S, count: Option<usize>) -> Option<io::Result<usize>>
256    where
257        Self: Consumer<Item = u8>,
258    {
259        let (left, _) = self.occupied_slices();
260        let count = usize::min(count.unwrap_or(left.len()), left.len());
261        if count == 0 {
262            return None;
263        }
264        let left_init = unsafe { slice_assume_init_ref(&left[..count]) };
265
266        let write_count = match writer.write(left_init) {
267            Ok(n) => n,
268            Err(e) => return Some(Err(e)),
269        };
270        assert!(write_count <= count);
271        unsafe { self.advance_read_index(write_count) };
272        Some(Ok(write_count))
273    }
274}
275
276/// Owning ring buffer iterator.
277pub struct IntoIter<C: Consumer + ?Sized> {
278    inner: C,
279}
280
281impl<C: Consumer> IntoIter<C> {
282    pub fn new(inner: C) -> Self {
283        Self { inner }
284    }
285    pub fn into_inner(self) -> C {
286        self.inner
287    }
288}
289
290impl<C: Consumer> Iterator for IntoIter<C> {
291    type Item = C::Item;
292
293    #[inline]
294    fn next(&mut self) -> Option<Self::Item> {
295        self.inner.try_pop()
296    }
297    #[inline]
298    fn size_hint(&self) -> (usize, Option<usize>) {
299        (self.inner.occupied_len(), None)
300    }
301}
302
303/// An iterator that removes items from the ring buffer.
304///
305/// Producer will see removed items only when iterator is dropped or [`PopIter::commit`] is called.
306pub struct PopIter<'a, C: Consumer + ?Sized> {
307    inner: &'a C,
308    iter: Chain<slice::Iter<'a, MaybeUninit<C::Item>>, slice::Iter<'a, MaybeUninit<C::Item>>>,
309    count: usize,
310    len: usize,
311}
312
313impl<C: Consumer + ?Sized> Drop for PopIter<'_, C> {
314    fn drop(&mut self) {
315        self.commit();
316    }
317}
318
319impl<'a, C: Consumer + ?Sized> PopIter<'a, C> {
320    /// Create an iterator.
321    pub fn new(inner: &'a mut C) -> Self {
322        let (len, iter) = {
323            let (left, right) = inner.occupied_slices();
324            (left.len() + right.len(), left.iter().chain(right))
325        };
326        Self {
327            inner,
328            iter,
329            count: 0,
330            len,
331        }
332    }
333
334    /// Send information about removed items to the ring buffer.
335    pub fn commit(&mut self) {
336        unsafe { self.inner.advance_read_index(self.count) };
337        self.count = 0;
338    }
339}
340
341impl<C: Consumer> Iterator for PopIter<'_, C> {
342    type Item = C::Item;
343
344    #[inline]
345    fn next(&mut self) -> Option<Self::Item> {
346        self.iter.next().map(|item| {
347            self.count += 1;
348            unsafe { item.assume_init_read() }
349        })
350    }
351    #[inline]
352    fn size_hint(&self) -> (usize, Option<usize>) {
353        let remain = self.len - self.count;
354        (remain, Some(remain))
355    }
356}
357
358impl<C: Consumer> ExactSizeIterator for PopIter<'_, C> {}
359
360/// Iterator over ring buffer contents.
361///
362/// *Please do not rely on actual type, it may change in future.*
363#[allow(type_alias_bounds)]
364pub type Iter<'a, C: Consumer> = Chain<slice::Iter<'a, C::Item>, slice::Iter<'a, C::Item>>;
365
366/// Mutable iterator over ring buffer contents.
367///
368/// *Please do not rely on actual type, it may change in future.*
369#[allow(type_alias_bounds)]
370pub type IterMut<'a, C: Consumer> = Chain<slice::IterMut<'a, C::Item>, slice::IterMut<'a, C::Item>>;
371
372/// Trait used for delegating producer methods.
373pub trait DelegateConsumer: DelegateObserver
374where
375    Self::Base: Consumer,
376{
377}
378impl<D: DelegateConsumer> Consumer for D
379where
380    D::Base: Consumer,
381{
382    #[inline]
383    unsafe fn set_read_index(&self, value: usize) {
384        unsafe { self.base().set_read_index(value) }
385    }
386    #[inline]
387    unsafe fn advance_read_index(&self, count: usize) {
388        unsafe { self.base().advance_read_index(count) }
389    }
390
391    #[inline]
392    fn occupied_slices(&self) -> (&[core::mem::MaybeUninit<Self::Item>], &[core::mem::MaybeUninit<Self::Item>]) {
393        self.base().occupied_slices()
394    }
395
396    #[inline]
397    unsafe fn occupied_slices_mut(&mut self) -> (&mut [core::mem::MaybeUninit<Self::Item>], &mut [core::mem::MaybeUninit<Self::Item>]) {
398        unsafe { self.base_mut().occupied_slices_mut() }
399    }
400
401    #[inline]
402    fn as_slices(&self) -> (&[Self::Item], &[Self::Item]) {
403        self.base().as_slices()
404    }
405
406    #[inline]
407    fn as_mut_slices(&mut self) -> (&mut [Self::Item], &mut [Self::Item]) {
408        self.base_mut().as_mut_slices()
409    }
410
411    #[inline]
412    fn try_pop(&mut self) -> Option<Self::Item> {
413        self.base_mut().try_pop()
414    }
415
416    #[inline]
417    fn pop_slice(&mut self, elems: &mut [Self::Item]) -> usize
418    where
419        Self::Item: Copy,
420    {
421        self.base_mut().pop_slice(elems)
422    }
423
424    #[inline]
425    fn iter(&self) -> Iter<'_, Self> {
426        self.base().iter()
427    }
428
429    #[inline]
430    fn iter_mut(&mut self) -> IterMut<'_, Self> {
431        self.base_mut().iter_mut()
432    }
433
434    #[inline]
435    fn skip(&mut self, count: usize) -> usize {
436        self.base_mut().skip(count)
437    }
438
439    #[inline]
440    fn clear(&mut self) -> usize {
441        self.base_mut().clear()
442    }
443}
444
445macro_rules! impl_consumer_traits {
446    ($type:ident $(< $( $param:tt $( : $first_bound:tt $(+ $next_bound:tt )* )? ),+ >)?) => {
447        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? core::iter::IntoIterator for $type $(< $( $param ),+ >)? where Self: Sized {
448            type Item = <Self as $crate::traits::Observer>::Item;
449            type IntoIter = $crate::traits::consumer::IntoIter<Self>;
450            fn into_iter(self) -> Self::IntoIter {
451                $crate::traits::consumer::IntoIter::new(self)
452            }
453        }
454
455        #[cfg(feature = "std")]
456        impl $(< $( $param $( : $first_bound $(+ $next_bound )* )? ),+ >)? std::io::Read for $type $(< $( $param ),+ >)?
457        where
458            Self: $crate::traits::Consumer<Item = u8>,
459        {
460            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
461                let n = self.pop_slice(buf);
462                if n == 0 {
463                    Err(std::io::ErrorKind::WouldBlock.into())
464                } else {
465                    Ok(n)
466                }
467            }
468        }
469    };
470}
471pub(crate) use impl_consumer_traits;