rtrb/
chunks.rs

1//! Writing and reading multiple items at once into and from a [`RingBuffer`].
2//!
3//! Multiple items at once can be moved from an iterator into the ring buffer by using
4//! [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`].
5//! Alternatively, mutable access to the (uninitialized) slots of the chunk can be obtained with
6//! [`WriteChunkUninit::as_mut_slices()`], which requires writing some `unsafe` code.
7//! To avoid that, [`Producer::write_chunk()`] can be used,
8//! which initializes all slots with their [`Default`] value
9//! and provides mutable access by means of [`WriteChunk::as_mut_slices()`].
10//!
11//! Multiple items at once can be moved out of the ring buffer by using
12//! [`Consumer::read_chunk()`] and iterating over the returned [`ReadChunk`]
13//! (or by explicitly calling [`ReadChunk::into_iter()`]).
14//! Immutable access to the slots of the chunk can be obtained with [`ReadChunk::as_slices()`].
15//!
16//! # Examples
17//!
18//! This example uses a single thread for simplicity, but in a real application,
19//! `producer` and `consumer` would of course live on different threads:
20//!
21//! ```
22//! use rtrb::RingBuffer;
23//!
24//! let (mut producer, mut consumer) = RingBuffer::new(5);
25//!
26//! if let Ok(chunk) = producer.write_chunk_uninit(4) {
27//!     chunk.fill_from_iter([10, 11, 12]);
28//!     // Note that we requested 4 slots but we've only written to 3 of them!
29//! } else {
30//!     unreachable!();
31//! }
32//!
33//! assert_eq!(producer.slots(), 2);
34//! assert_eq!(consumer.slots(), 3);
35//!
36//! if let Ok(chunk) = consumer.read_chunk(2) {
37//!     assert_eq!(chunk.into_iter().collect::<Vec<_>>(), [10, 11]);
38//! } else {
39//!     unreachable!();
40//! }
41//!
42//! // One element is still in the queue:
43//! assert_eq!(consumer.peek(), Ok(&12));
44//!
45//! let data = vec![20, 21, 22, 23];
46//! // NB: write_chunk_uninit() could be used for possibly better performance:
47//! if let Ok(mut chunk) = producer.write_chunk(4) {
48//!     let (first, second) = chunk.as_mut_slices();
49//!     let mid = first.len();
50//!     first.copy_from_slice(&data[..mid]);
51//!     second.copy_from_slice(&data[mid..]);
52//!     chunk.commit_all();
53//! } else {
54//!     unreachable!();
55//! }
56//!
57//! assert!(producer.is_full());
58//! assert_eq!(consumer.slots(), 5);
59//!
60//! let mut v = Vec::<i32>::with_capacity(5);
61//! if let Ok(chunk) = consumer.read_chunk(5) {
62//!     let (first, second) = chunk.as_slices();
63//!     v.extend(first);
64//!     v.extend(second);
65//!     chunk.commit_all();
66//! } else {
67//!     unreachable!();
68//! }
69//! assert_eq!(v, [12, 20, 21, 22, 23]);
70//! assert!(consumer.is_empty());
71//! ```
72//!
73//! The iterator API can be used to move items from one ring buffer to another:
74//!
75//! ```
76//! use rtrb::{Consumer, Producer};
77//!
78//! fn move_items<T>(src: &mut Consumer<T>, dst: &mut Producer<T>) -> usize {
79//!     let n = src.slots().min(dst.slots());
80//!     dst.write_chunk_uninit(n).unwrap().fill_from_iter(src.read_chunk(n).unwrap())
81//! }
82//! ```
83//!
84//! ## Common Access Patterns
85//!
86//! The following examples show the [`Producer`] side;
87//! similar patterns can of course be used with [`Consumer::read_chunk()`] as well.
88//! Furthermore, the examples use [`Producer::write_chunk_uninit()`],
89//! along with a bit of `unsafe` code.
90//! To avoid this, you can use [`Producer::write_chunk()`] instead,
91//! which requires the trait bound `T: Default` and will lead to a small runtime overhead.
92//!
93//! Copy a whole slice of items into the ring buffer, but only if space permits
94//! (if not, the entire input slice is returned as an error):
95//!
96//! ```
97//! use rtrb::{Producer, CopyToUninit};
98//!
99//! fn push_entire_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]>
100//! where
101//!     T: Copy,
102//! {
103//!     if let Ok(mut chunk) = queue.write_chunk_uninit(slice.len()) {
104//!         let (first, second) = chunk.as_mut_slices();
105//!         let mid = first.len();
106//!         slice[..mid].copy_to_uninit(first);
107//!         slice[mid..].copy_to_uninit(second);
108//!         // SAFETY: All slots have been initialized
109//!         unsafe { chunk.commit_all() };
110//!         Ok(())
111//!     } else {
112//!         Err(slice)
113//!     }
114//! }
115//! ```
116//!
117//! Copy as many items as possible from a given slice, returning the number of copied items:
118//!
119//! ```
120//! use rtrb::{Producer, CopyToUninit, chunks::ChunkError::TooFewSlots};
121//!
122//! fn push_partial_slice<T>(queue: &mut Producer<T>, slice: &[T]) -> usize
123//! where
124//!     T: Copy,
125//! {
126//!     let mut chunk = match queue.write_chunk_uninit(slice.len()) {
127//!         Ok(chunk) => chunk,
128//!         // Remaining slots are returned, this will always succeed:
129//!         Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
130//!     };
131//!     let end = chunk.len();
132//!     let (first, second) = chunk.as_mut_slices();
133//!     let mid = first.len();
134//!     slice[..mid].copy_to_uninit(first);
135//!     slice[mid..end].copy_to_uninit(second);
136//!     // SAFETY: All slots have been initialized
137//!     unsafe { chunk.commit_all() };
138//!     end
139//! }
140//! ```
141//!
142//! Write as many slots as possible, given an iterator
143//! (and return the number of written slots):
144//!
145//! ```
146//! use rtrb::{Producer, chunks::ChunkError::TooFewSlots};
147//!
148//! fn push_from_iter<T, I>(queue: &mut Producer<T>, iter: I) -> usize
149//! where
150//!     T: Default,
151//!     I: IntoIterator<Item = T>,
152//! {
153//!     let iter = iter.into_iter();
154//!     let n = match iter.size_hint() {
155//!         (_, None) => queue.slots(),
156//!         (_, Some(n)) => n,
157//!     };
158//!     let chunk = match queue.write_chunk_uninit(n) {
159//!         Ok(chunk) => chunk,
160//!         // Remaining slots are returned, this will always succeed:
161//!         Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
162//!     };
163//!     chunk.fill_from_iter(iter)
164//! }
165//! ```
166
167use core::fmt;
168use core::marker::PhantomData;
169use core::mem::MaybeUninit;
170use core::sync::atomic::Ordering;
171
172use crate::{Consumer, CopyToUninit, Producer};
173
174// This is used in the documentation.
175#[allow(unused_imports)]
176use crate::RingBuffer;
177
178impl<T> Producer<T> {
179    /// Returns `n` slots (initially containing their [`Default`] value) for writing.
180    ///
181    /// [`WriteChunk::as_mut_slices()`] provides mutable access to the slots.
182    /// After writing to those slots, they explicitly have to be made available
183    /// to be read by the [`Consumer`] by calling [`WriteChunk::commit()`]
184    /// or [`WriteChunk::commit_all()`].
185    ///
186    /// For an alternative that does not require the trait bound [`Default`],
187    /// see [`Producer::write_chunk_uninit()`].
188    ///
189    /// If items are supposed to be moved from an iterator into the ring buffer,
190    /// [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`]
191    /// can be used.
192    ///
193    /// # Errors
194    ///
195    /// If not enough slots are available, an error
196    /// (containing the number of available slots) is returned.
197    /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
198    ///
199    /// # Examples
200    ///
201    /// See the documentation of the [`chunks`](crate::chunks#examples) module.
202    pub fn write_chunk(&mut self, n: usize) -> Result<WriteChunk<'_, T>, ChunkError>
203    where
204        T: Default,
205    {
206        self.write_chunk_uninit(n).map(WriteChunk::from)
207    }
208
209    /// Returns `n` (uninitialized) slots for writing.
210    ///
211    /// [`WriteChunkUninit::as_mut_slices()`] provides mutable access
212    /// to the uninitialized slots.
213    /// After writing to those slots, they explicitly have to be made available
214    /// to be read by the [`Consumer`] by calling [`WriteChunkUninit::commit()`]
215    /// or [`WriteChunkUninit::commit_all()`].
216    ///
217    /// Alternatively, [`WriteChunkUninit::fill_from_iter()`] can be used
218    /// to move items from an iterator into the available slots.
219    /// All moved items are automatically made available to be read by the [`Consumer`].
220    ///
221    /// # Errors
222    ///
223    /// If not enough slots are available, an error
224    /// (containing the number of available slots) is returned.
225    /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
226    ///
227    /// # Safety
228    ///
229    /// This function itself is safe, as is [`WriteChunkUninit::fill_from_iter()`].
230    /// However, when using [`WriteChunkUninit::as_mut_slices()`],
231    /// the user has to make sure that the relevant slots have been initialized
232    /// before calling [`WriteChunkUninit::commit()`] or [`WriteChunkUninit::commit_all()`].
233    ///
234    /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
235    /// see [`Producer::write_chunk()`].
236    pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
237        let tail = self.cached_tail.get();
238
239        // Check if the queue has *possibly* not enough slots.
240        if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
241            // Refresh the head ...
242            let head = self.buffer.head.load(Ordering::Acquire);
243            self.cached_head.set(head);
244
245            // ... and check if there *really* are not enough slots.
246            let slots = self.buffer.capacity - self.buffer.distance(head, tail);
247            if slots < n {
248                return Err(ChunkError::TooFewSlots(slots));
249            }
250        }
251        let tail = self.buffer.collapse_position(tail);
252        let first_len = n.min(self.buffer.capacity - tail);
253        Ok(WriteChunkUninit {
254            // SAFETY: tail has been updated to a valid position.
255            first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
256            first_len,
257            second_ptr: self.buffer.data_ptr,
258            second_len: n - first_len,
259            producer: self,
260        })
261    }
262}
263
264impl<T> Consumer<T> {
265    /// Returns `n` slots for reading.
266    ///
267    /// [`ReadChunk::as_slices()`] provides immutable access to the slots.
268    /// After reading from those slots, they explicitly have to be made available
269    /// to be written again by the [`Producer`] by calling [`ReadChunk::commit()`]
270    /// or [`ReadChunk::commit_all()`].
271    ///
272    /// Alternatively, items can be moved out of the [`ReadChunk`] using iteration
273    /// because it implements [`IntoIterator`]
274    /// ([`ReadChunk::into_iter()`] can be used to explicitly turn it into an [`Iterator`]).
275    /// All moved items are automatically made available to be written again by the [`Producer`].
276    ///
277    /// # Errors
278    ///
279    /// If not enough slots are available, an error
280    /// (containing the number of available slots) is returned.
281    /// Use [`Consumer::slots()`] to obtain the number of available slots beforehand.
282    ///
283    /// # Examples
284    ///
285    /// See the documentation of the [`chunks`](crate::chunks#examples) module.
286    pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
287        let head = self.cached_head.get();
288
289        // Check if the queue has *possibly* not enough slots.
290        if self.buffer.distance(head, self.cached_tail.get()) < n {
291            // Refresh the tail ...
292            let tail = self.buffer.tail.load(Ordering::Acquire);
293            self.cached_tail.set(tail);
294
295            // ... and check if there *really* are not enough slots.
296            let slots = self.buffer.distance(head, tail);
297            if slots < n {
298                return Err(ChunkError::TooFewSlots(slots));
299            }
300        }
301
302        let head = self.buffer.collapse_position(head);
303        let first_len = n.min(self.buffer.capacity - head);
304        Ok(ReadChunk {
305            // SAFETY: head has been updated to a valid position.
306            first_ptr: unsafe { self.buffer.data_ptr.add(head) },
307            first_len,
308            second_ptr: self.buffer.data_ptr,
309            second_len: n - first_len,
310            consumer: self,
311        })
312    }
313}
314
315/// Structure for writing into multiple ([`Default`]-initialized) slots in one go.
316///
317/// This is returned from [`Producer::write_chunk()`].
318///
319/// To obtain uninitialized slots, use [`Producer::write_chunk_uninit()`] instead,
320/// which also allows moving items from an iterator into the ring buffer
321/// by means of [`WriteChunkUninit::fill_from_iter()`].
322#[derive(Debug, PartialEq, Eq)]
323pub struct WriteChunk<'a, T>(Option<WriteChunkUninit<'a, T>>, PhantomData<T>);
324
325impl<T> Drop for WriteChunk<'_, T> {
326    fn drop(&mut self) {
327        // NB: If `commit()` or `commit_all()` has been called, `self.0` is `None`.
328        if let Some(mut chunk) = self.0.take() {
329            // No part of the chunk has been committed, all slots are dropped.
330            // SAFETY: All slots have been initialized in From::from().
331            unsafe { chunk.drop_suffix(0) };
332        }
333    }
334}
335
336impl<'a, T> From<WriteChunkUninit<'a, T>> for WriteChunk<'a, T>
337where
338    T: Default,
339{
340    /// Fills all slots with the [`Default`] value.
341    fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
342        for i in 0..chunk.first_len {
343            // SAFETY: i is in a valid range.
344            unsafe { chunk.first_ptr.add(i).write(Default::default()) };
345        }
346        for i in 0..chunk.second_len {
347            // SAFETY: i is in a valid range.
348            unsafe { chunk.second_ptr.add(i).write(Default::default()) };
349        }
350        WriteChunk(Some(chunk), PhantomData)
351    }
352}
353
354impl<T> WriteChunk<'_, T>
355where
356    T: Default,
357{
358    /// Returns two slices for writing to the requested slots.
359    ///
360    /// All slots are initially filled with their [`Default`] value.
361    ///
362    /// The first slice can only be empty if `0` slots have been requested.
363    /// If the first slice contains all requested slots, the second one is empty.
364    ///
365    /// After writing to the slots, they are *not* automatically made available
366    /// to be read by the [`Consumer`].
367    /// This has to be explicitly done by calling [`commit()`](WriteChunk::commit)
368    /// or [`commit_all()`](WriteChunk::commit_all).
369    /// If items are written but *not* committed afterwards,
370    /// they will *not* become available for reading and
371    /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
372    pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
373        // self.0 is always Some(chunk).
374        let chunk = self.0.as_ref().unwrap();
375        // SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit()
376        // and all slots have been initialized in From::from().
377        unsafe {
378            (
379                core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
380                core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len),
381            )
382        }
383    }
384
385    /// Makes the first `n` slots of the chunk available for reading.
386    ///
387    /// The rest of the chunk is dropped.
388    ///
389    /// # Panics
390    ///
391    /// Panics if `n` is greater than the number of slots in the chunk.
392    pub fn commit(mut self, n: usize) {
393        // self.0 is always Some(chunk).
394        let mut chunk = self.0.take().unwrap();
395        // SAFETY: All slots have been initialized in From::from().
396        unsafe {
397            // Slots at index `n` and higher are dropped ...
398            chunk.drop_suffix(n);
399            // ... everything below `n` is committed.
400            chunk.commit(n);
401        }
402        // `self` is dropped here, with `self.0` being set to `None`.
403    }
404
405    /// Makes the whole chunk available for reading.
406    pub fn commit_all(mut self) {
407        // self.0 is always Some(chunk).
408        let chunk = self.0.take().unwrap();
409        // SAFETY: All slots have been initialized in From::from().
410        unsafe { chunk.commit_all() };
411        // `self` is dropped here, with `self.0` being set to `None`.
412    }
413
414    /// Returns the number of slots in the chunk.
415    #[must_use]
416    pub fn len(&self) -> usize {
417        // self.0 is always Some(chunk).
418        self.0.as_ref().unwrap().len()
419    }
420
421    /// Returns `true` if the chunk contains no slots.
422    #[must_use]
423    pub fn is_empty(&self) -> bool {
424        // self.0 is always Some(chunk).
425        self.0.as_ref().unwrap().is_empty()
426    }
427}
428
429/// Structure for writing into multiple (uninitialized) slots in one go.
430///
431/// This is returned from [`Producer::write_chunk_uninit()`].
432#[derive(Debug, PartialEq, Eq)]
433pub struct WriteChunkUninit<'a, T> {
434    first_ptr: *mut T,
435    first_len: usize,
436    second_ptr: *mut T,
437    second_len: usize,
438    producer: &'a Producer<T>,
439}
440
441// SAFETY: WriteChunkUninit only exists while a unique reference to the Producer is held.
442// It is therefore safe to move it to another thread.
443unsafe impl<T: Send> Send for WriteChunkUninit<'_, T> {}
444
445impl<T> WriteChunkUninit<'_, T> {
446    /// Returns two slices for writing to the requested slots.
447    ///
448    /// The first slice can only be empty if `0` slots have been requested.
449    /// If the first slice contains all requested slots, the second one is empty.
450    ///
451    /// The extension trait [`CopyToUninit`] can be used to safely copy data into those slices.
452    ///
453    /// After writing to the slots, they are *not* automatically made available
454    /// to be read by the [`Consumer`].
455    /// This has to be explicitly done by calling [`commit()`](WriteChunkUninit::commit)
456    /// or [`commit_all()`](WriteChunkUninit::commit_all).
457    /// If items are written but *not* committed afterwards,
458    /// they will *not* become available for reading and
459    /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
460    pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
461        // SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit().
462        unsafe {
463            (
464                core::slice::from_raw_parts_mut(self.first_ptr.cast(), self.first_len),
465                core::slice::from_raw_parts_mut(self.second_ptr.cast(), self.second_len),
466            )
467        }
468    }
469
470    /// Makes the first `n` slots of the chunk available for reading.
471    ///
472    /// # Panics
473    ///
474    /// Panics if `n` is greater than the number of slots in the chunk.
475    ///
476    /// # Safety
477    ///
478    /// The caller must make sure that the first `n` elements have been initialized.
479    pub unsafe fn commit(self, n: usize) {
480        assert!(n <= self.len(), "cannot commit more than chunk size");
481        // SAFETY: Delegated to the caller.
482        unsafe { self.commit_unchecked(n) };
483    }
484
485    /// Makes the whole chunk available for reading.
486    ///
487    /// # Safety
488    ///
489    /// The caller must make sure that all elements have been initialized.
490    pub unsafe fn commit_all(self) {
491        let slots = self.len();
492        // SAFETY: Delegated to the caller.
493        unsafe { self.commit_unchecked(slots) };
494    }
495
496    unsafe fn commit_unchecked(self, n: usize) -> usize {
497        let p = self.producer;
498        let tail = p.buffer.increment(p.cached_tail.get(), n);
499        p.buffer.tail.store(tail, Ordering::Release);
500        p.cached_tail.set(tail);
501        n
502    }
503
504    /// Moves items from an iterator into the (uninitialized) slots of the chunk.
505    ///
506    /// The number of moved items is returned.
507    ///
508    /// All moved items are automatically made availabe to be read by the [`Consumer`].
509    ///
510    /// # Examples
511    ///
512    /// If the iterator contains too few items, only a part of the chunk
513    /// is made available for reading:
514    ///
515    /// ```
516    /// use rtrb::{RingBuffer, PopError};
517    ///
518    /// let (mut p, mut c) = RingBuffer::new(4);
519    ///
520    /// if let Ok(chunk) = p.write_chunk_uninit(3) {
521    ///     assert_eq!(chunk.fill_from_iter([10, 20]), 2);
522    /// } else {
523    ///     unreachable!();
524    /// }
525    /// assert_eq!(p.slots(), 2);
526    /// assert_eq!(c.pop(), Ok(10));
527    /// assert_eq!(c.pop(), Ok(20));
528    /// assert_eq!(c.pop(), Err(PopError::Empty));
529    /// ```
530    ///
531    /// If the chunk size is too small, some items may remain in the iterator.
532    /// To be able to keep using the iterator after the call,
533    /// `&mut` (or [`Iterator::by_ref()`]) can be used.
534    ///
535    /// ```
536    /// use rtrb::{RingBuffer, PopError};
537    ///
538    /// let (mut p, mut c) = RingBuffer::new(4);
539    ///
540    /// let mut it = vec![10, 20, 30].into_iter();
541    /// if let Ok(chunk) = p.write_chunk_uninit(2) {
542    ///     assert_eq!(chunk.fill_from_iter(&mut it), 2);
543    /// } else {
544    ///     unreachable!();
545    /// }
546    /// assert_eq!(c.pop(), Ok(10));
547    /// assert_eq!(c.pop(), Ok(20));
548    /// assert_eq!(c.pop(), Err(PopError::Empty));
549    /// assert_eq!(it.next(), Some(30));
550    /// ```
551    pub fn fill_from_iter<I>(self, iter: I) -> usize
552    where
553        I: IntoIterator<Item = T>,
554    {
555        let mut iter = iter.into_iter();
556        let mut iterated = 0;
557        'outer: for &(ptr, len) in &[
558            (self.first_ptr, self.first_len),
559            (self.second_ptr, self.second_len),
560        ] {
561            for i in 0..len {
562                match iter.next() {
563                    Some(item) => {
564                        // SAFETY: It is allowed to write to this memory slot
565                        unsafe { ptr.add(i).write(item) };
566                        iterated += 1;
567                    }
568                    None => break 'outer,
569                }
570            }
571        }
572        // SAFETY: iterated slots have been initialized above
573        unsafe { self.commit_unchecked(iterated) }
574    }
575
576    /// Returns the number of slots in the chunk.
577    #[must_use]
578    pub fn len(&self) -> usize {
579        self.first_len + self.second_len
580    }
581
582    /// Returns `true` if the chunk contains no slots.
583    #[must_use]
584    pub fn is_empty(&self) -> bool {
585        self.first_len == 0
586    }
587
588    /// Drops all elements starting from index `n`.
589    ///
590    /// All of those slots must be initialized.
591    unsafe fn drop_suffix(&mut self, n: usize) {
592        // NB: If n >= self.len(), the loops are not entered.
593        for i in n..self.first_len {
594            // SAFETY: The caller must make sure that all slots are initialized.
595            unsafe { self.first_ptr.add(i).drop_in_place() };
596        }
597        for i in n.saturating_sub(self.first_len)..self.second_len {
598            // SAFETY: The caller must make sure that all slots are initialized.
599            unsafe { self.second_ptr.add(i).drop_in_place() };
600        }
601    }
602}
603
604/// Structure for reading from multiple slots in one go.
605///
606/// This is returned from [`Consumer::read_chunk()`].
607#[derive(Debug, PartialEq, Eq)]
608pub struct ReadChunk<'a, T> {
609    // Must be "mut" for drop_in_place()
610    first_ptr: *mut T,
611    first_len: usize,
612    // Must be "mut" for drop_in_place()
613    second_ptr: *mut T,
614    second_len: usize,
615    consumer: &'a Consumer<T>,
616}
617
618// SAFETY: ReadChunk only exists while a unique reference to the Consumer is held.
619// It is therefore safe to move it to another thread.
620unsafe impl<T: Send> Send for ReadChunk<'_, T> {}
621
622impl<T> ReadChunk<'_, T> {
623    /// Returns two slices for reading from the requested slots.
624    ///
625    /// The first slice can only be empty if `0` slots have been requested.
626    /// If the first slice contains all requested slots, the second one is empty.
627    ///
628    /// The provided slots are *not* automatically made available
629    /// to be written again by the [`Producer`].
630    /// This has to be explicitly done by calling [`commit()`](ReadChunk::commit)
631    /// or [`commit_all()`](ReadChunk::commit_all).
632    /// Note that this runs the destructor of the committed items (if `T` implements [`Drop`]).
633    /// You can "peek" at the contained values by simply not calling any of the "commit" methods.
634    #[must_use]
635    pub fn as_slices(&self) -> (&[T], &[T]) {
636        // SAFETY: The pointers and lengths have been computed correctly in read_chunk().
637        unsafe {
638            (
639                core::slice::from_raw_parts(self.first_ptr, self.first_len),
640                core::slice::from_raw_parts(self.second_ptr, self.second_len),
641            )
642        }
643    }
644
645    /// Returns two mutable slices for reading from the requested slots.
646    ///
647    /// This has the same semantics as [`as_slices()`](ReadChunk::as_slices),
648    /// except that it returns mutable slices and requires a mutable reference
649    /// to the chunk.
650    ///
651    /// In the vast majority of cases, mutable access is not required when
652    /// reading data and the immutable version should be preferred. However,
653    /// there are some scenarios where it might be desirable to perform
654    /// operations on the data in-place without copying it to a separate buffer
655    /// (e.g. streaming decryption), in which case this version can be used.
656    #[must_use]
657    pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
658        // SAFETY: The pointers and lengths have been computed correctly in read_chunk().
659        unsafe {
660            (
661                core::slice::from_raw_parts_mut(self.first_ptr, self.first_len),
662                core::slice::from_raw_parts_mut(self.second_ptr, self.second_len),
663            )
664        }
665    }
666
667    /// Drops the first `n` slots of the chunk, making the space available for writing again.
668    ///
669    /// # Panics
670    ///
671    /// Panics if `n` is greater than the number of slots in the chunk.
672    ///
673    /// # Examples
674    ///
675    /// The following example shows that items are dropped when "committed"
676    /// (which is only relevant if `T` implements [`Drop`]).
677    ///
678    /// ```
679    /// use rtrb::RingBuffer;
680    ///
681    /// // Static variable to count all drop() invocations
682    /// static mut DROP_COUNT: i32 = 0;
683    /// #[derive(Debug)]
684    /// struct Thing;
685    /// impl Drop for Thing {
686    ///     fn drop(&mut self) { unsafe { DROP_COUNT += 1; } }
687    /// }
688    ///
689    /// // Scope to limit lifetime of ring buffer
690    /// {
691    ///     let (mut p, mut c) = RingBuffer::new(2);
692    ///
693    ///     assert!(p.push(Thing).is_ok()); // 1
694    ///     assert!(p.push(Thing).is_ok()); // 2
695    ///     if let Ok(thing) = c.pop() {
696    ///         // "thing" has been *moved* out of the queue but not yet dropped
697    ///         assert_eq!(unsafe { DROP_COUNT }, 0);
698    ///     } else {
699    ///         unreachable!();
700    ///     }
701    ///     // First Thing has been dropped when "thing" went out of scope:
702    ///     assert_eq!(unsafe { DROP_COUNT }, 1);
703    ///     assert!(p.push(Thing).is_ok()); // 3
704    ///
705    ///     if let Ok(chunk) = c.read_chunk(2) {
706    ///         assert_eq!(chunk.len(), 2);
707    ///         assert_eq!(unsafe { DROP_COUNT }, 1);
708    ///         chunk.commit(1); // Drops only one of the two Things
709    ///         assert_eq!(unsafe { DROP_COUNT }, 2);
710    ///     } else {
711    ///         unreachable!();
712    ///     }
713    ///     // The last Thing is still in the queue ...
714    ///     assert_eq!(unsafe { DROP_COUNT }, 2);
715    /// }
716    /// // ... and it is dropped when the ring buffer goes out of scope:
717    /// assert_eq!(unsafe { DROP_COUNT }, 3);
718    /// ```
719    pub fn commit(self, n: usize) {
720        assert!(n <= self.len(), "cannot commit more than chunk size");
721        // SAFETY: self.len() initialized elements have been obtained in read_chunk().
722        unsafe { self.commit_unchecked(n) };
723    }
724
725    /// Drops all slots of the chunk, making the space available for writing again.
726    pub fn commit_all(self) {
727        let slots = self.len();
728        // SAFETY: self.len() initialized elements have been obtained in read_chunk().
729        unsafe { self.commit_unchecked(slots) };
730    }
731
732    unsafe fn commit_unchecked(self, n: usize) -> usize {
733        let first_len = self.first_len.min(n);
734        for i in 0..first_len {
735            // SAFETY: The caller must make sure that there are n initialized elements.
736            unsafe { self.first_ptr.add(i).drop_in_place() };
737        }
738        let second_len = self.second_len.min(n - first_len);
739        for i in 0..second_len {
740            // SAFETY: The caller must make sure that there are n initialized elements.
741            unsafe { self.second_ptr.add(i).drop_in_place() };
742        }
743        let c = self.consumer;
744        let head = c.buffer.increment(c.cached_head.get(), n);
745        c.buffer.head.store(head, Ordering::Release);
746        c.cached_head.set(head);
747        n
748    }
749
750    /// Returns the number of slots in the chunk.
751    #[must_use]
752    pub fn len(&self) -> usize {
753        self.first_len + self.second_len
754    }
755
756    /// Returns `true` if the chunk contains no slots.
757    #[must_use]
758    pub fn is_empty(&self) -> bool {
759        self.first_len == 0
760    }
761}
762
763impl<'a, T> IntoIterator for ReadChunk<'a, T> {
764    type Item = T;
765    type IntoIter = ReadChunkIntoIter<'a, T>;
766
767    /// Turns a [`ReadChunk`] into an iterator.
768    ///
769    /// When the iterator is dropped, all iterated slots are made available for writing again.
770    /// Non-iterated items remain in the ring buffer.
771    fn into_iter(self) -> Self::IntoIter {
772        Self::IntoIter {
773            chunk: self,
774            iterated: 0,
775        }
776    }
777}
778
779/// An iterator that moves out of a [`ReadChunk`].
780///
781/// This `struct` is created by the [`into_iter()`](ReadChunk::into_iter) method
782/// on [`ReadChunk`] (provided by the [`IntoIterator`] trait).
783///
784/// When this `struct` is dropped, the iterated slots are made available for writing again.
785/// Non-iterated items remain in the ring buffer.
786#[derive(Debug)]
787pub struct ReadChunkIntoIter<'a, T> {
788    chunk: ReadChunk<'a, T>,
789    iterated: usize,
790}
791
792impl<T> Drop for ReadChunkIntoIter<'_, T> {
793    /// Makes all iterated slots available for writing again.
794    ///
795    /// Non-iterated items remain in the ring buffer and are *not* dropped.
796    fn drop(&mut self) {
797        let c = &self.chunk.consumer;
798        let head = c.buffer.increment(c.cached_head.get(), self.iterated);
799        c.buffer.head.store(head, Ordering::Release);
800        c.cached_head.set(head);
801    }
802}
803
804impl<T> Iterator for ReadChunkIntoIter<'_, T> {
805    type Item = T;
806
807    #[inline]
808    fn next(&mut self) -> Option<Self::Item> {
809        let ptr = if self.iterated < self.chunk.first_len {
810            // SAFETY: first_len is valid.
811            unsafe { self.chunk.first_ptr.add(self.iterated) }
812        } else if self.iterated < self.chunk.first_len + self.chunk.second_len {
813            // SAFETY: first_len and second_len are valid.
814            unsafe {
815                self.chunk
816                    .second_ptr
817                    .add(self.iterated - self.chunk.first_len)
818            }
819        } else {
820            return None;
821        };
822        self.iterated += 1;
823        // SAFETY: ptr points to an initialized slot.
824        Some(unsafe { ptr.read() })
825    }
826
827    #[inline]
828    fn size_hint(&self) -> (usize, Option<usize>) {
829        let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated;
830        (remaining, Some(remaining))
831    }
832}
833
834impl<T> ExactSizeIterator for ReadChunkIntoIter<'_, T> {}
835
836impl<T> core::iter::FusedIterator for ReadChunkIntoIter<'_, T> {}
837
838#[cfg(feature = "std")]
839impl std::io::Write for Producer<u8> {
840    #[inline]
841    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
842        use ChunkError::TooFewSlots;
843        let mut chunk = match self.write_chunk_uninit(buf.len()) {
844            Ok(chunk) => chunk,
845            Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
846            Err(TooFewSlots(n)) => self.write_chunk_uninit(n).unwrap(),
847        };
848        let end = chunk.len();
849        let (first, second) = chunk.as_mut_slices();
850        let mid = first.len();
851        // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
852        buf[..mid].copy_to_uninit(first);
853        buf[mid..end].copy_to_uninit(second);
854        // SAFETY: All slots have been initialized
855        unsafe { chunk.commit_all() };
856        Ok(end)
857    }
858
859    #[inline]
860    fn flush(&mut self) -> std::io::Result<()> {
861        // Nothing to do here.
862        Ok(())
863    }
864}
865
866#[cfg(feature = "std")]
867impl std::io::Read for Consumer<u8> {
868    #[inline]
869    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
870        use ChunkError::TooFewSlots;
871        let chunk = match self.read_chunk(buf.len()) {
872            Ok(chunk) => chunk,
873            Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
874            Err(TooFewSlots(n)) => self.read_chunk(n).unwrap(),
875        };
876        let (first, second) = chunk.as_slices();
877        let mid = first.len();
878        let end = chunk.len();
879        // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
880        buf[..mid].copy_from_slice(first);
881        buf[mid..end].copy_from_slice(second);
882        chunk.commit_all();
883        Ok(end)
884    }
885}
886
887/// Error type for [`Consumer::read_chunk()`], [`Producer::write_chunk()`]
888/// and [`Producer::write_chunk_uninit()`].
889#[derive(Debug, Copy, Clone, PartialEq, Eq)]
890pub enum ChunkError {
891    /// Fewer than the requested number of slots were available.
892    ///
893    /// Contains the number of slots that were available.
894    TooFewSlots(usize),
895}
896
897#[cfg(feature = "std")]
898impl std::error::Error for ChunkError {}
899
900impl fmt::Display for ChunkError {
901    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902        match self {
903            ChunkError::TooFewSlots(n) => {
904                alloc::format!("only {} slots available in ring buffer", n).fmt(f)
905            }
906        }
907    }
908}