1use std::{
2 collections::VecDeque,
3 future::{ready, Future, IntoFuture, Ready},
4 net::SocketAddr,
5 pin::Pin,
6 sync::{
7 atomic::{AtomicBool, AtomicU64, Ordering},
8 Arc, Mutex,
9 },
10};
11
12use crate::{channel::DatagramChannel, context::DatagramInfo, Result};
13
14pub struct DatagramContext<W> {
22 info: DatagramInfo,
23 channel: DatagramChannel<W>,
24 outbox: DatagramOutboxHandle<W>,
25 close_requested: bool,
26}
27
28impl<W: Send + 'static> DatagramContext<W> {
29 pub(crate) fn new(info: DatagramInfo, channel: DatagramChannel<W>) -> Self {
30 Self {
31 info,
32 channel,
33 outbox: DatagramOutboxHandle::new(),
34 close_requested: false,
35 }
36 }
37
38 pub fn id(&self) -> u64 {
40 self.info.id()
41 }
42
43 pub fn peer_addr(&self) -> SocketAddr {
45 self.info.peer_addr()
46 }
47
48 pub fn local_addr(&self) -> SocketAddr {
50 self.info.local_addr()
51 }
52
53 pub fn channel(&self) -> DatagramChannel<W> {
55 self.channel.clone()
56 }
57
58 #[inline]
64 pub fn write(&mut self, msg: W) -> DatagramWriteHandle {
65 self.outbox.push_write(self.info.peer_addr(), msg);
66 DatagramWriteHandle { _private: () }
67 }
68
69 #[inline]
74 pub fn write_to(&mut self, peer_addr: SocketAddr, msg: W) -> DatagramWriteHandle {
75 self.outbox.push_write(peer_addr, msg);
76 DatagramWriteHandle { _private: () }
77 }
78
79 #[inline]
85 pub fn flush(&mut self) -> DatagramFlushHandle<'_, W> {
86 self.outbox.push_flush()
87 }
88
89 #[inline]
91 pub fn write_and_flush(&mut self, msg: W) -> DatagramFlushHandle<'_, W> {
92 self.outbox.push_write_and_flush(self.info.peer_addr(), msg)
93 }
94
95 #[inline]
97 pub fn write_to_and_flush(
98 &mut self,
99 peer_addr: SocketAddr,
100 msg: W,
101 ) -> DatagramFlushHandle<'_, W> {
102 self.outbox.push_write_and_flush(peer_addr, msg)
103 }
104
105 pub async fn close(&mut self) -> Result<()> {
107 self.close_requested = true;
108 Ok(())
109 }
110
111 pub(crate) fn outbox(&self) -> DatagramOutboxHandle<W> {
112 self.outbox.clone()
113 }
114
115 pub(crate) fn close_requested(&self) -> bool {
116 self.close_requested
117 }
118}
119
120pub struct DatagramWriteHandle {
121 _private: (),
122}
123
124impl IntoFuture for DatagramWriteHandle {
125 type Output = Result<()>;
126 type IntoFuture = Ready<Result<()>>;
127
128 #[inline]
129 fn into_future(self) -> Self::IntoFuture {
130 ready(Ok(()))
131 }
132}
133
134pub struct DatagramFlushHandle<'a, W> {
135 outbox: &'a DatagramOutboxHandle<W>,
136}
137
138impl<'a, W> IntoFuture for DatagramFlushHandle<'a, W> {
139 type Output = Result<()>;
140 type IntoFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
141
142 #[inline]
143 fn into_future(self) -> Self::IntoFuture {
144 let id = self.outbox.push_flush_completion();
145 let state = &self.outbox.core.flush_state;
146
147 Box::pin(async move {
148 state.mark_awaited(id);
149
150 loop {
151 let notified = state.notify.notified();
152 tokio::pin!(notified);
153 notified.as_mut().enable();
154
155 if state.completed_flush_id.load(Ordering::Acquire) >= id {
156 return Ok(());
157 }
158
159 notified.await;
160 }
161 })
162 }
163}
164
165pub(crate) enum DatagramOutboxCommand<W> {
166 WriteTo(SocketAddr, W),
167 Flush { completion: Option<u64> },
168 WriteToAndFlush { peer_addr: SocketAddr, msg: W },
169}
170
171struct DatagramOutboxState<W> {
172 head: Option<DatagramOutboxCommand<W>>,
173 tail: VecDeque<DatagramOutboxCommand<W>>,
174}
175
176impl<W> DatagramOutboxState<W> {
177 fn new() -> Self {
178 Self {
179 head: None,
180 tail: VecDeque::new(),
181 }
182 }
183
184 #[inline]
185 fn push(&mut self, command: DatagramOutboxCommand<W>) {
186 if self.head.is_none() {
187 self.head = Some(command);
188 } else {
189 self.tail.push_back(command);
190 }
191 }
192
193 #[inline]
194 fn take_batch(&mut self) -> DatagramOutboxBatch<W> {
195 DatagramOutboxBatch {
196 head: self.head.take(),
197 tail: std::mem::take(&mut self.tail),
198 }
199 }
200}
201
202pub(crate) struct DatagramOutboxBatch<W> {
203 head: Option<DatagramOutboxCommand<W>>,
204 tail: VecDeque<DatagramOutboxCommand<W>>,
205}
206
207impl<W> Iterator for DatagramOutboxBatch<W> {
208 type Item = DatagramOutboxCommand<W>;
209
210 #[inline]
211 fn next(&mut self) -> Option<Self::Item> {
212 self.head.take().or_else(|| self.tail.pop_front())
213 }
214}
215
216struct DatagramFlushState {
217 next_flush_id: AtomicU64,
218 completed_flush_id: AtomicU64,
219 awaited_flush_id: AtomicU64,
220 notify: tokio::sync::Notify,
221}
222
223impl DatagramFlushState {
224 fn new() -> Self {
225 Self {
226 next_flush_id: AtomicU64::new(0),
227 completed_flush_id: AtomicU64::new(0),
228 awaited_flush_id: AtomicU64::new(0),
229 notify: tokio::sync::Notify::new(),
230 }
231 }
232
233 #[inline]
234 fn next_id(&self) -> u64 {
235 self.next_flush_id.fetch_add(1, Ordering::Relaxed) + 1
236 }
237
238 #[inline]
239 fn mark_awaited(&self, id: u64) {
240 self.awaited_flush_id.fetch_max(id, Ordering::Release);
241 }
242
243 #[inline]
244 fn complete(&self, id: u64) {
245 self.completed_flush_id.store(id, Ordering::Release);
246 if self.awaited_flush_id.load(Ordering::Acquire) >= id {
247 self.notify.notify_waiters();
248 }
249 }
250}
251
252struct DatagramOutboxCore<W> {
253 commands: Mutex<DatagramOutboxState<W>>,
254 flush_requested: AtomicBool,
255 flush_state: DatagramFlushState,
256}
257
258pub(crate) struct DatagramOutboxHandle<W> {
259 core: Arc<DatagramOutboxCore<W>>,
260}
261
262impl<W> Clone for DatagramOutboxHandle<W> {
263 fn clone(&self) -> Self {
264 Self {
265 core: self.core.clone(),
266 }
267 }
268}
269
270impl<W> DatagramOutboxHandle<W> {
271 fn new() -> Self {
272 Self {
273 core: Arc::new(DatagramOutboxCore {
274 commands: Mutex::new(DatagramOutboxState::new()),
275 flush_requested: AtomicBool::new(false),
276 flush_state: DatagramFlushState::new(),
277 }),
278 }
279 }
280
281 #[inline]
282 fn push_write(&self, peer_addr: SocketAddr, msg: W) {
283 self.core
284 .commands
285 .lock()
286 .expect("datagram outbox lock poisoned")
287 .push(DatagramOutboxCommand::WriteTo(peer_addr, msg));
288 }
289
290 #[inline]
291 fn push_flush(&self) -> DatagramFlushHandle<'_, W> {
292 self.core
293 .commands
294 .lock()
295 .expect("datagram outbox lock poisoned")
296 .push(DatagramOutboxCommand::Flush { completion: None });
297 self.core.flush_requested.store(true, Ordering::Release);
298 DatagramFlushHandle { outbox: self }
299 }
300
301 #[inline]
302 fn push_write_and_flush(&self, peer_addr: SocketAddr, msg: W) -> DatagramFlushHandle<'_, W> {
303 self.core
304 .commands
305 .lock()
306 .expect("datagram outbox lock poisoned")
307 .push(DatagramOutboxCommand::WriteToAndFlush { peer_addr, msg });
308 self.core.flush_requested.store(true, Ordering::Release);
309 DatagramFlushHandle { outbox: self }
310 }
311
312 #[inline]
313 fn push_flush_completion(&self) -> u64 {
314 let id = self.core.flush_state.next_id();
315 self.core
316 .commands
317 .lock()
318 .expect("datagram outbox lock poisoned")
319 .push(DatagramOutboxCommand::Flush {
320 completion: Some(id),
321 });
322 self.core.flush_requested.store(true, Ordering::Release);
323 id
324 }
325
326 #[inline]
327 pub(crate) fn has_flush_command(&self) -> bool {
328 self.core.flush_requested.load(Ordering::Acquire)
329 }
330
331 #[inline]
332 pub(crate) fn take_commands(&self) -> DatagramOutboxBatch<W> {
333 self.core.flush_requested.store(false, Ordering::Release);
334 self.core
335 .commands
336 .lock()
337 .expect("datagram outbox lock poisoned")
338 .take_batch()
339 }
340
341 #[inline]
342 pub(crate) fn complete_flush(&self, id: u64) {
343 self.core.flush_state.complete(id);
344 }
345}