supermachine 0.7.2

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
// Status: minimal but FUNCTIONAL — parses TX packets, replies RST
// to any REQUEST (kernel observes a clean refusal instead of
// timing out), pushes responses back via the RX queue, raises IRQ.
// Real connection handling lives in the (next-commit) muxer.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use super::super::queue::Queue;
use super::super::{VirtioDevice, VIRTIO_ID_VSOCK};
use super::muxer::VsockMuxer;
use super::muxer_thread;
use super::packet::{Header, RxPacket, VSOCK_PKT_HDR_SIZE};

pub const RXQ_INDEX: usize = 0;
pub const TXQ_INDEX: usize = 1;
pub const EVQ_INDEX: usize = 2;

pub struct Vsock {
    cid: u64,
    queues: Mutex<Vec<Queue>>,
    activated: std::sync::atomic::AtomicBool,
    /// Queue of RX packets waiting to land in guest descriptors.
    /// Drained on RXQ notify or after TX dispatch.
    pending_rx: Mutex<VecDeque<RxPacket>>,
    irq_raise: Mutex<Option<Arc<dyn Fn() + Send + Sync>>>,
    /// Muxer that routes incoming TX → outgoing RX. Owns the
    /// connection table + TSI control state. Arc so accept-thread
    /// closures can hold references back to it.
    muxer: Arc<VsockMuxer>,
}

impl Vsock {
    pub fn new(cid: u64) -> Result<Self, muxer_thread::StartError> {
        Self::with_tsi_token(cid, None)
    }

    /// Like [`Vsock::new`] but pre-arms the muxer with a TSI
    /// control-channel auth token. Mismatched / unprefixed control
    /// DGRAMs are silently dropped. See [`super::muxer`].
    pub fn with_tsi_token(
        cid: u64,
        token: Option<[u8; super::muxer::TSI_TOKEN_LEN]>,
    ) -> Result<Self, muxer_thread::StartError> {
        Ok(Self {
            cid,
            queues: Mutex::new(Vec::new()),
            activated: std::sync::atomic::AtomicBool::new(false),
            pending_rx: Mutex::new(VecDeque::new()),
            irq_raise: Mutex::new(None),
            muxer: Arc::new(VsockMuxer::with_tsi_token(cid, token)?),
        })
    }

    pub fn muxer(&self) -> &Arc<VsockMuxer> {
        &self.muxer
    }

    /// External wake: drain any pending RX (from device or muxer)
    /// into the guest's RX virtq. Called by the muxer's accept
    /// thread after pushing a REQUEST.
    pub fn kick(&self) {
        self.try_drain_rx();
    }

    pub fn set_irq_raise(&self, f: Arc<dyn Fn() + Send + Sync>) {
        *self.irq_raise.lock().unwrap() = Some(f);
    }

    pub fn cid(&self) -> u64 {
        self.cid
    }

    /// Drain the per-dispatch RX backlog. Pool-worker mode calls
    /// this in tandem with `VsockMuxer::reset` between RESTOREs so
    /// the next dispatch starts with no leftover packets.
    pub fn reset_pending_rx(&self) {
        self.pending_rx.lock().unwrap().clear();
    }

    /// True when the vsock dataplane has no active per-request state.
    pub fn is_transport_idle(&self) -> bool {
        self.pending_rx.lock().unwrap().is_empty() && self.muxer.is_transport_idle()
    }

    /// Push an RX packet for the guest. Drains opportunistically;
    /// queued otherwise. Future muxer callers use this to deliver
    /// inbound TCP→vsock data.
    pub fn push_rx(&self, pkt: RxPacket) {
        self.pending_rx.lock().unwrap().push_back(pkt);
        self.try_drain_rx();
    }

