1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::wrap::{AsyncCons, AsyncProd};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{mem::MaybeUninit, num::NonZeroUsize};
use futures::task::AtomicWaker;
#[cfg(feature = "alloc")]
use ringbuf::traits::Split;
use ringbuf::{
    rb::traits::RbRef,
    storage::Storage,
    traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
    SharedRb,
};

pub trait AsyncRbRef: RbRef<Target = AsyncRb<Self::Storage>> {
    type Storage: Storage;
}
impl<S: Storage, R: RbRef<Target = AsyncRb<S>>> AsyncRbRef for R {
    type Storage = S;
}

pub struct AsyncRb<S: Storage> {
    base: SharedRb<S>,
    pub(crate) read: AtomicWaker,
    pub(crate) write: AtomicWaker,
}

impl<S: Storage> AsyncRb<S> {
    pub fn from(base: SharedRb<S>) -> Self {
        Self {
            base,
            read: AtomicWaker::default(),
            write: AtomicWaker::default(),
        }
    }
}

impl<S: Storage> Unpin for AsyncRb<S> {}

impl<S: Storage> Observer for AsyncRb<S> {
    type Item = S::Item;

    #[inline]
    fn capacity(&self) -> NonZeroUsize {
        self.base.capacity()
    }

    #[inline]
    fn read_index(&self) -> usize {
        self.base.read_index()
    }
    #[inline]
    fn write_index(&self) -> usize {
        self.base.write_index()
    }

    unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
        self.base.unsafe_slices(start, end)
    }

    #[inline]
    fn read_is_held(&self) -> bool {
        self.base.read_is_held()
    }
    #[inline]
    fn write_is_held(&self) -> bool {
        self.base.write_is_held()
    }
}

impl<S: Storage> Producer for AsyncRb<S> {
    unsafe fn set_write_index(&self, value: usize) {
        self.base.set_write_index(value);
        self.write.wake();
    }
}
impl<S: Storage> Consumer for AsyncRb<S> {
    unsafe fn set_read_index(&self, value: usize) {
        self.base.set_read_index(value);
        self.read.wake();
    }
}
impl<S: Storage> RingBuffer for AsyncRb<S> {
    #[inline]
    unsafe fn hold_read(&self, flag: bool) {
        self.base.hold_read(flag);
        self.read.wake()
    }
    #[inline]
    unsafe fn hold_write(&self, flag: bool) {
        self.base.hold_write(flag);
        self.write.wake()
    }
}

impl<S: Storage> SplitRef for AsyncRb<S> {
    type RefProd<'a> = AsyncProd<&'a Self> where Self:  'a;
    type RefCons<'a> = AsyncCons<&'a Self> where Self:  'a;

    fn split_ref(&mut self) -> (Self::RefProd<'_>, Self::RefCons<'_>) {
        unsafe { (AsyncProd::new(self), AsyncCons::new(self)) }
    }
}
#[cfg(feature = "alloc")]
impl<S: Storage> Split for AsyncRb<S> {
    type Prod = AsyncProd<Arc<Self>>;
    type Cons = AsyncCons<Arc<Self>>;

    fn split(self) -> (Self::Prod, Self::Cons) {
        let arc = Arc::new(self);
        unsafe { (AsyncProd::new(arc.clone()), AsyncCons::new(arc)) }
    }
}