ringbuf_basedrop/consumer.rs
1use basedrop::Shared;
2use core::{
3 cmp::{self, min},
4 mem::{self, MaybeUninit},
5 ops::Range,
6 ptr::copy_nonoverlapping,
7 slice,
8 sync::atomic,
9};
10#[cfg(feature = "std")]
11use std::io::{self, Read, Write};
12
13use crate::{producer::Producer, ring_buffer::*};
14
15/// Consumer part of ring buffer.
16pub struct Consumer<T: Send + Sized + 'static> {
17 pub(crate) rb: Shared<RingBuffer<T>>,
18}
19
20impl<T: Send + Sized + 'static> Consumer<T> {
21 /// Returns capacity of the ring buffer.
22 ///
23 /// The capacity of the buffer is constant.
24 pub fn capacity(&self) -> usize {
25 self.rb.capacity()
26 }
27
28 /// Checks if the ring buffer is empty.
29 ///
30 /// *The result may become irrelevant at any time because of concurring activity of the producer.*
31 pub fn is_empty(&self) -> bool {
32 self.rb.is_empty()
33 }
34
35 /// Checks if the ring buffer is full.
36 ///
37 /// The result is relevant until you remove items from the consumer.
38 pub fn is_full(&self) -> bool {
39 self.rb.is_full()
40 }
41
42 /// The length of the data stored in the buffer
43 ///
44 /// Actual length may be equal to or greater than the returned value.
45 pub fn len(&self) -> usize {
46 self.rb.len()
47 }
48
49 /// The remaining space in the buffer.
50 ///
51 /// Actual remaining space may be equal to or less than the returning value.
52 pub fn remaining(&self) -> usize {
53 self.rb.remaining()
54 }
55
56 fn get_ranges(&self) -> (Range<usize>, Range<usize>) {
57 let head = self.rb.head.load(atomic::Ordering::Acquire);
58 let tail = self.rb.tail.load(atomic::Ordering::Acquire);
59 let len = self.rb.data.len();
60
61 match head.cmp(&tail) {
62 cmp::Ordering::Less => (head..tail, 0..0),
63 cmp::Ordering::Greater => (head..len, 0..tail),
64 cmp::Ordering::Equal => (0..0, 0..0),
65 }
66 }
67
68 /// Returns a pair of slices which contain, in order, the contents of the `RingBuffer`.
69 ///
70 /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
71 pub fn as_slices(&self) -> (&[T], &[T]) {
72 let ranges = self.get_ranges();
73
74 unsafe {
75 let ptr = self.rb.data.get_ref().as_ptr();
76
77 let left = slice::from_raw_parts(ptr.add(ranges.0.start), ranges.0.len());
78 let right = slice::from_raw_parts(ptr.add(ranges.1.start), ranges.1.len());
79
80 (
81 &*(left as *const [MaybeUninit<T>] as *const [T]),
82 &*(right as *const [MaybeUninit<T>] as *const [T]),
83 )
84 }
85 }
86
87 /// Returns a pair of slices which contain, in order, the contents of the `RingBuffer`.
88 ///
89 /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
90 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
91 let ranges = self.get_ranges();
92
93 unsafe {
94 let ptr = self.rb.data.get_mut().as_mut_ptr();
95
96 let left = slice::from_raw_parts_mut(ptr.add(ranges.0.start), ranges.0.len());
97 let right = slice::from_raw_parts_mut(ptr.add(ranges.1.start), ranges.1.len());
98
99 (
100 &mut *(left as *mut [MaybeUninit<T>] as *mut [T]),
101 &mut *(right as *mut [MaybeUninit<T>] as *mut [T]),
102 )
103 }
104 }
105
106 /// Gives immutable access to the elements contained by the ring buffer without removing them.
107 ///
108 /// The method takes a function `f` as argument.
109 /// `f` takes two slices of ring buffer contents (the second one or both of them may be empty).
110 /// First slice contains older elements.
111 ///
112 /// *The slices may not include elements pushed to the buffer by concurring producer after the method call.*
113 ///
114 /// *Marked deprecated in favor of `as_slices`.*
115 #[deprecated(since = "0.2.7", note = "please use `as_slices` instead")]
116 pub fn access<F: FnOnce(&[T], &[T])>(&self, f: F) {
117 let (left, right) = self.as_slices();
118 f(left, right);
119 }
120
121 /// Gives mutable access to the elements contained by the ring buffer without removing them.
122 ///
123 /// The method takes a function `f` as argument.
124 /// `f` takes two slices of ring buffer contents (the second one or both of them may be empty).
125 /// First slice contains older elements.
126 ///
127 /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
128 ///
129 /// *Marked deprecated in favor of `as_mut_slices`.*
130 #[deprecated(since = "0.2.7", note = "please use `as_mut_slices` instead")]
131 pub fn access_mut<F: FnOnce(&mut [T], &mut [T])>(&mut self, f: F) {
132 let (left, right) = self.as_mut_slices();
133 f(left, right);
134 }
135
136 /// Allows to read from ring buffer memory directly.
137 ///
138 /// *This function is unsafe because it gives access to possibly uninitialized memory*
139 ///
140 /// The method takes a function `f` as argument.
141 /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
142 /// First slice contains older elements.
143 ///
144 /// `f` should return number of elements been read.
145 /// *There is no checks for returned number - it remains on the developer's conscience.*
146 ///
147 /// The method **always** calls `f` even if ring buffer is empty.
148 ///
149 /// The method returns number returned from `f`.
150 ///
151 /// # Safety
152 ///
153 /// The method gives access to ring buffer underlying memory which may be uninitialized.
154 ///
155 /// *It's up to you to copy or drop appropriate elements if you use this function.*
156 ///
157 pub unsafe fn pop_access<F>(&mut self, f: F) -> usize
158 where
159 F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
160 {
161 let head = self.rb.head.load(atomic::Ordering::Acquire);
162 let tail = self.rb.tail.load(atomic::Ordering::Acquire);
163 let len = self.rb.data.len();
164
165 let ranges = match head.cmp(&tail) {
166 cmp::Ordering::Less => (head..tail, 0..0),
167 cmp::Ordering::Greater => (head..len, 0..tail),
168 cmp::Ordering::Equal => (0..0, 0..0),
169 };
170
171 let ptr = self.rb.data.get_mut().as_mut_ptr();
172
173 let slices = (
174 slice::from_raw_parts_mut(ptr.wrapping_add(ranges.0.start), ranges.0.len()),
175 slice::from_raw_parts_mut(ptr.wrapping_add(ranges.1.start), ranges.1.len()),
176 );
177
178 let n = f(slices.0, slices.1);
179
180 if n > 0 {
181 let new_head = (head + n) % len;
182 self.rb.head.store(new_head, atomic::Ordering::Release);
183 }
184 n
185 }
186
187 /// Copies data from the ring buffer to the slice in byte-to-byte manner.
188 ///
189 /// The `elems` slice should contain **un-initialized** data before the method call.
190 /// After the call the copied part of data in `elems` should be interpreted as **initialized**.
191 /// The remaining part is still **un-initialized**.
192 ///
193 /// Returns the number of items been copied.
194 ///
195 /// # Safety
196 ///
197 /// The method copies raw data from the ring buffer.
198 ///
199 /// *You should manage copied elements after call, otherwise you may get a memory leak.*
200 ///
201 pub unsafe fn pop_copy(&mut self, elems: &mut [MaybeUninit<T>]) -> usize {
202 self.pop_access(|left, right| {
203 if elems.len() < left.len() {
204 copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), elems.len());
205 elems.len()
206 } else {
207 copy_nonoverlapping(left.as_ptr(), elems.as_mut_ptr(), left.len());
208 if elems.len() < left.len() + right.len() {
209 copy_nonoverlapping(
210 right.as_ptr(),
211 elems.as_mut_ptr().add(left.len()),
212 elems.len() - left.len(),
213 );
214 elems.len()
215 } else {
216 copy_nonoverlapping(
217 right.as_ptr(),
218 elems.as_mut_ptr().add(left.len()),
219 right.len(),
220 );
221 left.len() + right.len()
222 }
223 }
224 })
225 }
226
227 /// Removes latest element from the ring buffer and returns it.
228 /// Returns `None` if the ring buffer is empty.
229 pub fn pop(&mut self) -> Option<T> {
230 let mut elem_mu = MaybeUninit::uninit();
231 let n = unsafe {
232 self.pop_access(|slice, _| {
233 if !slice.is_empty() {
234 mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
235 1
236 } else {
237 0
238 }
239 })
240 };
241 match n {
242 0 => None,
243 1 => Some(unsafe { elem_mu.assume_init() }),
244 _ => unreachable!(),
245 }
246 }
247
248 /// Repeatedly calls the closure `f` passing elements removed from the ring buffer to it.
249 ///
250 /// The closure is called until it returns `false` or the ring buffer is empty.
251 ///
252 /// The method returns number of elements been removed from the buffer.
253 pub fn pop_each<F: FnMut(T) -> bool>(&mut self, mut f: F, count: Option<usize>) -> usize {
254 unsafe {
255 self.pop_access(|left, right| {
256 let lb = match count {
257 Some(n) => min(n, left.len()),
258 None => left.len(),
259 };
260 for (i, dst) in left[0..lb].iter_mut().enumerate() {
261 if !f(dst.as_ptr().read()) {
262 return i + 1;
263 }
264 }
265 if lb < left.len() {
266 return lb;
267 }
268
269 let rb = match count {
270 Some(n) => min(n - lb, right.len()),
271 None => right.len(),
272 };
273 for (i, dst) in right[0..rb].iter_mut().enumerate() {
274 if !f(dst.as_ptr().read()) {
275 return lb + i + 1;
276 }
277 }
278 lb + rb
279 })
280 }
281 }
282
283 /// Iterate immutably over the elements contained by the ring buffer without removing them.
284 ///
285 /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
286 ///
287 /// *Marked deprecated in favor of `iter`.*
288 #[deprecated(since = "0.2.7", note = "please use `iter` instead")]
289 pub fn for_each<F: FnMut(&T)>(&self, mut f: F) {
290 let (left, right) = self.as_slices();
291
292 for c in left.iter() {
293 f(c);
294 }
295 for c in right.iter() {
296 f(c);
297 }
298 }
299
300 /// Returns a front-to-back iterator.
301 pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
302 let (left, right) = self.as_slices();
303
304 left.iter().chain(right.iter())
305 }
306
307 /// Iterate mutably over the elements contained by the ring buffer without removing them.
308 ///
309 /// *The iteration may not include elements pushed to the buffer by concurring producer after the method call.*
310 ///
311 /// *Marked deprecated in favor of `iter_mut`.*
312 #[deprecated(since = "0.2.7", note = "please use `iter_mut` instead")]
313 pub fn for_each_mut<F: FnMut(&mut T)>(&mut self, mut f: F) {
314 let (left, right) = self.as_mut_slices();
315
316 for c in left.iter_mut() {
317 f(c);
318 }
319 for c in right.iter_mut() {
320 f(c);
321 }
322 }
323
324 /// Returns a front-to-back iterator that returns mutable references.
325 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_ {
326 let (left, right) = self.as_mut_slices();
327
328 left.iter_mut().chain(right.iter_mut())
329 }
330
331 /// Removes at most `n` and at least `min(n, Consumer::len())` items from the buffer and safely drops them.
332 ///
333 /// If there is no concurring producer activity then exactly `min(n, Consumer::len())` items are removed.
334 ///
335 /// Returns the number of deleted items.
336 ///
337 ///
338 /// ```rust
339 /// # extern crate ringbuf_basedrop;
340 /// # use ringbuf_basedrop::RingBuffer;
341 /// # use basedrop::Collector;
342 /// # fn main() {
343 /// let collector = Collector::new();
344 ///
345 /// let rb = RingBuffer::<i32>::new(8);
346 /// let (mut prod, mut cons) = rb.split(&collector.handle());
347 ///
348 /// assert_eq!(prod.push_iter(&mut (0..8)), 8);
349 ///
350 /// assert_eq!(cons.discard(4), 4);
351 /// assert_eq!(cons.discard(8), 4);
352 /// assert_eq!(cons.discard(8), 0);
353 /// # }
354 /// ```
355 pub fn discard(&mut self, n: usize) -> usize {
356 unsafe {
357 self.pop_access(|left, right| {
358 let (mut cnt, mut rem) = (0, n);
359 let left_elems = if rem <= left.len() {
360 cnt += rem;
361 left.get_unchecked_mut(0..rem)
362 } else {
363 cnt += left.len();
364 left
365 };
366 rem = n - cnt;
367
368 let right_elems = if rem <= right.len() {
369 cnt += rem;
370 right.get_unchecked_mut(0..rem)
371 } else {
372 cnt += right.len();
373 right
374 };
375
376 for e in left_elems.iter_mut().chain(right_elems.iter_mut()) {
377 e.as_mut_ptr().drop_in_place();
378 }
379
380 cnt
381 })
382 }
383 }
384
385 /// Removes at most `count` elements from the consumer and appends them to the producer.
386 /// If `count` is `None` then as much as possible elements will be moved.
387 /// The producer and consumer parts may be of different buffers as well as of the same one.
388 ///
389 /// On success returns count of elements been moved.
390 pub fn move_to(&mut self, other: &mut Producer<T>, count: Option<usize>) -> usize {
391 move_items(self, other, count)
392 }
393}
394
395impl<T: Send + Sized + 'static> Iterator for Consumer<T> {
396 type Item = T;
397
398 fn next(&mut self) -> Option<T> {
399 self.pop()
400 }
401}
402
403impl<T: Copy + Send + Sized + 'static> Consumer<T> {
404 /// Removes first elements from the ring buffer and writes them into a slice.
405 /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
406 ///
407 /// On success returns count of elements been removed from the ring buffer.
408 pub fn pop_slice(&mut self, elems: &mut [T]) -> usize {
409 unsafe { self.pop_copy(&mut *(elems as *mut [T] as *mut [MaybeUninit<T>])) }
410 }
411}
412
413#[cfg(feature = "std")]
414impl Consumer<u8> {
415 /// Removes at most first `count` bytes from the ring buffer and writes them into
416 /// a [`Write`](https://doc.rust-lang.org/std/io/trait.Write.html) instance.
417 /// If `count` is `None` then as much as possible bytes will be written.
418 ///
419 /// Returns `Ok(n)` if `write` succeeded. `n` is number of bytes been written.
420 /// `n == 0` means that either `write` returned zero or ring buffer is empty.
421 ///
422 /// If `write` is failed or returned an invalid number then error is returned.
423 pub fn write_into(
424 &mut self,
425 writer: &mut dyn Write,
426 count: Option<usize>,
427 ) -> io::Result<usize> {
428 let mut err = None;
429 let n = unsafe {
430 self.pop_access(|left, _| -> usize {
431 let left = match count {
432 Some(c) => {
433 if c < left.len() {
434 &mut left[0..c]
435 } else {
436 left
437 }
438 }
439 None => left,
440 };
441 match writer
442 .write(&*(left as *const [MaybeUninit<u8>] as *const [u8]))
443 .and_then(|n| {
444 if n <= left.len() {
445 Ok(n)
446 } else {
447 Err(io::Error::new(
448 io::ErrorKind::InvalidInput,
449 "Write operation returned an invalid number",
450 ))
451 }
452 }) {
453 Ok(n) => n,
454 Err(e) => {
455 err = Some(e);
456 0
457 }
458 }
459 })
460 };
461 match err {
462 Some(e) => Err(e),
463 None => Ok(n),
464 }
465 }
466}
467
468#[cfg(feature = "std")]
469impl Read for Consumer<u8> {
470 fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
471 let n = self.pop_slice(buffer);
472 if n == 0 && !buffer.is_empty() {
473 Err(io::ErrorKind::WouldBlock.into())
474 } else {
475 Ok(n)
476 }
477 }
478}