rtrb/chunks.rs
1//! Writing and reading multiple items at once into and from a [`RingBuffer`].
2//!
3//! Multiple items at once can be moved from an iterator into the ring buffer by using
4//! [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`].
5//! Alternatively, mutable access to the (uninitialized) slots of the chunk can be obtained with
6//! [`WriteChunkUninit::as_mut_slices()`], which requires writing some `unsafe` code.
7//! To avoid that, [`Producer::write_chunk()`] can be used,
8//! which initializes all slots with their [`Default`] value
9//! and provides mutable access by means of [`WriteChunk::as_mut_slices()`].
10//!
11//! Multiple items at once can be moved out of the ring buffer by using
12//! [`Consumer::read_chunk()`] and iterating over the returned [`ReadChunk`]
13//! (or by explicitly calling [`ReadChunk::into_iter()`]).
14//! Immutable access to the slots of the chunk can be obtained with [`ReadChunk::as_slices()`].
15//!
16//! # Examples
17//!
18//! This example uses a single thread for simplicity, but in a real application,
19//! `producer` and `consumer` would of course live on different threads:
20//!
21//! ```
22//! use rtrb::RingBuffer;
23//!
24//! let (mut producer, mut consumer) = RingBuffer::new(5);
25//!
26//! if let Ok(chunk) = producer.write_chunk_uninit(4) {
27//! chunk.fill_from_iter([10, 11, 12]);
28//! // Note that we requested 4 slots but we've only written to 3 of them!
29//! } else {
30//! unreachable!();
31//! }
32//!
33//! assert_eq!(producer.slots(), 2);
34//! assert_eq!(consumer.slots(), 3);
35//!
36//! if let Ok(chunk) = consumer.read_chunk(2) {
37//! assert_eq!(chunk.into_iter().collect::<Vec<_>>(), [10, 11]);
38//! } else {
39//! unreachable!();
40//! }
41//!
42//! // One element is still in the queue:
43//! assert_eq!(consumer.peek(), Ok(&12));
44//!
45//! let data = vec![20, 21, 22, 23];
46//! // NB: write_chunk_uninit() could be used for possibly better performance:
47//! if let Ok(mut chunk) = producer.write_chunk(4) {
48//! let (first, second) = chunk.as_mut_slices();
49//! let mid = first.len();
50//! first.copy_from_slice(&data[..mid]);
51//! second.copy_from_slice(&data[mid..]);
52//! chunk.commit_all();
53//! } else {
54//! unreachable!();
55//! }
56//!
57//! assert!(producer.is_full());
58//! assert_eq!(consumer.slots(), 5);
59//!
60//! let mut v = Vec::<i32>::with_capacity(5);
61//! if let Ok(chunk) = consumer.read_chunk(5) {
62//! let (first, second) = chunk.as_slices();
63//! v.extend(first);
64//! v.extend(second);
65//! chunk.commit_all();
66//! } else {
67//! unreachable!();
68//! }
69//! assert_eq!(v, [12, 20, 21, 22, 23]);
70//! assert!(consumer.is_empty());
71//! ```
72//!
73//! The iterator API can be used to move items from one ring buffer to another:
74//!
75//! ```
76//! use rtrb::{Consumer, Producer};
77//!
78//! fn move_items<T>(src: &mut Consumer<T>, dst: &mut Producer<T>) -> usize {
79//! let n = src.slots().min(dst.slots());
80//! dst.write_chunk_uninit(n).unwrap().fill_from_iter(src.read_chunk(n).unwrap())
81//! }
82//! ```
83//!
84//! ## Common Access Patterns
85//!
86//! The following examples show the [`Producer`] side;
87//! similar patterns can of course be used with [`Consumer::read_chunk()`] as well.
88//! Furthermore, the examples use [`Producer::write_chunk_uninit()`],
89//! along with a bit of `unsafe` code.
90//! To avoid this, you can use [`Producer::write_chunk()`] instead,
91//! which requires the trait bound `T: Default` and will lead to a small runtime overhead.
92//!
93//! Copy a whole slice of items into the ring buffer, but only if space permits
94//! (if not, the entire input slice is returned as an error):
95//!
96//! ```
97//! use rtrb::{Producer, CopyToUninit};
98//!
99//! fn push_entire_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]>
100//! where
101//! T: Copy,
102//! {
103//! if let Ok(mut chunk) = queue.write_chunk_uninit(slice.len()) {
104//! let (first, second) = chunk.as_mut_slices();
105//! let mid = first.len();
106//! slice[..mid].copy_to_uninit(first);
107//! slice[mid..].copy_to_uninit(second);
108//! // SAFETY: All slots have been initialized
109//! unsafe { chunk.commit_all() };
110//! Ok(())
111//! } else {
112//! Err(slice)
113//! }
114//! }
115//! ```
116//!
117//! Copy as many items as possible from a given slice, returning the number of copied items:
118//!
119//! ```
120//! use rtrb::{Producer, CopyToUninit, chunks::ChunkError::TooFewSlots};
121//!
122//! fn push_partial_slice<T>(queue: &mut Producer<T>, slice: &[T]) -> usize
123//! where
124//! T: Copy,
125//! {
126//! let mut chunk = match queue.write_chunk_uninit(slice.len()) {
127//! Ok(chunk) => chunk,
128//! // Remaining slots are returned, this will always succeed:
129//! Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
130//! };
131//! let end = chunk.len();
132//! let (first, second) = chunk.as_mut_slices();
133//! let mid = first.len();
134//! slice[..mid].copy_to_uninit(first);
135//! slice[mid..end].copy_to_uninit(second);
136//! // SAFETY: All slots have been initialized
137//! unsafe { chunk.commit_all() };
138//! end
139//! }
140//! ```
141//!
142//! Write as many slots as possible, given an iterator
143//! (and return the number of written slots):
144//!
145//! ```
146//! use rtrb::{Producer, chunks::ChunkError::TooFewSlots};
147//!
148//! fn push_from_iter<T, I>(queue: &mut Producer<T>, iter: I) -> usize
149//! where
150//! T: Default,
151//! I: IntoIterator<Item = T>,
152//! {
153//! let iter = iter.into_iter();
154//! let n = match iter.size_hint() {
155//! (_, None) => queue.slots(),
156//! (_, Some(n)) => n,
157//! };
158//! let chunk = match queue.write_chunk_uninit(n) {
159//! Ok(chunk) => chunk,
160//! // Remaining slots are returned, this will always succeed:
161//! Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
162//! };
163//! chunk.fill_from_iter(iter)
164//! }
165//! ```
166
167use core::fmt;
168use core::marker::PhantomData;
169use core::mem::MaybeUninit;
170use core::sync::atomic::Ordering;
171
172use crate::{Consumer, CopyToUninit, Producer};
173
174// This is used in the documentation.
175#[allow(unused_imports)]
176use crate::RingBuffer;
177
178impl<T> Producer<T> {
179 /// Returns `n` slots (initially containing their [`Default`] value) for writing.
180 ///
181 /// [`WriteChunk::as_mut_slices()`] provides mutable access to the slots.
182 /// After writing to those slots, they explicitly have to be made available
183 /// to be read by the [`Consumer`] by calling [`WriteChunk::commit()`]
184 /// or [`WriteChunk::commit_all()`].
185 ///
186 /// For an alternative that does not require the trait bound [`Default`],
187 /// see [`Producer::write_chunk_uninit()`].
188 ///
189 /// If items are supposed to be moved from an iterator into the ring buffer,
190 /// [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`]
191 /// can be used.
192 ///
193 /// # Errors
194 ///
195 /// If not enough slots are available, an error
196 /// (containing the number of available slots) is returned.
197 /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
198 ///
199 /// # Examples
200 ///
201 /// See the documentation of the [`chunks`](crate::chunks#examples) module.
202 pub fn write_chunk(&mut self, n: usize) -> Result<WriteChunk<'_, T>, ChunkError>
203 where
204 T: Default,
205 {
206 self.write_chunk_uninit(n).map(WriteChunk::from)
207 }
208
209 /// Returns `n` (uninitialized) slots for writing.
210 ///
211 /// [`WriteChunkUninit::as_mut_slices()`] provides mutable access
212 /// to the uninitialized slots.
213 /// After writing to those slots, they explicitly have to be made available
214 /// to be read by the [`Consumer`] by calling [`WriteChunkUninit::commit()`]
215 /// or [`WriteChunkUninit::commit_all()`].
216 ///
217 /// Alternatively, [`WriteChunkUninit::fill_from_iter()`] can be used
218 /// to move items from an iterator into the available slots.
219 /// All moved items are automatically made available to be read by the [`Consumer`].
220 ///
221 /// # Errors
222 ///
223 /// If not enough slots are available, an error
224 /// (containing the number of available slots) is returned.
225 /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
226 ///
227 /// # Safety
228 ///
229 /// This function itself is safe, as is [`WriteChunkUninit::fill_from_iter()`].
230 /// However, when using [`WriteChunkUninit::as_mut_slices()`],
231 /// the user has to make sure that the relevant slots have been initialized
232 /// before calling [`WriteChunkUninit::commit()`] or [`WriteChunkUninit::commit_all()`].
233 ///
234 /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
235 /// see [`Producer::write_chunk()`].
236 pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
237 let tail = self.cached_tail.get();
238
239 // Check if the queue has *possibly* not enough slots.
240 if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
241 // Refresh the head ...
242 let head = self.buffer.head.load(Ordering::Acquire);
243 self.cached_head.set(head);
244
245 // ... and check if there *really* are not enough slots.
246 let slots = self.buffer.capacity - self.buffer.distance(head, tail);
247 if slots < n {
248 return Err(ChunkError::TooFewSlots(slots));
249 }
250 }
251 let tail = self.buffer.collapse_position(tail);
252 let first_len = n.min(self.buffer.capacity - tail);
253 Ok(WriteChunkUninit {
254 // SAFETY: tail has been updated to a valid position.
255 first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
256 first_len,
257 second_ptr: self.buffer.data_ptr,
258 second_len: n - first_len,
259 producer: self,
260 })
261 }
262}
263
264impl<T> Consumer<T> {
265 /// Returns `n` slots for reading.
266 ///
267 /// [`ReadChunk::as_slices()`] provides immutable access to the slots.
268 /// After reading from those slots, they explicitly have to be made available
269 /// to be written again by the [`Producer`] by calling [`ReadChunk::commit()`]
270 /// or [`ReadChunk::commit_all()`].
271 ///
272 /// Alternatively, items can be moved out of the [`ReadChunk`] using iteration
273 /// because it implements [`IntoIterator`]
274 /// ([`ReadChunk::into_iter()`] can be used to explicitly turn it into an [`Iterator`]).
275 /// All moved items are automatically made available to be written again by the [`Producer`].
276 ///
277 /// # Errors
278 ///
279 /// If not enough slots are available, an error
280 /// (containing the number of available slots) is returned.
281 /// Use [`Consumer::slots()`] to obtain the number of available slots beforehand.
282 ///
283 /// # Examples
284 ///
285 /// See the documentation of the [`chunks`](crate::chunks#examples) module.
286 pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
287 let head = self.cached_head.get();
288
289 // Check if the queue has *possibly* not enough slots.
290 if self.buffer.distance(head, self.cached_tail.get()) < n {
291 // Refresh the tail ...
292 let tail = self.buffer.tail.load(Ordering::Acquire);
293 self.cached_tail.set(tail);
294
295 // ... and check if there *really* are not enough slots.
296 let slots = self.buffer.distance(head, tail);
297 if slots < n {
298 return Err(ChunkError::TooFewSlots(slots));
299 }
300 }
301
302 let head = self.buffer.collapse_position(head);
303 let first_len = n.min(self.buffer.capacity - head);
304 Ok(ReadChunk {
305 // SAFETY: head has been updated to a valid position.
306 first_ptr: unsafe { self.buffer.data_ptr.add(head) },
307 first_len,
308 second_ptr: self.buffer.data_ptr,
309 second_len: n - first_len,
310 consumer: self,
311 })
312 }
313}
314
315/// Structure for writing into multiple ([`Default`]-initialized) slots in one go.
316///
317/// This is returned from [`Producer::write_chunk()`].
318///
319/// To obtain uninitialized slots, use [`Producer::write_chunk_uninit()`] instead,
320/// which also allows moving items from an iterator into the ring buffer
321/// by means of [`WriteChunkUninit::fill_from_iter()`].
322#[derive(Debug, PartialEq, Eq)]
323pub struct WriteChunk<'a, T>(Option<WriteChunkUninit<'a, T>>, PhantomData<T>);
324
325impl<T> Drop for WriteChunk<'_, T> {
326 fn drop(&mut self) {
327 // NB: If `commit()` or `commit_all()` has been called, `self.0` is `None`.
328 if let Some(mut chunk) = self.0.take() {
329 // No part of the chunk has been committed, all slots are dropped.
330 // SAFETY: All slots have been initialized in From::from().
331 unsafe { chunk.drop_suffix(0) };
332 }
333 }
334}
335
336impl<'a, T> From<WriteChunkUninit<'a, T>> for WriteChunk<'a, T>
337where
338 T: Default,
339{
340 /// Fills all slots with the [`Default`] value.
341 fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
342 for i in 0..chunk.first_len {
343 // SAFETY: i is in a valid range.
344 unsafe { chunk.first_ptr.add(i).write(Default::default()) };
345 }
346 for i in 0..chunk.second_len {
347 // SAFETY: i is in a valid range.
348 unsafe { chunk.second_ptr.add(i).write(Default::default()) };
349 }
350 WriteChunk(Some(chunk), PhantomData)
351 }
352}
353
354impl<T> WriteChunk<'_, T>
355where
356 T: Default,
357{
358 /// Returns two slices for writing to the requested slots.
359 ///
360 /// All slots are initially filled with their [`Default`] value.
361 ///
362 /// The first slice can only be empty if `0` slots have been requested.
363 /// If the first slice contains all requested slots, the second one is empty.
364 ///
365 /// After writing to the slots, they are *not* automatically made available
366 /// to be read by the [`Consumer`].
367 /// This has to be explicitly done by calling [`commit()`](WriteChunk::commit)
368 /// or [`commit_all()`](WriteChunk::commit_all).
369 /// If items are written but *not* committed afterwards,
370 /// they will *not* become available for reading and
371 /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
372 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
373 // self.0 is always Some(chunk).
374 let chunk = self.0.as_ref().unwrap();
375 // SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit()
376 // and all slots have been initialized in From::from().
377 unsafe {
378 (
379 core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
380 core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len),
381 )
382 }
383 }
384
385 /// Makes the first `n` slots of the chunk available for reading.
386 ///
387 /// The rest of the chunk is dropped.
388 ///
389 /// # Panics
390 ///
391 /// Panics if `n` is greater than the number of slots in the chunk.
392 pub fn commit(mut self, n: usize) {
393 // self.0 is always Some(chunk).
394 let mut chunk = self.0.take().unwrap();
395 // SAFETY: All slots have been initialized in From::from().
396 unsafe {
397 // Slots at index `n` and higher are dropped ...
398 chunk.drop_suffix(n);
399 // ... everything below `n` is committed.
400 chunk.commit(n);
401 }
402 // `self` is dropped here, with `self.0` being set to `None`.
403 }
404
405 /// Makes the whole chunk available for reading.
406 pub fn commit_all(mut self) {
407 // self.0 is always Some(chunk).
408 let chunk = self.0.take().unwrap();
409 // SAFETY: All slots have been initialized in From::from().
410 unsafe { chunk.commit_all() };
411 // `self` is dropped here, with `self.0` being set to `None`.
412 }
413
414 /// Returns the number of slots in the chunk.
415 #[must_use]
416 pub fn len(&self) -> usize {
417 // self.0 is always Some(chunk).
418 self.0.as_ref().unwrap().len()
419 }
420
421 /// Returns `true` if the chunk contains no slots.
422 #[must_use]
423 pub fn is_empty(&self) -> bool {
424 // self.0 is always Some(chunk).
425 self.0.as_ref().unwrap().is_empty()
426 }
427}
428
429/// Structure for writing into multiple (uninitialized) slots in one go.
430///
431/// This is returned from [`Producer::write_chunk_uninit()`].
432#[derive(Debug, PartialEq, Eq)]
433pub struct WriteChunkUninit<'a, T> {
434 first_ptr: *mut T,
435 first_len: usize,
436 second_ptr: *mut T,
437 second_len: usize,
438 producer: &'a Producer<T>,
439}
440
441// SAFETY: WriteChunkUninit only exists while a unique reference to the Producer is held.
442// It is therefore safe to move it to another thread.
443unsafe impl<T: Send> Send for WriteChunkUninit<'_, T> {}
444
445impl<T> WriteChunkUninit<'_, T> {
446 /// Returns two slices for writing to the requested slots.
447 ///
448 /// The first slice can only be empty if `0` slots have been requested.
449 /// If the first slice contains all requested slots, the second one is empty.
450 ///
451 /// The extension trait [`CopyToUninit`] can be used to safely copy data into those slices.
452 ///
453 /// After writing to the slots, they are *not* automatically made available
454 /// to be read by the [`Consumer`].
455 /// This has to be explicitly done by calling [`commit()`](WriteChunkUninit::commit)
456 /// or [`commit_all()`](WriteChunkUninit::commit_all).
457 /// If items are written but *not* committed afterwards,
458 /// they will *not* become available for reading and
459 /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
460 pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
461 // SAFETY: The pointers and lengths have been computed correctly in write_chunk_uninit().
462 unsafe {
463 (
464 core::slice::from_raw_parts_mut(self.first_ptr.cast(), self.first_len),
465 core::slice::from_raw_parts_mut(self.second_ptr.cast(), self.second_len),
466 )
467 }
468 }
469
470 /// Makes the first `n` slots of the chunk available for reading.
471 ///
472 /// # Panics
473 ///
474 /// Panics if `n` is greater than the number of slots in the chunk.
475 ///
476 /// # Safety
477 ///
478 /// The caller must make sure that the first `n` elements have been initialized.
479 pub unsafe fn commit(self, n: usize) {
480 assert!(n <= self.len(), "cannot commit more than chunk size");
481 // SAFETY: Delegated to the caller.
482 unsafe { self.commit_unchecked(n) };
483 }
484
485 /// Makes the whole chunk available for reading.
486 ///
487 /// # Safety
488 ///
489 /// The caller must make sure that all elements have been initialized.
490 pub unsafe fn commit_all(self) {
491 let slots = self.len();
492 // SAFETY: Delegated to the caller.
493 unsafe { self.commit_unchecked(slots) };
494 }
495
496 unsafe fn commit_unchecked(self, n: usize) -> usize {
497 let p = self.producer;
498 let tail = p.buffer.increment(p.cached_tail.get(), n);
499 p.buffer.tail.store(tail, Ordering::Release);
500 p.cached_tail.set(tail);
501 n
502 }
503
504 /// Moves items from an iterator into the (uninitialized) slots of the chunk.
505 ///
506 /// The number of moved items is returned.
507 ///
508 /// All moved items are automatically made availabe to be read by the [`Consumer`].
509 ///
510 /// # Examples
511 ///
512 /// If the iterator contains too few items, only a part of the chunk
513 /// is made available for reading:
514 ///
515 /// ```
516 /// use rtrb::{RingBuffer, PopError};
517 ///
518 /// let (mut p, mut c) = RingBuffer::new(4);
519 ///
520 /// if let Ok(chunk) = p.write_chunk_uninit(3) {
521 /// assert_eq!(chunk.fill_from_iter([10, 20]), 2);
522 /// } else {
523 /// unreachable!();
524 /// }
525 /// assert_eq!(p.slots(), 2);
526 /// assert_eq!(c.pop(), Ok(10));
527 /// assert_eq!(c.pop(), Ok(20));
528 /// assert_eq!(c.pop(), Err(PopError::Empty));
529 /// ```
530 ///
531 /// If the chunk size is too small, some items may remain in the iterator.
532 /// To be able to keep using the iterator after the call,
533 /// `&mut` (or [`Iterator::by_ref()`]) can be used.
534 ///
535 /// ```
536 /// use rtrb::{RingBuffer, PopError};
537 ///
538 /// let (mut p, mut c) = RingBuffer::new(4);
539 ///
540 /// let mut it = vec![10, 20, 30].into_iter();
541 /// if let Ok(chunk) = p.write_chunk_uninit(2) {
542 /// assert_eq!(chunk.fill_from_iter(&mut it), 2);
543 /// } else {
544 /// unreachable!();
545 /// }
546 /// assert_eq!(c.pop(), Ok(10));
547 /// assert_eq!(c.pop(), Ok(20));
548 /// assert_eq!(c.pop(), Err(PopError::Empty));
549 /// assert_eq!(it.next(), Some(30));
550 /// ```
551 pub fn fill_from_iter<I>(self, iter: I) -> usize
552 where
553 I: IntoIterator<Item = T>,
554 {
555 let mut iter = iter.into_iter();
556 let mut iterated = 0;
557 'outer: for &(ptr, len) in &[
558 (self.first_ptr, self.first_len),
559 (self.second_ptr, self.second_len),
560 ] {
561 for i in 0..len {
562 match iter.next() {
563 Some(item) => {
564 // SAFETY: It is allowed to write to this memory slot
565 unsafe { ptr.add(i).write(item) };
566 iterated += 1;
567 }
568 None => break 'outer,
569 }
570 }
571 }
572 // SAFETY: iterated slots have been initialized above
573 unsafe { self.commit_unchecked(iterated) }
574 }
575
576 /// Returns the number of slots in the chunk.
577 #[must_use]
578 pub fn len(&self) -> usize {
579 self.first_len + self.second_len
580 }
581
582 /// Returns `true` if the chunk contains no slots.
583 #[must_use]
584 pub fn is_empty(&self) -> bool {
585 self.first_len == 0
586 }
587
588 /// Drops all elements starting from index `n`.
589 ///
590 /// All of those slots must be initialized.
591 unsafe fn drop_suffix(&mut self, n: usize) {
592 // NB: If n >= self.len(), the loops are not entered.
593 for i in n..self.first_len {
594 // SAFETY: The caller must make sure that all slots are initialized.
595 unsafe { self.first_ptr.add(i).drop_in_place() };
596 }
597 for i in n.saturating_sub(self.first_len)..self.second_len {
598 // SAFETY: The caller must make sure that all slots are initialized.
599 unsafe { self.second_ptr.add(i).drop_in_place() };
600 }
601 }
602}
603
604/// Structure for reading from multiple slots in one go.
605///
606/// This is returned from [`Consumer::read_chunk()`].
607#[derive(Debug, PartialEq, Eq)]
608pub struct ReadChunk<'a, T> {
609 // Must be "mut" for drop_in_place()
610 first_ptr: *mut T,
611 first_len: usize,
612 // Must be "mut" for drop_in_place()
613 second_ptr: *mut T,
614 second_len: usize,
615 consumer: &'a Consumer<T>,
616}
617
618// SAFETY: ReadChunk only exists while a unique reference to the Consumer is held.
619// It is therefore safe to move it to another thread.
620unsafe impl<T: Send> Send for ReadChunk<'_, T> {}
621
622impl<T> ReadChunk<'_, T> {
623 /// Returns two slices for reading from the requested slots.
624 ///
625 /// The first slice can only be empty if `0` slots have been requested.
626 /// If the first slice contains all requested slots, the second one is empty.
627 ///
628 /// The provided slots are *not* automatically made available
629 /// to be written again by the [`Producer`].
630 /// This has to be explicitly done by calling [`commit()`](ReadChunk::commit)
631 /// or [`commit_all()`](ReadChunk::commit_all).
632 /// Note that this runs the destructor of the committed items (if `T` implements [`Drop`]).
633 /// You can "peek" at the contained values by simply not calling any of the "commit" methods.
634 #[must_use]
635 pub fn as_slices(&self) -> (&[T], &[T]) {
636 // SAFETY: The pointers and lengths have been computed correctly in read_chunk().
637 unsafe {
638 (
639 core::slice::from_raw_parts(self.first_ptr, self.first_len),
640 core::slice::from_raw_parts(self.second_ptr, self.second_len),
641 )
642 }
643 }
644
645 /// Returns two mutable slices for reading from the requested slots.
646 ///
647 /// This has the same semantics as [`as_slices()`](ReadChunk::as_slices),
648 /// except that it returns mutable slices and requires a mutable reference
649 /// to the chunk.
650 ///
651 /// In the vast majority of cases, mutable access is not required when
652 /// reading data and the immutable version should be preferred. However,
653 /// there are some scenarios where it might be desirable to perform
654 /// operations on the data in-place without copying it to a separate buffer
655 /// (e.g. streaming decryption), in which case this version can be used.
656 #[must_use]
657 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
658 // SAFETY: The pointers and lengths have been computed correctly in read_chunk().
659 unsafe {
660 (
661 core::slice::from_raw_parts_mut(self.first_ptr, self.first_len),
662 core::slice::from_raw_parts_mut(self.second_ptr, self.second_len),
663 )
664 }
665 }
666
667 /// Drops the first `n` slots of the chunk, making the space available for writing again.
668 ///
669 /// # Panics
670 ///
671 /// Panics if `n` is greater than the number of slots in the chunk.
672 ///
673 /// # Examples
674 ///
675 /// The following example shows that items are dropped when "committed"
676 /// (which is only relevant if `T` implements [`Drop`]).
677 ///
678 /// ```
679 /// use rtrb::RingBuffer;
680 ///
681 /// // Static variable to count all drop() invocations
682 /// static mut DROP_COUNT: i32 = 0;
683 /// #[derive(Debug)]
684 /// struct Thing;
685 /// impl Drop for Thing {
686 /// fn drop(&mut self) { unsafe { DROP_COUNT += 1; } }
687 /// }
688 ///
689 /// // Scope to limit lifetime of ring buffer
690 /// {
691 /// let (mut p, mut c) = RingBuffer::new(2);
692 ///
693 /// assert!(p.push(Thing).is_ok()); // 1
694 /// assert!(p.push(Thing).is_ok()); // 2
695 /// if let Ok(thing) = c.pop() {
696 /// // "thing" has been *moved* out of the queue but not yet dropped
697 /// assert_eq!(unsafe { DROP_COUNT }, 0);
698 /// } else {
699 /// unreachable!();
700 /// }
701 /// // First Thing has been dropped when "thing" went out of scope:
702 /// assert_eq!(unsafe { DROP_COUNT }, 1);
703 /// assert!(p.push(Thing).is_ok()); // 3
704 ///
705 /// if let Ok(chunk) = c.read_chunk(2) {
706 /// assert_eq!(chunk.len(), 2);
707 /// assert_eq!(unsafe { DROP_COUNT }, 1);
708 /// chunk.commit(1); // Drops only one of the two Things
709 /// assert_eq!(unsafe { DROP_COUNT }, 2);
710 /// } else {
711 /// unreachable!();
712 /// }
713 /// // The last Thing is still in the queue ...
714 /// assert_eq!(unsafe { DROP_COUNT }, 2);
715 /// }
716 /// // ... and it is dropped when the ring buffer goes out of scope:
717 /// assert_eq!(unsafe { DROP_COUNT }, 3);
718 /// ```
719 pub fn commit(self, n: usize) {
720 assert!(n <= self.len(), "cannot commit more than chunk size");
721 // SAFETY: self.len() initialized elements have been obtained in read_chunk().
722 unsafe { self.commit_unchecked(n) };
723 }
724
725 /// Drops all slots of the chunk, making the space available for writing again.
726 pub fn commit_all(self) {
727 let slots = self.len();
728 // SAFETY: self.len() initialized elements have been obtained in read_chunk().
729 unsafe { self.commit_unchecked(slots) };
730 }
731
732 unsafe fn commit_unchecked(self, n: usize) -> usize {
733 let first_len = self.first_len.min(n);
734 for i in 0..first_len {
735 // SAFETY: The caller must make sure that there are n initialized elements.
736 unsafe { self.first_ptr.add(i).drop_in_place() };
737 }
738 let second_len = self.second_len.min(n - first_len);
739 for i in 0..second_len {
740 // SAFETY: The caller must make sure that there are n initialized elements.
741 unsafe { self.second_ptr.add(i).drop_in_place() };
742 }
743 let c = self.consumer;
744 let head = c.buffer.increment(c.cached_head.get(), n);
745 c.buffer.head.store(head, Ordering::Release);
746 c.cached_head.set(head);
747 n
748 }
749
750 /// Returns the number of slots in the chunk.
751 #[must_use]
752 pub fn len(&self) -> usize {
753 self.first_len + self.second_len
754 }
755
756 /// Returns `true` if the chunk contains no slots.
757 #[must_use]
758 pub fn is_empty(&self) -> bool {
759 self.first_len == 0
760 }
761}
762
763impl<'a, T> IntoIterator for ReadChunk<'a, T> {
764 type Item = T;
765 type IntoIter = ReadChunkIntoIter<'a, T>;
766
767 /// Turns a [`ReadChunk`] into an iterator.
768 ///
769 /// When the iterator is dropped, all iterated slots are made available for writing again.
770 /// Non-iterated items remain in the ring buffer.
771 fn into_iter(self) -> Self::IntoIter {
772 Self::IntoIter {
773 chunk: self,
774 iterated: 0,
775 }
776 }
777}
778
779/// An iterator that moves out of a [`ReadChunk`].
780///
781/// This `struct` is created by the [`into_iter()`](ReadChunk::into_iter) method
782/// on [`ReadChunk`] (provided by the [`IntoIterator`] trait).
783///
784/// When this `struct` is dropped, the iterated slots are made available for writing again.
785/// Non-iterated items remain in the ring buffer.
786#[derive(Debug)]
787pub struct ReadChunkIntoIter<'a, T> {
788 chunk: ReadChunk<'a, T>,
789 iterated: usize,
790}
791
792impl<T> Drop for ReadChunkIntoIter<'_, T> {
793 /// Makes all iterated slots available for writing again.
794 ///
795 /// Non-iterated items remain in the ring buffer and are *not* dropped.
796 fn drop(&mut self) {
797 let c = &self.chunk.consumer;
798 let head = c.buffer.increment(c.cached_head.get(), self.iterated);
799 c.buffer.head.store(head, Ordering::Release);
800 c.cached_head.set(head);
801 }
802}
803
804impl<T> Iterator for ReadChunkIntoIter<'_, T> {
805 type Item = T;
806
807 #[inline]
808 fn next(&mut self) -> Option<Self::Item> {
809 let ptr = if self.iterated < self.chunk.first_len {
810 // SAFETY: first_len is valid.
811 unsafe { self.chunk.first_ptr.add(self.iterated) }
812 } else if self.iterated < self.chunk.first_len + self.chunk.second_len {
813 // SAFETY: first_len and second_len are valid.
814 unsafe {
815 self.chunk
816 .second_ptr
817 .add(self.iterated - self.chunk.first_len)
818 }
819 } else {
820 return None;
821 };
822 self.iterated += 1;
823 // SAFETY: ptr points to an initialized slot.
824 Some(unsafe { ptr.read() })
825 }
826
827 #[inline]
828 fn size_hint(&self) -> (usize, Option<usize>) {
829 let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated;
830 (remaining, Some(remaining))
831 }
832}
833
834impl<T> ExactSizeIterator for ReadChunkIntoIter<'_, T> {}
835
836impl<T> core::iter::FusedIterator for ReadChunkIntoIter<'_, T> {}
837
838#[cfg(feature = "std")]
839impl std::io::Write for Producer<u8> {
840 #[inline]
841 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
842 use ChunkError::TooFewSlots;
843 let mut chunk = match self.write_chunk_uninit(buf.len()) {
844 Ok(chunk) => chunk,
845 Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
846 Err(TooFewSlots(n)) => self.write_chunk_uninit(n).unwrap(),
847 };
848 let end = chunk.len();
849 let (first, second) = chunk.as_mut_slices();
850 let mid = first.len();
851 // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
852 buf[..mid].copy_to_uninit(first);
853 buf[mid..end].copy_to_uninit(second);
854 // SAFETY: All slots have been initialized
855 unsafe { chunk.commit_all() };
856 Ok(end)
857 }
858
859 #[inline]
860 fn flush(&mut self) -> std::io::Result<()> {
861 // Nothing to do here.
862 Ok(())
863 }
864}
865
866#[cfg(feature = "std")]
867impl std::io::Read for Consumer<u8> {
868 #[inline]
869 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
870 use ChunkError::TooFewSlots;
871 let chunk = match self.read_chunk(buf.len()) {
872 Ok(chunk) => chunk,
873 Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
874 Err(TooFewSlots(n)) => self.read_chunk(n).unwrap(),
875 };
876 let (first, second) = chunk.as_slices();
877 let mid = first.len();
878 let end = chunk.len();
879 // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
880 buf[..mid].copy_from_slice(first);
881 buf[mid..end].copy_from_slice(second);
882 chunk.commit_all();
883 Ok(end)
884 }
885}
886
887/// Error type for [`Consumer::read_chunk()`], [`Producer::write_chunk()`]
888/// and [`Producer::write_chunk_uninit()`].
889#[derive(Debug, Copy, Clone, PartialEq, Eq)]
890pub enum ChunkError {
891 /// Fewer than the requested number of slots were available.
892 ///
893 /// Contains the number of slots that were available.
894 TooFewSlots(usize),
895}
896
897#[cfg(feature = "std")]
898impl std::error::Error for ChunkError {}
899
900impl fmt::Display for ChunkError {
901 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902 match self {
903 ChunkError::TooFewSlots(n) => {
904 alloc::format!("only {} slots available in ring buffer", n).fmt(f)
905 }
906 }
907 }
908}