supermachine 0.7.62

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
// Status: minimal — bounded RX-intent queue. Producers (the muxer +
// proxies) push RxPackets here; the vsock device drains them into the
// virtq RX descriptors when the guest provides them.
//
// The queue is shared across ALL of a guest's connections. Per-conn
// vsock credit (ProxyConnState) normally bounds in-flight host→guest
// data far below the cap, so the cap is a backstop. When it IS reached
// (a guest advertising a very large buf_alloc, or high connection
// fan-out summing past the shared cap), producers apply BACK-PRESSURE
// rather than drop: `push_rxq_and_kick` hands the packet back to the
// caller, which stashes it in `ProxyConnState::pending_rx` and pauses
// the connection's host-socket reads. handle_rw / handle_credit re-push
// the stash and resume once the guest drains the queue. Zero byte loss.

use super::packet::RxPacket;
use std::collections::VecDeque;

/// Hard cap on queued RX packets, shared across all of a guest's
/// connections.
pub const MUXER_RXQ_SIZE: usize = 16 * 1024;

pub struct MuxerRxQ {
    queue: VecDeque<RxPacket>,
    cap: usize,
}

impl MuxerRxQ {
    pub fn new() -> Self {
        Self::with_cap(MUXER_RXQ_SIZE)
    }

    /// Construct with an explicit cap. Used by unit tests to force the
    /// backpressure path deterministically; production uses `new`.
    pub fn with_cap(cap: usize) -> Self {
        let cap = cap.max(1);
        Self {
            queue: VecDeque::with_capacity(cap.min(MUXER_RXQ_SIZE)),
            cap,
        }
    }

    pub fn push(&mut self, pkt: RxPacket) -> bool {
        if self.queue.len() >= self.cap {
            return false;
        }
        self.queue.push_back(pkt);
        true
    }

    /// Push and report whether the queue was empty *before* the push.
    /// Callers that route this into the device-kick path only need to
    /// raise the IRQ on the empty → non-empty transition: a follow-up
    /// push during the in-flight drain is already visible to the vCPU's
    /// RX loop. Saves one `hv_gic_set_spi` per coalesced packet.
    ///
    /// Returns `Err(pkt)` — handing the packet back, NOT dropping it —
    /// when the queue is at its cap, so the caller can stash it and
    /// back-pressure the connection (see module docs), preserving every
    /// byte. `Ok(was_empty)` on success.
    pub fn push_was_empty(&mut self, pkt: RxPacket) -> Result<bool, RxPacket> {
        if self.queue.len() >= self.cap {
            return Err(pkt);
        }
        let was_empty = self.queue.is_empty();
        self.queue.push_back(pkt);
        Ok(was_empty)
    }
    pub fn pop(&mut self) -> Option<RxPacket> {
        self.queue.pop_front()
    }
    pub fn peek(&self) -> Option<&RxPacket> {
        self.queue.front()
    }
    pub fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }
    pub fn len(&self) -> usize {
        self.queue.len()
    }
    pub fn drain(&mut self) {
        self.queue.clear();
    }
}

#[cfg(test)]
mod tests {
    use super::super::packet::Header;
    use super::*;

    fn pkt(len: usize) -> RxPacket {
        RxPacket {
            hdr: Header {
                src_cid: 0,
                dst_cid: 0,
                src_port: 0,
                dst_port: 0,
                len: len as u32,
                type_: 0,
                op: 0,
                flags: 0,
                buf_alloc: 0,
                fwd_cnt: 0,
            },
            data: vec![0u8; len],
        }
    }

    #[test]
    fn push_signals_full_at_cap_without_growing() {
        let mut q = MuxerRxQ::with_cap(4);
        for _ in 0..4 {
            assert!(q.push(pkt(1)));
        }
        // At cap, both push paths signal "full" and the queue does NOT
        // grow — the caller back-pressures instead of dropping. The
        // overflowed packet is handed back (Err), never dropped.
        assert!(!q.push(pkt(1)));
        assert!(q.push_was_empty(pkt(1)).is_err());
        assert_eq!(q.len(), 4);
        // Draining one frees a slot again (the device consuming → resume).
        assert!(q.pop().is_some());
        assert!(matches!(q.push_was_empty(pkt(1)), Ok(false)));
        assert_eq!(q.len(), 4);
    }
}