1#![cfg_attr(not(feature = "std"), no_std)]
14
15extern crate alloc;
16
17#[cfg(any(test, doctest))]
18mod tests;
19pub mod waiter;
20
21use alloc::alloc::Layout;
22#[cfg(not(loom))]
23use alloc::sync::Arc;
24#[cfg(not(loom))]
25use core::sync::atomic::{
26 AtomicUsize,
27 Ordering,
28};
29use core::{
30 mem::MaybeUninit,
31 ops::Range,
32 ptr::NonNull,
33};
34
35#[cfg(loom)]
36use loom::sync::{
37 atomic::{
38 AtomicUsize,
39 Ordering,
40 },
41 Arc,
42};
43use waiter::OptionalWaiter;
44
45pub fn new_waitless<T>(
56 capacity: usize,
57) -> (RingBufferProducer<T, (), ()>, RingBufferConsumer<T, (), ()>) {
58 with_waiters(capacity, (), ())
59}
60
61#[cfg(feature = "std")]
74pub fn new_blocking<T>(
75 capacity: usize,
76) -> (
77 RingBufferProducer<T, waiter::BlockingWaiter, waiter::BlockingWaiter>,
78 RingBufferConsumer<T, waiter::BlockingWaiter, waiter::BlockingWaiter>,
79) {
80 with_waiters(
81 capacity,
82 waiter::BlockingWaiter::new(),
83 waiter::BlockingWaiter::new(),
84 )
85}
86
87#[cfg(feature = "async")]
100pub fn new_async<T>(
101 capacity: usize,
102) -> (
103 RingBufferProducer<T, waiter::AsyncWaiter, waiter::AsyncWaiter>,
104 RingBufferConsumer<T, waiter::AsyncWaiter, waiter::AsyncWaiter>,
105) {
106 with_waiters(
107 capacity,
108 waiter::AsyncWaiter::new(),
109 waiter::AsyncWaiter::new(),
110 )
111}
112
113pub fn with_waiters<T, PW: OptionalWaiter, CW: OptionalWaiter>(
135 mut capacity: usize,
136 producer_waiter: PW,
137 consumer_waiter: CW,
138) -> (RingBufferProducer<T, PW, CW>, RingBufferConsumer<T, PW, CW>) {
139 assert!(capacity != 0, "Cannot create a 0 capacity ring buffer");
140 capacity = capacity.saturating_add(1);
142 let layout = Layout::array::<T>(capacity).unwrap();
143 let buf = if core::mem::size_of::<T>() == 0 {
144 NonNull::dangling()
145 } else {
146 unsafe {
147 NonNull::new(alloc::alloc::alloc(layout) as *mut T)
148 .unwrap_or_else(|| alloc::alloc::handle_alloc_error(layout))
149 }
150 };
151
152 let shared = Arc::new(RingBufferShared::<T, PW, CW> {
153 buf,
154 cap: capacity,
155 head: AtomicUsize::new(0),
156 tail: AtomicUsize::new(0),
157 closed_count: AtomicUsize::new(0),
158 producer_waiter,
159 consumer_waiter,
160 _ph: core::marker::PhantomData::default(),
161 });
162
163 let producer = RingBufferProducer {
164 shared: shared.clone(),
165 };
166 let consumer = RingBufferConsumer { shared };
167 (producer, consumer)
168}
169
170pub fn with_default_waiters<T, PW: OptionalWaiter + Default, CW: OptionalWaiter + Default>(
182 capacity: usize,
183) -> (RingBufferProducer<T, PW, CW>, RingBufferConsumer<T, PW, CW>) {
184 with_waiters(capacity, Default::default(), Default::default())
185}
186
187struct RingBufferShared<T, PW: OptionalWaiter, CW: OptionalWaiter> {
189 buf: NonNull<T>,
190 cap: usize,
191 head: AtomicUsize,
193 tail: AtomicUsize,
195 closed_count: AtomicUsize,
200 producer_waiter: PW,
201 consumer_waiter: CW,
202 _ph: core::marker::PhantomData<T>,
203}
204impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferShared<T, PW, CW> {
205 fn head_tail(&self) -> (usize, usize) {
206 let head = self.head.load(Ordering::Acquire);
207 let tail = self.tail.load(Ordering::Acquire);
208 debug_assert!(head < self.cap);
209 debug_assert!(tail < self.cap);
210 (head, tail)
211 }
212
213 unsafe fn consume(&self, head: usize, tail: usize, amnt: usize, drop: bool) {
214 let (new_head, [drop1, drop2], _) = self.incr_head(head, tail, amnt);
215 debug_assert!(new_head < self.cap);
216 let set_on_drop = SetOnDrop(&self.head, new_head, &self.producer_waiter);
217
218 if drop && core::mem::needs_drop::<T>() {
219 for i in drop1 {
220 self.buf.as_ptr().add(i).drop_in_place();
221 }
222 for i in drop2 {
223 self.buf.as_ptr().add(i).drop_in_place();
224 }
225 }
226
227 core::mem::drop(set_on_drop);
228 }
229
230 fn incr_head(
239 &self,
240 head: usize,
241 tail: usize,
242 amnt: usize,
243 ) -> (usize, [Range<usize>; 2], [Range<usize>; 2]) {
244 debug_assert!(head < self.cap);
245 debug_assert!(tail < self.cap);
246
247 if head <= tail {
248 let new_head = head.saturating_add(amnt).min(tail);
250 return (new_head, [head..new_head, 0..0], [new_head..tail, 0..0]);
251 }
252
253 if head.saturating_add(amnt) < self.cap {
254 let new_head = head + amnt;
256 return (
257 new_head,
258 [head..new_head, 0..0],
259 [new_head..self.cap, 0..tail],
260 );
261 }
262
263 let new_head = tail.min(amnt - (self.cap - head));
265 return (
266 new_head,
267 [head..self.cap, 0..new_head],
268 [new_head..tail, 0..0],
269 );
270 }
271
272 fn incr_tail(
281 &self,
282 head: usize,
283 tail: usize,
284 amnt: usize,
285 ) -> (usize, [Range<usize>; 2], [Range<usize>; 2]) {
286 debug_assert!(head < self.cap);
287 debug_assert!(tail < self.cap);
288
289 if head == 0 {
290 let new_tail = tail.saturating_add(amnt).min(self.cap - 1);
292 return (
293 new_tail,
294 [tail..new_tail, 0..0],
295 [new_tail..self.cap - 1, 0..0],
296 );
297 }
298
299 if head <= tail && tail.saturating_add(amnt) >= self.cap {
300 let new_tail = (amnt - (self.cap - tail)).min(head - 1);
302 return (
303 new_tail,
304 [tail..self.cap, 0..new_tail],
305 [new_tail..head - 1, 0..0],
306 );
307 }
308
309 if head <= tail {
310 return (
312 tail + amnt,
313 [tail..tail + amnt, 0..0],
314 [tail + amnt..self.cap, 0..head - 1],
315 );
316 }
317
318 let new_tail = (tail + amnt).min(head - 1);
320 return (new_tail, [tail..new_tail, 0..0], [new_tail..head - 1, 0..0]);
321 }
322}
323unsafe impl<T: Send, PW: OptionalWaiter, CW: OptionalWaiter> Send for RingBufferShared<T, PW, CW> {}
324unsafe impl<T: Send, PW: OptionalWaiter, CW: OptionalWaiter> Sync for RingBufferShared<T, PW, CW> {}
325impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferShared<T, PW, CW> {
326 fn drop(&mut self) {
327 if core::mem::needs_drop::<T>() {
328 #[cfg(loom)]
329 let (head, tail) = self.head_tail();
330 #[cfg(not(loom))]
331 let (head, tail) = (*self.head.get_mut(), *self.tail.get_mut());
332 unsafe {
333 self.consume(head, tail, usize::MAX, true);
334 }
335 }
336
337 if core::mem::size_of::<T>() != 0 {
338 let layout = Layout::array::<T>(self.cap).unwrap();
339 unsafe {
340 alloc::alloc::dealloc(self.buf.as_ptr() as *mut _, layout);
341 }
342 }
343 }
344}
345
346impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::fmt::Debug for RingBufferShared<T, PW, CW> {
347 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
348 f.debug_struct("RingBufferShared")
349 .field("buf", &self.buf)
350 .field("cap", &self.cap)
351 .field("head", &self.head)
352 .field("tail", &self.tail)
353 .finish()
354 }
355}
356
357pub struct RingBufferProducer<T, PW: OptionalWaiter, CW: OptionalWaiter> {
359 shared: Arc<RingBufferShared<T, PW, CW>>,
360}
361impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferProducer<T, PW, CW> {
362 fn drop(&mut self) {
363 let v = self.shared.closed_count.fetch_add(1, Ordering::Release);
364 debug_assert!(v <= 1);
365 self.shared.consumer_waiter.notify();
366 }
367}
368
369impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
370 pub fn push(&mut self, v: T) -> Result<(), PushError<T>> {
378 self.extend(core::iter::once(v))
379 .map_err(|ClosedError(mut it)| PushError::Closed(it.next().unwrap()))
380 .and_then(|mut it| it.next().map(|v| Err(PushError::Full(v))).unwrap_or(Ok(())))
381 }
382
383 pub fn extend<I: Iterator<Item = T>>(&mut self, mut it: I) -> Result<I, ClosedError<I>> {
400 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
401 return Err(ClosedError(it));
402 }
403
404 let (head, mut tail) = self.shared.head_tail();
405 let limit = head.checked_sub(1).unwrap_or(self.shared.cap - 1);
406 if tail == limit {
407 return Ok(it);
408 }
409
410 while tail != limit {
412 let v = match it.next() {
413 Some(v) => v,
414 None => break,
415 };
416
417 unsafe {
418 self.shared.buf.as_ptr().add(tail).write(v);
419 }
420 tail = (tail + 1) % self.shared.cap;
421 }
422
423 debug_assert!(tail < self.shared.cap);
424 self.shared.tail.store(tail, Ordering::Release);
425 self.shared.consumer_waiter.notify();
426
427 Ok(it)
428 }
429
430 pub fn uninitialized_view(&mut self) -> Result<UninitWriteView<T, PW, CW>, ClosedError<()>> {
445 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
446 return Err(ClosedError(()));
447 }
448
449 let (head, tail) = self.shared.head_tail();
450
451 let (_, _, [slice1, slice2]) = self.shared.incr_tail(head, tail, 0);
452 let slices = unsafe {
453 (
454 core::slice::from_raw_parts_mut(
455 (self.shared.buf.as_ptr() as *mut MaybeUninit<T>).add(slice1.start),
456 slice1.end - slice1.start,
457 ),
458 core::slice::from_raw_parts_mut(
459 (self.shared.buf.as_ptr() as *mut MaybeUninit<T>).add(slice2.start),
460 slice2.end - slice2.start,
461 ),
462 )
463 };
464
465 Ok(UninitWriteView {
466 slices,
467 shared: &*self.shared,
468 head,
469 tail,
470 })
471 }
472
473 pub fn available_len(&self) -> usize {
478 let (head, tail) = self.shared.head_tail();
479 let (_, _, [slice1, slice2]) = self.shared.incr_tail(head, tail, 0);
480 (slice1.end - slice1.start) + (slice2.end - slice2.start)
481 }
482
483 pub fn capacity(&self) -> usize {
485 self.shared.cap - 1
486 }
487}
488impl<T: Default, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
489 pub fn view(&mut self) -> Result<WriteView<T, PW, CW>, ClosedError<()>> {
502 self.sized_view(usize::MAX)
503 }
504
505 pub fn sized_view(&mut self, size: usize) -> Result<WriteView<T, PW, CW>, ClosedError<()>> {
511 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
512 return Err(ClosedError(()));
513 }
514
515 let (head, tail) = self.shared.head_tail();
516
517 let (_, [slice1, slice2], _) = self.shared.incr_tail(head, tail, size);
518
519 unsafe {
520 for i in slice1.clone() {
521 self.shared.buf.as_ptr().add(i).write(Default::default());
522 }
523 for i in slice2.clone() {
524 self.shared.buf.as_ptr().add(i).write(Default::default());
525 }
526 }
527
528 let slices = unsafe {
529 (
530 core::slice::from_raw_parts_mut(
531 self.shared.buf.as_ptr().add(slice1.start),
532 slice1.end - slice1.start,
533 ),
534 core::slice::from_raw_parts_mut(
535 self.shared.buf.as_ptr().add(slice2.start),
536 slice2.end - slice2.start,
537 ),
538 )
539 };
540
541 Ok(WriteView {
542 slices,
543 shared: &*self.shared,
544 head,
545 tail,
546 })
547 }
548}
549impl<T: Copy, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW> {
550 pub fn push_slice(&mut self, values: &[T]) -> Result<usize, ClosedError<()>> {
561 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
562 return Err(ClosedError(()));
563 }
564 let (head, tail) = self.shared.head_tail();
565
566 let (new_tail, [range1, range2], _) = self.shared.incr_tail(head, tail, values.len());
567 debug_assert!((range1.end - range1.start) + (range2.end - range2.start) <= values.len());
568 unsafe {
569 self.shared
570 .buf
571 .as_ptr()
572 .add(range1.start)
573 .copy_from_nonoverlapping(values.as_ptr(), range1.end - range1.start);
574 self.shared
575 .buf
576 .as_ptr()
577 .add(range2.start)
578 .copy_from_nonoverlapping(
579 values[range1.end - range1.start..].as_ptr(),
580 range2.end - range2.start,
581 );
582 }
583
584 debug_assert!(tail < self.shared.cap);
585 self.shared.tail.store(new_tail, Ordering::Release);
586 Ok((range1.end - range1.start) + (range2.end - range2.start))
587 }
588}
589impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferProducer<T, PW, CW>
590where
591 PW: for<'a> waiter::Waiter<'a>,
592{
593 pub fn wait<'a>(&'a mut self) -> <PW as waiter::Waiter<'a>>::Future {
600 let checker = waiter::WakeCheck {
601 head: &self.shared.head,
602 tail: &self.shared.tail,
603 closed_count: &self.shared.closed_count,
604 cap: self.shared.cap,
605 is_producer: true,
606 };
607 self.shared.producer_waiter.wait(checker)
608 }
609
610 pub fn waiter(&mut self) -> &PW {
614 &self.shared.producer_waiter
615 }
616}
617
618pub struct RingBufferConsumer<T, PW: OptionalWaiter, CW: OptionalWaiter> {
620 shared: Arc<RingBufferShared<T, PW, CW>>,
621}
622impl<T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for RingBufferConsumer<T, PW, CW> {
623 fn drop(&mut self) {
624 let v = self.shared.closed_count.fetch_add(1, Ordering::Release);
625 debug_assert!(v <= 1);
626 self.shared.producer_waiter.notify();
627 }
628}
629
630impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW> {
631 pub fn pop(&mut self) -> Result<Option<T>, ClosedError<()>> {
637 let (head, tail) = self.shared.head_tail();
638 if head == tail {
639 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
640 return Err(ClosedError(()));
641 }
642 return Ok(None);
643 }
644 let v = unsafe { self.shared.buf.as_ptr().add(head).read() };
645 unsafe {
646 self.shared.consume(head, tail, 1, false);
647 }
648 Ok(Some(v))
649 }
650
651 pub fn view(&mut self) -> Result<ReadView<T, PW, CW>, ClosedError<()>> {
664 let (head, tail) = self.shared.head_tail();
665
666 let (_, _, [slice1, slice2]) = self.shared.incr_head(head, tail, 0);
667
668 if slice1.end - slice1.start == 0 {
669 debug_assert_eq!(slice1.end - slice1.start, 0);
670 if self.shared.closed_count.load(Ordering::Acquire) > 0 {
671 return Err(ClosedError(()));
672 }
673 }
674
675 let slices = unsafe {
676 (
677 core::slice::from_raw_parts_mut(
678 self.shared.buf.as_ptr().add(slice1.start),
679 slice1.end - slice1.start,
680 ),
681 core::slice::from_raw_parts_mut(
682 self.shared.buf.as_ptr().add(slice2.start),
683 slice2.end - slice2.start,
684 ),
685 )
686 };
687
688 Ok(ReadView {
689 slices,
690 shared: &*self.shared,
691 head,
692 tail,
693 })
694 }
695
696 pub fn available_len(&self) -> usize {
701 let (head, tail) = self.shared.head_tail();
702 let (_, _, [slice1, slice2]) = self.shared.incr_head(head, tail, 0);
703 (slice1.end - slice1.start) + (slice2.end - slice2.start)
704 }
705
706 pub fn capacity(&self) -> usize {
708 self.shared.cap - 1
709 }
710}
711impl<T, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW>
712where
713 CW: for<'a> waiter::Waiter<'a>,
714{
715 pub fn wait<'a>(&'a mut self) -> <CW as waiter::Waiter<'a>>::Future {
722 let checker = waiter::WakeCheck {
723 head: &self.shared.head,
724 tail: &self.shared.tail,
725 closed_count: &self.shared.closed_count,
726 cap: self.shared.cap,
727 is_producer: false,
728 };
729 self.shared.consumer_waiter.wait(checker)
730 }
731
732 pub fn waiter(&mut self) -> &CW {
736 &self.shared.consumer_waiter
737 }
738}
739impl<T: Copy, PW: OptionalWaiter, CW: OptionalWaiter> RingBufferConsumer<T, PW, CW> {
740 pub fn pop_to_slice(&mut self, dest: &mut [T]) -> Result<usize, ClosedError<()>> {
744 let v = self.view()?;
745 let s1amnt = dest.len().min(v.0.len());
746 let s2amnt = v.1.len().min(dest.len() - s1amnt);
747 dest[..s1amnt].copy_from_slice(&v.0[..s1amnt]);
748 dest[s1amnt..s1amnt + s2amnt].copy_from_slice(&v.1[..s2amnt]);
749 v.consume(s1amnt + s2amnt);
750 Ok(s1amnt + s2amnt)
751 }
752}
753
754pub struct ReadView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
762 pub slices: (&'a mut [T], &'a mut [T]),
763 shared: &'a RingBufferShared<T, PW, CW>,
764 head: usize,
765 tail: usize,
766}
767impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref for ReadView<'a, T, PW, CW> {
768 type Target = (&'a mut [T], &'a mut [T]);
769
770 fn deref(&self) -> &Self::Target {
771 &self.slices
772 }
773}
774impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
775 for ReadView<'a, T, PW, CW>
776{
777 fn deref_mut(&mut self) -> &mut Self::Target {
778 &mut self.slices
779 }
780}
781impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> ReadView<'a, T, PW, CW> {
782 pub fn consume(self, amnt: usize) {
787 let Self {
788 slices: _,
789 shared,
790 head,
791 tail,
792 } = self;
793 unsafe { shared.consume(head, tail, amnt, true) }
794 }
795
796 pub fn consume_all(self) {
798 self.consume(usize::MAX);
799 }
800
801 pub fn iter(&mut self) -> core::iter::Chain<core::slice::IterMut<T>, core::slice::IterMut<T>> {
805 let i1 = self.slices.0.iter_mut();
806 let i2 = self.slices.1.iter_mut();
807 i1.chain(i2)
808 }
809
810 pub fn is_empty(&self) -> bool {
815 self.slices.0.is_empty()
816 }
817
818 pub fn len(&self) -> usize {
822 self.slices.0.len() + self.slices.1.len()
823 }
824
825 pub fn get(&self, i: usize) -> Option<&T> {
829 if i > self.len() {
830 None
831 } else if i > self.slices.0.len() {
832 Some(&self.slices.1[i - self.0.len()])
833 } else {
834 Some(&self.slices.0[i])
835 }
836 }
837}
838impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
839 for ReadView<'a, T, PW, CW>
840{
841 type Output = T;
842
843 fn index(&self, i: usize) -> &Self::Output {
844 if i > self.slices.0.len() {
845 &self.slices.1[i - self.0.len()]
846 } else {
847 &self.slices.0[i]
848 }
849 }
850}
851
852pub struct WriteView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
854 pub slices: (&'a mut [T], &'a mut [T]),
855 shared: &'a RingBufferShared<T, PW, CW>,
856 head: usize,
857 tail: usize,
858}
859impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref for WriteView<'a, T, PW, CW> {
860 type Target = (&'a mut [T], &'a mut [T]);
861
862 fn deref(&self) -> &Self::Target {
863 &self.slices
864 }
865}
866impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
867 for WriteView<'a, T, PW, CW>
868{
869 fn deref_mut(&mut self) -> &mut Self::Target {
870 &mut self.slices
871 }
872}
873impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> WriteView<'a, T, PW, CW> {
874 pub fn produce(self, amnt: usize) {
880 let len = self.len();
881 let amnt = amnt.min(len);
882
883 let shared = self.shared;
884 let head = self.head;
885 let tail = self.tail;
886 core::mem::forget(self);
887
888 let (new_tail, _, _) = shared.incr_tail(head, tail, amnt);
889 debug_assert!(new_tail < shared.cap);
890 shared.tail.store(new_tail, Ordering::Release);
891 shared.consumer_waiter.notify();
892
893 let (_, [drop1, drop2], _) = shared.incr_tail(head, new_tail, len - amnt);
894 debug_assert!(new_tail < shared.cap);
895 if core::mem::needs_drop::<T>() {
896 unsafe {
897 for i in drop1 {
898 shared.buf.as_ptr().add(i).drop_in_place();
899 }
900 for i in drop2 {
901 shared.buf.as_ptr().add(i).drop_in_place();
902 }
903 }
904 }
905 }
906
907 pub fn iter(&mut self) -> core::iter::Chain<core::slice::IterMut<T>, core::slice::IterMut<T>> {
911 let i1 = self.slices.0.iter_mut();
912 let i2 = self.slices.1.iter_mut();
913 i1.chain(i2)
914 }
915
916 pub fn is_empty(&self) -> bool {
921 self.slices.0.is_empty()
922 }
923
924 pub fn len(&self) -> usize {
928 self.0.len() + self.1.len()
929 }
930
931 pub fn get(&mut self, i: usize) -> Option<&mut T> {
935 if i > self.len() {
936 None
937 } else if i > self.slices.0.len() {
938 Some(&mut self.slices.1[i - self.0.len()])
939 } else {
940 Some(&mut self.slices.0[i])
941 }
942 }
943}
944impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Drop for WriteView<'a, T, PW, CW> {
945 fn drop(&mut self) {
946 let len = self.len();
947 let (_, [drop1, drop2], _) = self.shared.incr_tail(self.head, self.tail, len);
948 if core::mem::needs_drop::<T>() {
949 unsafe {
950 for i in drop1 {
951 self.shared.buf.as_ptr().add(i).drop_in_place();
952 }
953 for i in drop2 {
954 self.shared.buf.as_ptr().add(i).drop_in_place();
955 }
956 }
957 }
958 }
959}
960impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
961 for WriteView<'a, T, PW, CW>
962{
963 type Output = T;
964
965 fn index(&self, i: usize) -> &Self::Output {
966 if i > self.slices.0.len() {
967 &self.slices.1[i - self.0.len()]
968 } else {
969 &self.slices.0[i]
970 }
971 }
972}
973impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::IndexMut<usize>
974 for WriteView<'a, T, PW, CW>
975{
976 fn index_mut(&mut self, i: usize) -> &mut T {
977 if i > self.slices.0.len() {
978 &mut self.slices.1[i - self.0.len()]
979 } else {
980 &mut self.slices.0[i]
981 }
982 }
983}
984
985pub struct UninitWriteView<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> {
990 pub slices: (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]),
991 shared: &'a RingBufferShared<T, PW, CW>,
992 head: usize,
993 tail: usize,
994}
995impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Deref
996 for UninitWriteView<'a, T, PW, CW>
997{
998 type Target = (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]);
999
1000 fn deref(&self) -> &Self::Target {
1001 &self.slices
1002 }
1003}
1004impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::DerefMut
1005 for UninitWriteView<'a, T, PW, CW>
1006{
1007 fn deref_mut(&mut self) -> &mut Self::Target {
1008 &mut self.slices
1009 }
1010}
1011impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> UninitWriteView<'a, T, PW, CW> {
1012 pub unsafe fn produce(self, amnt: usize) {
1022 let Self {
1023 slices: _,
1024 shared,
1025 head,
1026 tail,
1027 } = self;
1028
1029 let (new_tail, _, _) = shared.incr_tail(head, tail, amnt);
1030
1031 debug_assert!(new_tail < self.shared.cap);
1032 self.shared.tail.store(new_tail, Ordering::Release);
1033 }
1034
1035 pub fn iter(
1039 &mut self,
1040 ) -> core::iter::Chain<core::slice::IterMut<MaybeUninit<T>>, core::slice::IterMut<MaybeUninit<T>>>
1041 {
1042 let i1 = self.slices.0.iter_mut();
1043 let i2 = self.slices.1.iter_mut();
1044 i1.chain(i2)
1045 }
1046
1047 pub fn is_empty(&self) -> bool {
1052 self.slices.0.is_empty()
1053 }
1054
1055 pub fn len(&self) -> usize {
1059 self.0.len() + self.1.len()
1060 }
1061
1062 pub fn get(&mut self, i: usize) -> Option<&mut MaybeUninit<T>> {
1066 if i > self.len() {
1067 None
1068 } else if i > self.slices.0.len() {
1069 Some(&mut self.slices.1[i - self.0.len()])
1070 } else {
1071 Some(&mut self.slices.0[i])
1072 }
1073 }
1074}
1075impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::Index<usize>
1076 for UninitWriteView<'a, T, PW, CW>
1077{
1078 type Output = MaybeUninit<T>;
1079
1080 fn index(&self, i: usize) -> &Self::Output {
1081 if i > self.slices.0.len() {
1082 &self.slices.1[i - self.0.len()]
1083 } else {
1084 &self.slices.0[i]
1085 }
1086 }
1087}
1088impl<'a, T, PW: OptionalWaiter, CW: OptionalWaiter> core::ops::IndexMut<usize>
1089 for UninitWriteView<'a, T, PW, CW>
1090{
1091 fn index_mut(&mut self, i: usize) -> &mut MaybeUninit<T> {
1092 if i > self.slices.0.len() {
1093 &mut self.slices.1[i - self.0.len()]
1094 } else {
1095 &mut self.slices.0[i]
1096 }
1097 }
1098}
1099
1100struct SetOnDrop<'a, W: OptionalWaiter>(&'a AtomicUsize, usize, &'a W);
1103impl<'a, W: OptionalWaiter> core::ops::Drop for SetOnDrop<'a, W> {
1104 fn drop(&mut self) {
1105 self.0.store(self.1, Ordering::Release);
1106 self.2.notify()
1107 }
1108}
1109
1110#[derive(Debug, Clone, PartialEq, Eq)]
1119pub struct ClosedError<T>(pub T);
1120impl<T> core::fmt::Display for ClosedError<T> {
1121 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1122 f.write_str("Other side of buffer dropped")
1123 }
1124}
1125
1126#[derive(Debug, Clone, PartialEq, Eq)]
1131pub enum PushError<T> {
1132 Full(T),
1134 Closed(T),
1136}
1137impl<T> core::fmt::Display for PushError<T> {
1138 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1139 match self {
1140 Self::Full(_) => f.write_str("Buffer full"),
1141 Self::Closed(_) => f.write_str("Other side of buffer dropped"),
1142 }
1143 }
1144}
1145impl<T> From<ClosedError<T>> for PushError<T> {
1146 fn from(v: ClosedError<T>) -> Self {
1147 Self::Closed(v.0)
1148 }
1149}