unix_udp_sock/
framed.rs

1use tokio_util::codec::Decoder;
2
3use futures_core::Stream;
4
5use bytes::{BufMut, BytesMut};
6use futures_core::ready;
7use std::borrow::Borrow;
8use std::task::{Context, Poll};
9use std::{io::IoSliceMut, pin::Pin};
10
11use crate::{RecvMeta, UdpSocket};
12
13/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using
14/// the `Encoder` and `Decoder` traits to encode and decode frames.
15///
16/// Raw UDP sockets work with datagrams, but higher-level code usually wants to
17/// batch these into meaningful chunks, called "frames". This method layers
18/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
19/// handle encoding and decoding of messages frames. Note that the incoming and
20/// outgoing frame types may be distinct.
21///
22/// This function returns a *single* object that is both [`Stream`] and [`Sink`];
23/// grouping this into a single object is often useful for layering things which
24/// require both read and write access to the underlying object.
25///
26/// If you want to work more directly with the streams and sink, consider
27/// calling [`split`] on the `UdpFramed` returned by this method, which will break
28/// them into separate objects, allowing them to interact more easily.
29///
30/// [`Stream`]: futures_core::Stream
31/// [`Sink`]: futures_sink::Sink
32/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
33#[must_use = "sinks do nothing unless polled"]
34#[derive(Debug)]
35pub struct UdpFramed<C, T = UdpSocket> {
36    socket: T,
37    codec: C,
38    rd: BytesMut,
39    // wr: BytesMut,
40    // out_addr: SocketAddr,
41    // flushed: bool,
42    is_readable: bool,
43    cur_meta: Option<RecvMeta>,
44}
45
46const INITIAL_RD_CAPACITY: usize = 64 * 1024;
47// const INITIAL_WR_CAPACITY: usize = 8 * 1024;
48
49impl<C, T> Unpin for UdpFramed<C, T> {}
50
51impl<C, T> Stream for UdpFramed<C, T>
52where
53    T: Borrow<UdpSocket>,
54    C: Decoder,
55{
56    type Item = Result<(C::Item, RecvMeta), C::Error>;
57
58    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        let pin = self.get_mut();
60
61        pin.rd.reserve(INITIAL_RD_CAPACITY);
62
63        loop {
64            // Are there still bytes left in the read buffer to decode?
65            if pin.is_readable {
66                if let Some(frame) = pin.codec.decode_eof(&mut pin.rd)? {
67                    let current_meta = pin
68                        .cur_meta
69                        .expect("will always be set before this line is called");
70
71                    return Poll::Ready(Some(Ok((frame, current_meta))));
72                }
73
74                // if this line has been reached then decode has returned `None`.
75                pin.is_readable = false;
76                pin.rd.clear();
77            }
78
79            // We're out of data. Try and fetch more data to decode
80            let meta = {
81                // Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
82                // transparent wrapper around `[MaybeUninit<u8>]`.
83                let buf = unsafe { &mut *(pin.rd.chunk_mut() as *mut _ as *mut [u8]) };
84                let mut iov = IoSliceMut::new(buf);
85                let meta = ready!(pin.socket.borrow().poll_recv_msg(cx, &mut iov))?;
86
87                unsafe { pin.rd.advance_mut(meta.len) };
88
89                meta
90            };
91
92            pin.cur_meta = Some(meta);
93            pin.is_readable = true;
94        }
95    }
96}
97
98// impl<I, C, T> Sink<(I, SocketAddr)> for UdpFramed<C, T>
99// where
100//     T: Borrow<UdpSocket>,
101//     C: Encoder<I>,
102// {
103//     type Error = C::Error;
104
105//     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106//         if !self.flushed {
107//             match self.poll_flush(cx)? {
108//                 Poll::Ready(()) => {}
109//                 Poll::Pending => return Poll::Pending,
110//             }
111//         }
112
113//         Poll::Ready(Ok(()))
114//     }
115
116//     fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> {
117//         let (frame, out_addr) = item;
118
119//         let pin = self.get_mut();
120
121//         pin.codec.encode(frame, &mut pin.wr)?;
122//         pin.out_addr = out_addr;
123//         pin.flushed = false;
124
125//         Ok(())
126//     }
127
128//     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129//         if self.flushed {
130//             return Poll::Ready(Ok(()));
131//         }
132
133//         let Self {
134//             ref socket,
135//             ref mut out_addr,
136//             ref mut wr,
137//             ..
138//         } = *self;
139
140//         let n = ready!(socket.borrow().poll_send_to(cx, wr, *out_addr))?;
141
142//         let wrote_all = n == self.wr.len();
143//         self.wr.clear();
144//         self.flushed = true;
145
146//         let res = if wrote_all {
147//             Ok(())
148//         } else {
149//             Err(io::Error::new(
150//                 io::ErrorKind::Other,
151//                 "failed to write entire datagram to socket",
152//             )
153//             .into())
154//         };
155
156//         Poll::Ready(res)
157//     }
158
159//     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160//         ready!(self.poll_flush(cx))?;
161//         Poll::Ready(Ok(()))
162//     }
163// }
164
165impl<C, T> UdpFramed<C, T>
166where
167    T: Borrow<UdpSocket>,
168{
169    /// Create a new `UdpFramed` backed by the given socket and codec.
170    ///
171    /// See struct level documentation for more details.
172    pub fn new(socket: T, codec: C) -> UdpFramed<C, T> {
173        Self {
174            socket,
175            codec,
176            // out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
177            rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
178            // wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
179            // flushed: true,
180            is_readable: false,
181            cur_meta: None,
182        }
183    }
184
185    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
186    ///
187    /// # Note
188    ///
189    /// Care should be taken to not tamper with the underlying stream of data
190    /// coming in as it may corrupt the stream of frames otherwise being worked
191    /// with.
192    pub fn get_ref(&self) -> &T {
193        &self.socket
194    }
195
196    /// Returns a mutable reference to the underlying I/O stream wrapped by `Framed`.
197    ///
198    /// # Note
199    ///
200    /// Care should be taken to not tamper with the underlying stream of data
201    /// coming in as it may corrupt the stream of frames otherwise being worked
202    /// with.
203    pub fn get_mut(&mut self) -> &mut T {
204        &mut self.socket
205    }
206
207    /// Returns a reference to the underlying codec wrapped by
208    /// `Framed`.
209    ///
210    /// Note that care should be taken to not tamper with the underlying codec
211    /// as it may corrupt the stream of frames otherwise being worked with.
212    pub fn codec(&self) -> &C {
213        &self.codec
214    }
215
216    /// Returns a mutable reference to the underlying codec wrapped by
217    /// `UdpFramed`.
218    ///
219    /// Note that care should be taken to not tamper with the underlying codec
220    /// as it may corrupt the stream of frames otherwise being worked with.
221    pub fn codec_mut(&mut self) -> &mut C {
222        &mut self.codec
223    }
224
225    /// Returns a reference to the read buffer.
226    pub fn read_buffer(&self) -> &BytesMut {
227        &self.rd
228    }
229
230    /// Returns a mutable reference to the read buffer.
231    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
232        &mut self.rd
233    }
234
235    /// Consumes the `Framed`, returning its underlying I/O stream.
236    pub fn into_inner(self) -> T {
237        self.socket
238    }
239}