rtrb_basedrop/
chunks.rs

1//! Writing and reading multiple items at once into and from a [`RingBuffer`].
2//!
3//! Multiple items at once can be moved from an iterator into the ring buffer by using
4//! [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`].
5//! Alternatively, mutable access to the (uninitialized) slots of the chunk can be obtained with
6//! [`WriteChunkUninit::as_mut_slices()`], which requires writing some `unsafe` code.
7//! To avoid that, [`Producer::write_chunk()`] can be used,
8//! which initializes all slots with their [`Default`] value
9//! and provides mutable access by means of [`WriteChunk::as_mut_slices()`].
10//!
11//! Multiple items at once can be moved out of the ring buffer by using
12//! [`Consumer::read_chunk()`] and iterating over the returned [`ReadChunk`]
13//! (or by explicitly calling [`ReadChunk::into_iter()`]).
14//! Immutable access to the slots of the chunk can be obtained with [`ReadChunk::as_slices()`].
15//!
16//! # Examples
17//!
18//! This example uses a single thread for simplicity, but in a real application,
19//! `producer` and `consumer` would of course live on different threads:
20//!
21//! ```
22//! use rtrb_basedrop::RingBuffer;
23//! use basedrop::Collector;
24//!
25//! let collector = Collector::new();
26//!
27//! let (mut producer, mut consumer) = RingBuffer::new(5, &collector.handle());
28//!
29//! if let Ok(chunk) = producer.write_chunk_uninit(4) {
30//!     chunk.fill_from_iter([10, 11, 12]);
31//!     // Note that we requested 4 slots but we've only written to 3 of them!
32//! } else {
33//!     unreachable!();
34//! }
35//!
36//! assert_eq!(producer.slots(), 2);
37//! assert_eq!(consumer.slots(), 3);
38//!
39//! if let Ok(chunk) = consumer.read_chunk(2) {
40//!     assert_eq!(chunk.into_iter().collect::<Vec<_>>(), [10, 11]);
41//! } else {
42//!     unreachable!();
43//! }
44//!
45//! // One element is still in the queue:
46//! assert_eq!(consumer.peek(), Ok(&12));
47//!
48//! let data = vec![20, 21, 22, 23];
49//! // NB: write_chunk_uninit() could be used for possibly better performance:
50//! if let Ok(mut chunk) = producer.write_chunk(4) {
51//!     let (first, second) = chunk.as_mut_slices();
52//!     let mid = first.len();
53//!     first.copy_from_slice(&data[..mid]);
54//!     second.copy_from_slice(&data[mid..]);
55//!     chunk.commit_all();
56//! } else {
57//!     unreachable!();
58//! }
59//!
60//! assert!(producer.is_full());
61//! assert_eq!(consumer.slots(), 5);
62//!
63//! let mut v = Vec::<i32>::with_capacity(5);
64//! if let Ok(chunk) = consumer.read_chunk(5) {
65//!     let (first, second) = chunk.as_slices();
66//!     v.extend(first);
67//!     v.extend(second);
68//!     chunk.commit_all();
69//! } else {
70//!     unreachable!();
71//! }
72//! assert_eq!(v, [12, 20, 21, 22, 23]);
73//! assert!(consumer.is_empty());
74//! ```
75//!
76//! The iterator API can be used to move items from one ring buffer to another:
77//!
78//! ```
79//! use rtrb_basedrop::{Consumer, Producer};
80//!
81//! fn move_items<T: Send + 'static>(src: &mut Consumer<T>, dst: &mut Producer<T>) -> usize {
82//!     let n = src.slots().min(dst.slots());
83//!     dst.write_chunk_uninit(n).unwrap().fill_from_iter(src.read_chunk(n).unwrap())
84//! }
85//! ```
86//!
87//! ## Common Access Patterns
88//!
89//! The following examples show the [`Producer`] side;
90//! similar patterns can of course be used with [`Consumer::read_chunk()`] as well.
91//! Furthermore, the examples use [`Producer::write_chunk_uninit()`],
92//! along with a bit of `unsafe` code.
93//! To avoid this, you can use [`Producer::write_chunk()`] instead,
94//! which requires the trait bound `T: Default` and will lead to a small runtime overhead.
95//!
96//! Copy a whole slice of items into the ring buffer, but only if space permits
97//! (if not, the entire input slice is returned as an error):
98//!
99//! ```
100//! use rtrb_basedrop::{Producer, CopyToUninit};
101//!
102//! fn push_entire_slice<'a, T: Send + 'static>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]>
103//! where
104//!     T: Copy,
105//! {
106//!     if let Ok(mut chunk) = queue.write_chunk_uninit(slice.len()) {
107//!         let (first, second) = chunk.as_mut_slices();
108//!         let mid = first.len();
109//!         slice[..mid].copy_to_uninit(first);
110//!         slice[mid..].copy_to_uninit(second);
111//!         // SAFETY: All slots have been initialized
112//!         unsafe {
113//!             chunk.commit_all();
114//!         }
115//!         Ok(())
116//!     } else {
117//!         Err(slice)
118//!     }
119//! }
120//! ```
121//!
122//! Copy as many items as possible from a given slice, returning the number of copied items:
123//!
124//! ```
125//! use rtrb_basedrop::{Producer, CopyToUninit, chunks::ChunkError::TooFewSlots};
126//!
127//! fn push_partial_slice<T: Send + 'static>(queue: &mut Producer<T>, slice: &[T]) -> usize
128//! where
129//!     T: Copy,
130//! {
131//!     let mut chunk = match queue.write_chunk_uninit(slice.len()) {
132//!         Ok(chunk) => chunk,
133//!         // Remaining slots are returned, this will always succeed:
134//!         Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
135//!     };
136//!     let end = chunk.len();
137//!     let (first, second) = chunk.as_mut_slices();
138//!     let mid = first.len();
139//!     slice[..mid].copy_to_uninit(first);
140//!     slice[mid..end].copy_to_uninit(second);
141//!     // SAFETY: All slots have been initialized
142//!     unsafe {
143//!         chunk.commit_all();
144//!     }
145//!     end
146//! }
147//! ```
148//!
149//! Write as many slots as possible, given an iterator
150//! (and return the number of written slots):
151//!
152//! ```
153//! use rtrb_basedrop::{Producer, chunks::ChunkError::TooFewSlots};
154//!
155//! fn push_from_iter<T: Send + 'static, I>(queue: &mut Producer<T>, iter: I) -> usize
156//! where
157//!     T: Default,
158//!     I: IntoIterator<Item = T>,
159//! {
160//!     let iter = iter.into_iter();
161//!     let n = match iter.size_hint() {
162//!         (_, None) => queue.slots(),
163//!         (_, Some(n)) => n,
164//!     };
165//!     let chunk = match queue.write_chunk_uninit(n) {
166//!         Ok(chunk) => chunk,
167//!         // Remaining slots are returned, this will always succeed:
168//!         Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
169//!     };
170//!     chunk.fill_from_iter(iter)
171//! }
172//! ```
173
174use core::fmt;
175use core::marker::PhantomData;
176use core::mem::MaybeUninit;
177use core::sync::atomic::Ordering;
178
179use crate::{Consumer, CopyToUninit, Producer};
180
181// This is used in the documentation.
182#[allow(unused_imports)]
183use crate::RingBuffer;
184
185impl<T: Send + 'static> Producer<T> {
186    /// Returns `n` slots (initially containing their [`Default`] value) for writing.
187    ///
188    /// [`WriteChunk::as_mut_slices()`] provides mutable access to the slots.
189    /// After writing to those slots, they explicitly have to be made available
190    /// to be read by the [`Consumer`] by calling [`WriteChunk::commit()`]
191    /// or [`WriteChunk::commit_all()`].
192    ///
193    /// For an alternative that does not require the trait bound [`Default`],
194    /// see [`Producer::write_chunk_uninit()`].
195    ///
196    /// If items are supposed to be moved from an iterator into the ring buffer,
197    /// [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`]
198    /// can be used.
199    ///
200    /// # Errors
201    ///
202    /// If not enough slots are available, an error
203    /// (containing the number of available slots) is returned.
204    /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
205    ///
206    /// # Examples
207    ///
208    /// See the documentation of the [`chunks`](crate::chunks#examples) module.
209    pub fn write_chunk(&mut self, n: usize) -> Result<WriteChunk<'_, T>, ChunkError>
210    where
211        T: Default,
212    {
213        self.write_chunk_uninit(n).map(WriteChunk::from)
214    }
215
216    /// Returns `n` (uninitialized) slots for writing.
217    ///
218    /// [`WriteChunkUninit::as_mut_slices()`] provides mutable access
219    /// to the uninitialized slots.
220    /// After writing to those slots, they explicitly have to be made available
221    /// to be read by the [`Consumer`] by calling [`WriteChunkUninit::commit()`]
222    /// or [`WriteChunkUninit::commit_all()`].
223    ///
224    /// Alternatively, [`WriteChunkUninit::fill_from_iter()`] can be used
225    /// to move items from an iterator into the available slots.
226    /// All moved items are automatically made available to be read by the [`Consumer`].
227    ///
228    /// # Errors
229    ///
230    /// If not enough slots are available, an error
231    /// (containing the number of available slots) is returned.
232    /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
233    ///
234    /// # Safety
235    ///
236    /// This function itself is safe, as is [`WriteChunkUninit::fill_from_iter()`].
237    /// However, when using [`WriteChunkUninit::as_mut_slices()`],
238    /// the user has to make sure that the relevant slots have been initialized
239    /// before calling [`WriteChunkUninit::commit()`] or [`WriteChunkUninit::commit_all()`].
240    ///
241    /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
242    /// see [`Producer::write_chunk()`].
243    pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
244        let tail = self.cached_tail.get();
245
246        // Check if the queue has *possibly* not enough slots.
247        if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
248            // Refresh the head ...
249            let head = self.buffer.head.load(Ordering::Acquire);
250            self.cached_head.set(head);
251
252            // ... and check if there *really* are not enough slots.
253            let slots = self.buffer.capacity - self.buffer.distance(head, tail);
254            if slots < n {
255                return Err(ChunkError::TooFewSlots(slots));
256            }
257        }
258        let tail = self.buffer.collapse_position(tail);
259        let first_len = n.min(self.buffer.capacity - tail);
260        Ok(WriteChunkUninit {
261            first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
262            first_len,
263            second_ptr: self.buffer.data_ptr,
264            second_len: n - first_len,
265            producer: self,
266        })
267    }
268}
269
270impl<T: Send + 'static> Consumer<T> {
271    /// Returns `n` slots for reading.
272    ///
273    /// [`ReadChunk::as_slices()`] provides immutable access to the slots.
274    /// After reading from those slots, they explicitly have to be made available
275    /// to be written again by the [`Producer`] by calling [`ReadChunk::commit()`]
276    /// or [`ReadChunk::commit_all()`].
277    ///
278    /// Alternatively, items can be moved out of the [`ReadChunk`] using iteration
279    /// because it implements [`IntoIterator`]
280    /// ([`ReadChunk::into_iter()`] can be used to explicitly turn it into an [`Iterator`]).
281    /// All moved items are automatically made available to be written again by the [`Producer`].
282    ///
283    /// # Errors
284    ///
285    /// If not enough slots are available, an error
286    /// (containing the number of available slots) is returned.
287    /// Use [`Consumer::slots()`] to obtain the number of available slots beforehand.
288    ///
289    /// # Examples
290    ///
291    /// See the documentation of the [`chunks`](crate::chunks#examples) module.
292    pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
293        let head = self.cached_head.get();
294
295        // Check if the queue has *possibly* not enough slots.
296        if self.buffer.distance(head, self.cached_tail.get()) < n {
297            // Refresh the tail ...
298            let tail = self.buffer.tail.load(Ordering::Acquire);
299            self.cached_tail.set(tail);
300
301            // ... and check if there *really* are not enough slots.
302            let slots = self.buffer.distance(head, tail);
303            if slots < n {
304                return Err(ChunkError::TooFewSlots(slots));
305            }
306        }
307
308        let head = self.buffer.collapse_position(head);
309        let first_len = n.min(self.buffer.capacity - head);
310        Ok(ReadChunk {
311            first_ptr: unsafe { self.buffer.data_ptr.add(head) },
312            first_len,
313            second_ptr: self.buffer.data_ptr,
314            second_len: n - first_len,
315            consumer: self,
316        })
317    }
318}
319
320/// Structure for writing into multiple ([`Default`]-initialized) slots in one go.
321///
322/// This is returned from [`Producer::write_chunk()`].
323///
324/// To obtain uninitialized slots, use [`Producer::write_chunk_uninit()`] instead,
325/// which also allows moving items from an iterator into the ring buffer
326/// by means of [`WriteChunkUninit::fill_from_iter()`].
327#[derive(Debug)]
328pub struct WriteChunk<'a, T: Send + 'static>(Option<WriteChunkUninit<'a, T>>, PhantomData<T>);
329
330impl<T: Send + 'static> Drop for WriteChunk<'_, T> {
331    fn drop(&mut self) {
332        // NB: If `commit()` or `commit_all()` has been called, `self.0` is `None`.
333        if let Some(mut chunk) = self.0.take() {
334            // Safety: All slots have been initialized in From::from().
335            unsafe {
336                // No part of the chunk has been committed, all slots are dropped.
337                chunk.drop_suffix(0);
338            }
339        }
340    }
341}
342
343impl<'a, T> From<WriteChunkUninit<'a, T>> for WriteChunk<'a, T>
344where
345    T: Default + Send + 'static,
346{
347    /// Fills all slots with the [`Default`] value.
348    fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
349        for i in 0..chunk.first_len {
350            unsafe {
351                chunk.first_ptr.add(i).write(Default::default());
352            }
353        }
354        for i in 0..chunk.second_len {
355            unsafe {
356                chunk.second_ptr.add(i).write(Default::default());
357            }
358        }
359        WriteChunk(Some(chunk), PhantomData)
360    }
361}
362
363impl<T> WriteChunk<'_, T>
364where
365    T: Default + Send + 'static,
366{
367    /// Returns two slices for writing to the requested slots.
368    ///
369    /// All slots are initially filled with their [`Default`] value.
370    ///
371    /// The first slice can only be empty if `0` slots have been requested.
372    /// If the first slice contains all requested slots, the second one is empty.
373    ///
374    /// After writing to the slots, they are *not* automatically made available
375    /// to be read by the [`Consumer`].
376    /// This has to be explicitly done by calling [`commit()`](WriteChunk::commit)
377    /// or [`commit_all()`](WriteChunk::commit_all).
378    /// If items are written but *not* committed afterwards,
379    /// they will *not* become available for reading and
380    /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
381    pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
382        // self.0 is always Some(chunk).
383        let chunk = self.0.as_ref().unwrap();
384        // Safety: All slots have been initialized in From::from().
385        unsafe {
386            (
387                core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
388                core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len),
389            )
390        }
391    }
392
393    /// Makes the first `n` slots of the chunk available for reading.
394    ///
395    /// The rest of the chunk is dropped.
396    ///
397    /// # Panics
398    ///
399    /// Panics if `n` is greater than the number of slots in the chunk.
400    pub fn commit(mut self, n: usize) {
401        // self.0 is always Some(chunk).
402        let mut chunk = self.0.take().unwrap();
403        // Safety: All slots have been initialized in From::from().
404        unsafe {
405            // Slots at index `n` and higher are dropped ...
406            chunk.drop_suffix(n);
407            // ... everything below `n` is committed.
408            chunk.commit(n);
409        }
410        // `self` is dropped here, with `self.0` being set to `None`.
411    }
412
413    /// Makes the whole chunk available for reading.
414    pub fn commit_all(mut self) {
415        // self.0 is always Some(chunk).
416        let chunk = self.0.take().unwrap();
417        // Safety: All slots have been initialized in From::from().
418        unsafe {
419            chunk.commit_all();
420        }
421        // `self` is dropped here, with `self.0` being set to `None`.
422    }
423
424    /// Returns the number of slots in the chunk.
425    #[must_use]
426    pub fn len(&self) -> usize {
427        // self.0 is always Some(chunk).
428        self.0.as_ref().unwrap().len()
429    }
430
431    /// Returns `true` if the chunk contains no slots.
432    #[must_use]
433    pub fn is_empty(&self) -> bool {
434        // self.0 is always Some(chunk).
435        self.0.as_ref().unwrap().is_empty()
436    }
437}
438
439/// Structure for writing into multiple (uninitialized) slots in one go.
440///
441/// This is returned from [`Producer::write_chunk_uninit()`].
442#[derive(Debug)]
443pub struct WriteChunkUninit<'a, T: Send + 'static> {
444    first_ptr: *mut T,
445    first_len: usize,
446    second_ptr: *mut T,
447    second_len: usize,
448    producer: &'a Producer<T>,
449}
450
451// WriteChunkUninit only exists while a unique reference to the Producer is held.
452// It is therefore safe to move it to another thread.
453unsafe impl<T: Send + 'static> Send for WriteChunkUninit<'_, T> {}
454
455impl<T: Send + 'static> WriteChunkUninit<'_, T> {
456    /// Returns two slices for writing to the requested slots.
457    ///
458    /// The first slice can only be empty if `0` slots have been requested.
459    /// If the first slice contains all requested slots, the second one is empty.
460    ///
461    /// The extension trait [`CopyToUninit`] can be used to safely copy data into those slices.
462    ///
463    /// After writing to the slots, they are *not* automatically made available
464    /// to be read by the [`Consumer`].
465    /// This has to be explicitly done by calling [`commit()`](WriteChunkUninit::commit)
466    /// or [`commit_all()`](WriteChunkUninit::commit_all).
467    /// If items are written but *not* committed afterwards,
468    /// they will *not* become available for reading and
469    /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
470    pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
471        unsafe {
472            (
473                core::slice::from_raw_parts_mut(self.first_ptr as *mut _, self.first_len),
474                core::slice::from_raw_parts_mut(self.second_ptr as *mut _, self.second_len),
475            )
476        }
477    }
478
479    /// Makes the first `n` slots of the chunk available for reading.
480    ///
481    /// # Panics
482    ///
483    /// Panics if `n` is greater than the number of slots in the chunk.
484    ///
485    /// # Safety
486    ///
487    /// The user must make sure that the first `n` elements have been initialized.
488    pub unsafe fn commit(self, n: usize) {
489        assert!(n <= self.len(), "cannot commit more than chunk size");
490        self.commit_unchecked(n);
491    }
492
493    /// Makes the whole chunk available for reading.
494    ///
495    /// # Safety
496    ///
497    /// The user must make sure that all elements have been initialized.
498    pub unsafe fn commit_all(self) {
499        let slots = self.len();
500        self.commit_unchecked(slots);
501    }
502
503    unsafe fn commit_unchecked(self, n: usize) -> usize {
504        let p = self.producer;
505        let tail = p.buffer.increment(p.cached_tail.get(), n);
506        p.buffer.tail.store(tail, Ordering::Release);
507        p.cached_tail.set(tail);
508        n
509    }
510
511    /// Moves items from an iterator into the (uninitialized) slots of the chunk.
512    ///
513    /// The number of moved items is returned.
514    ///
515    /// All moved items are automatically made availabe to be read by the [`Consumer`].
516    ///
517    /// # Examples
518    ///
519    /// If the iterator contains too few items, only a part of the chunk
520    /// is made available for reading:
521    ///
522    /// ```
523    /// use rtrb_basedrop::{RingBuffer, PopError};
524    /// use basedrop::Collector;
525    ///
526    /// let collector = Collector::new();
527    ///
528    /// let (mut p, mut c) = RingBuffer::new(4, &collector.handle());
529    ///
530    /// if let Ok(chunk) = p.write_chunk_uninit(3) {
531    ///     assert_eq!(chunk.fill_from_iter([10, 20]), 2);
532    /// } else {
533    ///     unreachable!();
534    /// }
535    /// assert_eq!(p.slots(), 2);
536    /// assert_eq!(c.pop(), Ok(10));
537    /// assert_eq!(c.pop(), Ok(20));
538    /// assert_eq!(c.pop(), Err(PopError::Empty));
539    /// ```
540    ///
541    /// If the chunk size is too small, some items may remain in the iterator.
542    /// To be able to keep using the iterator after the call,
543    /// `&mut` (or [`Iterator::by_ref()`]) can be used.
544    ///
545    /// ```
546    /// use rtrb_basedrop::{RingBuffer, PopError};
547    /// use basedrop::Collector;
548    ///
549    /// let collector = Collector::new();
550    ///
551    /// let (mut p, mut c) = RingBuffer::new(4, &collector.handle());
552    ///
553    /// let mut it = vec![10, 20, 30].into_iter();
554    /// if let Ok(chunk) = p.write_chunk_uninit(2) {
555    ///     assert_eq!(chunk.fill_from_iter(&mut it), 2);
556    /// } else {
557    ///     unreachable!();
558    /// }
559    /// assert_eq!(c.pop(), Ok(10));
560    /// assert_eq!(c.pop(), Ok(20));
561    /// assert_eq!(c.pop(), Err(PopError::Empty));
562    /// assert_eq!(it.next(), Some(30));
563    /// ```
564    pub fn fill_from_iter<I>(self, iter: I) -> usize
565    where
566        I: IntoIterator<Item = T>,
567    {
568        let mut iter = iter.into_iter();
569        let mut iterated = 0;
570        'outer: for &(ptr, len) in &[
571            (self.first_ptr, self.first_len),
572            (self.second_ptr, self.second_len),
573        ] {
574            for i in 0..len {
575                match iter.next() {
576                    Some(item) => {
577                        // Safety: It is allowed to write to this memory slot
578                        unsafe {
579                            ptr.add(i).write(item);
580                        }
581                        iterated += 1;
582                    }
583                    None => break 'outer,
584                }
585            }
586        }
587        // Safety: iterated slots have been initialized above
588        unsafe { self.commit_unchecked(iterated) }
589    }
590
591    /// Returns the number of slots in the chunk.
592    #[must_use]
593    pub fn len(&self) -> usize {
594        self.first_len + self.second_len
595    }
596
597    /// Returns `true` if the chunk contains no slots.
598    #[must_use]
599    pub fn is_empty(&self) -> bool {
600        self.first_len == 0
601    }
602
603    /// Drops all elements starting from index `n`.
604    ///
605    /// All of those slots must be initialized.
606    unsafe fn drop_suffix(&mut self, n: usize) {
607        // NB: If n >= self.len(), the loops are not entered.
608        for i in n..self.first_len {
609            self.first_ptr.add(i).drop_in_place();
610        }
611        for i in n.saturating_sub(self.first_len)..self.second_len {
612            self.second_ptr.add(i).drop_in_place();
613        }
614    }
615}
616
617/// Structure for reading from multiple slots in one go.
618///
619/// This is returned from [`Consumer::read_chunk()`].
620#[derive(Debug)]
621pub struct ReadChunk<'a, T: Send + 'static> {
622    // Must be "mut" for drop_in_place()
623    first_ptr: *mut T,
624    first_len: usize,
625    // Must be "mut" for drop_in_place()
626    second_ptr: *mut T,
627    second_len: usize,
628    consumer: &'a Consumer<T>,
629}
630
631// ReadChunk only exists while a unique reference to the Consumer is held.
632// It is therefore safe to move it to another thread.
633unsafe impl<T: Send + 'static> Send for ReadChunk<'_, T> {}
634
635impl<T: Send + 'static> ReadChunk<'_, T> {
636    /// Returns two slices for reading from the requested slots.
637    ///
638    /// The first slice can only be empty if `0` slots have been requested.
639    /// If the first slice contains all requested slots, the second one is empty.
640    ///
641    /// The provided slots are *not* automatically made available
642    /// to be written again by the [`Producer`].
643    /// This has to be explicitly done by calling [`commit()`](ReadChunk::commit)
644    /// or [`commit_all()`](ReadChunk::commit_all).
645    /// Note that this runs the destructor of the committed items (if `T` implements [`Drop`]).
646    /// You can "peek" at the contained values by simply not calling any of the "commit" methods.
647    #[must_use]
648    pub fn as_slices(&self) -> (&[T], &[T]) {
649        (
650            unsafe { core::slice::from_raw_parts(self.first_ptr, self.first_len) },
651            unsafe { core::slice::from_raw_parts(self.second_ptr, self.second_len) },
652        )
653    }
654
655    /// Drops the first `n` slots of the chunk, making the space available for writing again.
656    ///
657    /// # Panics
658    ///
659    /// Panics if `n` is greater than the number of slots in the chunk.
660    ///
661    /// # Examples
662    ///
663    /// The following example shows that items are dropped when "committed"
664    /// (which is only relevant if `T` implements [`Drop`]).
665    ///
666    /// ```
667    /// use rtrb_basedrop::RingBuffer;
668    /// use basedrop::Collector;
669    ///
670    /// let mut collector = Collector::new();
671    ///
672    /// // Static variable to count all drop() invocations
673    /// static mut DROP_COUNT: i32 = 0;
674    /// #[derive(Debug)]
675    /// struct Thing;
676    /// impl Drop for Thing {
677    ///     fn drop(&mut self) { unsafe { DROP_COUNT += 1; } }
678    /// }
679    ///
680    /// // Scope to limit lifetime of ring buffer
681    /// {
682    ///     let (mut p, mut c) = RingBuffer::new(2, &collector.handle());
683    ///
684    ///     assert!(p.push(Thing).is_ok()); // 1
685    ///     assert!(p.push(Thing).is_ok()); // 2
686    ///     if let Ok(thing) = c.pop() {
687    ///         // "thing" has been *moved* out of the queue but not yet dropped
688    ///         collector.collect();
689    ///         assert_eq!(unsafe { DROP_COUNT }, 0);
690    ///     } else {
691    ///         unreachable!();
692    ///     }
693    ///     // First Thing has been dropped when "thing" went out of scope:
694    ///     collector.collect();
695    ///     assert_eq!(unsafe { DROP_COUNT }, 1);
696    ///     assert!(p.push(Thing).is_ok()); // 3
697    ///
698    ///     if let Ok(chunk) = c.read_chunk(2) {
699    ///         assert_eq!(chunk.len(), 2);
700    ///         collector.collect();
701    ///         assert_eq!(unsafe { DROP_COUNT }, 1);
702    ///         chunk.commit(1); // Drops only one of the two Things
703    ///         collector.collect();
704    ///         assert_eq!(unsafe { DROP_COUNT }, 2);
705    ///     } else {
706    ///         unreachable!();
707    ///     }
708    ///     // The last Thing is still in the queue ...
709    ///     collector.collect();
710    ///     assert_eq!(unsafe { DROP_COUNT }, 2);
711    /// }
712    /// // ... and it is dropped when the ring buffer goes out of scope:
713    /// collector.collect();
714    /// assert_eq!(unsafe { DROP_COUNT }, 3);
715    /// ```
716    pub fn commit(self, n: usize) {
717        assert!(n <= self.len(), "cannot commit more than chunk size");
718        unsafe { self.commit_unchecked(n) };
719    }
720
721    /// Drops all slots of the chunk, making the space available for writing again.
722    pub fn commit_all(self) {
723        let slots = self.len();
724        unsafe { self.commit_unchecked(slots) };
725    }
726
727    unsafe fn commit_unchecked(self, n: usize) -> usize {
728        let first_len = self.first_len.min(n);
729        for i in 0..first_len {
730            self.first_ptr.add(i).drop_in_place();
731        }
732        let second_len = self.second_len.min(n - first_len);
733        for i in 0..second_len {
734            self.second_ptr.add(i).drop_in_place();
735        }
736        let c = self.consumer;
737        let head = c.buffer.increment(c.cached_head.get(), n);
738        c.buffer.head.store(head, Ordering::Release);
739        c.cached_head.set(head);
740        n
741    }
742
743    /// Returns the number of slots in the chunk.
744    #[must_use]
745    pub fn len(&self) -> usize {
746        self.first_len + self.second_len
747    }
748
749    /// Returns `true` if the chunk contains no slots.
750    #[must_use]
751    pub fn is_empty(&self) -> bool {
752        self.first_len == 0
753    }
754}
755
756impl<'a, T: Send + 'static> IntoIterator for ReadChunk<'a, T> {
757    type Item = T;
758    type IntoIter = ReadChunkIntoIter<'a, T>;
759
760    /// Turns a [`ReadChunk`] into an iterator.
761    ///
762    /// When the iterator is dropped, all iterated slots are made available for writing again.
763    /// Non-iterated items remain in the ring buffer.
764    fn into_iter(self) -> Self::IntoIter {
765        Self::IntoIter {
766            chunk: self,
767            iterated: 0,
768        }
769    }
770}
771
772/// An iterator that moves out of a [`ReadChunk`].
773///
774/// This `struct` is created by the [`into_iter()`](ReadChunk::into_iter) method
775/// on [`ReadChunk`] (provided by the [`IntoIterator`] trait).
776///
777/// When this `struct` is dropped, the iterated slots are made available for writing again.
778/// Non-iterated items remain in the ring buffer.
779#[derive(Debug)]
780pub struct ReadChunkIntoIter<'a, T: Send + 'static> {
781    chunk: ReadChunk<'a, T>,
782    iterated: usize,
783}
784
785impl<'a, T: Send + 'static> Drop for ReadChunkIntoIter<'a, T> {
786    /// Makes all iterated slots available for writing again.
787    ///
788    /// Non-iterated items remain in the ring buffer and are *not* dropped.
789    fn drop(&mut self) {
790        let c = &self.chunk.consumer;
791        let head = c.buffer.increment(c.cached_head.get(), self.iterated);
792        c.buffer.head.store(head, Ordering::Release);
793        c.cached_head.set(head);
794    }
795}
796
797impl<'a, T: Send + 'static> Iterator for ReadChunkIntoIter<'a, T> {
798    type Item = T;
799
800    fn next(&mut self) -> Option<Self::Item> {
801        let ptr = if self.iterated < self.chunk.first_len {
802            unsafe { self.chunk.first_ptr.add(self.iterated) }
803        } else if self.iterated < self.chunk.first_len + self.chunk.second_len {
804            unsafe {
805                self.chunk
806                    .second_ptr
807                    .add(self.iterated - self.chunk.first_len)
808            }
809        } else {
810            return None;
811        };
812        self.iterated += 1;
813        Some(unsafe { ptr.read() })
814    }
815
816    fn size_hint(&self) -> (usize, Option<usize>) {
817        let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated;
818        (remaining, Some(remaining))
819    }
820}
821
822impl<'a, T: Send + 'static> ExactSizeIterator for ReadChunkIntoIter<'a, T> {}
823
824impl<'a, T: Send + 'static> core::iter::FusedIterator for ReadChunkIntoIter<'a, T> {}
825
826#[cfg(feature = "std")]
827impl std::io::Write for Producer<u8> {
828    #[inline]
829    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
830        use ChunkError::TooFewSlots;
831        let mut chunk = match self.write_chunk_uninit(buf.len()) {
832            Ok(chunk) => chunk,
833            Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
834            Err(TooFewSlots(n)) => self.write_chunk_uninit(n).unwrap(),
835        };
836        let end = chunk.len();
837        let (first, second) = chunk.as_mut_slices();
838        let mid = first.len();
839        // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
840        buf[..mid].copy_to_uninit(first);
841        buf[mid..end].copy_to_uninit(second);
842        // Safety: All slots have been initialized
843        unsafe {
844            chunk.commit_all();
845        }
846        Ok(end)
847    }
848
849    #[inline]
850    fn flush(&mut self) -> std::io::Result<()> {
851        // Nothing to do here.
852        Ok(())
853    }
854}
855
856#[cfg(feature = "std")]
857impl std::io::Read for Consumer<u8> {
858    #[inline]
859    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
860        use ChunkError::TooFewSlots;
861        let chunk = match self.read_chunk(buf.len()) {
862            Ok(chunk) => chunk,
863            Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
864            Err(TooFewSlots(n)) => self.read_chunk(n).unwrap(),
865        };
866        let (first, second) = chunk.as_slices();
867        let mid = first.len();
868        let end = chunk.len();
869        // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
870        buf[..mid].copy_from_slice(first);
871        buf[mid..end].copy_from_slice(second);
872        chunk.commit_all();
873        Ok(end)
874    }
875}
876
877/// Error type for [`Consumer::read_chunk()`], [`Producer::write_chunk()`]
878/// and [`Producer::write_chunk_uninit()`].
879#[derive(Debug, Copy, Clone, PartialEq, Eq)]
880pub enum ChunkError {
881    /// Fewer than the requested number of slots were available.
882    ///
883    /// Contains the number of slots that were available.
884    TooFewSlots(usize),
885}
886
887#[cfg(feature = "std")]
888impl std::error::Error for ChunkError {}
889
890impl fmt::Display for ChunkError {
891    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892        match self {
893            ChunkError::TooFewSlots(n) => {
894                alloc::format!("only {} slots available in ring buffer", n).fmt(f)
895            }
896        }
897    }
898}