smolvm_network/queues.rs
1//! Shared queues and wake notifications for the virtio-net backend.
2//!
3//! Context
4//! =======
5//!
6//! The host-side virtio runtime has several independently blocked workers:
7//! - the Unix-stream reader thread
8//! - the Unix-stream writer thread
9//! - the smoltcp poll loop
10//! - TCP relay threads
11//!
12//! They need two kinds of coordination:
13//! 1. lock-free frame handoff between threads
14//! 2. a way to wake a thread that is blocked in `poll(2)` or waiting for work
15//!
16//! This module provides both:
17//! - `ArrayQueue<Vec<u8>>` for frame ownership transfer
18//! - `WakePipe` as a tiny readiness primitive built from `pipe(2)` + `poll(2)`
19//!
20//! Data flow:
21//!
22//! ```text
23//! guest_to_host queue : reader thread -> smoltcp poll loop
24//! host_to_guest queue : smoltcp runtime -> writer thread
25//!
26//! guest_wake: reader thread / shutdown -> smoltcp poll loop
27//! host_wake : smoltcp runtime / shutdown -> Unix-stream writer
28//! relay_wake: TCP relay threads / shutdown -> smoltcp poll loop
29//! ```
30//!
31//! Thread interaction view:
32//!
33//! ```text
34//! FrameStream reader thread
35//! -> guest_to_host.push(frame)
36//! -> guest_wake.wake()
37//!
38//! smolvm-net-poll thread
39//! -> guest_to_host.pop()
40//! -> host_to_guest.push(frame)
41//! -> host_wake.wake()
42//! -> relay_wake.wait()/drain()
43//!
44//! FrameStream writer thread
45//! -> host_wake.wait()
46//! -> host_to_guest.pop()
47//!
48//! TCP relay thread
49//! -> to_smoltcp.send(payload)
50//! -> relay_wake.wake()
51//! ```
52
53use crossbeam_queue::ArrayQueue;
54use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::sync::Arc;
57use std::time::Duration;
58
59/// Default queue capacity for guest/host ethernet frames.
60pub const DEFAULT_FRAME_QUEUE_CAPACITY: usize = 1024;
61
62/// Shared queues and wake handles for the host-side virtio-net runtime.
63///
64/// One `NetworkFrameQueues` is shared across all helper threads for a single
65/// guest NIC.
66///
67/// A useful mental model is:
68///
69/// ```text
70/// queues = ownership transfer for frame bytes
71/// wakes = "go look at the queue now"
72/// shutdown= sticky flag + wake all blocked waiters
73/// ```
74pub struct NetworkFrameQueues {
75 /// Raw ethernet frames emitted by the guest and waiting for smoltcp.
76 pub guest_to_host: ArrayQueue<Vec<u8>>,
77 /// Raw ethernet frames emitted by smoltcp and waiting for libkrun.
78 pub host_to_guest: ArrayQueue<Vec<u8>>,
79 /// Wake the smoltcp poll loop when a guest frame arrives.
80 pub guest_wake: WakePipe,
81 /// Wake the libkrun writer thread when a host frame is ready.
82 pub host_wake: WakePipe,
83 /// Wake the smoltcp poll loop when a TCP relay thread has new data.
84 pub relay_wake: WakePipe,
85 /// Signals that the helper process should shut down.
86 shutting_down: AtomicBool,
87}
88
89impl NetworkFrameQueues {
90 /// Create a new shared queue set wrapped in `Arc`.
91 pub fn shared(capacity: usize) -> Arc<Self> {
92 Arc::new(Self {
93 guest_to_host: ArrayQueue::new(capacity),
94 host_to_guest: ArrayQueue::new(capacity),
95 guest_wake: WakePipe::new(),
96 host_wake: WakePipe::new(),
97 relay_wake: WakePipe::new(),
98 shutting_down: AtomicBool::new(false),
99 })
100 }
101
102 /// Mark the runtime as shutting down and wake all waiters.
103 ///
104 /// The wakes are part of shutdown correctness. Without them, a thread
105 /// blocked in `poll(2)` could sleep indefinitely even though the shutdown
106 /// flag was already set.
107 pub fn begin_shutdown(&self) {
108 self.shutting_down.store(true, Ordering::SeqCst);
109 self.guest_wake.wake();
110 self.host_wake.wake();
111 self.relay_wake.wake();
112 }
113
114 /// Whether shutdown has been requested.
115 pub fn is_shutting_down(&self) -> bool {
116 self.shutting_down.load(Ordering::SeqCst)
117 }
118}
119
120/// Wake notification built on `pipe(2)`.
121///
122/// The pattern is:
123/// - one thread blocks on the read end with `poll(2)`
124/// - another thread writes one byte to the write end to signal "work exists"
125/// - the waiter drains pending bytes before going back to sleep
126///
127/// Why use a pipe here:
128/// - it gives us a real file descriptor that integrates with `poll(2)`
129/// - it works on the Unix platforms smolvm targets
130/// - it is simpler than building a custom condvar + timeout scheme around the
131/// smoltcp loop and Unix-stream writer
132#[derive(Debug)]
133pub struct WakePipe {
134 read_fd: OwnedFd,
135 write_fd: OwnedFd,
136}
137
138impl WakePipe {
139 /// Create a non-blocking wake pipe.
140 ///
141 /// Low-level steps:
142 ///
143 /// ```text
144 /// pipe() -> create read/write fds
145 /// fcntl(F_SETFL) -> add O_NONBLOCK
146 /// fcntl(F_SETFD) -> add FD_CLOEXEC
147 /// wrap in OwnedFd -> move fd lifetime into Rust ownership
148 /// ```
149 pub fn new() -> Self {
150 let mut fds = [0i32; 2];
151
152 // SAFETY: `pipe` initializes both file descriptors on success.
153 let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
154 assert_eq!(
155 result,
156 0,
157 "pipe() failed: {}",
158 std::io::Error::last_os_error()
159 );
160
161 // SAFETY: both descriptors are valid after a successful `pipe`.
162 unsafe {
163 set_nonblock_cloexec(fds[0]);
164 set_nonblock_cloexec(fds[1]);
165 }
166
167 Self {
168 // SAFETY: ownership of the raw file descriptors transfers here.
169 read_fd: unsafe { OwnedFd::from_raw_fd(fds[0]) },
170 write_fd: unsafe { OwnedFd::from_raw_fd(fds[1]) },
171 }
172 }
173
174 /// Signal the waiting side.
175 ///
176 /// Writing one byte is enough. The byte value itself does not matter; only
177 /// readability of the pipe matters. Multiple writes coalesce naturally into
178 /// "there is pending wake state".
179 pub fn wake(&self) {
180 let byte = [1u8; 1];
181 // SAFETY: the write end is valid and non-blocking.
182 unsafe {
183 libc::write(self.write_fd.as_raw_fd(), byte.as_ptr().cast(), byte.len());
184 }
185 }
186
187 /// Drain all pending wake bytes.
188 ///
189 /// This resets the readiness state after a wake. Because the pipe is
190 /// non-blocking, `read <= 0` means "nothing more to drain right now".
191 pub fn drain(&self) {
192 let mut buf = [0u8; 256];
193 loop {
194 // SAFETY: the read end is valid and non-blocking.
195 let read =
196 unsafe { libc::read(self.read_fd.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) };
197 if read <= 0 {
198 break;
199 }
200 }
201 }
202
203 /// Wait until the pipe is readable or the timeout elapses.
204 ///
205 /// This is the low-level equivalent of "sleep until another thread signals
206 /// me or the timeout expires", but implemented in file-descriptor space so
207 /// it composes with other polling logic.
208 pub fn wait(&self, timeout: Option<Duration>) -> std::io::Result<bool> {
209 let timeout_ms = timeout
210 .map(|duration| duration.as_millis().min(i32::MAX as u128) as i32)
211 .unwrap_or(-1);
212 let mut pollfd = libc::pollfd {
213 fd: self.read_fd.as_raw_fd(),
214 events: libc::POLLIN,
215 revents: 0,
216 };
217
218 // SAFETY: `pollfd` points to a valid descriptor and struct.
219 let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
220 if result < 0 {
221 return Err(std::io::Error::last_os_error());
222 }
223
224 Ok(result > 0 && pollfd.revents & libc::POLLIN != 0)
225 }
226
227 /// File descriptor for `poll(2)`.
228 ///
229 /// Callers should treat this as a borrowed readiness handle, not as an fd
230 /// they own or may close.
231 pub fn as_raw_fd(&self) -> RawFd {
232 self.read_fd.as_raw_fd()
233 }
234}
235
236impl Clone for WakePipe {
237 /// Clone by duplicating both file descriptors.
238 ///
239 /// Each clone refers to the same underlying pipe objects, so waking or
240 /// draining from any clone affects the shared readiness state.
241 fn clone(&self) -> Self {
242 let read_fd = self
243 .read_fd
244 .try_clone()
245 .expect("wake pipe read fd should be clonable");
246 let write_fd = self
247 .write_fd
248 .try_clone()
249 .expect("wake pipe write fd should be clonable");
250 Self { read_fd, write_fd }
251 }
252}
253
254impl Default for WakePipe {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260/// Set `O_NONBLOCK` and `FD_CLOEXEC` on a file descriptor.
261///
262/// # Safety
263///
264/// `fd` must be a valid open file descriptor.
265///
266/// Why these flags matter:
267/// - `O_NONBLOCK`: wake helpers should never hang the runtime on a read/write
268/// path that is supposed to be just a signal
269/// - `FD_CLOEXEC`: if smolvm later `exec`s another process, these internal
270/// coordination fds should not leak into that child
271unsafe fn set_nonblock_cloexec(fd: RawFd) {
272 // SAFETY: caller guarantees `fd` is valid.
273 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
274 assert!(
275 flags >= 0,
276 "fcntl(F_GETFL) failed: {}",
277 std::io::Error::last_os_error()
278 );
279 // SAFETY: caller guarantees `fd` is valid.
280 let result = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) };
281 assert!(
282 result >= 0,
283 "fcntl(F_SETFL) failed: {}",
284 std::io::Error::last_os_error()
285 );
286
287 // SAFETY: caller guarantees `fd` is valid.
288 let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
289 assert!(
290 flags >= 0,
291 "fcntl(F_GETFD) failed: {}",
292 std::io::Error::last_os_error()
293 );
294 // SAFETY: caller guarantees `fd` is valid.
295 let result = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
296 assert!(
297 result >= 0,
298 "fcntl(F_SETFD) failed: {}",
299 std::io::Error::last_os_error()
300 );
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn wake_pipe_round_trip() {
309 let pipe = WakePipe::new();
310 pipe.wake();
311 assert!(pipe.wait(Some(Duration::from_millis(10))).unwrap());
312 pipe.drain();
313 assert!(!pipe.wait(Some(Duration::from_millis(1))).unwrap());
314 }
315
316 #[test]
317 fn queues_are_fifo() {
318 let queues = NetworkFrameQueues::shared(4);
319 queues.guest_to_host.push(vec![1, 2, 3]).unwrap();
320 queues.guest_to_host.push(vec![4, 5, 6]).unwrap();
321
322 assert_eq!(queues.guest_to_host.pop(), Some(vec![1, 2, 3]));
323 assert_eq!(queues.guest_to_host.pop(), Some(vec![4, 5, 6]));
324 assert_eq!(queues.guest_to_host.pop(), None);
325 }
326}