unix_udp_sock/framed.rs
1use tokio_util::codec::Decoder;
2
3use futures_core::Stream;
4
5use bytes::{BufMut, BytesMut};
6use futures_core::ready;
7use std::borrow::Borrow;
8use std::task::{Context, Poll};
9use std::{io::IoSliceMut, pin::Pin};
10
11use crate::{RecvMeta, UdpSocket};
12
13/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using
14/// the `Encoder` and `Decoder` traits to encode and decode frames.
15///
16/// Raw UDP sockets work with datagrams, but higher-level code usually wants to
17/// batch these into meaningful chunks, called "frames". This method layers
18/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
19/// handle encoding and decoding of messages frames. Note that the incoming and
20/// outgoing frame types may be distinct.
21///
22/// This function returns a *single* object that is both [`Stream`] and [`Sink`];
23/// grouping this into a single object is often useful for layering things which
24/// require both read and write access to the underlying object.
25///
26/// If you want to work more directly with the streams and sink, consider
27/// calling [`split`] on the `UdpFramed` returned by this method, which will break
28/// them into separate objects, allowing them to interact more easily.
29///
30/// [`Stream`]: futures_core::Stream
31/// [`Sink`]: futures_sink::Sink
32/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
33#[must_use = "sinks do nothing unless polled"]
34#[derive(Debug)]
35pub struct UdpFramed<C, T = UdpSocket> {
36 socket: T,
37 codec: C,
38 rd: BytesMut,
39 // wr: BytesMut,
40 // out_addr: SocketAddr,
41 // flushed: bool,
42 is_readable: bool,
43 cur_meta: Option<RecvMeta>,
44}
45
46const INITIAL_RD_CAPACITY: usize = 64 * 1024;
47// const INITIAL_WR_CAPACITY: usize = 8 * 1024;
48
49impl<C, T> Unpin for UdpFramed<C, T> {}
50
51impl<C, T> Stream for UdpFramed<C, T>
52where
53 T: Borrow<UdpSocket>,
54 C: Decoder,
55{
56 type Item = Result<(C::Item, RecvMeta), C::Error>;
57
58 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 let pin = self.get_mut();
60
61 pin.rd.reserve(INITIAL_RD_CAPACITY);
62
63 loop {
64 // Are there still bytes left in the read buffer to decode?
65 if pin.is_readable {
66 if let Some(frame) = pin.codec.decode_eof(&mut pin.rd)? {
67 let current_meta = pin
68 .cur_meta
69 .expect("will always be set before this line is called");
70
71 return Poll::Ready(Some(Ok((frame, current_meta))));
72 }
73
74 // if this line has been reached then decode has returned `None`.
75 pin.is_readable = false;
76 pin.rd.clear();
77 }
78
79 // We're out of data. Try and fetch more data to decode
80 let meta = {
81 // Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
82 // transparent wrapper around `[MaybeUninit<u8>]`.
83 let buf = unsafe { &mut *(pin.rd.chunk_mut() as *mut _ as *mut [u8]) };
84 let mut iov = IoSliceMut::new(buf);
85 let meta = ready!(pin.socket.borrow().poll_recv_msg(cx, &mut iov))?;
86
87 unsafe { pin.rd.advance_mut(meta.len) };
88
89 meta
90 };
91
92 pin.cur_meta = Some(meta);
93 pin.is_readable = true;
94 }
95 }
96}
97
98// impl<I, C, T> Sink<(I, SocketAddr)> for UdpFramed<C, T>
99// where
100// T: Borrow<UdpSocket>,
101// C: Encoder<I>,
102// {
103// type Error = C::Error;
104
105// fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106// if !self.flushed {
107// match self.poll_flush(cx)? {
108// Poll::Ready(()) => {}
109// Poll::Pending => return Poll::Pending,
110// }
111// }
112
113// Poll::Ready(Ok(()))
114// }
115
116// fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> {
117// let (frame, out_addr) = item;
118
119// let pin = self.get_mut();
120
121// pin.codec.encode(frame, &mut pin.wr)?;
122// pin.out_addr = out_addr;
123// pin.flushed = false;
124
125// Ok(())
126// }
127
128// fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129// if self.flushed {
130// return Poll::Ready(Ok(()));
131// }
132
133// let Self {
134// ref socket,
135// ref mut out_addr,
136// ref mut wr,
137// ..
138// } = *self;
139
140// let n = ready!(socket.borrow().poll_send_to(cx, wr, *out_addr))?;
141
142// let wrote_all = n == self.wr.len();
143// self.wr.clear();
144// self.flushed = true;
145
146// let res = if wrote_all {
147// Ok(())
148// } else {
149// Err(io::Error::new(
150// io::ErrorKind::Other,
151// "failed to write entire datagram to socket",
152// )
153// .into())
154// };
155
156// Poll::Ready(res)
157// }
158
159// fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160// ready!(self.poll_flush(cx))?;
161// Poll::Ready(Ok(()))
162// }
163// }
164
165impl<C, T> UdpFramed<C, T>
166where
167 T: Borrow<UdpSocket>,
168{
169 /// Create a new `UdpFramed` backed by the given socket and codec.
170 ///
171 /// See struct level documentation for more details.
172 pub fn new(socket: T, codec: C) -> UdpFramed<C, T> {
173 Self {
174 socket,
175 codec,
176 // out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
177 rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
178 // wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
179 // flushed: true,
180 is_readable: false,
181 cur_meta: None,
182 }
183 }
184
185 /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
186 ///
187 /// # Note
188 ///
189 /// Care should be taken to not tamper with the underlying stream of data
190 /// coming in as it may corrupt the stream of frames otherwise being worked
191 /// with.
192 pub fn get_ref(&self) -> &T {
193 &self.socket
194 }
195
196 /// Returns a mutable reference to the underlying I/O stream wrapped by `Framed`.
197 ///
198 /// # Note
199 ///
200 /// Care should be taken to not tamper with the underlying stream of data
201 /// coming in as it may corrupt the stream of frames otherwise being worked
202 /// with.
203 pub fn get_mut(&mut self) -> &mut T {
204 &mut self.socket
205 }
206
207 /// Returns a reference to the underlying codec wrapped by
208 /// `Framed`.
209 ///
210 /// Note that care should be taken to not tamper with the underlying codec
211 /// as it may corrupt the stream of frames otherwise being worked with.
212 pub fn codec(&self) -> &C {
213 &self.codec
214 }
215
216 /// Returns a mutable reference to the underlying codec wrapped by
217 /// `UdpFramed`.
218 ///
219 /// Note that care should be taken to not tamper with the underlying codec
220 /// as it may corrupt the stream of frames otherwise being worked with.
221 pub fn codec_mut(&mut self) -> &mut C {
222 &mut self.codec
223 }
224
225 /// Returns a reference to the read buffer.
226 pub fn read_buffer(&self) -> &BytesMut {
227 &self.rd
228 }
229
230 /// Returns a mutable reference to the read buffer.
231 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
232 &mut self.rd
233 }
234
235 /// Consumes the `Framed`, returning its underlying I/O stream.
236 pub fn into_inner(self) -> T {
237 self.socket
238 }
239}