ringbuf_basedrop/
consumer.rs

1use basedrop::Shared;
2use core::{
3    cmp::{self, min},
4    mem::{self, MaybeUninit},
5    ops::Range,
6    ptr::copy_nonoverlapping,
7    slice,
8    sync::atomic,
9};
10#[cfg(feature = "std")]
11use std::io::{self, Read, Write};
12
13use crate::{producer::Producer, ring_buffer::*};
14
15/// Consumer part of ring buffer.
16pub struct Consumer<T: Send + Sized + 'static> {
17    pub(crate) rb: Shared<RingBuffer<T>>,
18}
19
20impl<T: Send + Sized + 'static> Consumer<T> {
21    /// Returns capacity of the ring buffer.
22    ///
23    /// The capacity of the buffer is constant.
24    pub fn capacity(&self) -> usize {
25        self.rb.capacity()
26    }
27
28    /// Checks if the ring buffer is empty.
29    ///
30    /// *The result may become irrelevant at any time because of concurring activity of the producer.*
31    pub fn is_empty(&self) -> bool {
32        self.rb.is_empty()
33    }
34
35    /// Checks if the ring buffer is full.
36    ///
37    /// The result is relevant until you remove items from the consumer.
38    pub fn is_full(&self) -> bool {
39        self.rb.is_full()
40    }
41
42    /// The length of the data stored in the buffer
43    ///
44    /// Actual length may be equal to or greater than the returned value.
45    pub fn len(&self) -> usize {
46        self.rb.len()
47    }
48
49    /// The remaining space in the buffer.
50    ///
51    /// Actual remaining space may be equal to or less than the returning value.
52    pub fn remaining(&self) -> usize {
53        self.rb.remaining()
54    }
55
56    fn get_ranges(&self) -> (Range<usize>, Range<usize>) {
57        let head = self.rb.head.load(atomic::Ordering::Acquire);
58        let tail = self.rb.tail.load(atomic::Ordering::Acquire);
59        let len = self.rb.data.len();
60
61        match head.cmp(&tail) {
62            cmp::Ordering::Less => (head..tail, 0..0),
63            cmp::Ordering::Greater => (head..len, 0..tail),
64            cmp::Ordering::Equal => (0..0, 0..0),
65        }
66    }
67
68    /// Returns a pair of slices which contain, in order, the contents of the `RingBuffer`.
69    ///
70    /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
71    pub fn as_slices(&self) -> (&[T], &[T]) {
72        let ranges = self.get_ranges();
73
74        unsafe {
75            let ptr = self.rb.data.get_ref().as_ptr();
76
77            let left = slice::from_raw_parts(ptr.add(ranges.0.start), ranges.0.len());
78            let right = slice::from_raw_parts(ptr.add(ranges.1.start), ranges.1.len());
79
80            (
81                &*(left as *const [MaybeUninit<T>] as *const [T]),
82                &*(right as *const [MaybeUninit<T>] as *const [T]),
83            )
84        }
85    }
86
87    /// Returns a pair of slices which contain, in order, the contents of the `RingBuffer`.
88    ///
89    /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
90    pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
91        let ranges = self.get_ranges();
92
93        unsafe {
94            let ptr = self.rb.data.get_mut().as_mut_ptr();
95
96            let left = slice::from_raw_parts_mut(ptr.add(ranges.0.start), ranges.0.len());
97            let right = slice::from_raw_parts_mut(ptr.add(ranges.1.start), ranges.1.len());
98
99            (
100                &mut *(left as *mut [MaybeUninit<T>] as *mut [T]),
101                &mut *(right as *mut [MaybeUninit<T>] as *mut [T]),
102            )
103        }
104    }
105
106    /// Gives immutable access to the elements contained by the ring buffer without removing them.
107    ///
108    /// The method takes a function `f` as argument.
109    /// `f` takes two slices of ring buffer contents (the second one or both of them may be empty).
110    /// First slice contains older elements.
111    ///
112    /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
113    ///
114    /// *Marked deprecated in favor of `as_slices`.*
115    #[deprecated(since = "0.2.7", note = "please use `as_slices` instead")]
116    pub fn access<F: FnOnce(&[T], &[T])>(&self, f: F) {
117        let (left, right) = self.as_slices();
118        f(left, right);
119    }
120
121    /// Gives mutable access to the elements contained by the ring buffer without removing them.
122    ///
123    /// The method takes a function `f` as argument.
124    /// `f` takes two slices of ring buffer contents (the second one or both of them may be empty).
125    /// First slice contains older elements.
126    ///
127    /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
128    ///
129    /// *Marked deprecated in favor of `as_mut_slices`.*
130    #[deprecated(since = "0.2.7", note = "please use `as_mut_slices` instead")]
131    pub fn access_mut<F: FnOnce(&mut [T], &mut [T])>(&mut self, f: F) {
132        let (left, right) = self.as_mut_slices();
133        f(left, right);
134    }
135
136    /// Allows to read from ring buffer memory directly.
137    ///
138    /// *This function is unsafe because it gives access to possibly uninitialized memory*
139    ///
140    /// The method takes a function `f` as argument.
141    /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
142    /// First slice contains older elements.
143    ///
144    /// `f` should return number of elements been read.
145    /// *There is no checks for returned number - it remains on the developer's conscience.*
146    ///
147    /// The method **always** calls `f` even if ring buffer is empty.
148    ///
149    /// The method returns number returned from `f`.
150    ///
151    /// # Safety
152    ///
153    /// The method gives access to ring buffer underlying memory which may be uninitialized.
154    ///
155    /// *It's up to you to copy or drop appropriate elements if you use this function.*
156    ///
157    pub unsafe fn pop_access<F>(&mut self, f: F) -> usize
158    where
159        F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
160    {
161        let head = self.rb.head.load(atomic::Ordering::Acquire);
162        let tail = self.rb.tail.load(atomic::Ordering::Acquire);
163        let len = self.rb.data.len();
164
165        let ranges = match head.cmp(&tail) {
166            cmp::Ordering::Less => (head..tail, 0..0),
167            cmp::Ordering::Greater => (head..len, 0..tail),
168            cmp::Ordering::Equal => (0..0, 0..0),
169        };
170
171        let ptr = self.rb.data.get_mut().as_mut_ptr();
172
173        let slices = (
174            slice::from_raw_parts_mut(ptr.wrapping_add(ranges.0.start), ranges.0.len()),
175            slice::from_raw_parts_mut(ptr.wrapping_add(ranges.1.start), ranges.1.len()),
176        );
177
178        let n = f(slices.0, slices.1);
179
180        if n > 0 {
181            let new_head = (head + n) % len;
182            self.rb.head.store(new_head, atomic::Ordering::Release);
183        }
184        n
185    }
186
187    /// Copies data from the ring buffer to the slice in byte-to-byte manner.
188    ///
189    /// The `elems` slice should contain **un-initialized** data before the method call.
190    /// After the call the copied part of data in `elems` should be interpreted as **initialized**.
191    /// The remaining part is still **un-initialized**.
192    ///
193    /// Returns the number of items been copied.
194    ///
195    /// # Safety
196    ///
197    /// The method copies raw data from the ring buffer.
198    ///
199    /// *You should manage copied elements after call, otherwise you may get a memory leak.*
200    ///
201    pub unsafe fn pop_copy(&mut self, elems: &mut [MaybeUninit<T>]) -> usize {
202        self.pop_access(|left, right| {
203            if elems.len() < left.len() {
204                copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), elems.len());
205                elems.len()
206            } else {
207                copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), left.len());
208                if elems.len() < left.len() + right.len() {
209                    copy_nonoverlapping(
210                        right.as_ptr(),
211                        elems.as_mut_ptr().add(left.len()),
212                        elems.len() - left.len(),
213                    );
214                    elems.len()
215                } else {
216                    copy_nonoverlapping(
217                        right.as_ptr(),
218                        elems.as_mut_ptr().add(left.len()),
219                        right.len(),
220                    );
221                    left.len() + right.len()
222                }
223            }
224        })
225    }
226
227    /// Removes latest element from the ring buffer and returns it.
228    /// Returns `None` if the ring buffer is empty.
229    pub fn pop(&mut self) -> Option<T> {
230        let mut elem_mu = MaybeUninit::uninit();
231        let n = unsafe {
232            self.pop_access(|slice, _| {
233                if !slice.is_empty() {
234                    mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
235                    1
236                } else {
237                    0
238                }
239            })
240        };
241        match n {
242            0 => None,
243            1 => Some(unsafe { elem_mu.assume_init() }),
244            _ => unreachable!(),
245        }
246    }
247
248    /// Repeatedly calls the closure `f` passing elements removed from the ring buffer to it.
249    ///
250    /// The closure is called until it returns `false` or the ring buffer is empty.
251    ///
252    /// The method returns number of elements been removed from the buffer.
253    pub fn pop_each<F: FnMut(T) -> bool>(&mut self, mut f: F, count: Option<usize>) -> usize {
254        unsafe {
255            self.pop_access(|left, right| {
256                let lb = match count {
257                    Some(n) => min(n, left.len()),
258                    None => left.len(),
259                };
260                for (i, dst) in left[0..lb].iter_mut().enumerate() {
261                    if !f(dst.as_ptr().read()) {
262                        return i + 1;
263                    }
264                }
265                if lb < left.len() {
266                    return lb;
267                }
268
269                let rb = match count {
270                    Some(n) => min(n - lb, right.len()),
271                    None => right.len(),
272                };
273                for (i, dst) in right[0..rb].iter_mut().enumerate() {
274                    if !f(dst.as_ptr().read()) {
275                        return lb + i + 1;
276                    }
277                }
278                lb + rb
279            })
280        }
281    }
282
283    /// Iterate immutably over the elements contained by the ring buffer without removing them.
284    ///
285    /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
286    ///
287    /// *Marked deprecated in favor of `iter`.*
288    #[deprecated(since = "0.2.7", note = "please use `iter` instead")]
289    pub fn for_each<F: FnMut(&T)>(&self, mut f: F) {
290        let (left, right) = self.as_slices();
291
292        for c in left.iter() {
293            f(c);
294        }
295        for c in right.iter() {
296            f(c);
297        }
298    }
299
300    /// Returns a front-to-back iterator.
301    pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
302        let (left, right) = self.as_slices();
303
304        left.iter().chain(right.iter())
305    }
306
307    /// Iterate mutably over the elements contained by the ring buffer without removing them.
308    ///
309    /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
310    ///
311    /// *Marked deprecated in favor of `iter_mut`.*
312    #[deprecated(since = "0.2.7", note = "please use `iter_mut` instead")]
313    pub fn for_each_mut<F: FnMut(&mut T)>(&mut self, mut f: F) {
314        let (left, right) = self.as_mut_slices();
315
316        for c in left.iter_mut() {
317            f(c);
318        }
319        for c in right.iter_mut() {
320            f(c);
321        }
322    }
323
324    /// Returns a front-to-back iterator that returns mutable references.
325    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ {
326        let (left, right) = self.as_mut_slices();
327
328        left.iter_mut().chain(right.iter_mut())
329    }
330
331    /// Removes at most `n` and at least `min(n, Consumer::len())` items from the buffer and safely drops them.
332    ///
333    /// If there is no concurring producer activity then exactly `min(n, Consumer::len())` items are removed.
334    ///
335    /// Returns the number of deleted items.
336    ///
337    ///
338    /// ```rust
339    /// # extern crate ringbuf_basedrop;
340    /// # use ringbuf_basedrop::RingBuffer;
341    /// # use basedrop::Collector;
342    /// # fn main() {
343    /// let collector = Collector::new();
344    ///
345    /// let rb = RingBuffer::<i32>::new(8);
346    /// let (mut prod, mut cons) = rb.split(&collector.handle());
347    ///
348    /// assert_eq!(prod.push_iter(&mut (0..8)), 8);
349    ///
350    /// assert_eq!(cons.discard(4), 4);
351    /// assert_eq!(cons.discard(8), 4);
352    /// assert_eq!(cons.discard(8), 0);
353    /// # }
354    /// ```
355    pub fn discard(&mut self, n: usize) -> usize {
356        unsafe {
357            self.pop_access(|left, right| {
358                let (mut cnt, mut rem) = (0, n);
359                let left_elems = if rem <= left.len() {
360                    cnt += rem;
361                    left.get_unchecked_mut(0..rem)
362                } else {
363                    cnt += left.len();
364                    left
365                };
366                rem = n - cnt;
367
368                let right_elems = if rem <= right.len() {
369                    cnt += rem;
370                    right.get_unchecked_mut(0..rem)
371                } else {
372                    cnt += right.len();
373                    right
374                };
375
376                for e in left_elems.iter_mut().chain(right_elems.iter_mut()) {
377                    e.as_mut_ptr().drop_in_place();
378                }
379
380                cnt
381            })
382        }
383    }
384
385    /// Removes at most `count` elements from the consumer and appends them to the producer.
386    /// If `count` is `None` then as much as possible elements will be moved.
387    /// The producer and consumer parts may be of different buffers as well as of the same one.
388    ///
389    /// On success returns count of elements been moved.
390    pub fn move_to(&mut self, other: &mut Producer<T>, count: Option<usize>) -> usize {
391        move_items(self, other, count)
392    }
393}
394
395impl<T: Send + Sized + 'static> Iterator for Consumer<T> {
396    type Item = T;
397
398    fn next(&mut self) -> Option<T> {
399        self.pop()
400    }
401}
402
403impl<T: Copy + Send + Sized + 'static> Consumer<T> {
404    /// Removes first elements from the ring buffer and writes them into a slice.
405    /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
406    ///
407    /// On success returns count of elements been removed from the ring buffer.
408    pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
409        unsafe { self.pop_copy(&mut *(elems as *mut [T] as *mut [MaybeUninit<T>])) }
410    }
411}
412
413#[cfg(feature = "std")]
414impl Consumer<u8> {
415    /// Removes at most first `count` bytes from the ring buffer and writes them into
416    /// a [`Write`](https://doc.rust-lang.org/std/io/trait.Write.html) instance.
417    /// If `count` is `None` then as much as possible bytes will be written.
418    ///
419    /// Returns `Ok(n)` if `write` succeeded. `n` is number of bytes been written.
420    /// `n == 0` means that either `write` returned zero or ring buffer is empty.
421    ///
422    /// If `write` is failed or returned an invalid number then error is returned.
423    pub fn write_into(
424        &mut self,
425        writer: &mut dyn Write,
426        count: Option<usize>,
427    ) -> io::Result<usize> {
428        let mut err = None;
429        let n = unsafe {
430            self.pop_access(|left, _| -> usize {
431                let left = match count {
432                    Some(c) => {
433                        if c < left.len() {
434                            &mut left[0..c]
435                        } else {
436                            left
437                        }
438                    }
439                    None => left,
440                };
441                match writer
442                    .write(&*(left as *const [MaybeUninit<u8>] as *const [u8]))
443                    .and_then(|n| {
444                        if n <= left.len() {
445                            Ok(n)
446                        } else {
447                            Err(io::Error::new(
448                                io::ErrorKind::InvalidInput,
449                                "Write operation returned an invalid number",
450                            ))
451                        }
452                    }) {
453                    Ok(n) => n,
454                    Err(e) => {
455                        err = Some(e);
456                        0
457                    }
458                }
459            })
460        };
461        match err {
462            Some(e) => Err(e),
463            None => Ok(n),
464        }
465    }
466}
467
468#[cfg(feature = "std")]
469impl Read for Consumer<u8> {
470    fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
471        let n = self.pop_slice(buffer);
472        if n == 0 && !buffer.is_empty() {
473            Err(io::ErrorKind::WouldBlock.into())
474        } else {
475            Ok(n)
476        }
477    }
478}