ruchei_connection/
lib.rs

1use std::{
2    marker::PhantomPinned,
3    pin::Pin,
4    sync::Arc,
5    task::{Context, Poll, Wake, Waker},
6};
7
8use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
9use futures_util::{Stream, task::AtomicWaker};
10use pin_project::pin_project;
11use ruchei_collections::as_linked_slab::{AsLinkedSlab, SlabKey};
12
13#[derive(Debug, Default)]
14#[must_use]
15struct SlabWaker {
16    waker: AtomicWaker,
17}
18
19impl Wake for SlabWaker {
20    fn wake(self: Arc<Self>) {
21        self.wake_by_ref();
22    }
23
24    fn wake_by_ref(self: &Arc<Self>) {
25        self.waker.wake();
26    }
27}
28
29#[must_use]
30#[pin_project]
31#[derive(Debug)]
32pub struct Ready {
33    send: UnboundedSender<SlabKey>,
34    #[pin]
35    recv: UnboundedReceiver<SlabKey>,
36    inner: Arc<SlabWaker>,
37    waker: Waker,
38    _pinned: PhantomPinned,
39}
40
41impl Default for Ready {
42    fn default() -> Self {
43        let (send, recv) = unbounded();
44        let inner = Arc::<SlabWaker>::default();
45        let waker = inner.clone().into();
46        Self {
47            send,
48            recv,
49            inner,
50            waker,
51            _pinned: PhantomPinned,
52        }
53    }
54}
55
56#[must_use]
57#[derive(Debug, Default)]
58pub struct ReadyWeak(Option<UnboundedSender<SlabKey>>);
59
60impl Ready {
61    pub fn downgrade(&self) -> ReadyWeak {
62        ReadyWeak(Some(self.send.clone()))
63    }
64
65    pub fn wake(&self) {
66        self.inner.wake_by_ref();
67    }
68
69    pub fn compact<const M: usize>(self: Pin<&mut Self>, slab: &mut impl AsLinkedSlab) {
70        let mut this = self.project();
71        while let Poll::Ready(Some(key)) = this
72            .recv
73            .as_mut()
74            .poll_next(&mut Context::from_waker(this.waker))
75        {
76            slab.link_push_back::<M>(key);
77        }
78    }
79
80    #[must_use]
81    pub fn next<const M: usize>(
82        self: Pin<&mut Self>,
83        slab: &mut impl AsLinkedSlab,
84    ) -> Option<SlabKey> {
85        self.compact::<M>(slab);
86        slab.link_pop_front::<M>()
87    }
88
89    pub fn register(&self, cx: &mut Context<'_>) {
90        self.inner.waker.register(cx.waker());
91    }
92}
93
94impl ReadyWeak {
95    pub fn insert(&self, key: SlabKey) {
96        if let Some(sender) = self.0.as_ref() {
97            let _ = sender.unbounded_send(key);
98        }
99    }
100}
101
102impl Extend<SlabKey> for ReadyWeak {
103    fn extend<T: IntoIterator<Item = SlabKey>>(&mut self, iter: T) {
104        for key in iter {
105            self.insert(key);
106        }
107    }
108}
109
110#[must_use]
111#[derive(Debug)]
112pub struct ConnectionWaker {
113    waker: AtomicWaker,
114    ready: ReadyWeak,
115    key: SlabKey,
116}
117
118impl ConnectionWaker {
119    #[must_use]
120    pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
121        Arc::new(Self {
122            waker: Default::default(),
123            ready,
124            key,
125        })
126    }
127
128    pub fn wake(&self) {
129        self.ready.insert(self.key);
130        self.waker.wake();
131    }
132}
133
134impl Wake for ConnectionWaker {
135    fn wake(self: Arc<Self>) {
136        (*self).wake();
137    }
138
139    fn wake_by_ref(self: &Arc<Self>) {
140        (**self).wake();
141    }
142}
143
144impl ConnectionWaker {
145    pub fn poll<T>(
146        self: &Arc<Self>,
147        cx: &mut Context<'_>,
148        f: impl FnOnce(&mut Context<'_>) -> T,
149    ) -> T {
150        self.waker.register(cx.waker());
151        self.poll_detached(f)
152    }
153
154    pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
155        f(&mut Context::from_waker(&Waker::from(self.clone())))
156    }
157}
158
159#[must_use]
160#[derive(Debug)]
161pub struct ConnectionWaker2 {
162    waker0: AtomicWaker,
163    waker1: AtomicWaker,
164    ready: ReadyWeak,
165    key: SlabKey,
166}
167
168impl ConnectionWaker2 {
169    #[must_use]
170    pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
171        Arc::new(Self {
172            waker0: Default::default(),
173            waker1: Default::default(),
174            ready,
175            key,
176        })
177    }
178
179    pub fn wake(&self) {
180        self.ready.insert(self.key);
181        self.waker0.wake();
182        self.waker1.wake();
183    }
184}
185
186impl Wake for ConnectionWaker2 {
187    fn wake(self: Arc<Self>) {
188        (*self).wake();
189    }
190
191    fn wake_by_ref(self: &Arc<Self>) {
192        (**self).wake();
193    }
194}
195
196impl ConnectionWaker2 {
197    pub fn poll0<T>(
198        self: &Arc<Self>,
199        cx: &mut Context<'_>,
200        f: impl FnOnce(&mut Context<'_>) -> T,
201    ) -> T {
202        self.waker0.register(cx.waker());
203        self.poll_detached(f)
204    }
205
206    pub fn poll1<T>(
207        self: &Arc<Self>,
208        cx: &mut Context<'_>,
209        f: impl FnOnce(&mut Context<'_>) -> T,
210    ) -> T {
211        self.waker1.register(cx.waker());
212        self.poll_detached(f)
213    }
214
215    pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
216        f(&mut Context::from_waker(&Waker::from(self.clone())))
217    }
218}
219
220#[must_use]
221#[derive(Debug)]
222pub struct Connection<S> {
223    pub stream: S,
224    pub next: Arc<ConnectionWaker>,
225    pub ready: Arc<ConnectionWaker>,
226    pub flush: Arc<ConnectionWaker>,
227    pub close: Arc<ConnectionWaker>,
228}
229
230#[must_use]
231#[derive(Debug)]
232pub struct Connection2<S> {
233    pub stream: S,
234    pub next: Arc<ConnectionWaker>,
235    pub ready: Arc<ConnectionWaker2>,
236    pub flush: Arc<ConnectionWaker2>,
237    pub close: Arc<ConnectionWaker>,
238}