rtrb_basedrop/chunks.rs
1//! Writing and reading multiple items at once into and from a [`RingBuffer`].
2//!
3//! Multiple items at once can be moved from an iterator into the ring buffer by using
4//! [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`].
5//! Alternatively, mutable access to the (uninitialized) slots of the chunk can be obtained with
6//! [`WriteChunkUninit::as_mut_slices()`], which requires writing some `unsafe` code.
7//! To avoid that, [`Producer::write_chunk()`] can be used,
8//! which initializes all slots with their [`Default`] value
9//! and provides mutable access by means of [`WriteChunk::as_mut_slices()`].
10//!
11//! Multiple items at once can be moved out of the ring buffer by using
12//! [`Consumer::read_chunk()`] and iterating over the returned [`ReadChunk`]
13//! (or by explicitly calling [`ReadChunk::into_iter()`]).
14//! Immutable access to the slots of the chunk can be obtained with [`ReadChunk::as_slices()`].
15//!
16//! # Examples
17//!
18//! This example uses a single thread for simplicity, but in a real application,
19//! `producer` and `consumer` would of course live on different threads:
20//!
21//! ```
22//! use rtrb_basedrop::RingBuffer;
23//! use basedrop::Collector;
24//!
25//! let collector = Collector::new();
26//!
27//! let (mut producer, mut consumer) = RingBuffer::new(5, &collector.handle());
28//!
29//! if let Ok(chunk) = producer.write_chunk_uninit(4) {
30//! chunk.fill_from_iter([10, 11, 12]);
31//! // Note that we requested 4 slots but we've only written to 3 of them!
32//! } else {
33//! unreachable!();
34//! }
35//!
36//! assert_eq!(producer.slots(), 2);
37//! assert_eq!(consumer.slots(), 3);
38//!
39//! if let Ok(chunk) = consumer.read_chunk(2) {
40//! assert_eq!(chunk.into_iter().collect::<Vec<_>>(), [10, 11]);
41//! } else {
42//! unreachable!();
43//! }
44//!
45//! // One element is still in the queue:
46//! assert_eq!(consumer.peek(), Ok(&12));
47//!
48//! let data = vec![20, 21, 22, 23];
49//! // NB: write_chunk_uninit() could be used for possibly better performance:
50//! if let Ok(mut chunk) = producer.write_chunk(4) {
51//! let (first, second) = chunk.as_mut_slices();
52//! let mid = first.len();
53//! first.copy_from_slice(&data[..mid]);
54//! second.copy_from_slice(&data[mid..]);
55//! chunk.commit_all();
56//! } else {
57//! unreachable!();
58//! }
59//!
60//! assert!(producer.is_full());
61//! assert_eq!(consumer.slots(), 5);
62//!
63//! let mut v = Vec::<i32>::with_capacity(5);
64//! if let Ok(chunk) = consumer.read_chunk(5) {
65//! let (first, second) = chunk.as_slices();
66//! v.extend(first);
67//! v.extend(second);
68//! chunk.commit_all();
69//! } else {
70//! unreachable!();
71//! }
72//! assert_eq!(v, [12, 20, 21, 22, 23]);
73//! assert!(consumer.is_empty());
74//! ```
75//!
76//! The iterator API can be used to move items from one ring buffer to another:
77//!
78//! ```
79//! use rtrb_basedrop::{Consumer, Producer};
80//!
81//! fn move_items<T: Send + 'static>(src: &mut Consumer<T>, dst: &mut Producer<T>) -> usize {
82//! let n = src.slots().min(dst.slots());
83//! dst.write_chunk_uninit(n).unwrap().fill_from_iter(src.read_chunk(n).unwrap())
84//! }
85//! ```
86//!
87//! ## Common Access Patterns
88//!
89//! The following examples show the [`Producer`] side;
90//! similar patterns can of course be used with [`Consumer::read_chunk()`] as well.
91//! Furthermore, the examples use [`Producer::write_chunk_uninit()`],
92//! along with a bit of `unsafe` code.
93//! To avoid this, you can use [`Producer::write_chunk()`] instead,
94//! which requires the trait bound `T: Default` and will lead to a small runtime overhead.
95//!
96//! Copy a whole slice of items into the ring buffer, but only if space permits
97//! (if not, the entire input slice is returned as an error):
98//!
99//! ```
100//! use rtrb_basedrop::{Producer, CopyToUninit};
101//!
102//! fn push_entire_slice<'a, T: Send + 'static>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]>
103//! where
104//! T: Copy,
105//! {
106//! if let Ok(mut chunk) = queue.write_chunk_uninit(slice.len()) {
107//! let (first, second) = chunk.as_mut_slices();
108//! let mid = first.len();
109//! slice[..mid].copy_to_uninit(first);
110//! slice[mid..].copy_to_uninit(second);
111//! // SAFETY: All slots have been initialized
112//! unsafe {
113//! chunk.commit_all();
114//! }
115//! Ok(())
116//! } else {
117//! Err(slice)
118//! }
119//! }
120//! ```
121//!
122//! Copy as many items as possible from a given slice, returning the number of copied items:
123//!
124//! ```
125//! use rtrb_basedrop::{Producer, CopyToUninit, chunks::ChunkError::TooFewSlots};
126//!
127//! fn push_partial_slice<T: Send + 'static>(queue: &mut Producer<T>, slice: &[T]) -> usize
128//! where
129//! T: Copy,
130//! {
131//! let mut chunk = match queue.write_chunk_uninit(slice.len()) {
132//! Ok(chunk) => chunk,
133//! // Remaining slots are returned, this will always succeed:
134//! Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
135//! };
136//! let end = chunk.len();
137//! let (first, second) = chunk.as_mut_slices();
138//! let mid = first.len();
139//! slice[..mid].copy_to_uninit(first);
140//! slice[mid..end].copy_to_uninit(second);
141//! // SAFETY: All slots have been initialized
142//! unsafe {
143//! chunk.commit_all();
144//! }
145//! end
146//! }
147//! ```
148//!
149//! Write as many slots as possible, given an iterator
150//! (and return the number of written slots):
151//!
152//! ```
153//! use rtrb_basedrop::{Producer, chunks::ChunkError::TooFewSlots};
154//!
155//! fn push_from_iter<T: Send + 'static, I>(queue: &mut Producer<T>, iter: I) -> usize
156//! where
157//! T: Default,
158//! I: IntoIterator<Item = T>,
159//! {
160//! let iter = iter.into_iter();
161//! let n = match iter.size_hint() {
162//! (_, None) => queue.slots(),
163//! (_, Some(n)) => n,
164//! };
165//! let chunk = match queue.write_chunk_uninit(n) {
166//! Ok(chunk) => chunk,
167//! // Remaining slots are returned, this will always succeed:
168//! Err(TooFewSlots(n)) => queue.write_chunk_uninit(n).unwrap(),
169//! };
170//! chunk.fill_from_iter(iter)
171//! }
172//! ```
173
174use core::fmt;
175use core::marker::PhantomData;
176use core::mem::MaybeUninit;
177use core::sync::atomic::Ordering;
178
179use crate::{Consumer, CopyToUninit, Producer};
180
181// This is used in the documentation.
182#[allow(unused_imports)]
183use crate::RingBuffer;
184
185impl<T: Send + 'static> Producer<T> {
186 /// Returns `n` slots (initially containing their [`Default`] value) for writing.
187 ///
188 /// [`WriteChunk::as_mut_slices()`] provides mutable access to the slots.
189 /// After writing to those slots, they explicitly have to be made available
190 /// to be read by the [`Consumer`] by calling [`WriteChunk::commit()`]
191 /// or [`WriteChunk::commit_all()`].
192 ///
193 /// For an alternative that does not require the trait bound [`Default`],
194 /// see [`Producer::write_chunk_uninit()`].
195 ///
196 /// If items are supposed to be moved from an iterator into the ring buffer,
197 /// [`Producer::write_chunk_uninit()`] followed by [`WriteChunkUninit::fill_from_iter()`]
198 /// can be used.
199 ///
200 /// # Errors
201 ///
202 /// If not enough slots are available, an error
203 /// (containing the number of available slots) is returned.
204 /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
205 ///
206 /// # Examples
207 ///
208 /// See the documentation of the [`chunks`](crate::chunks#examples) module.
209 pub fn write_chunk(&mut self, n: usize) -> Result<WriteChunk<'_, T>, ChunkError>
210 where
211 T: Default,
212 {
213 self.write_chunk_uninit(n).map(WriteChunk::from)
214 }
215
216 /// Returns `n` (uninitialized) slots for writing.
217 ///
218 /// [`WriteChunkUninit::as_mut_slices()`] provides mutable access
219 /// to the uninitialized slots.
220 /// After writing to those slots, they explicitly have to be made available
221 /// to be read by the [`Consumer`] by calling [`WriteChunkUninit::commit()`]
222 /// or [`WriteChunkUninit::commit_all()`].
223 ///
224 /// Alternatively, [`WriteChunkUninit::fill_from_iter()`] can be used
225 /// to move items from an iterator into the available slots.
226 /// All moved items are automatically made available to be read by the [`Consumer`].
227 ///
228 /// # Errors
229 ///
230 /// If not enough slots are available, an error
231 /// (containing the number of available slots) is returned.
232 /// Use [`Producer::slots()`] to obtain the number of available slots beforehand.
233 ///
234 /// # Safety
235 ///
236 /// This function itself is safe, as is [`WriteChunkUninit::fill_from_iter()`].
237 /// However, when using [`WriteChunkUninit::as_mut_slices()`],
238 /// the user has to make sure that the relevant slots have been initialized
239 /// before calling [`WriteChunkUninit::commit()`] or [`WriteChunkUninit::commit_all()`].
240 ///
241 /// For a safe alternative that provides mutable slices of [`Default`]-initialized slots,
242 /// see [`Producer::write_chunk()`].
243 pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
244 let tail = self.cached_tail.get();
245
246 // Check if the queue has *possibly* not enough slots.
247 if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
248 // Refresh the head ...
249 let head = self.buffer.head.load(Ordering::Acquire);
250 self.cached_head.set(head);
251
252 // ... and check if there *really* are not enough slots.
253 let slots = self.buffer.capacity - self.buffer.distance(head, tail);
254 if slots < n {
255 return Err(ChunkError::TooFewSlots(slots));
256 }
257 }
258 let tail = self.buffer.collapse_position(tail);
259 let first_len = n.min(self.buffer.capacity - tail);
260 Ok(WriteChunkUninit {
261 first_ptr: unsafe { self.buffer.data_ptr.add(tail) },
262 first_len,
263 second_ptr: self.buffer.data_ptr,
264 second_len: n - first_len,
265 producer: self,
266 })
267 }
268}
269
270impl<T: Send + 'static> Consumer<T> {
271 /// Returns `n` slots for reading.
272 ///
273 /// [`ReadChunk::as_slices()`] provides immutable access to the slots.
274 /// After reading from those slots, they explicitly have to be made available
275 /// to be written again by the [`Producer`] by calling [`ReadChunk::commit()`]
276 /// or [`ReadChunk::commit_all()`].
277 ///
278 /// Alternatively, items can be moved out of the [`ReadChunk`] using iteration
279 /// because it implements [`IntoIterator`]
280 /// ([`ReadChunk::into_iter()`] can be used to explicitly turn it into an [`Iterator`]).
281 /// All moved items are automatically made available to be written again by the [`Producer`].
282 ///
283 /// # Errors
284 ///
285 /// If not enough slots are available, an error
286 /// (containing the number of available slots) is returned.
287 /// Use [`Consumer::slots()`] to obtain the number of available slots beforehand.
288 ///
289 /// # Examples
290 ///
291 /// See the documentation of the [`chunks`](crate::chunks#examples) module.
292 pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
293 let head = self.cached_head.get();
294
295 // Check if the queue has *possibly* not enough slots.
296 if self.buffer.distance(head, self.cached_tail.get()) < n {
297 // Refresh the tail ...
298 let tail = self.buffer.tail.load(Ordering::Acquire);
299 self.cached_tail.set(tail);
300
301 // ... and check if there *really* are not enough slots.
302 let slots = self.buffer.distance(head, tail);
303 if slots < n {
304 return Err(ChunkError::TooFewSlots(slots));
305 }
306 }
307
308 let head = self.buffer.collapse_position(head);
309 let first_len = n.min(self.buffer.capacity - head);
310 Ok(ReadChunk {
311 first_ptr: unsafe { self.buffer.data_ptr.add(head) },
312 first_len,
313 second_ptr: self.buffer.data_ptr,
314 second_len: n - first_len,
315 consumer: self,
316 })
317 }
318}
319
320/// Structure for writing into multiple ([`Default`]-initialized) slots in one go.
321///
322/// This is returned from [`Producer::write_chunk()`].
323///
324/// To obtain uninitialized slots, use [`Producer::write_chunk_uninit()`] instead,
325/// which also allows moving items from an iterator into the ring buffer
326/// by means of [`WriteChunkUninit::fill_from_iter()`].
327#[derive(Debug)]
328pub struct WriteChunk<'a, T: Send + 'static>(Option<WriteChunkUninit<'a, T>>, PhantomData<T>);
329
330impl<T: Send + 'static> Drop for WriteChunk<'_, T> {
331 fn drop(&mut self) {
332 // NB: If `commit()` or `commit_all()` has been called, `self.0` is `None`.
333 if let Some(mut chunk) = self.0.take() {
334 // Safety: All slots have been initialized in From::from().
335 unsafe {
336 // No part of the chunk has been committed, all slots are dropped.
337 chunk.drop_suffix(0);
338 }
339 }
340 }
341}
342
343impl<'a, T> From<WriteChunkUninit<'a, T>> for WriteChunk<'a, T>
344where
345 T: Default + Send + 'static,
346{
347 /// Fills all slots with the [`Default`] value.
348 fn from(chunk: WriteChunkUninit<'a, T>) -> Self {
349 for i in 0..chunk.first_len {
350 unsafe {
351 chunk.first_ptr.add(i).write(Default::default());
352 }
353 }
354 for i in 0..chunk.second_len {
355 unsafe {
356 chunk.second_ptr.add(i).write(Default::default());
357 }
358 }
359 WriteChunk(Some(chunk), PhantomData)
360 }
361}
362
363impl<T> WriteChunk<'_, T>
364where
365 T: Default + Send + 'static,
366{
367 /// Returns two slices for writing to the requested slots.
368 ///
369 /// All slots are initially filled with their [`Default`] value.
370 ///
371 /// The first slice can only be empty if `0` slots have been requested.
372 /// If the first slice contains all requested slots, the second one is empty.
373 ///
374 /// After writing to the slots, they are *not* automatically made available
375 /// to be read by the [`Consumer`].
376 /// This has to be explicitly done by calling [`commit()`](WriteChunk::commit)
377 /// or [`commit_all()`](WriteChunk::commit_all).
378 /// If items are written but *not* committed afterwards,
379 /// they will *not* become available for reading and
380 /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
381 pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
382 // self.0 is always Some(chunk).
383 let chunk = self.0.as_ref().unwrap();
384 // Safety: All slots have been initialized in From::from().
385 unsafe {
386 (
387 core::slice::from_raw_parts_mut(chunk.first_ptr, chunk.first_len),
388 core::slice::from_raw_parts_mut(chunk.second_ptr, chunk.second_len),
389 )
390 }
391 }
392
393 /// Makes the first `n` slots of the chunk available for reading.
394 ///
395 /// The rest of the chunk is dropped.
396 ///
397 /// # Panics
398 ///
399 /// Panics if `n` is greater than the number of slots in the chunk.
400 pub fn commit(mut self, n: usize) {
401 // self.0 is always Some(chunk).
402 let mut chunk = self.0.take().unwrap();
403 // Safety: All slots have been initialized in From::from().
404 unsafe {
405 // Slots at index `n` and higher are dropped ...
406 chunk.drop_suffix(n);
407 // ... everything below `n` is committed.
408 chunk.commit(n);
409 }
410 // `self` is dropped here, with `self.0` being set to `None`.
411 }
412
413 /// Makes the whole chunk available for reading.
414 pub fn commit_all(mut self) {
415 // self.0 is always Some(chunk).
416 let chunk = self.0.take().unwrap();
417 // Safety: All slots have been initialized in From::from().
418 unsafe {
419 chunk.commit_all();
420 }
421 // `self` is dropped here, with `self.0` being set to `None`.
422 }
423
424 /// Returns the number of slots in the chunk.
425 #[must_use]
426 pub fn len(&self) -> usize {
427 // self.0 is always Some(chunk).
428 self.0.as_ref().unwrap().len()
429 }
430
431 /// Returns `true` if the chunk contains no slots.
432 #[must_use]
433 pub fn is_empty(&self) -> bool {
434 // self.0 is always Some(chunk).
435 self.0.as_ref().unwrap().is_empty()
436 }
437}
438
439/// Structure for writing into multiple (uninitialized) slots in one go.
440///
441/// This is returned from [`Producer::write_chunk_uninit()`].
442#[derive(Debug)]
443pub struct WriteChunkUninit<'a, T: Send + 'static> {
444 first_ptr: *mut T,
445 first_len: usize,
446 second_ptr: *mut T,
447 second_len: usize,
448 producer: &'a Producer<T>,
449}
450
451// WriteChunkUninit only exists while a unique reference to the Producer is held.
452// It is therefore safe to move it to another thread.
453unsafe impl<T: Send + 'static> Send for WriteChunkUninit<'_, T> {}
454
455impl<T: Send + 'static> WriteChunkUninit<'_, T> {
456 /// Returns two slices for writing to the requested slots.
457 ///
458 /// The first slice can only be empty if `0` slots have been requested.
459 /// If the first slice contains all requested slots, the second one is empty.
460 ///
461 /// The extension trait [`CopyToUninit`] can be used to safely copy data into those slices.
462 ///
463 /// After writing to the slots, they are *not* automatically made available
464 /// to be read by the [`Consumer`].
465 /// This has to be explicitly done by calling [`commit()`](WriteChunkUninit::commit)
466 /// or [`commit_all()`](WriteChunkUninit::commit_all).
467 /// If items are written but *not* committed afterwards,
468 /// they will *not* become available for reading and
469 /// they will be leaked (which is only relevant if `T` implements [`Drop`]).
470 pub fn as_mut_slices(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) {
471 unsafe {
472 (
473 core::slice::from_raw_parts_mut(self.first_ptr as *mut _, self.first_len),
474 core::slice::from_raw_parts_mut(self.second_ptr as *mut _, self.second_len),
475 )
476 }
477 }
478
479 /// Makes the first `n` slots of the chunk available for reading.
480 ///
481 /// # Panics
482 ///
483 /// Panics if `n` is greater than the number of slots in the chunk.
484 ///
485 /// # Safety
486 ///
487 /// The user must make sure that the first `n` elements have been initialized.
488 pub unsafe fn commit(self, n: usize) {
489 assert!(n <= self.len(), "cannot commit more than chunk size");
490 self.commit_unchecked(n);
491 }
492
493 /// Makes the whole chunk available for reading.
494 ///
495 /// # Safety
496 ///
497 /// The user must make sure that all elements have been initialized.
498 pub unsafe fn commit_all(self) {
499 let slots = self.len();
500 self.commit_unchecked(slots);
501 }
502
503 unsafe fn commit_unchecked(self, n: usize) -> usize {
504 let p = self.producer;
505 let tail = p.buffer.increment(p.cached_tail.get(), n);
506 p.buffer.tail.store(tail, Ordering::Release);
507 p.cached_tail.set(tail);
508 n
509 }
510
511 /// Moves items from an iterator into the (uninitialized) slots of the chunk.
512 ///
513 /// The number of moved items is returned.
514 ///
515 /// All moved items are automatically made availabe to be read by the [`Consumer`].
516 ///
517 /// # Examples
518 ///
519 /// If the iterator contains too few items, only a part of the chunk
520 /// is made available for reading:
521 ///
522 /// ```
523 /// use rtrb_basedrop::{RingBuffer, PopError};
524 /// use basedrop::Collector;
525 ///
526 /// let collector = Collector::new();
527 ///
528 /// let (mut p, mut c) = RingBuffer::new(4, &collector.handle());
529 ///
530 /// if let Ok(chunk) = p.write_chunk_uninit(3) {
531 /// assert_eq!(chunk.fill_from_iter([10, 20]), 2);
532 /// } else {
533 /// unreachable!();
534 /// }
535 /// assert_eq!(p.slots(), 2);
536 /// assert_eq!(c.pop(), Ok(10));
537 /// assert_eq!(c.pop(), Ok(20));
538 /// assert_eq!(c.pop(), Err(PopError::Empty));
539 /// ```
540 ///
541 /// If the chunk size is too small, some items may remain in the iterator.
542 /// To be able to keep using the iterator after the call,
543 /// `&mut` (or [`Iterator::by_ref()`]) can be used.
544 ///
545 /// ```
546 /// use rtrb_basedrop::{RingBuffer, PopError};
547 /// use basedrop::Collector;
548 ///
549 /// let collector = Collector::new();
550 ///
551 /// let (mut p, mut c) = RingBuffer::new(4, &collector.handle());
552 ///
553 /// let mut it = vec![10, 20, 30].into_iter();
554 /// if let Ok(chunk) = p.write_chunk_uninit(2) {
555 /// assert_eq!(chunk.fill_from_iter(&mut it), 2);
556 /// } else {
557 /// unreachable!();
558 /// }
559 /// assert_eq!(c.pop(), Ok(10));
560 /// assert_eq!(c.pop(), Ok(20));
561 /// assert_eq!(c.pop(), Err(PopError::Empty));
562 /// assert_eq!(it.next(), Some(30));
563 /// ```
564 pub fn fill_from_iter<I>(self, iter: I) -> usize
565 where
566 I: IntoIterator<Item = T>,
567 {
568 let mut iter = iter.into_iter();
569 let mut iterated = 0;
570 'outer: for &(ptr, len) in &[
571 (self.first_ptr, self.first_len),
572 (self.second_ptr, self.second_len),
573 ] {
574 for i in 0..len {
575 match iter.next() {
576 Some(item) => {
577 // Safety: It is allowed to write to this memory slot
578 unsafe {
579 ptr.add(i).write(item);
580 }
581 iterated += 1;
582 }
583 None => break 'outer,
584 }
585 }
586 }
587 // Safety: iterated slots have been initialized above
588 unsafe { self.commit_unchecked(iterated) }
589 }
590
591 /// Returns the number of slots in the chunk.
592 #[must_use]
593 pub fn len(&self) -> usize {
594 self.first_len + self.second_len
595 }
596
597 /// Returns `true` if the chunk contains no slots.
598 #[must_use]
599 pub fn is_empty(&self) -> bool {
600 self.first_len == 0
601 }
602
603 /// Drops all elements starting from index `n`.
604 ///
605 /// All of those slots must be initialized.
606 unsafe fn drop_suffix(&mut self, n: usize) {
607 // NB: If n >= self.len(), the loops are not entered.
608 for i in n..self.first_len {
609 self.first_ptr.add(i).drop_in_place();
610 }
611 for i in n.saturating_sub(self.first_len)..self.second_len {
612 self.second_ptr.add(i).drop_in_place();
613 }
614 }
615}
616
617/// Structure for reading from multiple slots in one go.
618///
619/// This is returned from [`Consumer::read_chunk()`].
620#[derive(Debug)]
621pub struct ReadChunk<'a, T: Send + 'static> {
622 // Must be "mut" for drop_in_place()
623 first_ptr: *mut T,
624 first_len: usize,
625 // Must be "mut" for drop_in_place()
626 second_ptr: *mut T,
627 second_len: usize,
628 consumer: &'a Consumer<T>,
629}
630
631// ReadChunk only exists while a unique reference to the Consumer is held.
632// It is therefore safe to move it to another thread.
633unsafe impl<T: Send + 'static> Send for ReadChunk<'_, T> {}
634
635impl<T: Send + 'static> ReadChunk<'_, T> {
636 /// Returns two slices for reading from the requested slots.
637 ///
638 /// The first slice can only be empty if `0` slots have been requested.
639 /// If the first slice contains all requested slots, the second one is empty.
640 ///
641 /// The provided slots are *not* automatically made available
642 /// to be written again by the [`Producer`].
643 /// This has to be explicitly done by calling [`commit()`](ReadChunk::commit)
644 /// or [`commit_all()`](ReadChunk::commit_all).
645 /// Note that this runs the destructor of the committed items (if `T` implements [`Drop`]).
646 /// You can "peek" at the contained values by simply not calling any of the "commit" methods.
647 #[must_use]
648 pub fn as_slices(&self) -> (&[T], &[T]) {
649 (
650 unsafe { core::slice::from_raw_parts(self.first_ptr, self.first_len) },
651 unsafe { core::slice::from_raw_parts(self.second_ptr, self.second_len) },
652 )
653 }
654
655 /// Drops the first `n` slots of the chunk, making the space available for writing again.
656 ///
657 /// # Panics
658 ///
659 /// Panics if `n` is greater than the number of slots in the chunk.
660 ///
661 /// # Examples
662 ///
663 /// The following example shows that items are dropped when "committed"
664 /// (which is only relevant if `T` implements [`Drop`]).
665 ///
666 /// ```
667 /// use rtrb_basedrop::RingBuffer;
668 /// use basedrop::Collector;
669 ///
670 /// let mut collector = Collector::new();
671 ///
672 /// // Static variable to count all drop() invocations
673 /// static mut DROP_COUNT: i32 = 0;
674 /// #[derive(Debug)]
675 /// struct Thing;
676 /// impl Drop for Thing {
677 /// fn drop(&mut self) { unsafe { DROP_COUNT += 1; } }
678 /// }
679 ///
680 /// // Scope to limit lifetime of ring buffer
681 /// {
682 /// let (mut p, mut c) = RingBuffer::new(2, &collector.handle());
683 ///
684 /// assert!(p.push(Thing).is_ok()); // 1
685 /// assert!(p.push(Thing).is_ok()); // 2
686 /// if let Ok(thing) = c.pop() {
687 /// // "thing" has been *moved* out of the queue but not yet dropped
688 /// collector.collect();
689 /// assert_eq!(unsafe { DROP_COUNT }, 0);
690 /// } else {
691 /// unreachable!();
692 /// }
693 /// // First Thing has been dropped when "thing" went out of scope:
694 /// collector.collect();
695 /// assert_eq!(unsafe { DROP_COUNT }, 1);
696 /// assert!(p.push(Thing).is_ok()); // 3
697 ///
698 /// if let Ok(chunk) = c.read_chunk(2) {
699 /// assert_eq!(chunk.len(), 2);
700 /// collector.collect();
701 /// assert_eq!(unsafe { DROP_COUNT }, 1);
702 /// chunk.commit(1); // Drops only one of the two Things
703 /// collector.collect();
704 /// assert_eq!(unsafe { DROP_COUNT }, 2);
705 /// } else {
706 /// unreachable!();
707 /// }
708 /// // The last Thing is still in the queue ...
709 /// collector.collect();
710 /// assert_eq!(unsafe { DROP_COUNT }, 2);
711 /// }
712 /// // ... and it is dropped when the ring buffer goes out of scope:
713 /// collector.collect();
714 /// assert_eq!(unsafe { DROP_COUNT }, 3);
715 /// ```
716 pub fn commit(self, n: usize) {
717 assert!(n <= self.len(), "cannot commit more than chunk size");
718 unsafe { self.commit_unchecked(n) };
719 }
720
721 /// Drops all slots of the chunk, making the space available for writing again.
722 pub fn commit_all(self) {
723 let slots = self.len();
724 unsafe { self.commit_unchecked(slots) };
725 }
726
727 unsafe fn commit_unchecked(self, n: usize) -> usize {
728 let first_len = self.first_len.min(n);
729 for i in 0..first_len {
730 self.first_ptr.add(i).drop_in_place();
731 }
732 let second_len = self.second_len.min(n - first_len);
733 for i in 0..second_len {
734 self.second_ptr.add(i).drop_in_place();
735 }
736 let c = self.consumer;
737 let head = c.buffer.increment(c.cached_head.get(), n);
738 c.buffer.head.store(head, Ordering::Release);
739 c.cached_head.set(head);
740 n
741 }
742
743 /// Returns the number of slots in the chunk.
744 #[must_use]
745 pub fn len(&self) -> usize {
746 self.first_len + self.second_len
747 }
748
749 /// Returns `true` if the chunk contains no slots.
750 #[must_use]
751 pub fn is_empty(&self) -> bool {
752 self.first_len == 0
753 }
754}
755
756impl<'a, T: Send + 'static> IntoIterator for ReadChunk<'a, T> {
757 type Item = T;
758 type IntoIter = ReadChunkIntoIter<'a, T>;
759
760 /// Turns a [`ReadChunk`] into an iterator.
761 ///
762 /// When the iterator is dropped, all iterated slots are made available for writing again.
763 /// Non-iterated items remain in the ring buffer.
764 fn into_iter(self) -> Self::IntoIter {
765 Self::IntoIter {
766 chunk: self,
767 iterated: 0,
768 }
769 }
770}
771
772/// An iterator that moves out of a [`ReadChunk`].
773///
774/// This `struct` is created by the [`into_iter()`](ReadChunk::into_iter) method
775/// on [`ReadChunk`] (provided by the [`IntoIterator`] trait).
776///
777/// When this `struct` is dropped, the iterated slots are made available for writing again.
778/// Non-iterated items remain in the ring buffer.
779#[derive(Debug)]
780pub struct ReadChunkIntoIter<'a, T: Send + 'static> {
781 chunk: ReadChunk<'a, T>,
782 iterated: usize,
783}
784
785impl<'a, T: Send + 'static> Drop for ReadChunkIntoIter<'a, T> {
786 /// Makes all iterated slots available for writing again.
787 ///
788 /// Non-iterated items remain in the ring buffer and are *not* dropped.
789 fn drop(&mut self) {
790 let c = &self.chunk.consumer;
791 let head = c.buffer.increment(c.cached_head.get(), self.iterated);
792 c.buffer.head.store(head, Ordering::Release);
793 c.cached_head.set(head);
794 }
795}
796
797impl<'a, T: Send + 'static> Iterator for ReadChunkIntoIter<'a, T> {
798 type Item = T;
799
800 fn next(&mut self) -> Option<Self::Item> {
801 let ptr = if self.iterated < self.chunk.first_len {
802 unsafe { self.chunk.first_ptr.add(self.iterated) }
803 } else if self.iterated < self.chunk.first_len + self.chunk.second_len {
804 unsafe {
805 self.chunk
806 .second_ptr
807 .add(self.iterated - self.chunk.first_len)
808 }
809 } else {
810 return None;
811 };
812 self.iterated += 1;
813 Some(unsafe { ptr.read() })
814 }
815
816 fn size_hint(&self) -> (usize, Option<usize>) {
817 let remaining = self.chunk.first_len + self.chunk.second_len - self.iterated;
818 (remaining, Some(remaining))
819 }
820}
821
822impl<'a, T: Send + 'static> ExactSizeIterator for ReadChunkIntoIter<'a, T> {}
823
824impl<'a, T: Send + 'static> core::iter::FusedIterator for ReadChunkIntoIter<'a, T> {}
825
826#[cfg(feature = "std")]
827impl std::io::Write for Producer<u8> {
828 #[inline]
829 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
830 use ChunkError::TooFewSlots;
831 let mut chunk = match self.write_chunk_uninit(buf.len()) {
832 Ok(chunk) => chunk,
833 Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
834 Err(TooFewSlots(n)) => self.write_chunk_uninit(n).unwrap(),
835 };
836 let end = chunk.len();
837 let (first, second) = chunk.as_mut_slices();
838 let mid = first.len();
839 // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
840 buf[..mid].copy_to_uninit(first);
841 buf[mid..end].copy_to_uninit(second);
842 // Safety: All slots have been initialized
843 unsafe {
844 chunk.commit_all();
845 }
846 Ok(end)
847 }
848
849 #[inline]
850 fn flush(&mut self) -> std::io::Result<()> {
851 // Nothing to do here.
852 Ok(())
853 }
854}
855
856#[cfg(feature = "std")]
857impl std::io::Read for Consumer<u8> {
858 #[inline]
859 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
860 use ChunkError::TooFewSlots;
861 let chunk = match self.read_chunk(buf.len()) {
862 Ok(chunk) => chunk,
863 Err(TooFewSlots(0)) => return Err(std::io::ErrorKind::WouldBlock.into()),
864 Err(TooFewSlots(n)) => self.read_chunk(n).unwrap(),
865 };
866 let (first, second) = chunk.as_slices();
867 let mid = first.len();
868 let end = chunk.len();
869 // NB: If buf.is_empty(), chunk will be empty as well and the following are no-ops:
870 buf[..mid].copy_from_slice(first);
871 buf[mid..end].copy_from_slice(second);
872 chunk.commit_all();
873 Ok(end)
874 }
875}
876
877/// Error type for [`Consumer::read_chunk()`], [`Producer::write_chunk()`]
878/// and [`Producer::write_chunk_uninit()`].
879#[derive(Debug, Copy, Clone, PartialEq, Eq)]
880pub enum ChunkError {
881 /// Fewer than the requested number of slots were available.
882 ///
883 /// Contains the number of slots that were available.
884 TooFewSlots(usize),
885}
886
887#[cfg(feature = "std")]
888impl std::error::Error for ChunkError {}
889
890impl fmt::Display for ChunkError {
891 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
892 match self {
893 ChunkError::TooFewSlots(n) => {
894 alloc::format!("only {} slots available in ring buffer", n).fmt(f)
895 }
896 }
897 }
898}