Skip to main content

smolvm_network/
queues.rs

1//! Shared queues and wake notifications for the virtio-net backend.
2//!
3//! Context
4//! =======
5//!
6//! The host-side virtio runtime has several independently blocked workers:
7//! - the Unix-stream reader thread
8//! - the Unix-stream writer thread
9//! - the smoltcp poll loop
10//! - TCP relay threads
11//!
12//! They need two kinds of coordination:
13//! 1. lock-free frame handoff between threads
14//! 2. a way to wake a thread that is blocked in `poll(2)` or waiting for work
15//!
16//! This module provides both:
17//! - `ArrayQueue<Vec<u8>>` for frame ownership transfer
18//! - `WakePipe` as a tiny readiness primitive built from `pipe(2)` + `poll(2)`
19//!
20//! Data flow:
21//!
22//! ```text
23//! guest_to_host queue : reader thread  -> smoltcp poll loop
24//! host_to_guest queue : smoltcp runtime -> writer thread
25//!
26//! guest_wake: reader thread / shutdown -> smoltcp poll loop
27//! host_wake : smoltcp runtime / shutdown -> Unix-stream writer
28//! relay_wake: TCP relay threads / shutdown -> smoltcp poll loop
29//! ```
30//!
31//! Thread interaction view:
32//!
33//! ```text
34//! FrameStream reader thread
35//!   -> guest_to_host.push(frame)
36//!   -> guest_wake.wake()
37//!
38//! smolvm-net-poll thread
39//!   -> guest_to_host.pop()
40//!   -> host_to_guest.push(frame)
41//!   -> host_wake.wake()
42//!   -> relay_wake.wait()/drain()
43//!
44//! FrameStream writer thread
45//!   -> host_wake.wait()
46//!   -> host_to_guest.pop()
47//!
48//! TCP relay thread
49//!   -> to_smoltcp.send(payload)
50//!   -> relay_wake.wake()
51//! ```
52
53use crossbeam_queue::ArrayQueue;
54use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
55use std::sync::atomic::{AtomicBool, Ordering};
56use std::sync::Arc;
57use std::time::Duration;
58
59/// Default queue capacity for guest/host ethernet frames.
60pub const DEFAULT_FRAME_QUEUE_CAPACITY: usize = 1024;
61
62/// Shared queues and wake handles for the host-side virtio-net runtime.
63///
64/// One `NetworkFrameQueues` is shared across all helper threads for a single
65/// guest NIC.
66///
67/// A useful mental model is:
68///
69/// ```text
70/// queues  = ownership transfer for frame bytes
71/// wakes   = "go look at the queue now"
72/// shutdown= sticky flag + wake all blocked waiters
73/// ```
74pub struct NetworkFrameQueues {
75    /// Raw ethernet frames emitted by the guest and waiting for smoltcp.
76    pub guest_to_host: ArrayQueue<Vec<u8>>,
77    /// Raw ethernet frames emitted by smoltcp and waiting for libkrun.
78    pub host_to_guest: ArrayQueue<Vec<u8>>,
79    /// Wake the smoltcp poll loop when a guest frame arrives.
80    pub guest_wake: WakePipe,
81    /// Wake the libkrun writer thread when a host frame is ready.
82    pub host_wake: WakePipe,
83    /// Wake the smoltcp poll loop when a TCP relay thread has new data.
84    pub relay_wake: WakePipe,
85    /// Signals that the helper process should shut down.
86    shutting_down: AtomicBool,
87}
88
89impl NetworkFrameQueues {
90    /// Create a new shared queue set wrapped in `Arc`.
91    pub fn shared(capacity: usize) -> Arc<Self> {
92        Arc::new(Self {
93            guest_to_host: ArrayQueue::new(capacity),
94            host_to_guest: ArrayQueue::new(capacity),
95            guest_wake: WakePipe::new(),
96            host_wake: WakePipe::new(),
97            relay_wake: WakePipe::new(),
98            shutting_down: AtomicBool::new(false),
99        })
100    }
101
102    /// Mark the runtime as shutting down and wake all waiters.
103    ///
104    /// The wakes are part of shutdown correctness. Without them, a thread
105    /// blocked in `poll(2)` could sleep indefinitely even though the shutdown
106    /// flag was already set.
107    pub fn begin_shutdown(&self) {
108        self.shutting_down.store(true, Ordering::SeqCst);
109        self.guest_wake.wake();
110        self.host_wake.wake();
111        self.relay_wake.wake();
112    }
113
114    /// Whether shutdown has been requested.
115    pub fn is_shutting_down(&self) -> bool {
116        self.shutting_down.load(Ordering::SeqCst)
117    }
118}
119
120/// Wake notification built on `pipe(2)`.
121///
122/// The pattern is:
123/// - one thread blocks on the read end with `poll(2)`
124/// - another thread writes one byte to the write end to signal "work exists"
125/// - the waiter drains pending bytes before going back to sleep
126///
127/// Why use a pipe here:
128/// - it gives us a real file descriptor that integrates with `poll(2)`
129/// - it works on the Unix platforms smolvm targets
130/// - it is simpler than building a custom condvar + timeout scheme around the
131///   smoltcp loop and Unix-stream writer
132#[derive(Debug)]
133pub struct WakePipe {
134    read_fd: OwnedFd,
135    write_fd: OwnedFd,
136}
137
138impl WakePipe {
139    /// Create a non-blocking wake pipe.
140    ///
141    /// Low-level steps:
142    ///
143    /// ```text
144    /// pipe()               -> create read/write fds
145    /// fcntl(F_SETFL)       -> add O_NONBLOCK
146    /// fcntl(F_SETFD)       -> add FD_CLOEXEC
147    /// wrap in OwnedFd      -> move fd lifetime into Rust ownership
148    /// ```
149    pub fn new() -> Self {
150        let mut fds = [0i32; 2];
151
152        // SAFETY: `pipe` initializes both file descriptors on success.
153        let result = unsafe { libc::pipe(fds.as_mut_ptr()) };
154        assert_eq!(
155            result,
156            0,
157            "pipe() failed: {}",
158            std::io::Error::last_os_error()
159        );
160
161        // SAFETY: both descriptors are valid after a successful `pipe`.
162        unsafe {
163            set_nonblock_cloexec(fds[0]);
164            set_nonblock_cloexec(fds[1]);
165        }
166
167        Self {
168            // SAFETY: ownership of the raw file descriptors transfers here.
169            read_fd: unsafe { OwnedFd::from_raw_fd(fds[0]) },
170            write_fd: unsafe { OwnedFd::from_raw_fd(fds[1]) },
171        }
172    }
173
174    /// Signal the waiting side.
175    ///
176    /// Writing one byte is enough. The byte value itself does not matter; only
177    /// readability of the pipe matters. Multiple writes coalesce naturally into
178    /// "there is pending wake state".
179    pub fn wake(&self) {
180        let byte = [1u8; 1];
181        // SAFETY: the write end is valid and non-blocking.
182        unsafe {
183            libc::write(self.write_fd.as_raw_fd(), byte.as_ptr().cast(), byte.len());
184        }
185    }
186
187    /// Drain all pending wake bytes.
188    ///
189    /// This resets the readiness state after a wake. Because the pipe is
190    /// non-blocking, `read <= 0` means "nothing more to drain right now".
191    pub fn drain(&self) {
192        let mut buf = [0u8; 256];
193        loop {
194            // SAFETY: the read end is valid and non-blocking.
195            let read =
196                unsafe { libc::read(self.read_fd.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) };
197            if read <= 0 {
198                break;
199            }
200        }
201    }
202
203    /// Wait until the pipe is readable or the timeout elapses.
204    ///
205    /// This is the low-level equivalent of "sleep until another thread signals
206    /// me or the timeout expires", but implemented in file-descriptor space so
207    /// it composes with other polling logic.
208    pub fn wait(&self, timeout: Option<Duration>) -> std::io::Result<bool> {
209        let timeout_ms = timeout
210            .map(|duration| duration.as_millis().min(i32::MAX as u128) as i32)
211            .unwrap_or(-1);
212        let mut pollfd = libc::pollfd {
213            fd: self.read_fd.as_raw_fd(),
214            events: libc::POLLIN,
215            revents: 0,
216        };
217
218        // SAFETY: `pollfd` points to a valid descriptor and struct.
219        let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
220        if result < 0 {
221            return Err(std::io::Error::last_os_error());
222        }
223
224        Ok(result > 0 && pollfd.revents & libc::POLLIN != 0)
225    }
226
227    /// File descriptor for `poll(2)`.
228    ///
229    /// Callers should treat this as a borrowed readiness handle, not as an fd
230    /// they own or may close.
231    pub fn as_raw_fd(&self) -> RawFd {
232        self.read_fd.as_raw_fd()
233    }
234}
235
236impl Clone for WakePipe {
237    /// Clone by duplicating both file descriptors.
238    ///
239    /// Each clone refers to the same underlying pipe objects, so waking or
240    /// draining from any clone affects the shared readiness state.
241    fn clone(&self) -> Self {
242        let read_fd = self
243            .read_fd
244            .try_clone()
245            .expect("wake pipe read fd should be clonable");
246        let write_fd = self
247            .write_fd
248            .try_clone()
249            .expect("wake pipe write fd should be clonable");
250        Self { read_fd, write_fd }
251    }
252}
253
254impl Default for WakePipe {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260/// Set `O_NONBLOCK` and `FD_CLOEXEC` on a file descriptor.
261///
262/// # Safety
263///
264/// `fd` must be a valid open file descriptor.
265///
266/// Why these flags matter:
267/// - `O_NONBLOCK`: wake helpers should never hang the runtime on a read/write
268///   path that is supposed to be just a signal
269/// - `FD_CLOEXEC`: if smolvm later `exec`s another process, these internal
270///   coordination fds should not leak into that child
271unsafe fn set_nonblock_cloexec(fd: RawFd) {
272    // SAFETY: caller guarantees `fd` is valid.
273    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
274    assert!(
275        flags >= 0,
276        "fcntl(F_GETFL) failed: {}",
277        std::io::Error::last_os_error()
278    );
279    // SAFETY: caller guarantees `fd` is valid.
280    let result = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) };
281    assert!(
282        result >= 0,
283        "fcntl(F_SETFL) failed: {}",
284        std::io::Error::last_os_error()
285    );
286
287    // SAFETY: caller guarantees `fd` is valid.
288    let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
289    assert!(
290        flags >= 0,
291        "fcntl(F_GETFD) failed: {}",
292        std::io::Error::last_os_error()
293    );
294    // SAFETY: caller guarantees `fd` is valid.
295    let result = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
296    assert!(
297        result >= 0,
298        "fcntl(F_SETFD) failed: {}",
299        std::io::Error::last_os_error()
300    );
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn wake_pipe_round_trip() {
309        let pipe = WakePipe::new();
310        pipe.wake();
311        assert!(pipe.wait(Some(Duration::from_millis(10))).unwrap());
312        pipe.drain();
313        assert!(!pipe.wait(Some(Duration::from_millis(1))).unwrap());
314    }
315
316    #[test]
317    fn queues_are_fifo() {
318        let queues = NetworkFrameQueues::shared(4);
319        queues.guest_to_host.push(vec![1, 2, 3]).unwrap();
320        queues.guest_to_host.push(vec![4, 5, 6]).unwrap();
321
322        assert_eq!(queues.guest_to_host.pop(), Some(vec![1, 2, 3]));
323        assert_eq!(queues.guest_to_host.pop(), Some(vec![4, 5, 6]));
324        assert_eq!(queues.guest_to_host.pop(), None);
325    }
326}