futures_udp/lib.rs
1#![cfg_attr(unstable_bool_to_result, feature(bool_to_result))]
2#![cfg_attr(unstable_never_type, feature(never_type))]
3
4//! Runtime agnostic, non-blocking, non-exclusive async UDP networking.
5//!
6//! `futures-udp` provides two key structs:
7//! - [UdpStream] for reading data from a UDP Socket
8//! - [UdpSink] for sending data via a UDP Socket
9//!
10//! These structs implement the `futures-rs` traits [Stream] & [Sink] respectively but are tested
11//! and known to work with both `tokio` & `futures-rs` runtimes. (tokio tests performed in a
12//! downstream crate, I'll add them here soon so to make sure this never breaks)
13//!
14//! ## Why?
15//! - I usually don't want to be forced to bring `tokio` into my dependency tree unless I want
16//! to use it as my runtime. I think the runtime choice should be left to the final binary.
17//! - `futures-rs` is a lot lighter weight and provided by rust-lang, so I chose that for the base
18//! traits. They are cross-compatible with `tokio`.
19//! - Working with a bare `UdpSocket` is "a bit hard", doing it async is "a bit more hard".
20//! Adding `Stream` & `Sink` semantics makes it "nice".
21//! - Despite the docs [futures_net::UdpSocket] creates a blocking socket, which is locked
22//! for exclusive use. (Opening a ticket TBD)
23//!
24//! ## Stability & MSRV
25//!
26//! I've chosen to rely on two experimental features, while this crate is in v0.x.y, as I feel they
27//! add significant value to the API. I also believe in supporting language development and
28//! generating feedback to features as they near stabilisation.
29//!
30//! This crate will not move to v1.x.y until both features are stabilised, or I decide to stop using
31//! them. Realistically, however, they will be stable while I allow this API to go through a
32//! "settling-in" phase before fixing it at v1.0.0
33//!
34//! > 🔬 **Experimental Features**
35//! >
36//! > This crate makes use of the following experimental features:
37//! >
38//! > - [`#![feature(never_type)]`](https://github.com/rust-lang/rust/issues/35121) [final stages of stabilisation]
39//! > - [`#![feature(bool_to_result)]`](https://github.com/rust-lang/rust/issues/142748) [in FCP as of 2026-04-25]
40//! >
41//! > This list includes any unstable features used by direct & transitive dependencies (currently, none).
42//! >
43//! > Both are so close to being part of stable rust that I chose to use them here.
44//!
45//! You do not need to enable these in your own code, the list is for information only. But currently
46//! you do need to use nightly to take advantage of this crate.
47//!
48//! ### Stability guarantees
49//!
50//! We run automated tests **every month** to ensure no fundamental changes affect this crate and
51//! test every PR against the current nightly, as well as the current equivalent beta & stable.
52//! If you find an issue before we do, please
53//! [raise an issue on github](https://github.com/MusicalNinjaDad/splurt/issues).
54//!
55//! ### MSRV
56//!
57//! For those of you working with a pinned nightly (etc.) this crate supports the equivalent of
58//! 1.90.0 onwards. We use [autocfg](https://crates.io/crates/autocfg/) to seamlessly handle
59//! features which have been stabilised since then.
60//!
61//! ### Dependencies
62//!
63//! We deliberately keep the dependency list short and pay attention to any transitive dependencies
64//! we bring in.
65//!
66//! - `futures-rs` (for the Stream & Sink traits)
67//! - `futures-net` (for the underlying UdpSocket)
68//! - `socket2` (to set the socket to non-blocking, non-exclusive)
69
70use std::{
71 io,
72 net::{SocketAddr, ToSocketAddrs},
73 pin::Pin,
74 task::{Context, Poll},
75};
76
77use futures::{Stream, sink::Sink};
78use futures_net::driver::{
79 PollEvented,
80 sys::{self},
81};
82use socket2::{Domain, Type};
83
84//TODO: Open a ticket with futures_net re non-blocking UdpSocket
85#[derive(Debug)]
86/// A non-blocking async UdpSocket with ability to `recv_from` via `next` and `send_to` via `push`.
87///
88/// #### BUF_SIZE
89/// Messages received via [UdpStream::next] will be provided as an array of bytes of length
90/// `BUF_SIZE`. This is a generic const to allow avoid us having to allocate a 65k buffer on each
91/// call to next in order to cover the max possible UDP datagram size.
92///
93/// It is your responsibility to ensure that `BUF_SIZE` is large enough to hold the largest UDP
94/// datagram your protocol expects; if it is smaller than the incoming datagram size, the datagram
95/// will be truncated in the output from `next`. You cannot rely on the returned `bytes_read` value
96/// to indicate truncation as this will also be set to the buffer length, not the full size of the
97/// truncated message (this is the underlying behaviour of the libc call `recv_from`).
98///
99/// #### Note
100/// - This does NOT have exclusive access to the bound port. If you want to guarantee that
101/// no other processes bind to the same socket vote thumbs up on issue #22 TODO: implement
102/// `bind_exclusive` etc.)
103pub struct UdpStream<const BUF_SIZE: usize> {
104 /// The underlying, evented Socket.
105 ///
106 /// #### Note
107 /// - [`futures_net::UdpSocket`] does NOT implement [futures_net::driver::sys::event::Evented]
108 /// and is NOT the same type as stored here.
109 /// - [`futures_net::driver::sys::net::UdpSocket`] is not actually non-blocking, despite the
110 /// documentation.
111 /// - Neither [std::sys::net::UdpSocket], nor [net2::UdpBuilder] expose `set_nonblocking()` so
112 /// we need use [socket2::Socket] while building the listener but are unable to change
113 /// blocking or exclusivity after construction.
114 io: PollEvented<sys::net::UdpSocket>,
115}
116
117/// Basic functions on a struct wrapping a `PollEvented<sys::net::UdpSocket>`
118///
119/// Right now this is lazy for my own use, so makes assumptions about internal structure.
120///
121/// #### Note
122/// - TODO #26 handle cases with multiple fields which need to be provided during construction
123pub trait EventedUdpSocket
124where
125 Self: Sized,
126{
127 /// Create a new `Self` from a `PollEvented<sys::net::UdpSocket>`
128 fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self>;
129
130 /// Create a new `Self` by binding it to a given [SocketAddr].
131 ///
132 /// In the default implementation, the listener is guaranteed to be constructed to be
133 /// non-blocking and have non-exclusive access to the bound address; if either of these system
134 /// calls fails to take effect an [io::ErrorKind::Unsupported] will be returned.
135 ///
136 /// #### Note
137 /// - It is not possible to validate the non-blocking status has been correctly set on Windows
138 /// or wasm32-wasip1 so this check is skipped in those cases and success is assumed.
139 fn bind(addr: SocketAddr) -> io::Result<Self> {
140 let s2 = socket2::Socket::new(Domain::IPV4, Type::DGRAM, None)?;
141 let addr = addr.into();
142 s2.set_nonblocking(true)?;
143 #[cfg(any(unix, all(target_os = "wasi", not(target_env = "p1"))))]
144 s2.nonblocking()?
145 .ok_or(io::Error::from(io::ErrorKind::Unsupported))?;
146
147 // NOTE for consideration if/when implementing conversion to a UdpConnectedStream
148 // ==============================================================================
149 // This would stop another process from re-binding to the same address *& port* if
150 // converted to a UdpConnectedStream which actively begins listening on this address,
151 // thereby claiming exclusive interest in all received data.
152 // see https://man7.org/linux/man-pages/man7/socket.7.html#:~:text=SO_REUSEADDR
153 s2.set_reuse_address(true)?;
154 s2.reuse_address()?
155 .ok_or(io::Error::from(io::ErrorKind::Unsupported))?;
156
157 s2.bind(&addr)?;
158 let sstd: std::net::UdpSocket = s2.into();
159 let evented_socket = PollEvented::new(sys::net::UdpSocket::from_socket(sstd)?);
160 Self::from_evented_socket(evented_socket)
161 }
162
163 // // TODO: #22 Add bind_exclusive constructor & update struct docs for UdpStream
164 // pub fn bind_exclusive(addr: SocketAddr) -> io::Result<Self>
165 // pub fn is_exclusive(&self) -> Option<SocketAddr>
166 // pub fn check_exclusive(&self) -> io::Result<SocketAddr>
167 // pub fn is_non_exclusive(&self) -> Option<SocketAddr>
168 // pub fn check_non_exclusive(&self) -> io::Result<SocketAddr>
169
170 /// Get the local address of the underlying Socket
171 fn local_addr(&self) -> io::Result<SocketAddr> {
172 self.as_socket().local_addr()
173 }
174
175 /// Provides access to the underlying Socket.
176 ///
177 /// #### Note
178 /// `futures_net::UdpSocket` is NOT the same type as returned here.
179 fn as_socket(&self) -> &sys::net::UdpSocket;
180
181 /// Provides mutable access to the underlying Socket.
182 ///
183 /// #### Note
184 /// `futures_net::UdpSocket` is NOT the same type as returned here.
185 fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket;
186
187 /// Converts a pinned `&mut Self` to a pinned &mut of the underlying pollevented socket
188 /// allowing for calls to traits and functions implemented by [PollEvented]
189 fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>>;
190
191 /// Clear the readiness state of the underlying socket.
192 ///
193 /// **This MUST be called after any failed readiness poll.**
194 ///
195 /// Implementations should attempt to clear the relevant readiness marker of the underlying
196 /// socket and then return:
197 /// - `Poll::Pending` if successful
198 /// - `Poll::Ready(error)` on error, to avoid repeated polling without handling the error
199 ///
200 /// #### Note
201 /// This returns a `Poll<Result<!>>` which will not currently automatically coerce into a
202 /// `Poll<Result<T>>`. Work around this by calling `.map_ok(|x| x)` as a no-op to force the
203 /// compiler to notice that everything is fine.
204 fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>>;
205 // TODO: #30 Should an error during clear_ready be clearly fatal?
206 // One option would be to return a `Poll<Option<!>>`, thus differentiating it
207 // from unblock processing a non-blocking error. This would lead to a Stream
208 // delivering `None` and thus signalling it is dead. But it would lose the
209 // details of the error which occurred.
210
211 /// Checks whether `error` will block the underlying Socket and either:
212 /// - calls [Self::clear_ready] for blocking errors
213 /// - returns `Poll::Ready(error)` for non-blocking errors
214 ///
215 /// #### Note
216 /// This returns a `Poll<Result<!>>` which will not currently automatically coerce into a
217 /// `Poll<Result<T>>`. Work around this by calling `.map_ok(|x| x)` as a no-op to force the
218 /// compiler to notice that everything is fine.
219 fn unblock(
220 self: Pin<&mut Self>,
221 error: io::Result<!>,
222 cx: &mut Context<'_>,
223 ) -> Poll<io::Result<!>> {
224 let Err(error) = error;
225 match error.kind() {
226 io::ErrorKind::WouldBlock => self.clear_ready(cx),
227 _ => Poll::Ready(Err(error)),
228 }
229 }
230}
231
232impl<const _BS: usize> EventedUdpSocket for UdpStream<_BS> {
233 fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self> {
234 Ok(Self { io: evented_socket })
235 }
236
237 fn as_socket(&self) -> &sys::net::UdpSocket {
238 let io = &self.io;
239 io.get_ref()
240 }
241
242 fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket {
243 let io = &mut self.io;
244 io.get_mut()
245 }
246
247 fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>> {
248 let this = self.get_mut();
249 let io = &mut this.io;
250 Pin::new(&mut *io)
251 }
252
253 fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>> {
254 match self.as_evented_socket_pin().clear_read_ready(cx) {
255 Ok(_) => Poll::Pending,
256 Err(e) => Poll::Ready(Err(e)),
257 }
258 }
259}
260
261impl<const BUF_SIZE: usize> Stream for UdpStream<BUF_SIZE> {
262 type Item = io::Result<([u8; BUF_SIZE], usize, SocketAddr)>;
263
264 /// Receives data from the IO interface once `await`ed.
265 ///
266 /// Awaiting returns an array of bytes containing the message received, the message length
267 /// and the target from whence the data came as an
268 /// `Option<io::Result<([u8; BUF_SIZE], usize, SocketAddr)>>`
269 ///
270 /// #### Note
271 ///
272 /// - Messages received via [UdpStream::next] will be provided as an array of bytes of length
273 /// `BUF_SIZE`. This is a generic const to allow avoid us having to allocate a 65k buffer on each
274 /// call to next in order to cover the max possible UDP datagram size.
275 /// - It is your responsibility to ensure that `BUF_SIZE` is large enough to hold the largest UDP
276 /// datagram your protocol expects; if it is smaller than the incoming datagram size, the datagram
277 /// will be truncated in the output from `next`. You cannot rely on the returned `bytes_read` value
278 /// to indicate truncation as this will also be set to the buffer length, not the full size of the
279 /// truncated message (this is the underlying behaviour of the libc call `recv_from`).
280 /// - All bytes after the actual message will be NULL so it can be directly converted to a String,
281 /// for example, without first slicing. Other data manipulation should take into account the actual length.
282 /// - There are no clear situations which could lead to this returning `None`. Wrapping the
283 /// returned data in an `Option` is done purely to maintain a consistent API with expectations
284 /// on an Iterator / Stream
285 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
286 let evented_socket = self.as_mut().as_evented_socket_pin();
287 match evented_socket.poll_read_ready(cx) {
288 Poll::Ready(is_ready) => match is_ready {
289 Ok(readiness) => match readiness.is_readable() {
290 true => {
291 let mut buf: [u8; BUF_SIZE] = [b'\x00'; BUF_SIZE];
292 let recv = self
293 .as_socket()
294 .recv_from(&mut buf)
295 .map(|(len, addr)| (buf, len, addr));
296 match recv {
297 Ok(_) => Poll::Ready(Some(recv)),
298 Err(e) => self.unblock(Err(e), cx).map_ok(|x| x).map(Some),
299 }
300 }
301 false => self.clear_ready(cx).map_ok(|x| x).map(Some),
302 },
303 Err(e) => self.unblock(Err(e), cx).map_ok(|x| x).map(Some),
304 },
305 Poll::Pending => Poll::Pending,
306 }
307 }
308}
309
310#[derive(Debug)]
311/// A non-blocking async UdpSocket with ability to `send_to` via `send` and make use of all the
312/// niceties that come with [`futures::sink::Sink`] and [`futures::sink::SinkExt`].
313///
314/// #### Note
315/// - This does NOT have exclusive access to the bound port. If you want to guarantee that
316/// no other processes bind to the same socket vote thumbs up on issue #22 TODO: implement
317/// `bind_exclusive` etc.
318pub struct UdpSink {
319 /// The underlying, evented Socket.
320 ///
321 /// #### Note
322 /// - [`futures_net::UdpSocket`] does NOT implement [futures_net::driver::sys::event::Evented]
323 /// and is NOT the same type as stored here.
324 /// - [`futures_net::driver::sys::net::UdpSocket`] is not actually non-blocking, despite the
325 /// documentation.
326 /// - Neither [std::sys::net::UdpSocket], nor [net2::UdpBuilder] expose `set_nonblocking()` so
327 /// we need use [socket2::Socket] while building the listener but are unable to change
328 /// blocking or exclusivity after construction.
329 io: PollEvented<sys::net::UdpSocket>,
330}
331
332impl EventedUdpSocket for UdpSink {
333 fn from_evented_socket(evented_socket: PollEvented<sys::net::UdpSocket>) -> io::Result<Self> {
334 Ok(Self { io: evented_socket })
335 }
336
337 fn as_socket(&self) -> &sys::net::UdpSocket {
338 let io = &self.io;
339 io.get_ref()
340 }
341
342 fn as_socket_mut(&mut self) -> &mut sys::net::UdpSocket {
343 let io = &mut self.io;
344 io.get_mut()
345 }
346
347 fn as_evented_socket_pin(self: Pin<&mut Self>) -> Pin<&mut PollEvented<sys::net::UdpSocket>> {
348 let this = self.get_mut();
349 let io = &mut this.io;
350 Pin::new(&mut *io)
351 }
352
353 fn clear_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<!>> {
354 match self.as_evented_socket_pin().clear_write_ready(cx) {
355 Ok(_) => Poll::Pending,
356 Err(e) => Poll::Ready(Err(e)),
357 }
358 }
359}
360
361impl<A: ToSocketAddrs> Sink<(&[u8], &A)> for UdpSink {
362 type Error = io::Error;
363
364 /// Attempts to prepare the Sink to receive a value.
365 ///
366 /// This method must be called and return Poll::Ready(Ok(())) prior to each call to start_send.
367 ///
368 /// This method returns Poll::Ready once the underlying sink is ready to receive data.
369 /// If this method returns Poll::Pending, the current task is registered to be notified
370 /// (via cx.waker().wake_by_ref()) when poll_ready should be called again.
371 ///
372 /// #### Note
373 ///
374 /// - If the attempt to poll readiness fails this method **will properly handle
375 /// it** by calling [Self::clear_ready]/[Self::unblock] to ensure the underlying socket
376 /// does not remain blocked.
377 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
378 let evented_socket = self.as_mut().as_evented_socket_pin();
379 match evented_socket.poll_write_ready(cx) {
380 Poll::Ready(is_ready) => match is_ready {
381 Ok(readiness) => match readiness.is_writable() {
382 true => Poll::Ready(Ok(())),
383 false => self.clear_ready(cx).map_ok(|x| x),
384 },
385 Err(e) => self.unblock(Err(e), cx).map_ok(|x| x),
386 },
387 Poll::Pending => Poll::Pending,
388 }
389 }
390
391 /// #### Note
392 /// - While this function will accept multiple addresses, currently data is only sent to the
393 /// first one (TODO)
394 /// - If an empty list of addresses the error will be of kind `io::ErrorKind::InvalidInput`
395 fn start_send(self: Pin<&mut Self>, item: (&[u8], &A)) -> Result<(), Self::Error> {
396 let socket = self.as_socket();
397 let (msg, addr) = item;
398 let addr = addr
399 .to_socket_addrs()?
400 .next()
401 .ok_or(io::Error::from(io::ErrorKind::InvalidInput))?;
402 socket.send_to(msg, &addr).and_then(|l| {
403 if l != msg.len() {
404 Err(io::Error::other(format!(
405 "{} bytes sent but message was {} bytes",
406 l,
407 msg.len()
408 )))
409 } else {
410 Ok(())
411 }
412 })
413 }
414
415 /// Await write readiness indicating that all pending messages have been sent, then return
416 /// as a no-op (`UdpSockets` do not have an inherent `flush` method).
417 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
418 <Self as futures::Sink<(&[u8], &A)>>::poll_ready(self, cx)
419 }
420
421 // TODO: it would be nice to be able to annote these situations with as eg `Poll<!>`
422 // is this worth a sub-issue to the tracking issue for `never_type`?
423 /// #### Note
424 /// This only flushes but does not close as no-one exposes the libc `close()`
425 /// call on a `UdpSocket`
426 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
427 <Self as futures::Sink<(&[u8], &A)>>::poll_flush(self, cx)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use std::net::{Ipv4Addr, SocketAddrV4};
434
435 use super::*;
436 use futures::{SinkExt, StreamExt};
437 use futures_net::runtime::Runtime;
438
439 #[futures_net::test]
440 async fn non_blocking() {
441 let loopback = Ipv4Addr::new(127, 0, 0, 1);
442 let addr: SocketAddr = SocketAddrV4::new(loopback, 0).into();
443 let first = UdpStream::<32>::bind(addr).expect("first connection");
444 let addr = first.local_addr().expect("bound port");
445 let _second = UdpStream::<32>::bind(addr).expect("second connection");
446 }
447
448 #[futures_net::test]
449 async fn truncated_next() {
450 let loopback = Ipv4Addr::new(127, 0, 0, 1);
451 let addr: SocketAddr = SocketAddrV4::new(loopback, 0).into();
452 let mut receiver = UdpStream::<8>::bind(addr).expect("receiver");
453 let rec_addr = receiver.local_addr().expect("bound port");
454
455 let mut sender = UdpSink::bind(addr).expect("sender");
456 let original_msg = b"udp loopback test";
457
458 let send = async move {
459 sender
460 .send((original_msg, &rec_addr))
461 .await
462 .expect("send msg");
463 };
464
465 let rec = async {
466 let (msg, len, _sent_by) = receiver
467 .next()
468 .await
469 .expect("a message")
470 .expect("a valid message");
471 // bytes read is limited to buf size - as per libc call recv_from
472 assert_eq!(len, 8);
473 assert_eq!(msg, original_msg[..8]);
474 };
475
476 futures::join!(rec, send);
477 }
478}