ya_ring_buf/
lib.rs

1//! Yet Another Ring Buffer.
2//!
3//! SPSC channel backed by a ring buffer.
4//!
5//! Features include:
6//!
7//! * Lock free, non-blocking push and pop.
8//! * If desired, can integrate blocking, futures-based, or custom waiting. If undesired, no-op waiters add
9//!   no overhead.
10//! * `veiw` functions for safely accessing the buffer directly, ex. for doing IO directly from
11//!   or to the buffer.
12//! * `no_std` compatible (currently requires `alloc`)
13#![cfg_attr(not(feature = "std"), no_std)]
14
15extern crate alloc;
16
17#[cfg(any(test, doctest))]
18mod tests;
19pub mod waiter;
20
21use alloc::alloc::Layout;
22#[cfg(not(loom))]
23use alloc::sync::Arc;
24#[cfg(not(loom))]
25use core::sync::atomic::{
26	AtomicUsize,
27	Ordering,
28};
29use core::{
30	mem::MaybeUninit,
31	ops::Range,
32	ptr::NonNull,
33};
34
35#[cfg(loom)]
36use loom::sync::{
37	atomic::{
38		AtomicUsize,
39		Ordering,
40	},
41	Arc,
42};
43use waiter::OptionalWaiter;
44
45/// Creates a new ring buffer with the specified capacity and no waiters.
46///
47/// Returns the producer and consumer ends. The buffer will not have any capability to wait
48/// for the other end - i.e. non-blocking operations only.
49///
50/// See [`with_waiters`] for more documentation.
51///
52/// # Panics
53///
54/// If capacity is zero.
55pub fn new_waitless<T>(
56	capacity: usize,
57) -> (RingBufferProducer<T, (), ()>, RingBufferConsumer<T, (), ()>) {
58	with_waiters(capacity, (), ())
59}
60
61/// Creates a new ring buffer with the specified capacity and blocking waiters.
62///
63/// Returns the producer and consumer ends. The wait methods will block until
64/// available.
65///
66/// See [`with_waiters`] for more documentation.
67///
68/// Requires the 'std' feature.
69///
70/// # Panics
71///
72/// If capacity is zero.
73#[cfg(feature = "std")]
74pub fn new_blocking<T>(
75	capacity: usize,
76) -> (
77	RingBufferProducer<T, waiter::BlockingWaiter, waiter::BlockingWaiter>,
78	RingBufferConsumer<T, waiter::BlockingWaiter, waiter::BlockingWaiter>,
79) {
80	with_waiters(
81		capacity,
82		waiter::BlockingWaiter::new(),
83		waiter::BlockingWaiter::new(),
84	)
85}
86
87/// Creates a new ring buffer with the specified capacity and asynchronous waiters.
88///
89/// Returns the producer and consumer ends. The wait methods will return a future that
90/// will be fulfilled when there are items or slots available.
91///
92/// See [`with_waiters`] for more documentation.
93///
94/// Requires the 'async' feature.
95///
96/// # Panics
97///
98/// If capacity is zero.
99#[cfg(feature = "async")]
100pub fn new_async<T>(
101	capacity: usize,
102) -> (
103	RingBufferProducer<T, waiter::AsyncWaiter, waiter::AsyncWaiter>,
104	RingBufferConsumer<T, waiter::AsyncWaiter, waiter::AsyncWaiter>,
105) {
106	with_waiters(
107		capacity,
108		waiter::AsyncWaiter::new(),
109		waiter::AsyncWaiter::new(),
110	)
111}
112
113/// Creates a new ring buffer with the specified capacity and specified waiters.
114///
115/// Returns the producer and consumer ends. The ends may be sent to other threads,
116/// but because most methods take `&mut self` for safety, they cannot be shared by
117/// threads.
118///
119/// If one side of the buffer is dropped, the other side's operations will eventually
120/// fail with a [`ClosedError`] (immediately for producers, and after the existing items
121/// have been drained for consumers).
122///
123/// Each end has a waiter, which abstracts how each buffer end should notify the
124/// other end that items or slots are available and it may resume activity.
125/// This crate includes two such waiters: [`BlockingWaiter`](waiter::BlockingWaiter),
126/// which blocks the current thread, and [`AsyncWaiter`](waiter::AyncWaiter), which
127/// returns a future. Custom implementations may do other things - for example, signal
128/// an mio waker. If no waiter is needed, the unit type `()` can be used, which removes
129/// the `wait` methods and adds no overhead to the buffer.
130///
131/// # Panics
132///
133/// If capacity is zero.
134pub fn with_waiters<T, PW: OptionalWaiter, CW: OptionalWaiter>(
135	mut capacity: usize,
136	producer_waiter: PW,
137	consumer_waiter: CW,
138) -> (RingBufferProducer<T, PW, CW>, RingBufferConsumer<T, PW, CW>) {
139	assert!(capacity != 0, "Cannot create a 0 capacity ring buffer");
140	// The ring buffer can't fill the last slot, so add 1 to the capacity.
141	capacity = capacity.saturating_add(1);
142	let layout = Layout::array::<T>(capacity).unwrap();
143	let buf = if core::mem::size_of::<T>() == 0 {
144		NonNull::dangling()
145	} else {
146		unsafe {
147			NonNull::new(alloc::alloc::alloc(layout) as *mut T)
148				.unwrap_or_else(|| alloc::alloc::handle_alloc_error(layout))
149		}
150	};
151
152	let shared = Arc::new(RingBufferShared::<T, PW, CW> {
153		buf,
154		cap: capacity,
155		head: AtomicUsize::new(0),
156		tail: AtomicUsize::new(0),
157		closed_count: AtomicUsize::new(0),
158		producer_waiter,
159		consumer_waiter,
160		_ph: core::marker::PhantomData::default(),
161	});
162
163	let producer = RingBufferProducer {
164		shared: shared.clone(),
165	};
166	let consumer = RingBufferConsumer { shared };
167	(producer, consumer)
168}
169
170/// Creates a new ring buffer with the specified capacity and `Default::default()` for waiters.
171///
172/// This is equivalent to `with_waiters(capacity, Default::default(), Default::default())`.
173///
174/// Returns the producer and consumer ends.
175///
176/// See [`with_waiters`] for more documentation.
177///
178/// # Panics
179///
180/// If capacity is zero.
181pub fn with_default_waiters<T, PW: OptionalWaiter + Default, CW: OptionalWaiter + Default>(
182	capacity: usize,
183) -> (RingBufferProducer<T, PW, CW>, RingBufferConsumer<T, PW, CW>) {
184	with_waiters(capacity, Default::default(), Default::default())
185}
186
187/// Shared state
188struct RingBufferShared<T, PW: OptionalWaiter, CW: OptionalWaiter> {
189	buf: NonNull<T>,
190	cap: usize,
191	/// Next slot to read
192	head: AtomicUsize,
193	/// Next slot to write
194	tail: AtomicUsize,
195	// non-zero if one of the sides has been dropped.
196	// TODO: Maybe see if we can remove this? Can't just use Arc::strong_count,
197	// because we need to notify the waiter after we drop the count, which may race
198	// with another thread dropping.
199	closed_count: AtomicUsize,
200	producer_waiter: PW,
201	consumer_waiter: CW,
202	_ph: core::marker::PhantomData<T>,
203}
204impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferShared<T, PW, CW> {
205	fn head_tail(&self) -> (usize, usize) {
206		let head = self.head.load(Ordering::Acquire);
207		let tail = self.tail.load(Ordering::Acquire);
208		debug_assert!(head < self.cap);
209		debug_assert!(tail < self.cap);
210		(head, tail)
211	}
212
213	unsafe fn consume(&self, head: usize, tail: usize, amnt: usize, drop: bool) {
214		let (new_head, [drop1, drop2], _) = self.incr_head(head, tail, amnt);
215		debug_assert!(new_head < self.cap);
216		let set_on_drop = SetOnDrop(&self.head, new_head, &self.producer_waiter);
217
218		if drop && core::mem::needs_drop::<T>() {
219			for i in drop1 {
220				self.buf.as_ptr().add(i).drop_in_place();
221			}
222			for i in drop2 {
223				self.buf.as_ptr().add(i).drop_in_place();
224			}
225		}
226
227		core::mem::drop(set_on_drop);
228	}
229
230	/// Increment the head pointer by an amount, clamped to the available capacity.
231	///
232	/// Returns:
233	///
234	/// 1. The new head pointer.
235	/// 2. Ranges of the indices for slots between the old and new head pointers, i.e. slots requested for reading.
236	/// 3. Ranges of the indices for slots after the new head pointer but before the tail pointer, i.e. remaining
237	///    slots to read.
238	fn incr_head(
239		&self,
240		head: usize,
241		tail: usize,
242		amnt: usize,
243	) -> (usize, [Range<usize>; 2], [Range<usize>; 2]) {
244		debug_assert!(head < self.cap);
245		debug_assert!(tail < self.cap);
246
247		if head <= tail {
248			// No wrapping, limited by tail
249			let new_head = head.saturating_add(amnt).min(tail);
250			return (new_head, [head..new_head, 0..0], [new_head..tail, 0..0]);
251		}
252
253		if head.saturating_add(amnt) < self.cap {
254			// Not enough to wrap
255			let new_head = head + amnt;
256			return (
257				new_head,
258				[head..new_head, 0..0],
259				[new_head..self.cap, 0..tail],
260			);
261		}
262
263		// Wrap
264		let new_head = tail.min(amnt - (self.cap - head));
265		return (
266			new_head,
267			[head..self.cap, 0..new_head],
268			[new_head..tail, 0..0],
269		);
270	}
271
272	/// Increment the tail pointer by an amount, clamped to the available capacity.
273	///
274	/// Returns:
275	///
276	/// 1. The new tail pointer.
277	/// 2. Ranges of the indices for slots before the new tail pointer, i.e. slots that would be made available.
278	/// 3. Ranges of the indices for slots after the new tail pointer but before the head pointer, i.e.
279	///    slots that could still be written.
280	fn incr_tail(
281		&self,
282		head: usize,
283		tail: usize,
284		amnt: usize,
285	) -> (usize, [Range<usize>; 2], [Range<usize>; 2]) {
286		debug_assert!(head < self.cap);
287		debug_assert!(tail < self.cap);
288
289		if head == 0 {
290			// Head at zero, keep last slot open and don't wrap.
291			let new_tail = tail.saturating_add(amnt).min(self.cap - 1);
292			return (
293				new_tail,
294				[tail..new_tail, 0..0],
295				[new_tail..self.cap - 1, 0..0],
296			);
297		}
298
299		if head <= tail && tail.saturating_add(amnt) >= self.cap {
300			// Wrap around. Head != 0.
301			let new_tail = (amnt - (self.cap - tail)).min(head - 1);
302			return (
303				new_tail,
304				[tail..self.cap, 0..new_tail],
305				[new_tail..head - 1, 0..0],
306			);
307		}
308
309		if head <= tail {
310			// No wrapping, limited by cap
311			return (
312				tail + amnt,
313				[tail..tail + amnt, 0..0],
314				[tail + amnt..self.cap, 0..head - 1],
315			);
316		}
317
318		// head > tail, limited by head
319		let new_tail = (tail + amnt).min(head - 1);
320		return (new_tail, [tail..new_tail, 0..0], [new_tail..head - 1, 0..0]);
321	}
322}
323unsafe impl<T: Send, PW: OptionalWaiter, CW: OptionalWaiter> Send for RingBufferShared<T, PW, CW> {}
324unsafe impl<T: Send, PW: OptionalWaiter, CW: OptionalWaiter> Sync for RingBufferShared<T, PW, CW> {}
325impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferShared<T, PW, CW> {
326	fn drop(&mut self) {
327		if core::mem::needs_drop::<T>() {
328			#[cfg(loom)]
329			let (head, tail) = self.head_tail();
330			#[cfg(not(loom))]
331			let (head, tail) = (*self.head.get_mut(), *self.tail.get_mut());
332			unsafe {
333				self.consume(head, tail, usize::MAX, true);
334			}
335		}
336
337		if core::mem::size_of::<T>() != 0 {
338			let layout = Layout::array::<T>(self.cap).unwrap();
339			unsafe {
340				alloc::alloc::dealloc(self.buf.as_ptr() as *mut _, layout);
341			}
342		}
343	}
344}
345
346impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::fmt::Debug for RingBufferShared<T, PW, CW> {
347	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
348		f.debug_struct("RingBufferShared")
349			.field("buf", &self.buf)
350			.field("cap", &self.cap)
351			.field("head", &self.head)
352			.field("tail", &self.tail)
353			.finish()
354	}
355}
356
357/// Producer side of the ring buffer.
358pub struct RingBufferProducer<T, PW: OptionalWaiter, CW: OptionalWaiter> {
359	shared: Arc<RingBufferShared<T, PW, CW>>,
360}
361impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferProducer<T, PW, CW> {
362	fn drop(&mut self) {
363		let v = self.shared.closed_count.fetch_add(1, Ordering::Release);
364		debug_assert!(v <= 1);
365		self.shared.consumer_waiter.notify();
366	}
367}
368
369impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
370	/// Pushes a single item into the ring buffer.
371	///
372	/// If there was enough space, returns `Ok(())`.
373	///
374	/// If the buffer is full, returns `PushError::Full(v)`. If the receiver
375	/// had been dropped, returns `PushError::Closed(v)`. The passed-in value
376	/// is returned so that pushing may be attempted later.
377	pub fn push(&mut self, v: T) -> Result<(), PushError<T>> {
378		self.extend(core::iter::once(v))
379			.map_err(|ClosedError(mut it)| PushError::Closed(it.next().unwrap()))
380			.and_then(|mut it| it.next().map(|v| Err(PushError::Full(v))).unwrap_or(Ok(())))
381	}
382
383	/// Pushes a sequence of items into the ring buffer.
384	///
385	/// Will stop when the iterator runs out of items or the ring buffer
386	/// runs out of space, whichever comes first. Returns the iterator
387	/// with the remaining items. It's recommended to use a `FusedIterator`.
388	///
389	/// In the case of the iterator panicing, already-pushed items may not
390	/// appear to the consumer, and may leak instead.
391	///
392	/// This can be slightly more efficient than pushing individually, as the
393	/// atomic operations for updating the counters will not update until all
394	/// the items are pushed. If `T` is `Copy`, consider using [`RingBufferProducer::push_slice`]
395	/// instead.
396	///
397	/// Returns `Err(ClosedError(it))` if the receiver had been dropped,
398	/// returning the iterator without touching it.
399	pub fn extend<I: Iterator<Item = T>>(&mut self, mut it: I) -> Result<I, ClosedError<I>> {
400		if self.shared.closed_count.load(Ordering::Acquire) > 0 {
401			return Err(ClosedError(it));
402		}
403
404		let (head, mut tail) = self.shared.head_tail();
405		let limit = head.checked_sub(1).unwrap_or(self.shared.cap - 1);
406		if tail == limit {
407			return Ok(it);
408		}
409
410		// TODO: if next panics, objects that did get added will leak. Should drop them.
411		while tail != limit {
412			let v = match it.next() {
413				Some(v) => v,
414				None => break,
415			};
416
417			unsafe {
418				self.shared.buf.as_ptr().add(tail).write(v);
419			}
420			tail = (tail + 1) % self.shared.cap;
421		}
422
423		debug_assert!(tail < self.shared.cap);
424		self.shared.tail.store(tail, Ordering::Release);
425		self.shared.consumer_waiter.notify();
426
427		Ok(it)
428	}
429
430	/// Gets a view into the currently writable portion of this ring buffer.
431	///
432	/// This returns a wrapper over two slices, which, when concatenated, form the
433	/// available space in order. The user should write items to the slices in order,
434	/// then call [`UninitWriteView::produce`] to make them available to the consumer.
435	///
436	/// As opposed to [`RingBufferProducer::view`], this does not do any initialization
437	/// of the available slots, and so the returned slices have elements of `MaybeUninit<T>`.
438	/// Its up to the user to write to the slots via `MaybeUninit::write` and call
439	/// `UninitWriteView::produce` with the correct amount.
440	///
441	/// This can be used, for example, to read bytes directly to the ring buffer, via
442	/// [`std::io::ReadBuf`], without having to use another buffer and paying for the
443	/// overhead of initializing the values.
444	pub fn uninitialized_view(&mut self) -> Result<UninitWriteView<T, PW, CW>, ClosedError<()>> {
445		if self.shared.closed_count.load(Ordering::Acquire) > 0 {
446			return Err(ClosedError(()));
447		}
448
449		let (head, tail) = self.shared.head_tail();
450
451		let (_, _, [slice1, slice2]) = self.shared.incr_tail(head, tail, 0);
452		let slices = unsafe {
453			(
454				core::slice::from_raw_parts_mut(
455					(self.shared.buf.as_ptr() as *mut MaybeUninit<T>).add(slice1.start),
456					slice1.end - slice1.start,
457				),
458				core::slice::from_raw_parts_mut(
459					(self.shared.buf.as_ptr() as *mut MaybeUninit<T>).add(slice2.start),
460					slice2.end - slice2.start,
461				),
462			)
463		};
464
465		Ok(UninitWriteView {
466			slices,
467			shared: &*self.shared,
468			head,
469			tail,
470		})
471	}
472
473	/// Gets how many items that can be written.
474	///
475	/// If the consumer side is being used on another thread, the actual amount
476	/// may increase after this function has returned.
477	pub fn available_len(&self) -> usize {
478		let (head, tail) = self.shared.head_tail();
479		let (_, _, [slice1, slice2]) = self.shared.incr_tail(head, tail, 0);
480		(slice1.end - slice1.start) + (slice2.end - slice2.start)
481	}
482
483	/// Returns the maximum number of elements that the buffer can hold.
484	pub fn capacity(&self) -> usize {
485		self.shared.cap - 1
486	}
487}
488impl<T: Default, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
489	/// Gets a view into the currently writable portion of this ring buffer.
490	///
491	/// This returns a wrapper over two slices, which, when concatenated, form the
492	/// available space in order. The user should write items to the slices in order,
493	/// then call [`UninitWriteView::produce`] to make them available to the consumer.
494	///
495	/// As opposed to [`RingBufferProducer::uninitialized_view`], this initializes
496	/// each element with `Default::default`, meaning there will be no uninitialized
497	/// values in the buffer and the interface can be used in safe code.
498	///
499	/// This can be used, for example, to read bytes directly to the ring buffer, via
500	/// [`std::io::Read::read`], without having to use another buffer.
501	pub fn view(&mut self) -> Result<WriteView<T, PW, CW>, ClosedError<()>> {
502		self.sized_view(usize::MAX)
503	}
504
505	/// Gets a view into the currently writable portion of this ring buffer, limited by a size.
506	///
507	/// Works similar to [`RingBufferProducer::view`], but caps the number of elements in the view
508	/// to the specified number. This can be used to avoid default-initializing an excessive number of
509	/// elements.
510	pub fn sized_view(&mut self, size: usize) -> Result<WriteView<T, PW, CW>, ClosedError<()>> {
511		if self.shared.closed_count.load(Ordering::Acquire) > 0 {
512			return Err(ClosedError(()));
513		}
514
515		let (head, tail) = self.shared.head_tail();
516
517		let (_, [slice1, slice2], _) = self.shared.incr_tail(head, tail, size);
518
519		unsafe {
520			for i in slice1.clone() {
521				self.shared.buf.as_ptr().add(i).write(Default::default());
522			}
523			for i in slice2.clone() {
524				self.shared.buf.as_ptr().add(i).write(Default::default());
525			}
526		}
527
528		let slices = unsafe {
529			(
530				core::slice::from_raw_parts_mut(
531					self.shared.buf.as_ptr().add(slice1.start),
532					slice1.end - slice1.start,
533				),
534				core::slice::from_raw_parts_mut(
535					self.shared.buf.as_ptr().add(slice2.start),
536					slice2.end - slice2.start,
537				),
538			)
539		};
540
541		Ok(WriteView {
542			slices,
543			shared: &*self.shared,
544			head,
545			tail,
546		})
547	}
548}
549impl<T: Copy, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
550	/// Pushes a slice of copyable values.
551	///
552	/// This memcpy's the slice into the buffer, so it should be
553	/// more efficient than [`RingBufferProducer::push`] or [`RingBufferProducer::extend`].
554	///
555	/// Returns the amount of items pushed, which may be less than the length
556	/// of the slice is the buffer fills up.
557	///
558	/// If the receiver had been dropped, returns `Err(ClosedError(()))` without
559	/// touching the values slice.
560	pub fn push_slice(&mut self, values: &[T]) -> Result<usize, ClosedError<()>> {
561		if self.shared.closed_count.load(Ordering::Acquire) > 0 {
562			return Err(ClosedError(()));
563		}
564		let (head, tail) = self.shared.head_tail();
565
566		let (new_tail, [range1, range2], _) = self.shared.incr_tail(head, tail, values.len());
567		debug_assert!((range1.end - range1.start) + (range2.end - range2.start) <= values.len());
568		unsafe {
569			self.shared
570				.buf
571				.as_ptr()
572				.add(range1.start)
573				.copy_from_nonoverlapping(values.as_ptr(), range1.end - range1.start);
574			self.shared
575				.buf
576				.as_ptr()
577				.add(range2.start)
578				.copy_from_nonoverlapping(
579					values[range1.end - range1.start..].as_ptr(),
580					range2.end - range2.start,
581				);
582		}
583
584		debug_assert!(tail < self.shared.cap);
585		self.shared.tail.store(new_tail, Ordering::Release);
586		Ok((range1.end - range1.start) + (range2.end - range2.start))
587	}
588}
589impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW>
590where
591	PW: for<'a> waiter::Waiter<'a>,
592{
593	/// Waits for space to become available in the buffer, or for the consumer to be closed.
594	///
595	/// This method is only available if the buffer was created with a producer waker.
596	///
597	/// How the wait happens is up to the waiter - it may block, return a future, etc.
598	/// However, it should return immediately if space is currently available.
599	pub fn wait<'a>(&'a mut self) -> <PW as waiter::Waiter<'a>>::Future {
600		let checker = waiter::WakeCheck {
601			head: &self.shared.head,
602			tail: &self.shared.tail,
603			closed_count: &self.shared.closed_count,
604			cap: self.shared.cap,
605			is_producer: true,
606		};
607		self.shared.producer_waiter.wait(checker)
608	}
609
610	/// Gets the waiter.
611	///
612	/// Can be used to alter settings of the waiter, for example, by setting a timeout.
613	pub fn waiter(&mut self) -> &PW {
614		&self.shared.producer_waiter
615	}
616}
617
618/// Consumer side of the ring buffer.
619pub struct RingBufferConsumer<T, PW: OptionalWaiter, CW: OptionalWaiter> {
620	shared: Arc<RingBufferShared<T, PW, CW>>,
621}
622impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferConsumer<T, PW, CW> {
623	fn drop(&mut self) {
624		let v = self.shared.closed_count.fetch_add(1, Ordering::Release);
625		debug_assert!(v <= 1);
626		self.shared.producer_waiter.notify();
627	}
628}
629
630impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW> {
631	/// Tries to pop an item from the buffer.
632	///
633	/// Returns `Ok(Some(_))` if the buffer had an item, `Ok(None)` if
634	/// the buffer was empty but the sender is still available, or `Err(ClosedError(()))`
635	/// if the buffer is empty and the sender had been dropped.
636	pub fn pop(&mut self) -> Result<Option<T>, ClosedError<()>> {
637		let (head, tail) = self.shared.head_tail();
638		if head == tail {
639			if self.shared.closed_count.load(Ordering::Acquire) > 0 {
640				return Err(ClosedError(()));
641			}
642			return Ok(None);
643		}
644		let v = unsafe { self.shared.buf.as_ptr().add(head).read() };
645		unsafe {
646			self.shared.consume(head, tail, 1, false);
647		}
648		Ok(Some(v))
649	}
650
651	/// Gets a view into the currently readable portion of this ring buffer.
652	///
653	/// This returns a wrapper over two slices, which, when concatenated, form the
654	/// available space in order. The user should read the items, then call
655	/// [`ReadView::consume`] or [`ReadView::consume_all`] to remove the items
656	/// from the buffer.
657	///
658	/// If the buffer is empty and the sender had been dropped, returns
659	/// `Err(ClosedError(()))`.
660	///
661	/// This can be used, for example, to write bytes directly from the ring buffer, via
662	/// [`std::io::Write::write_all`], without having to use another buffer.
663	pub fn view(&mut self) -> Result<ReadView<T, PW, CW>, ClosedError<()>> {
664		let (head, tail) = self.shared.head_tail();
665
666		let (_, _, [slice1, slice2]) = self.shared.incr_head(head, tail, 0);
667
668		if slice1.end - slice1.start == 0 {
669			debug_assert_eq!(slice1.end - slice1.start, 0);
670			if self.shared.closed_count.load(Ordering::Acquire) > 0 {
671				return Err(ClosedError(()));
672			}
673		}
674
675		let slices = unsafe {
676			(
677				core::slice::from_raw_parts_mut(
678					self.shared.buf.as_ptr().add(slice1.start),
679					slice1.end - slice1.start,
680				),
681				core::slice::from_raw_parts_mut(
682					self.shared.buf.as_ptr().add(slice2.start),
683					slice2.end - slice2.start,
684				),
685			)
686		};
687
688		Ok(ReadView {
689			slices,
690			shared: &*self.shared,
691			head,
692			tail,
693		})
694	}
695
696	/// Gets how many items that can be read.
697	///
698	/// If the producer side is being used on another thread, the actual amount
699	/// may increase after this function has returned.
700	pub fn available_len(&self) -> usize {
701		let (head, tail) = self.shared.head_tail();
702		let (_, _, [slice1, slice2]) = self.shared.incr_head(head, tail, 0);
703		(slice1.end - slice1.start) + (slice2.end - slice2.start)
704	}
705
706	/// Returns the maximum number of elements that the buffer can hold.
707	pub fn capacity(&self) -> usize {
708		self.shared.cap - 1
709	}
710}
711impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW>
712where
713	CW: for<'a> waiter::Waiter<'a>,
714{
715	/// Waits for an item to be produced into the buffer, or for the producer to be closed.
716	///
717	/// This method is only available if the buffer was created with a producer waker.
718	///
719	/// How the wait happens is up to the waiter - it may block, return a future, etc.
720	/// Ideally, it should return immediately if space is already available.
721	pub fn wait<'a>(&'a mut self) -> <CW as waiter::Waiter<'a>>::Future {
722		let checker = waiter::WakeCheck {
723			head: &self.shared.head,
724			tail: &self.shared.tail,
725			closed_count: &self.shared.closed_count,
726			cap: self.shared.cap,
727			is_producer: false,
728		};
729		self.shared.consumer_waiter.wait(checker)
730	}
731
732	/// Gets the waiter.
733	///
734	/// Can be used to alter settings of the waiter, for example, by setting a timeout.
735	pub fn waiter(&mut self) -> &CW {
736		&self.shared.consumer_waiter
737	}
738}
739impl<T: Copy, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW> {
740	/// Pops as many items as possible, copying them into a destination slice.
741	///
742	/// Returns the amount of items copied, which may be zero.
743	pub fn pop_to_slice(&mut self, dest: &mut [T]) -> Result<usize, ClosedError<()>> {
744		let v = self.view()?;
745		let s1amnt = dest.len().min(v.0.len());
746		let s2amnt = v.1.len().min(dest.len() - s1amnt);
747		dest[..s1amnt].copy_from_slice(&v.0[..s1amnt]);
748		dest[s1amnt..s1amnt + s2amnt].copy_from_slice(&v.1[..s2amnt]);
749		v.consume(s1amnt + s2amnt);
750		Ok(s1amnt + s2amnt)
751	}
752}
753
754/// Ring buffer available contents, as two slices.
755///
756/// The two slices are similar to how `VecDeque::to_slices` work -  when concatenated, they
757/// contain the available contents of the buffer at the time of calling.
758///
759/// After the contents have been examined, you may call `consume` or `consume_all` to remove the items
760/// from the buffer and free up their slots for the producer.
761pub struct ReadView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
762	pub slices: (&'a mut [T], &'a mut [T]),
763	shared: &'a RingBufferShared<T, PW, CW>,
764	head: usize,
765	tail: usize,
766}
767impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref for ReadView<'a, T, PW, CW> {
768	type Target = (&'a mut [T], &'a mut [T]);
769
770	fn deref(&self) -> &Self::Target {
771		&self.slices
772	}
773}
774impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
775	for ReadView<'a, T, PW, CW>
776{
777	fn deref_mut(&mut self) -> &mut Self::Target {
778		&mut self.slices
779	}
780}
781impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> ReadView<'a, T, PW, CW> {
782	/// Consumes the first `amnt` items from the buffer, dropping them and
783	/// freeing the slots for the producer to fill.
784	///
785	/// `amnt` is clamped to the number of items in this view.
786	pub fn consume(self, amnt: usize) {
787		let Self {
788			slices: _,
789			shared,
790			head,
791			tail,
792		} = self;
793		unsafe { shared.consume(head, tail, amnt, true) }
794	}
795
796	/// Consumes all items in this view from the buffer.
797	pub fn consume_all(self) {
798		self.consume(usize::MAX);
799	}
800
801	/// Iterates through each available element, in order.
802	///
803	/// Equivalent to chaining the two slice's iterators.
804	pub fn iter(&mut self) -> core::iter::Chain<core::slice::IterMut<T>, core::slice::IterMut<T>> {
805		let i1 = self.slices.0.iter_mut();
806		let i2 = self.slices.1.iter_mut();
807		i1.chain(i2)
808	}
809
810	/// Checks if the view is empty.
811	///
812	/// Equivalent to testing if the first slices is empty. If the first slice is empty,
813	/// the second one will be as well.
814	pub fn is_empty(&self) -> bool {
815		self.slices.0.is_empty()
816	}
817
818	/// Gets how many items are in the view.
819	///
820	/// This is the sum of the lengths of the two slices.
821	pub fn len(&self) -> usize {
822		self.slices.0.len() + self.slices.1.len()
823	}
824
825	/// Gets an item from the view, at the `i`'th place, or `None` if out of bounds.
826	///
827	/// Mimics `slice::get`.
828	pub fn get(&self, i: usize) -> Option<&T> {
829		if i > self.len() {
830			None
831		} else if i > self.slices.0.len() {
832			Some(&self.slices.1[i - self.0.len()])
833		} else {
834			Some(&self.slices.0[i])
835		}
836	}
837}
838impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
839	for ReadView<'a, T, PW, CW>
840{
841	type Output = T;
842
843	fn index(&self, i: usize) -> &Self::Output {
844		if i > self.slices.0.len() {
845			&self.slices.1[i - self.0.len()]
846		} else {
847			&self.slices.0[i]
848		}
849	}
850}
851
852/// View into the writable portion of the buffer.
853pub struct WriteView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
854	pub slices: (&'a mut [T], &'a mut [T]),
855	shared: &'a RingBufferShared<T, PW, CW>,
856	head: usize,
857	tail: usize,
858}
859impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref for WriteView<'a, T, PW, CW> {
860	type Target = (&'a mut [T], &'a mut [T]);
861
862	fn deref(&self) -> &Self::Target {
863		&self.slices
864	}
865}
866impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
867	for WriteView<'a, T, PW, CW>
868{
869	fn deref_mut(&mut self) -> &mut Self::Target {
870		&mut self.slices
871	}
872}
873impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> WriteView<'a, T, PW, CW> {
874	/// Produces the specified number of items, making them available to the receiver.
875	///
876	/// The remaining elements are dropped.
877	///
878	/// `amnt` is clamped to the number of items in this view.
879	pub fn produce(self, amnt: usize) {
880		let len = self.len();
881		let amnt = amnt.min(len);
882
883		let shared = self.shared;
884		let head = self.head;
885		let tail = self.tail;
886		core::mem::forget(self);
887
888		let (new_tail, _, _) = shared.incr_tail(head, tail, amnt);
889		debug_assert!(new_tail < shared.cap);
890		shared.tail.store(new_tail, Ordering::Release);
891		shared.consumer_waiter.notify();
892
893		let (_, [drop1, drop2], _) = shared.incr_tail(head, new_tail, len - amnt);
894		debug_assert!(new_tail < shared.cap);
895		if core::mem::needs_drop::<T>() {
896			unsafe {
897				for i in drop1 {
898					shared.buf.as_ptr().add(i).drop_in_place();
899				}
900				for i in drop2 {
901					shared.buf.as_ptr().add(i).drop_in_place();
902				}
903			}
904		}
905	}
906
907	/// Iterates through each available slot, in order.
908	///
909	/// Equivalent to chaining the two slice's iterators.
910	pub fn iter(&mut self) -> core::iter::Chain<core::slice::IterMut<T>, core::slice::IterMut<T>> {
911		let i1 = self.slices.0.iter_mut();
912		let i2 = self.slices.1.iter_mut();
913		i1.chain(i2)
914	}
915
916	/// Checks if the view is empty.
917	///
918	/// Equivalent to testing if the first slices is empty. If the first slice is empty,
919	/// the second one will be as well.
920	pub fn is_empty(&self) -> bool {
921		self.slices.0.is_empty()
922	}
923
924	/// Gets the length of the view.
925	///
926	/// This is the sum of the lengths of both slices.
927	pub fn len(&self) -> usize {
928		self.0.len() + self.1.len()
929	}
930
931	/// Gets an item from the view, at the `i`'th place, or `None` if out of bounds.
932	///
933	/// Mimics `slice::get_mut`.
934	pub fn get(&mut self, i: usize) -> Option<&mut T> {
935		if i > self.len() {
936			None
937		} else if i > self.slices.0.len() {
938			Some(&mut self.slices.1[i - self.0.len()])
939		} else {
940			Some(&mut self.slices.0[i])
941		}
942	}
943}
944impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for WriteView<'a, T, PW, CW> {
945	fn drop(&mut self) {
946		let len = self.len();
947		let (_, [drop1, drop2], _) = self.shared.incr_tail(self.head, self.tail, len);
948		if core::mem::needs_drop::<T>() {
949			unsafe {
950				for i in drop1 {
951					self.shared.buf.as_ptr().add(i).drop_in_place();
952				}
953				for i in drop2 {
954					self.shared.buf.as_ptr().add(i).drop_in_place();
955				}
956			}
957		}
958	}
959}
960impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
961	for WriteView<'a, T, PW, CW>
962{
963	type Output = T;
964
965	fn index(&self, i: usize) -> &Self::Output {
966		if i > self.slices.0.len() {
967			&self.slices.1[i - self.0.len()]
968		} else {
969			&self.slices.0[i]
970		}
971	}
972}
973impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::IndexMut<usize>
974	for WriteView<'a, T, PW, CW>
975{
976	fn index_mut(&mut self, i: usize) -> &mut T {
977		if i > self.slices.0.len() {
978			&mut self.slices.1[i - self.0.len()]
979		} else {
980			&mut self.slices.0[i]
981		}
982	}
983}
984
985/// View into the writable portion of the buffer, with uninitialized elements.
986///
987/// Using this is unsafe, as an accurate number of items produces must be given,
988/// otherwise this will produce uninitialized values to the consumer.
989pub struct UninitWriteView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
990	pub slices: (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]),
991	shared: &'a RingBufferShared<T, PW, CW>,
992	head: usize,
993	tail: usize,
994}
995impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref
996	for UninitWriteView<'a, T, PW, CW>
997{
998	type Target = (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]);
999
1000	fn deref(&self) -> &Self::Target {
1001		&self.slices
1002	}
1003}
1004impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
1005	for UninitWriteView<'a, T, PW, CW>
1006{
1007	fn deref_mut(&mut self) -> &mut Self::Target {
1008		&mut self.slices
1009	}
1010}
1011impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> UninitWriteView<'a, T, PW, CW> {
1012	/// Produces the specified number of items, making them available to the receiver.
1013	///
1014	/// The remaining elements are assumed to be uninitialzed and will not be accessed.
1015	///
1016	/// `amnt` is clamped to the number of items in this view.
1017	///
1018	/// # Safety
1019	///
1020	/// The elements being produced must have been initialized.
1021	pub unsafe fn produce(self, amnt: usize) {
1022		let Self {
1023			slices: _,
1024			shared,
1025			head,
1026			tail,
1027		} = self;
1028
1029		let (new_tail, _, _) = shared.incr_tail(head, tail, amnt);
1030
1031		debug_assert!(new_tail < self.shared.cap);
1032		self.shared.tail.store(new_tail, Ordering::Release);
1033	}
1034
1035	/// Iterates through each available slot, in order.
1036	///
1037	/// Equivalent to chaining the two slice's iterators.
1038	pub fn iter(
1039		&mut self,
1040	) -> core::iter::Chain<core::slice::IterMut<MaybeUninit<T>>, core::slice::IterMut<MaybeUninit<T>>>
1041	{
1042		let i1 = self.slices.0.iter_mut();
1043		let i2 = self.slices.1.iter_mut();
1044		i1.chain(i2)
1045	}
1046
1047	/// Checks if the view is empty.
1048	///
1049	/// Equivalent to testing if the first slices is empty. If the first slice is empty,
1050	/// the second one will be as well.
1051	pub fn is_empty(&self) -> bool {
1052		self.slices.0.is_empty()
1053	}
1054
1055	/// Gets the length of the view.
1056	///
1057	/// This is the sum of the lengths of both slices.
1058	pub fn len(&self) -> usize {
1059		self.0.len() + self.1.len()
1060	}
1061
1062	/// Gets an item from the view, at the `i`'th place, or `None` if out of bounds.
1063	///
1064	/// Mimics `slice::get_mut`.
1065	pub fn get(&mut self, i: usize) -> Option<&mut MaybeUninit<T>> {
1066		if i > self.len() {
1067			None
1068		} else if i > self.slices.0.len() {
1069			Some(&mut self.slices.1[i - self.0.len()])
1070		} else {
1071			Some(&mut self.slices.0[i])
1072		}
1073	}
1074}
1075impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
1076	for UninitWriteView<'a, T, PW, CW>
1077{
1078	type Output = MaybeUninit<T>;
1079
1080	fn index(&self, i: usize) -> &Self::Output {
1081		if i > self.slices.0.len() {
1082			&self.slices.1[i - self.0.len()]
1083		} else {
1084			&self.slices.0[i]
1085		}
1086	}
1087}
1088impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::IndexMut<usize>
1089	for UninitWriteView<'a, T, PW, CW>
1090{
1091	fn index_mut(&mut self, i: usize) -> &mut MaybeUninit<T> {
1092		if i > self.slices.0.len() {
1093			&mut self.slices.1[i - self.0.len()]
1094		} else {
1095			&mut self.slices.0[i]
1096		}
1097	}
1098}
1099
1100/// Struct for ensuring an AtomicUsize is always set and a waiter is notified,
1101/// even if panicking.
1102struct SetOnDrop<'a, W: OptionalWaiter>(&'a AtomicUsize, usize, &'a W);
1103impl<'a, W: OptionalWaiter> core::ops::Drop for SetOnDrop<'a, W> {
1104	fn drop(&mut self) {
1105		self.0.store(self.1, Ordering::Release);
1106		self.2.notify()
1107	}
1108}
1109
1110/// Error produced when trying to do an operation but the other side is closed.
1111///
1112/// For [`RingBufferProducer`], most functions will start returning this error
1113/// as soon as the receiver has been dropped. The error may contain the value
1114/// that was being pushed, so that the sender can do something with it.
1115///
1116/// For [`RingBufferConsumer`], most functions will start returning this error
1117/// only after all the data in the buffer has been consumed and it is now empty.
1118#[derive(Debug, Clone, PartialEq, Eq)]
1119pub struct ClosedError<T>(pub T);
1120impl<T> core::fmt::Display for ClosedError<T> {
1121	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1122		f.write_str("Other side of buffer dropped")
1123	}
1124}
1125
1126/// Error produced while trying to push a single value to the buffer.
1127///
1128/// The error contains the value that was being pushed, so that the sender
1129/// can try again later.
1130#[derive(Debug, Clone, PartialEq, Eq)]
1131pub enum PushError<T> {
1132	/// Buffer is full.
1133	Full(T),
1134	/// Receiver was dropped. Equivalent to [`ClosedError`].
1135	Closed(T),
1136}
1137impl<T> core::fmt::Display for PushError<T> {
1138	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1139		match self {
1140			Self::Full(_) => f.write_str("Buffer full"),
1141			Self::Closed(_) => f.write_str("Other side of buffer dropped"),
1142		}
1143	}
1144}
1145impl<T> From<ClosedError<T>> for PushError<T> {
1146	fn from(v: ClosedError<T>) -> Self {
1147		Self::Closed(v.0)
1148	}
1149}