Skip to main content

rs_netty/context/
datagram.rs

1use std::{
2    collections::VecDeque,
3    future::{ready, Future, IntoFuture, Ready},
4    net::SocketAddr,
5    pin::Pin,
6    sync::{
7        atomic::{AtomicBool, AtomicU64, Ordering},
8        Arc, Mutex,
9    },
10};
11
12use crate::{channel::DatagramChannel, context::DatagramInfo, Result};
13
14/// Context passed to a UDP [`crate::DatagramHandler`].
15///
16/// Writes through this context are staged in a handler-local outbox. They are
17/// encoded into the socket's pending datagram queue when the handler returns.
18/// They are sent only when [`Self::flush`] or a `*_and_flush` method requests
19/// an immediate flush. Dropping a flush handle is fire-and-forget; awaiting it
20/// waits until the local `send_to` calls complete.
21pub struct DatagramContext<W> {
22    info: DatagramInfo,
23    channel: DatagramChannel<W>,
24    outbox: DatagramOutboxHandle<W>,
25    close_requested: bool,
26}
27
28impl<W: Send + 'static> DatagramContext<W> {
29    pub(crate) fn new(info: DatagramInfo, channel: DatagramChannel<W>) -> Self {
30        Self {
31            info,
32            channel,
33            outbox: DatagramOutboxHandle::new(),
34            close_requested: false,
35        }
36    }
37
38    /// Socket id assigned by the UDP runtime.
39    pub fn id(&self) -> u64 {
40        self.info.id()
41    }
42
43    /// Peer address for the current datagram.
44    pub fn peer_addr(&self) -> SocketAddr {
45        self.info.peer_addr()
46    }
47
48    /// Local socket address.
49    pub fn local_addr(&self) -> SocketAddr {
50        self.info.local_addr()
51    }
52
53    /// Returns a cloneable channel for writing from outside the current handler.
54    pub fn channel(&self) -> DatagramChannel<W> {
55        self.channel.clone()
56    }
57
58    /// Stages a response to the current datagram peer.
59    ///
60    /// The message is stored in the handler-local outbox and later encoded into
61    /// the socket's pending datagram queue. It is not sent until the outbox or
62    /// channel is explicitly flushed.
63    #[inline]
64    pub fn write(&mut self, msg: W) -> DatagramWriteHandle {
65        self.outbox.push_write(self.info.peer_addr(), msg);
66        DatagramWriteHandle { _private: () }
67    }
68
69    /// Stages a datagram for an explicit peer.
70    ///
71    /// Use this when a handler needs to reply somewhere other than the sender
72    /// of the current datagram.
73    #[inline]
74    pub fn write_to(&mut self, peer_addr: SocketAddr, msg: W) -> DatagramWriteHandle {
75        self.outbox.push_write(peer_addr, msg);
76        DatagramWriteHandle { _private: () }
77    }
78
79    /// Sends messages staged by this handler so far.
80    ///
81    /// Dropping the returned handle is fire-and-forget. Awaiting it waits until
82    /// the socket task completes `send_to` for all staged messages before this
83    /// flush boundary.
84    #[inline]
85    pub fn flush(&mut self) -> DatagramFlushHandle<'_, W> {
86        self.outbox.push_flush()
87    }
88
89    /// Stages a response to the current peer and requests a flush.
90    #[inline]
91    pub fn write_and_flush(&mut self, msg: W) -> DatagramFlushHandle<'_, W> {
92        self.outbox.push_write_and_flush(self.info.peer_addr(), msg)
93    }
94
95    /// Stages a datagram for an explicit peer and requests a flush.
96    #[inline]
97    pub fn write_to_and_flush(
98        &mut self,
99        peer_addr: SocketAddr,
100        msg: W,
101    ) -> DatagramFlushHandle<'_, W> {
102        self.outbox.push_write_and_flush(peer_addr, msg)
103    }
104
105    /// Requests that the socket task close after the current handler returns.
106    pub async fn close(&mut self) -> Result<()> {
107        self.close_requested = true;
108        Ok(())
109    }
110
111    pub(crate) fn outbox(&self) -> DatagramOutboxHandle<W> {
112        self.outbox.clone()
113    }
114
115    pub(crate) fn close_requested(&self) -> bool {
116        self.close_requested
117    }
118}
119
120pub struct DatagramWriteHandle {
121    _private: (),
122}
123
124impl IntoFuture for DatagramWriteHandle {
125    type Output = Result<()>;
126    type IntoFuture = Ready<Result<()>>;
127
128    #[inline]
129    fn into_future(self) -> Self::IntoFuture {
130        ready(Ok(()))
131    }
132}
133
134pub struct DatagramFlushHandle<'a, W> {
135    outbox: &'a DatagramOutboxHandle<W>,
136}
137
138impl<'a, W> IntoFuture for DatagramFlushHandle<'a, W> {
139    type Output = Result<()>;
140    type IntoFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
141
142    #[inline]
143    fn into_future(self) -> Self::IntoFuture {
144        let id = self.outbox.push_flush_completion();
145        let state = &self.outbox.core.flush_state;
146
147        Box::pin(async move {
148            state.mark_awaited(id);
149
150            loop {
151                let notified = state.notify.notified();
152                tokio::pin!(notified);
153                notified.as_mut().enable();
154
155                if state.completed_flush_id.load(Ordering::Acquire) >= id {
156                    return Ok(());
157                }
158
159                notified.await;
160            }
161        })
162    }
163}
164
165pub(crate) enum DatagramOutboxCommand<W> {
166    WriteTo(SocketAddr, W),
167    Flush { completion: Option<u64> },
168    WriteToAndFlush { peer_addr: SocketAddr, msg: W },
169}
170
171struct DatagramOutboxState<W> {
172    head: Option<DatagramOutboxCommand<W>>,
173    tail: VecDeque<DatagramOutboxCommand<W>>,
174}
175
176impl<W> DatagramOutboxState<W> {
177    fn new() -> Self {
178        Self {
179            head: None,
180            tail: VecDeque::new(),
181        }
182    }
183
184    #[inline]
185    fn push(&mut self, command: DatagramOutboxCommand<W>) {
186        if self.head.is_none() {
187            self.head = Some(command);
188        } else {
189            self.tail.push_back(command);
190        }
191    }
192
193    #[inline]
194    fn take_batch(&mut self) -> DatagramOutboxBatch<W> {
195        DatagramOutboxBatch {
196            head: self.head.take(),
197            tail: std::mem::take(&mut self.tail),
198        }
199    }
200}
201
202pub(crate) struct DatagramOutboxBatch<W> {
203    head: Option<DatagramOutboxCommand<W>>,
204    tail: VecDeque<DatagramOutboxCommand<W>>,
205}
206
207impl<W> Iterator for DatagramOutboxBatch<W> {
208    type Item = DatagramOutboxCommand<W>;
209
210    #[inline]
211    fn next(&mut self) -> Option<Self::Item> {
212        self.head.take().or_else(|| self.tail.pop_front())
213    }
214}
215
216struct DatagramFlushState {
217    next_flush_id: AtomicU64,
218    completed_flush_id: AtomicU64,
219    awaited_flush_id: AtomicU64,
220    notify: tokio::sync::Notify,
221}
222
223impl DatagramFlushState {
224    fn new() -> Self {
225        Self {
226            next_flush_id: AtomicU64::new(0),
227            completed_flush_id: AtomicU64::new(0),
228            awaited_flush_id: AtomicU64::new(0),
229            notify: tokio::sync::Notify::new(),
230        }
231    }
232
233    #[inline]
234    fn next_id(&self) -> u64 {
235        self.next_flush_id.fetch_add(1, Ordering::Relaxed) + 1
236    }
237
238    #[inline]
239    fn mark_awaited(&self, id: u64) {
240        self.awaited_flush_id.fetch_max(id, Ordering::Release);
241    }
242
243    #[inline]
244    fn complete(&self, id: u64) {
245        self.completed_flush_id.store(id, Ordering::Release);
246        if self.awaited_flush_id.load(Ordering::Acquire) >= id {
247            self.notify.notify_waiters();
248        }
249    }
250}
251
252struct DatagramOutboxCore<W> {
253    commands: Mutex<DatagramOutboxState<W>>,
254    flush_requested: AtomicBool,
255    flush_state: DatagramFlushState,
256}
257
258pub(crate) struct DatagramOutboxHandle<W> {
259    core: Arc<DatagramOutboxCore<W>>,
260}
261
262impl<W> Clone for DatagramOutboxHandle<W> {
263    fn clone(&self) -> Self {
264        Self {
265            core: self.core.clone(),
266        }
267    }
268}
269
270impl<W> DatagramOutboxHandle<W> {
271    fn new() -> Self {
272        Self {
273            core: Arc::new(DatagramOutboxCore {
274                commands: Mutex::new(DatagramOutboxState::new()),
275                flush_requested: AtomicBool::new(false),
276                flush_state: DatagramFlushState::new(),
277            }),
278        }
279    }
280
281    #[inline]
282    fn push_write(&self, peer_addr: SocketAddr, msg: W) {
283        self.core
284            .commands
285            .lock()
286            .expect("datagram outbox lock poisoned")
287            .push(DatagramOutboxCommand::WriteTo(peer_addr, msg));
288    }
289
290    #[inline]
291    fn push_flush(&self) -> DatagramFlushHandle<'_, W> {
292        self.core
293            .commands
294            .lock()
295            .expect("datagram outbox lock poisoned")
296            .push(DatagramOutboxCommand::Flush { completion: None });
297        self.core.flush_requested.store(true, Ordering::Release);
298        DatagramFlushHandle { outbox: self }
299    }
300
301    #[inline]
302    fn push_write_and_flush(&self, peer_addr: SocketAddr, msg: W) -> DatagramFlushHandle<'_, W> {
303        self.core
304            .commands
305            .lock()
306            .expect("datagram outbox lock poisoned")
307            .push(DatagramOutboxCommand::WriteToAndFlush { peer_addr, msg });
308        self.core.flush_requested.store(true, Ordering::Release);
309        DatagramFlushHandle { outbox: self }
310    }
311
312    #[inline]
313    fn push_flush_completion(&self) -> u64 {
314        let id = self.core.flush_state.next_id();
315        self.core
316            .commands
317            .lock()
318            .expect("datagram outbox lock poisoned")
319            .push(DatagramOutboxCommand::Flush {
320                completion: Some(id),
321            });
322        self.core.flush_requested.store(true, Ordering::Release);
323        id
324    }
325
326    #[inline]
327    pub(crate) fn has_flush_command(&self) -> bool {
328        self.core.flush_requested.load(Ordering::Acquire)
329    }
330
331    #[inline]
332    pub(crate) fn take_commands(&self) -> DatagramOutboxBatch<W> {
333        self.core.flush_requested.store(false, Ordering::Release);
334        self.core
335            .commands
336            .lock()
337            .expect("datagram outbox lock poisoned")
338            .take_batch()
339    }
340
341    #[inline]
342    pub(crate) fn complete_flush(&self, id: u64) {
343        self.core.flush_state.complete(id);
344    }
345}