1use std::{
2 marker::PhantomPinned,
3 pin::Pin,
4 sync::Arc,
5 task::{Context, Poll, Wake, Waker},
6};
7
8use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
9use futures_util::{Stream, task::AtomicWaker};
10use pin_project::pin_project;
11use ruchei_collections::as_linked_slab::{AsLinkedSlab, SlabKey};
12
13#[derive(Debug, Default)]
14#[must_use]
15struct SlabWaker {
16 waker: AtomicWaker,
17}
18
19impl Wake for SlabWaker {
20 fn wake(self: Arc<Self>) {
21 self.wake_by_ref();
22 }
23
24 fn wake_by_ref(self: &Arc<Self>) {
25 self.waker.wake();
26 }
27}
28
29#[must_use]
30#[pin_project]
31#[derive(Debug)]
32pub struct Ready {
33 send: UnboundedSender<SlabKey>,
34 #[pin]
35 recv: UnboundedReceiver<SlabKey>,
36 inner: Arc<SlabWaker>,
37 waker: Waker,
38 _pinned: PhantomPinned,
39}
40
41impl Default for Ready {
42 fn default() -> Self {
43 let (send, recv) = unbounded();
44 let inner = Arc::<SlabWaker>::default();
45 let waker = inner.clone().into();
46 Self {
47 send,
48 recv,
49 inner,
50 waker,
51 _pinned: PhantomPinned,
52 }
53 }
54}
55
56#[must_use]
57#[derive(Debug, Default)]
58pub struct ReadyWeak(Option<UnboundedSender<SlabKey>>);
59
60impl Ready {
61 pub fn downgrade(&self) -> ReadyWeak {
62 ReadyWeak(Some(self.send.clone()))
63 }
64
65 pub fn wake(&self) {
66 self.inner.wake_by_ref();
67 }
68
69 pub fn compact<const M: usize>(self: Pin<&mut Self>, slab: &mut impl AsLinkedSlab) {
70 let mut this = self.project();
71 while let Poll::Ready(Some(key)) = this
72 .recv
73 .as_mut()
74 .poll_next(&mut Context::from_waker(this.waker))
75 {
76 slab.link_push_back::<M>(key);
77 }
78 }
79
80 #[must_use]
81 pub fn next<const M: usize>(
82 self: Pin<&mut Self>,
83 slab: &mut impl AsLinkedSlab,
84 ) -> Option<SlabKey> {
85 self.compact::<M>(slab);
86 slab.link_pop_front::<M>()
87 }
88
89 pub fn register(&self, cx: &mut Context<'_>) {
90 self.inner.waker.register(cx.waker());
91 }
92}
93
94impl ReadyWeak {
95 pub fn insert(&self, key: SlabKey) {
96 if let Some(sender) = self.0.as_ref() {
97 let _ = sender.unbounded_send(key);
98 }
99 }
100}
101
102impl Extend<SlabKey> for ReadyWeak {
103 fn extend<T: IntoIterator<Item = SlabKey>>(&mut self, iter: T) {
104 for key in iter {
105 self.insert(key);
106 }
107 }
108}
109
110#[must_use]
111#[derive(Debug)]
112pub struct ConnectionWaker {
113 waker: AtomicWaker,
114 ready: ReadyWeak,
115 key: SlabKey,
116}
117
118impl ConnectionWaker {
119 #[must_use]
120 pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
121 Arc::new(Self {
122 waker: Default::default(),
123 ready,
124 key,
125 })
126 }
127
128 pub fn wake(&self) {
129 self.ready.insert(self.key);
130 self.waker.wake();
131 }
132}
133
134impl Wake for ConnectionWaker {
135 fn wake(self: Arc<Self>) {
136 (*self).wake();
137 }
138
139 fn wake_by_ref(self: &Arc<Self>) {
140 (**self).wake();
141 }
142}
143
144impl ConnectionWaker {
145 pub fn poll<T>(
146 self: &Arc<Self>,
147 cx: &mut Context<'_>,
148 f: impl FnOnce(&mut Context<'_>) -> T,
149 ) -> T {
150 self.waker.register(cx.waker());
151 self.poll_detached(f)
152 }
153
154 pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
155 f(&mut Context::from_waker(&Waker::from(self.clone())))
156 }
157}
158
159#[must_use]
160#[derive(Debug)]
161pub struct ConnectionWaker2 {
162 waker0: AtomicWaker,
163 waker1: AtomicWaker,
164 ready: ReadyWeak,
165 key: SlabKey,
166}
167
168impl ConnectionWaker2 {
169 #[must_use]
170 pub fn new(key: SlabKey, ready: ReadyWeak) -> Arc<Self> {
171 Arc::new(Self {
172 waker0: Default::default(),
173 waker1: Default::default(),
174 ready,
175 key,
176 })
177 }
178
179 pub fn wake(&self) {
180 self.ready.insert(self.key);
181 self.waker0.wake();
182 self.waker1.wake();
183 }
184}
185
186impl Wake for ConnectionWaker2 {
187 fn wake(self: Arc<Self>) {
188 (*self).wake();
189 }
190
191 fn wake_by_ref(self: &Arc<Self>) {
192 (**self).wake();
193 }
194}
195
196impl ConnectionWaker2 {
197 pub fn poll0<T>(
198 self: &Arc<Self>,
199 cx: &mut Context<'_>,
200 f: impl FnOnce(&mut Context<'_>) -> T,
201 ) -> T {
202 self.waker0.register(cx.waker());
203 self.poll_detached(f)
204 }
205
206 pub fn poll1<T>(
207 self: &Arc<Self>,
208 cx: &mut Context<'_>,
209 f: impl FnOnce(&mut Context<'_>) -> T,
210 ) -> T {
211 self.waker1.register(cx.waker());
212 self.poll_detached(f)
213 }
214
215 pub fn poll_detached<T>(self: &Arc<Self>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
216 f(&mut Context::from_waker(&Waker::from(self.clone())))
217 }
218}
219
220#[must_use]
221#[derive(Debug)]
222pub struct Connection<S> {
223 pub stream: S,
224 pub next: Arc<ConnectionWaker>,
225 pub ready: Arc<ConnectionWaker>,
226 pub flush: Arc<ConnectionWaker>,
227 pub close: Arc<ConnectionWaker>,
228}
229
230#[must_use]
231#[derive(Debug)]
232pub struct Connection2<S> {
233 pub stream: S,
234 pub next: Arc<ConnectionWaker>,
235 pub ready: Arc<ConnectionWaker2>,
236 pub flush: Arc<ConnectionWaker2>,
237 pub close: Arc<ConnectionWaker>,
238}