Skip to main content

virtual_socket/
lib.rs

1//! Virtual UDP sockets that share a single physical [`tokio::net::UdpSocket`].
2//!
3//! Pattern: a single physical UDP socket is owned by a top-level
4//! "router" task that reads from it. The router demultiplexes each
5//! inbound datagram to one of several [`VirtualUdpSocket`]s — by peer
6//! address, by some identifier inside the payload (e.g. a QUIC
7//! Connection ID), by listener kind, or any other rule the router
8//! chooses. Each virtual socket has its own bounded inbound queue;
9//! consumers drain that queue via [`VirtualUdpSocket::poll_dequeue`]
10//! or [`VirtualUdpSocket::try_dequeue`].
11//!
12//! Outbound is mux: every virtual socket forwards
13//! [`VirtualUdpSocket::try_send_to`] / [`VirtualUdpSocket::poll_send_ready`]
14//! to the shared physical socket, so multiple consumers can write
15//! through the same OS endpoint without contention beyond what the OS
16//! itself imposes.
17//!
18//! This crate is transport-policy free: it does not parse datagrams,
19//! does not own a routing table, and does not implement any
20//! application protocol. Pair it with whatever demultiplex strategy
21//! the calling system needs (e.g. peer-address fan-in, QUIC CID
22//! demux, DNS query-ID dispatch).
23//!
24//! For an adapter that exposes [`VirtualUdpSocket`] as a
25//! [`quinn::AsyncUdpSocket`](https://docs.rs/quinn), see the
26//! `quinn-shared-socket` crate.
27
28use std::collections::VecDeque;
29use std::io;
30use std::net::SocketAddr;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::task::{Context, Poll, Waker};
34
35use bytes::Bytes;
36use parking_lot::Mutex;
37use tokio::net::UdpSocket;
38
39/// Default inbound queue capacity. Matches a single high-throughput
40/// QUIC endpoint's burst window without letting a single misbehaving
41/// session starve siblings sharing the physical socket.
42pub const DEFAULT_INBOUND_CAPACITY: usize = 256;
43
44/// One virtual UDP socket sharing the physical socket given at
45/// construction time. Inbound datagrams arrive only via
46/// [`VirtualUdpSocket::enqueue_inbound`] (called by the router that
47/// owns the physical socket); outbound datagrams pass through to the
48/// physical socket unchanged.
49///
50/// Cloning the `Arc<VirtualUdpSocket>` shares state — both clones see
51/// the same inbound queue, the same physical socket, and the same
52/// closed flag. This is intended: typical use installs one clone in
53/// the router's dispatch table and hands another to the consumer
54/// task.
55pub struct VirtualUdpSocket {
56	physical: Arc<UdpSocket>,
57	inbound: Mutex<Inbound>,
58	closed: AtomicBool,
59}
60
61struct Inbound {
62	queue: VecDeque<(SocketAddr, Bytes)>,
63	waker: Option<Waker>,
64	capacity: usize,
65}
66
67impl std::fmt::Debug for VirtualUdpSocket {
68	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69		f.debug_struct("VirtualUdpSocket")
70			.field("closed", &self.closed.load(Ordering::Relaxed))
71			.finish_non_exhaustive()
72	}
73}
74
75impl VirtualUdpSocket {
76	/// Build a virtual socket against `physical` with
77	/// [`DEFAULT_INBOUND_CAPACITY`].
78	#[must_use]
79	pub fn new(physical: Arc<UdpSocket>) -> Arc<Self> {
80		Self::new_with_capacity(physical, DEFAULT_INBOUND_CAPACITY)
81	}
82
83	/// Build a virtual socket against `physical` with a custom
84	/// inbound queue capacity. `capacity` of zero is allowed but
85	/// drops every enqueue.
86	#[must_use]
87	pub fn new_with_capacity(physical: Arc<UdpSocket>, capacity: usize) -> Arc<Self> {
88		Arc::new(Self {
89			physical,
90			inbound: Mutex::new(Inbound { queue: VecDeque::new(), waker: None, capacity }),
91			closed: AtomicBool::new(false),
92		})
93	}
94
95	/// Local address of the underlying physical socket.
96	///
97	/// # Errors
98	///
99	/// Surfaces the OS error from [`tokio::net::UdpSocket::local_addr`].
100	pub fn local_addr(&self) -> io::Result<SocketAddr> {
101		self.physical.local_addr()
102	}
103
104	/// Push one datagram onto this virtual socket's inbound queue.
105	/// Called by the router that owns the physical socket. Drops the
106	/// datagram silently when the queue is full or the socket has
107	/// been closed — UDP is lossy by design and stalling the router
108	/// would block every other virtual socket sharing the physical
109	/// endpoint. A `tracing::warn!` records each drop.
110	pub fn enqueue_inbound(&self, peer: SocketAddr, datagram: Bytes) {
111		if self.closed.load(Ordering::Relaxed) {
112			tracing::warn!(?peer, "virtual udp socket closed; dropping inbound datagram");
113			return;
114		}
115		let mut inbound = self.inbound.lock();
116		if inbound.queue.len() >= inbound.capacity {
117			tracing::warn!(?peer, "virtual udp socket inbound queue full; dropping datagram");
118			return;
119		}
120		inbound.queue.push_back((peer, datagram));
121		if let Some(w) = inbound.waker.take() {
122			w.wake();
123		}
124	}
125
126	/// Pop the head of the inbound queue without blocking. Returns
127	/// `None` when the queue is empty (regardless of closed state).
128	pub fn try_dequeue(&self) -> Option<(SocketAddr, Bytes)> {
129		self.inbound.lock().queue.pop_front()
130	}
131
132	/// Poll for the next inbound datagram.
133	///
134	/// - `Poll::Ready(Some((peer, datagram)))` when a datagram is
135	///   available.
136	/// - `Poll::Ready(None)` when [`Self::close`] has been called and
137	///   the queue has been fully drained — the caller can treat
138	///   this as a clean end-of-stream.
139	/// - `Poll::Pending` otherwise; the waker from `cx` is registered
140	///   and woken on the next [`Self::enqueue_inbound`] /
141	///   [`Self::close`].
142	pub fn poll_dequeue(&self, cx: &mut Context<'_>) -> Poll<Option<(SocketAddr, Bytes)>> {
143		let mut inbound = self.inbound.lock();
144		if let Some(item) = inbound.queue.pop_front() {
145			return Poll::Ready(Some(item));
146		}
147		if self.closed.load(Ordering::Relaxed) {
148			return Poll::Ready(None);
149		}
150		inbound.waker = Some(cx.waker().clone());
151		Poll::Pending
152	}
153
154	/// Forward an outbound datagram to the physical socket without
155	/// blocking. Surfaces `WouldBlock` to the caller — wait via
156	/// [`Self::poll_send_ready`] before retrying.
157	///
158	/// # Errors
159	///
160	/// Surfaces the OS error from
161	/// [`tokio::net::UdpSocket::try_send_to`].
162	pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
163		self.physical.try_send_to(buf, target)
164	}
165
166	/// Wait until the physical socket is writable. Proxies to
167	/// [`tokio::net::UdpSocket::poll_send_ready`].
168	///
169	/// # Errors
170	///
171	/// Surfaces the OS error from `poll_send_ready`.
172	pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
173		self.physical.poll_send_ready(cx)
174	}
175
176	/// Borrow the underlying physical socket. Useful when callers
177	/// need to invoke socket-level methods (e.g. setting buffer
178	/// sizes) that virtual-socket does not surface.
179	#[must_use]
180	pub fn physical(&self) -> &Arc<UdpSocket> {
181		&self.physical
182	}
183
184	/// Mark this virtual socket closed. New
185	/// [`Self::enqueue_inbound`] calls are dropped; existing queued
186	/// datagrams remain drainable. Once the queue is empty,
187	/// [`Self::poll_dequeue`] returns `Poll::Ready(None)` so consumer
188	/// tasks can exit cleanly. Idempotent.
189	pub fn close(&self) {
190		let already = self.closed.swap(true, Ordering::Relaxed);
191		if !already {
192			// Wake the consumer so it observes the close on the next
193			// poll, even if no datagrams arrive after this point.
194			if let Some(w) = self.inbound.lock().waker.take() {
195				w.wake();
196			}
197		}
198	}
199
200	/// Whether [`Self::close`] has been called.
201	#[must_use]
202	pub fn is_closed(&self) -> bool {
203		self.closed.load(Ordering::Relaxed)
204	}
205
206	/// Inbound queue length. Useful for metrics / diagnostics.
207	#[must_use]
208	pub fn inbound_len(&self) -> usize {
209		self.inbound.lock().queue.len()
210	}
211}
212
213#[cfg(test)]
214mod tests {
215	use std::future::poll_fn;
216	use std::net::Ipv4Addr;
217
218	use bytes::Bytes;
219	use tokio::net::UdpSocket;
220
221	use super::*;
222
223	async fn bound() -> (Arc<UdpSocket>, SocketAddr) {
224		let s = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).await.expect("bind");
225		let a = s.local_addr().expect("local_addr");
226		(Arc::new(s), a)
227	}
228
229	#[tokio::test]
230	async fn try_dequeue_returns_none_when_empty() {
231		let (phys, _) = bound().await;
232		let v = VirtualUdpSocket::new(phys);
233		assert!(v.try_dequeue().is_none());
234	}
235
236	#[tokio::test]
237	async fn enqueue_then_dequeue_roundtrip() {
238		let (phys, _) = bound().await;
239		let v = VirtualUdpSocket::new(phys);
240		let peer: SocketAddr = "192.0.2.1:443".parse().unwrap();
241		v.enqueue_inbound(peer, Bytes::from_static(b"hello"));
242		v.enqueue_inbound(peer, Bytes::from_static(b"world"));
243		let (p1, d1) = v.try_dequeue().unwrap();
244		assert_eq!(p1, peer);
245		assert_eq!(&*d1, b"hello");
246		let (_, d2) = v.try_dequeue().unwrap();
247		assert_eq!(&*d2, b"world");
248		assert!(v.try_dequeue().is_none());
249	}
250
251	#[tokio::test]
252	async fn poll_dequeue_pending_then_woken_on_enqueue() {
253		let (phys, _) = bound().await;
254		let v = VirtualUdpSocket::new(phys);
255		let peer: SocketAddr = "192.0.2.2:443".parse().unwrap();
256		let v_for_task = Arc::clone(&v);
257		let waker_task = tokio::spawn(async move { poll_fn(|cx| v_for_task.poll_dequeue(cx)).await });
258		// Yield so the task registers its waker.
259		tokio::task::yield_now().await;
260		v.enqueue_inbound(peer, Bytes::from_static(b"X"));
261		let (got_peer, got_data) = waker_task.await.unwrap().expect("dequeue ok");
262		assert_eq!(got_peer, peer);
263		assert_eq!(&*got_data, b"X");
264	}
265
266	#[tokio::test]
267	async fn full_queue_drops_overflow() {
268		let (phys, _) = bound().await;
269		let v = VirtualUdpSocket::new_with_capacity(phys, 2);
270		let peer: SocketAddr = "192.0.2.3:443".parse().unwrap();
271		v.enqueue_inbound(peer, Bytes::from_static(&[1]));
272		v.enqueue_inbound(peer, Bytes::from_static(&[2]));
273		// This one is dropped (capacity is 2).
274		v.enqueue_inbound(peer, Bytes::from_static(&[3]));
275		assert_eq!(v.inbound_len(), 2);
276		assert_eq!(&*v.try_dequeue().unwrap().1, &[1]);
277		assert_eq!(&*v.try_dequeue().unwrap().1, &[2]);
278		assert!(v.try_dequeue().is_none());
279	}
280
281	#[tokio::test]
282	async fn close_drops_subsequent_enqueues_and_yields_none_after_drain() {
283		let (phys, _) = bound().await;
284		let v = VirtualUdpSocket::new(phys);
285		let peer: SocketAddr = "192.0.2.4:443".parse().unwrap();
286		v.enqueue_inbound(peer, Bytes::from_static(b"A"));
287		v.close();
288		// Existing items still drain.
289		assert_eq!(&*v.try_dequeue().unwrap().1, b"A");
290		// New enqueues are dropped.
291		v.enqueue_inbound(peer, Bytes::from_static(b"B"));
292		assert!(v.try_dequeue().is_none());
293		// poll_dequeue returns Ready(None) after drain + close.
294		let r = poll_fn(|cx| v.poll_dequeue(cx)).await;
295		assert!(r.is_none());
296	}
297
298	#[tokio::test]
299	async fn try_send_to_proxies_physical() {
300		let (phys_a, addr_a) = bound().await;
301		let (phys_b, addr_b) = bound().await;
302		let v = VirtualUdpSocket::new(phys_a);
303		// Outbound from v reaches phys_b.
304		// poll_send_ready on the physical socket so we don't race the OS buffer state.
305		poll_fn(|cx| v.poll_send_ready(cx)).await.expect("send_ready");
306		let n = v.try_send_to(b"PING", addr_b).expect("send");
307		assert_eq!(n, 4);
308		let mut buf = [0u8; 16];
309		let (got, from) = phys_b.recv_from(&mut buf).await.expect("recv");
310		assert_eq!(&buf[..got], b"PING");
311		assert_eq!(from, addr_a);
312	}
313
314	#[tokio::test]
315	async fn local_addr_matches_physical() {
316		let (phys, addr) = bound().await;
317		let v = VirtualUdpSocket::new(phys);
318		assert_eq!(v.local_addr().unwrap(), addr);
319	}
320}