Skip to main content

futures_udp/
lib.rs

1#![cfg_attr(unstable_bool_to_result, feature(bool_to_result))]
2#![cfg_attr(unstable_never_type, feature(never_type))]
3
4//! Runtime agnostic, non-blocking, non-exclusive async UDP networking.
5//!
6//! `futures-udp` provides two key structs:
7//! - [UdpStream] for reading data from a UDP Socket
8//! - [UdpSink] for sending data via a UDP Socket
9//!
10//! These structs implement the `futures-rs` traits [Stream] & [Sink] respectively but are tested
11//! and known to work with both `tokio` & `futures-rs` runtimes. (tokio tests performed in a
12//! downstream crate, I'll add them here soon so to make sure this never breaks)
13//!
14//! ## Why?
15//! - I usually don't want to be forced to bring `tokio` into my dependency tree unless I want
16//!   to use it as my runtime. I think the runtime choice should be left to the final binary.
17//! - `futures-rs` is a lot lighter weight and provided by rust-lang, so I chose that for the base
18//!   traits. They are cross-compatible with `tokio`.
19//! - Working with a bare `UdpSocket` is "a bit hard", doing it async is "a bit more hard".
20//!   Adding `Stream` & `Sink` semantics makes it "nice".
21//! - Despite the docs [futures_net::UdpSocket] creates a blocking socket, which is locked
22//!   for exclusive use. (Opening a ticket TBD)
23//!
24//! ## Stability & MSRV
25//!
26//! I've chosen to rely on two experimental features, while this crate is in v0.x.y, as I feel they
27//! add significant value to the API. I also believe in supporting language development and
28//! generating feedback to features as they near stabilisation.
29//!
30//! This crate will not move to v1.x.y until both features are stabilised, or I decide to stop using
31//! them. Realistically, however, they will be stable while I allow this API to go through a
32//! "settling-in" phase before fixing it at v1.0.0
33//!
34//! > 🔬 **Experimental Features**
35//! >
36//! > This crate makes use of the following experimental features:
37//! >
38//! > - [`#![feature(never_type)]`](https://github.com/rust-lang/rust/issues/35121) [final stages of stabilisation]
39//! > - [`#![feature(bool_to_result)]`](https://github.com/rust-lang/rust/issues/142748) [in FCP as of 2026-04-25]
40//! >
41//! > This list includes any unstable features used by direct & transitive dependencies (currently, none).
42//! >
43//! > Both are so close to being part of stable rust that I chose to use them here.
44//!
45//! You do not need to enable these in your own code, the list is for information only. But currently
46//! you do need to use nightly to take advantage of this crate.
47//!
48//! ### Stability guarantees
49//!
50//! We run automated tests **every month** to ensure no fundamental changes affect this crate and
51//! test every PR against the current nightly, as well as the current equivalent beta & stable.
52//! If you find an issue before we do, please
53//! [raise an issue on github](https://github.com/MusicalNinjaDad/splurt/issues).
54//!
55//! ### MSRV
56//!
57//! For those of you working with a pinned nightly (etc.) this crate supports the equivalent of
58//! 1.90.0 onwards. We use [autocfg](https://crates.io/crates/autocfg/) to seamlessly handle
59//! features which have been stabilised since then.
60//!
61//! ### Dependencies
62//!
63//! We deliberately keep the dependency list short and pay attention to any transitive dependencies
64//! we bring in.
65//!
66//! - `futures-rs` (for the Stream & Sink traits)
67//! - `futures-net` (for the underlying UdpSocket)
68//! - `socket2` (to set the socket to non-blocking, non-exclusive)
69
70use std::{
71    io,
72    net::{SocketAddr, ToSocketAddrs},
73    pin::Pin,
74    task::{Context, Poll},
75};
76
77use futures::{Stream, sink::Sink};
78use futures_net::driver::{
79    PollEvented,
80    sys::{self},
81};
82use socket2::{Domain, Type};
83
84//TODO: Open a ticket with futures_net re non-blocking UdpSocket
85#[derive(Debug)]
86/// A non-blocking async UdpSocket with ability to `recv_from` via `next` and `send_to` via `push`.
87///
88/// #### BUF_SIZE
89/// Messages received via [UdpStream::next] will be provided as an array of bytes of length
90/// `BUF_SIZE`. This is a generic const to allow avoid us having to allocate a 65k buffer on each
91/// call to next in order to cover the max possible UDP datagram size.
92///
93/// It is your responsibility to ensure that `BUF_SIZE` is large enough to hold the largest UDP
94/// datagram your protocol expects; if it is smaller than the incoming datagram size, the datagram
95/// will be truncated in the output from `next`. You cannot rely on the returned `bytes_read` value
96/// to indicate truncation as this will also be set to the buffer length, not the full size of the
97/// truncated message (this is the underlying behaviour of the libc call `recv_from`).
98///
99/// #### Note
100/// - This does NOT have exclusive access to the bound port. If you want to guarantee that
101///   no other processes bind to the same socket vote thumbs up on issue #22 TODO: implement
102///   `bind_exclusive` etc.)
103pub struct UdpStream<const BUF_SIZE: usize> {
104    /// The underlying, evented Socket.
105    ///
106    /// #### Note
107    /// - [`futures_net::UdpSocket`] does NOT implement [futures_net::driver::sys::event::Evented]
108    ///   and is NOT the same type as stored here.
109    /// - [`futures_net::driver::sys::net::UdpSocket`] is not actually non-blocking, despite the
110    ///   documentation.
111    /// - Neither [std::sys::net::UdpSocket], nor [net2::UdpBuilder] expose `set_nonblocking()` so
112    ///   we need use [socket2::Socket] while building the listener but are unable to change
113    ///   blocking or exclusivity after construction.
114    io: PollEvented<sys::net::UdpSocket>,
115}
116
117/// Basic functions on a struct wrapping a `PollEvented<sys::net::UdpSocket>`
118///
119/// Right now this is lazy for my own use, so makes assumptions about internal structure.
120///
121/// #### Note
122/// - TODO #26 handle cases with multiple fields which need to be provided during construction
123pub trait EventedUdpSocket
124where
125    Self: Sized,
126{
127    /// Create a new `Self` from a `PollEvented<sys::net::UdpSocket>`
128    fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self>;
129
130    /// Create a new `Self` by binding it to a given [SocketAddr].
131    ///
132    /// In the default implementation, the listener is guaranteed to be constructed to be
133    /// non-blocking and have non-exclusive access to the bound address; if either of these system
134    /// calls fails to take effect an [io::ErrorKind::Unsupported] will be returned.
135    ///
136    /// #### Note
137    /// - It is not possible to validate the non-blocking status has been correctly set on Windows
138    ///   or wasm32-wasip1 so this check is skipped in those cases and success is assumed.
139    fn bind(addr: SocketAddr) -> io::Result<Self> {
140        let s2 = socket2::Socket::new(Domain::IPV4, Type::DGRAM, None)?;
141        let addr = addr.into();
142        s2.set_nonblocking(true)?;
143        #[cfg(any(unix, all(target_os = "wasi", not(target_env = "p1"))))]
144        s2.nonblocking()?
145            .ok_or(io::Error::from(io::ErrorKind::Unsupported))?;
146
147        // NOTE for consideration if/when implementing conversion to a UdpConnectedStream
148        // ==============================================================================
149        // This would stop another process from re-binding to the same address *& port* if
150        // converted to a UdpConnectedStream which actively begins listening on this address,
151        // thereby claiming exclusive interest in all received data.
152        // see https://man7.org/linux/man-pages/man7/socket.7.html#:~:text=SO_REUSEADDR
153        s2.set_reuse_address(true)?;
154        s2.reuse_address()?
155            .ok_or(io::Error::from(io::ErrorKind::Unsupported))?;
156
157        s2.bind(&addr)?;
158        let sstd: std::net::UdpSocket = s2.into();
159        let evented_socket = PollEvented::new(sys::net::UdpSocket::from_socket(sstd)?);
160        Self::from_evented_socket(evented_socket)
161    }
162
163    // // TODO: #22 Add bind_exclusive constructor & update struct docs for UdpStream
164    // pub fn bind_exclusive(addr: SocketAddr) -> io::Result<Self>
165    // pub fn is_exclusive(&self) -> Option<SocketAddr>
166    // pub fn check_exclusive(&self) -> io::Result<SocketAddr>
167    // pub fn is_non_exclusive(&self) -> Option<SocketAddr>
168    // pub fn check_non_exclusive(&self) -> io::Result<SocketAddr>
169
170    /// Get the local address of the underlying Socket
171    fn local_addr(&self) -> io::Result<SocketAddr> {
172        self.as_socket().local_addr()
173    }
174
175    /// Provides access to the underlying Socket.
176    ///
177    /// #### Note
178    /// `futures_net::UdpSocket` is NOT the same type as returned here.
179    fn as_socket(&self) -> &sys::net::UdpSocket;
180
181    /// Provides mutable access to the underlying Socket.
182    ///
183    /// #### Note
184    /// `futures_net::UdpSocket` is NOT the same type as returned here.
185    fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket;
186
187    /// Converts a pinned `&mut Self` to a pinned &mut of the underlying pollevented socket
188    /// allowing for calls to traits and functions implemented by [PollEvented]
189    fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>>;
190
191    /// Clear the readiness state of the underlying socket.
192    ///
193    /// **This MUST be called after any failed readiness poll.**
194    ///
195    /// Implementations should attempt to clear the relevant readiness marker of the underlying
196    /// socket and then return:
197    /// - `Poll::Pending` if successful
198    /// - `Poll::Ready(error)` on error, to avoid repeated polling without handling the error
199    ///
200    /// #### Note
201    /// This returns a `Poll<Result<!>>` which will not currently automatically coerce into a
202    /// `Poll<Result<T>>`. Work around this by calling `.map_ok(|x| x)` as a no-op to force the
203    /// compiler to notice that everything is fine.
204    fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>>;
205    // TODO: #30 Should an error during clear_ready be clearly fatal?
206    //       One option would be to return a `Poll<Option<!>>`, thus differentiating it
207    //       from unblock processing a non-blocking error. This would lead to a Stream
208    //       delivering `None` and thus signalling it is dead. But it would lose the
209    //       details of the error which occurred.
210
211    /// Checks whether `error` will block the underlying Socket and either:
212    /// - calls [Self::clear_ready] for blocking errors
213    /// - returns `Poll::Ready(error)` for non-blocking errors
214    ///
215    /// #### Note
216    /// This returns a `Poll<Result<!>>` which will not currently automatically coerce into a
217    /// `Poll<Result<T>>`. Work around this by calling `.map_ok(|x| x)` as a no-op to force the
218    /// compiler to notice that everything is fine.
219    fn unblock(
220        self: Pin<&mut Self>,
221        error: io::Result<!>,
222        cx: &mut Context<'_>,
223    ) -> Poll<io::Result<!>> {
224        let Err(error) = error;
225        match error.kind() {
226            io::ErrorKind::WouldBlock => self.clear_ready(cx),
227            _ => Poll::Ready(Err(error)),
228        }
229    }
230}
231
232impl<const _BS: usize> EventedUdpSocket for UdpStream<_BS> {
233    fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self> {
234        Ok(Self { io: evented_socket })
235    }
236
237    fn as_socket(&self) -> &sys::net::UdpSocket {
238        let io = &self.io;
239        io.get_ref()
240    }
241
242    fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket {
243        let io = &mut self.io;
244        io.get_mut()
245    }
246
247    fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>> {
248        let this = self.get_mut();
249        let io = &mut this.io;
250        Pin::new(&mut *io)
251    }
252
253    fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>> {
254        match self.as_evented_socket_pin().clear_read_ready(cx) {
255            Ok(_) => Poll::Pending,
256            Err(e) => Poll::Ready(Err(e)),
257        }
258    }
259}
260
261impl<const BUF_SIZE: usize> Stream for UdpStream<BUF_SIZE> {
262    type Item = io::Result<([u8; BUF_SIZE], usize, SocketAddr)>;
263
264    /// Receives data from the IO interface once `await`ed.
265    ///
266    /// Awaiting returns an array of bytes containing the message received, the message length
267    /// and the target from whence the data came as an
268    /// `Option<io::Result<([u8; BUF_SIZE], usize, SocketAddr)>>`
269    ///
270    /// #### Note
271    ///
272    /// - Messages received via [UdpStream::next] will be provided as an array of bytes of length
273    ///   `BUF_SIZE`. This is a generic const to allow avoid us having to allocate a 65k buffer on each
274    ///   call to next in order to cover the max possible UDP datagram size.
275    /// - It is your responsibility to ensure that `BUF_SIZE` is large enough to hold the largest UDP
276    ///   datagram your protocol expects; if it is smaller than the incoming datagram size, the datagram
277    ///   will be truncated in the output from `next`. You cannot rely on the returned `bytes_read` value
278    ///   to indicate truncation as this will also be set to the buffer length, not the full size of the
279    ///   truncated message (this is the underlying behaviour of the libc call `recv_from`).
280    /// - All bytes after the actual message will be NULL so it can be directly converted to a String,
281    ///   for example, without first slicing. Other data manipulation should take into account the actual length.
282    /// - There are no clear situations which could lead to this returning `None`. Wrapping the
283    ///   returned data in an `Option` is done purely to maintain a consistent API with expectations
284    ///   on an Iterator / Stream
285    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
286        let evented_socket = self.as_mut().as_evented_socket_pin();
287        match evented_socket.poll_read_ready(cx) {
288            Poll::Ready(is_ready) => match is_ready {
289                Ok(readiness) => match readiness.is_readable() {
290                    true => {
291                        let mut buf: [u8; BUF_SIZE] = [b'\x00'; BUF_SIZE];
292                        let recv = self
293                            .as_socket()
294                            .recv_from(&mut buf)
295                            .map(|(len, addr)| (buf, len, addr));
296                        match recv {
297                            Ok(_) => Poll::Ready(Some(recv)),
298                            Err(e) => self.unblock(Err(e), cx).map_ok(|x| x).map(Some),
299                        }
300                    }
301                    false => self.clear_ready(cx).map_ok(|x| x).map(Some),
302                },
303                Err(e) => self.unblock(Err(e), cx).map_ok(|x| x).map(Some),
304            },
305            Poll::Pending => Poll::Pending,
306        }
307    }
308}
309
310#[derive(Debug)]
311/// A non-blocking async UdpSocket with ability to `send_to` via `send` and make use of all the
312/// niceties that come with [`futures::sink::Sink`] and [`futures::sink::SinkExt`].
313///
314/// #### Note
315/// - This does NOT have exclusive access to the bound port. If you want to guarantee that
316///   no other processes bind to the same socket vote thumbs up on issue #22 TODO: implement
317///   `bind_exclusive` etc.
318pub struct UdpSink {
319    /// The underlying, evented Socket.
320    ///
321    /// #### Note
322    /// - [`futures_net::UdpSocket`] does NOT implement [futures_net::driver::sys::event::Evented]
323    ///   and is NOT the same type as stored here.
324    /// - [`futures_net::driver::sys::net::UdpSocket`] is not actually non-blocking, despite the
325    ///   documentation.
326    /// - Neither [std::sys::net::UdpSocket], nor [net2::UdpBuilder] expose `set_nonblocking()` so
327    ///   we need use [socket2::Socket] while building the listener but are unable to change
328    ///   blocking or exclusivity after construction.
329    io: PollEvented<sys::net::UdpSocket>,
330}
331
332impl EventedUdpSocket for UdpSink {
333    fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self> {
334        Ok(Self { io: evented_socket })
335    }
336
337    fn as_socket(&self) -> &sys::net::UdpSocket {
338        let io = &self.io;
339        io.get_ref()
340    }
341
342    fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket {
343        let io = &mut self.io;
344        io.get_mut()
345    }
346
347    fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>> {
348        let this = self.get_mut();
349        let io = &mut this.io;
350        Pin::new(&mut *io)
351    }
352
353    fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>> {
354        match self.as_evented_socket_pin().clear_write_ready(cx) {
355            Ok(_) => Poll::Pending,
356            Err(e) => Poll::Ready(Err(e)),
357        }
358    }
359}
360
361impl<A: ToSocketAddrs> Sink<(&[u8], &A)> for UdpSink {
362    type Error = io::Error;
363
364    /// Attempts to prepare the Sink to receive a value.
365    ///
366    /// This method must be called and return Poll::Ready(Ok(())) prior to each call to start_send.
367    ///
368    /// This method returns Poll::Ready once the underlying sink is ready to receive data.
369    /// If this method returns Poll::Pending, the current task is registered to be notified
370    /// (via cx.waker().wake_by_ref()) when poll_ready should be called again.
371    ///
372    /// #### Note
373    ///
374    /// - If the attempt to poll readiness fails this method **will properly handle
375    ///   it** by calling [Self::clear_ready]/[Self::unblock] to ensure the underlying socket
376    ///   does not remain blocked.
377    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
378        let evented_socket = self.as_mut().as_evented_socket_pin();
379        match evented_socket.poll_write_ready(cx) {
380            Poll::Ready(is_ready) => match is_ready {
381                Ok(readiness) => match readiness.is_writable() {
382                    true => Poll::Ready(Ok(())),
383                    false => self.clear_ready(cx).map_ok(|x| x),
384                },
385                Err(e) => self.unblock(Err(e), cx).map_ok(|x| x),
386            },
387            Poll::Pending => Poll::Pending,
388        }
389    }
390
391    /// #### Note
392    /// - While this function will accept multiple addresses, currently data is only sent to the
393    ///   first one (TODO)
394    /// - If an empty list of addresses the error will be of kind `io::ErrorKind::InvalidInput`
395    fn start_send(self: Pin<&mut Self>, item: (&[u8], &A)) -> Result<(), Self::Error> {
396        let socket = self.as_socket();
397        let (msg, addr) = item;
398        let addr = addr
399            .to_socket_addrs()?
400            .next()
401            .ok_or(io::Error::from(io::ErrorKind::InvalidInput))?;
402        socket.send_to(msg, &addr).and_then(|l| {
403            if l != msg.len() {
404                Err(io::Error::other(format!(
405                    "{} bytes sent but message was {} bytes",
406                    l,
407                    msg.len()
408                )))
409            } else {
410                Ok(())
411            }
412        })
413    }
414
415    /// Await write readiness indicating that all pending messages have been sent, then return
416    /// as a no-op (`UdpSockets` do not have an inherent `flush` method).
417    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
418        <Self as futures::Sink<(&[u8], &A)>>::poll_ready(self, cx)
419    }
420
421    // TODO: it would be nice to be able to annote these situations with as eg `Poll<!>`
422    //       is this worth a sub-issue to the tracking issue for `never_type`?
423    /// #### Note
424    /// This only flushes but does not close as no-one exposes the libc `close()`
425    /// call on a `UdpSocket`
426    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
427        <Self as futures::Sink<(&[u8], &A)>>::poll_flush(self, cx)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use std::net::{Ipv4Addr, SocketAddrV4};
434
435    use super::*;
436    use futures::{SinkExt, StreamExt};
437    use futures_net::runtime::Runtime;
438
439    #[futures_net::test]
440    async fn non_blocking() {
441        let loopback = Ipv4Addr::new(127, 0, 0, 1);
442        let addr: SocketAddr = SocketAddrV4::new(loopback, 0).into();
443        let first = UdpStream::<32>::bind(addr).expect("first connection");
444        let addr = first.local_addr().expect("bound port");
445        let _second = UdpStream::<32>::bind(addr).expect("second connection");
446    }
447
448    #[futures_net::test]
449    async fn truncated_next() {
450        let loopback = Ipv4Addr::new(127, 0, 0, 1);
451        let addr: SocketAddr = SocketAddrV4::new(loopback, 0).into();
452        let mut receiver = UdpStream::<8>::bind(addr).expect("receiver");
453        let rec_addr = receiver.local_addr().expect("bound port");
454
455        let mut sender = UdpSink::bind(addr).expect("sender");
456        let original_msg = b"udp loopback test";
457
458        let send = async move {
459            sender
460                .send((original_msg, &rec_addr))
461                .await
462                .expect("send msg");
463        };
464
465        let rec = async {
466            let (msg, len, _sent_by) = receiver
467                .next()
468                .await
469                .expect("a message")
470                .expect("a valid message");
471            // bytes read is limited to buf size - as per libc call recv_from
472            assert_eq!(len, 8);
473            assert_eq!(msg, original_msg[..8]);
474        };
475
476        futures::join!(rec, send);
477    }
478}