    /// Try to land queued RX packets into guest RX descriptors.
    fn try_drain_rx(&self) {
        if !self.activated.load(std::sync::atomic::Ordering::Acquire) {
            return;
        }
        // First, pull any muxer-pushed RxPackets into our pending
        // (they came from accept threads etc.).
        {
            let mut mq = self.muxer.rxq.lock().unwrap();
            let mut p = self.pending_rx.lock().unwrap();
            while let Some(pkt) = mq.pop() {
                p.push_back(pkt);
            }
        }
        let mut qs = self.queues.lock().unwrap();
        let q = match qs.get_mut(RXQ_INDEX) {
            Some(q) => q,
            None => return,
        };
        if !q.ready {
            return;
        }
        let mut pending = self.pending_rx.lock().unwrap();
        let n_pending = pending.len();
        let mut any = false;
        while let Some(pkt) = pending.front() {
            // Pop a descriptor chain head from RX avail.
            let (head, chain) = match q.pop_chain() {
                Some(p) => p,
                None => break, // no descriptors free; leave queued
            };
            let first = chain[0];
            // Header writable into first descriptor (guest provides
            // a writable buffer in RX queue).
            let mut hdr = pkt.hdr;
            hdr.len = pkt.data.len() as u32;
            let hdr_bytes = hdr.encode();
            q.mem.write_slice(first.addr, &hdr_bytes);
            let mut written = VSOCK_PKT_HDR_SIZE as u32;
            if !pkt.data.is_empty() {
                // Linux 6.2+: payload may follow header in the SAME
                // descriptor; older kernels split. Walk the chain
                // until we've placed the whole payload OR run out
                // of writable descriptors.
                let mut payload_off = 0usize;
                // First descriptor may have leftover bytes after
                // the header.
                let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
                if first_leftover > 0 {
                    let take = first_leftover.min(pkt.data.len());
                    q.mem
                        .write_slice(first.addr + VSOCK_PKT_HDR_SIZE as u64, &pkt.data[..take]);
                    written += take as u32;
                    payload_off += take;
                }
                // Remaining descriptors hold payload only.
                for d in chain.iter().skip(1) {
                    if payload_off >= pkt.data.len() {
                        break;
                    }
                    let take = (d.len as usize).min(pkt.data.len() - payload_off);
                    q.mem
                        .write_slice(d.addr, &pkt.data[payload_off..payload_off + take]);
                    written += take as u32;
                    payload_off += take;
                }
                debug_assert_eq!(
                    payload_off,
                    pkt.data.len(),
                    "RX descriptor chain too small for {} byte packet",
                    pkt.data.len()
                );
            }
            q.add_used(head, written);
            pending.pop_front();
            any = true;
        }
        if any {
            let f_opt = self.irq_raise.lock().unwrap().clone();
            drop(pending);
            drop(qs);
            if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
                eprintln!("[vsock] drained {} RX, raising IRQ", n_pending);
            }
            if let Some(f) = f_opt {
                f();
            }
        }
    }

    /// Drain the TX queue: parse each packet, generate a response
    /// (RST for now — the muxer port replaces this with real
    /// routing). After TX drain, try to deliver any pending RX.
    fn drain_tx(&self) {
        // Snapshot (header, payload) for each TX chain. Read
        // payload into a Vec — we'll route it through the muxer
        // (which may need it for TSI control DGRAMs and RW data).
        let parsed: Vec<(Header, Vec<u8>)> = {
            let mut qs = self.queues.lock().unwrap();
            let q = match qs.get_mut(TXQ_INDEX) {
                Some(q) => q,
                None => return,
            };
            if !q.ready {
                return;
            }
            let mut out = Vec::new();
            while let Some((head, chain)) = q.pop_chain() {
                if chain.is_empty() {
                    q.add_used(head, 0);
                    continue;
                }
                let first = chain[0];
                if (first.len as usize) < VSOCK_PKT_HDR_SIZE {
                    q.add_used(head, 0);
                    continue;
                }
                let mut hdrbuf = [0u8; VSOCK_PKT_HDR_SIZE];
                q.mem.read_slice(first.addr, &mut hdrbuf);
                let h = Header::parse(&hdrbuf);
                // Payload either continues in first.addr+44 (Linux
                // 6.2+ single-desc TX) or in chain[1].
                let mut payload = Vec::with_capacity(h.len as usize);
                if h.len > 0 {
                    payload.resize(h.len as usize, 0);
                    let mut off = 0usize;
                    let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
                    if first_leftover > 0 {
                        let take = first_leftover.min(payload.len());
                        q.mem.read_slice(
                            first.addr + VSOCK_PKT_HDR_SIZE as u64,
                            &mut payload[..take],
                        );
                        off += take;
                    }
                    for d in chain.iter().skip(1) {
                        if off >= payload.len() {
                            break;
                        }
                        let take = (d.len as usize).min(payload.len() - off);
                        q.mem.read_slice(d.addr, &mut payload[off..off + take]);
                        off += take;
                    }
                    payload.truncate(off);
                }
                if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
                    eprintln!(
                        "[vsock] TX op={} type={} src=({},{}) dst=({},{}) len={}",
                        h.op, h.type_, h.src_cid, h.src_port, h.dst_cid, h.dst_port, h.len
                    );
                }
                out.push((h, payload));
                q.add_used(head, 0);
            }
            out
        };
        // Route through muxer (no queue lock held).
        for (h, payload) in parsed {
            for rx in self.muxer.handle_tx(&h, &payload) {
                self.pending_rx.lock().unwrap().push_back(rx);
            }
        }
        // Drain any RX intents into RX descriptors + raise IRQ.
        self.try_drain_rx();
    }
}

impl VirtioDevice for Vsock {
    fn device_id(&self) -> u32 {
        VIRTIO_ID_VSOCK
    }
    fn num_queues(&self) -> usize {
        3
    }
    fn config(&self) -> Vec<u8> {
        self.cid.to_le_bytes().to_vec()
    }
    fn features(&self) -> u64 {
        // VIRTIO_F_VERSION_1 (bit 32) + VIRTIO_VSOCK_F_DGRAM (bit 3).
        // The DGRAM feature bit is what the kernel TSI patches
        // gate on — without it advertised by the host, the kernel's
        // TSI listen() returns EINVAL because it can't send the
        // PROXY_CREATE / LISTEN control DGRAMs.
        (1u64 << 32) | (1u64 << 3)
    }
    fn notify(&self, q: u16) {
        match q as usize {
            TXQ_INDEX => self.drain_tx(),
            RXQ_INDEX => self.try_drain_rx(),
            _ => {}
        }
    }
    fn activate(&self, queues: Vec<Queue>) {
        *self.queues.lock().unwrap() = queues;
        self.activated
            .store(true, std::sync::atomic::Ordering::Release);
        eprintln!("[vsock] activated cid={}", self.cid);
    }
    fn snapshot_queues(&self) -> Vec<Queue> {
        self.queues.lock().unwrap().clone()
    }
}