supermachine 0.3.7

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
// Status: minimal — host TCP listener bound on TSI_LISTEN, accept
// loop on its own thread, per-conn pump threads bridging accepted
// TCP ↔ vsock RW. The single-epoll-thread fold-in lives in
// muxer_thread.rs; this module remains the bind/accept side.
//
// API:
//   TsiListener::bind(...) → Result<(host_addr, TsiListener)>
//   TsiListener::start(callbacks) — spawns the accept thread
//   on_accept callback fires for each new conn

#![allow(dead_code)]

use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;

/// Created on TSI_LISTEN. Owns a host TcpListener + an accept
/// thread.
pub struct TsiListener {
    /// (cid, vm_port) pair from the LISTEN — used to address
    /// inbound vsock REQUEST/RW packets back to the guest.
    pub cid: u64,
    pub vm_port: u32,
    pub host_addr: SocketAddr,
    accept_thread: Option<std::thread::JoinHandle<()>>,
    /// Set by `Drop` (or `shutdown`) to ask the accept thread to
    /// exit cleanly. Pool-worker mode uses this between RESTOREs
    /// so we don't leak one accept thread per dispatch.
    shutdown: Arc<std::sync::atomic::AtomicBool>,
}

impl Drop for TsiListener {
    fn drop(&mut self) {
        self.shutdown
            .store(true, std::sync::atomic::Ordering::SeqCst);
        // Self-connect to wake the blocking accept() — the thread
        // immediately checks `shutdown`, sees true, exits.
        let _ = std::net::TcpStream::connect_timeout(
            &self.host_addr,
            std::time::Duration::from_millis(50),
        );
        if let Some(h) = self.accept_thread.take() {
            let _ = h.join();
        }
    }
}

/// Ephemeral host_src_port allocator. Used to give each accepted
/// TCP stream a unique vsock src_port for routing back to the
/// right TsiConn. Top-of-range to avoid clashing with guest's
/// chosen ports (which are typically below 0xC0000000).
static NEXT_HOST_SRC_PORT: std::sync::atomic::AtomicU32 =
    std::sync::atomic::AtomicU32::new(0xC000_0000);
pub fn alloc_host_src_port() -> u32 {
    NEXT_HOST_SRC_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}

/// Bind a dual-stack IPv4+IPv6 listener on `::1:0`. Uses the libc
/// socket() path so we can `setsockopt(IPV6_V6ONLY, 0)` before bind.
fn bind_dual_stack() -> std::io::Result<TcpListener> {
    use std::os::fd::FromRawFd;
    // SAFETY: standard libc usage with RAII via FromRawFd at the end.
    unsafe {
        let fd = libc::socket(libc::AF_INET6, libc::SOCK_STREAM, 0);
        if fd < 0 {
            return Err(std::io::Error::last_os_error());
        }
        let zero: libc::c_int = 0;
        if libc::setsockopt(
            fd,
            libc::IPPROTO_IPV6,
            libc::IPV6_V6ONLY,
            &zero as *const _ as *const _,
            std::mem::size_of::<libc::c_int>() as _,
        ) < 0
        {
            let e = std::io::Error::last_os_error();
            libc::close(fd);
            return Err(e);
        }
        let one: libc::c_int = 1;
        let _ = libc::setsockopt(
            fd,
            libc::SOL_SOCKET,
            libc::SO_REUSEADDR,
            &one as *const _ as *const _,
            std::mem::size_of::<libc::c_int>() as _,
        );
        let mut addr: libc::sockaddr_in6 = std::mem::zeroed();
        addr.sin6_family = libc::AF_INET6 as _;
        addr.sin6_port = 0;
        addr.sin6_addr = libc::in6_addr { s6_addr: [0u8; 16] }; // ::
        if libc::bind(
            fd,
            &addr as *const _ as *const _,
            std::mem::size_of::<libc::sockaddr_in6>() as _,
        ) < 0
        {
            let e = std::io::Error::last_os_error();
            libc::close(fd);
            return Err(e);
        }
        if libc::listen(fd, 1024) < 0 {
            let e = std::io::Error::last_os_error();
            libc::close(fd);
            return Err(e);
        }
        Ok(TcpListener::from_raw_fd(fd))
    }
}

impl TsiListener {
    /// Bind 127.0.0.1:0 and spawn the accept thread. Each accepted
    /// TCP stream gets handed to `on_accept(cid, host_src_port,
    /// vm_port, tcp)` — the muxer's accept handler.
    pub fn bind(
        cid: u64,
        vm_port: u32,
        on_accept: Arc<dyn Fn(u64, u32, u32, TcpStream) + Send + Sync>,
    ) -> std::io::Result<Self> {
        // Dual-stack: bind on IPv6 with V6ONLY=0 so the listener
        // accepts BOTH IPv4 and IPv6 client connections. Necessary
        // because guest workloads that listen on `::` (Python's
        // http.server default, many Node servers, etc.) end up here
        // even when external clients only do IPv4. Falls back to
        // pure IPv4 if IPv6 isn't available.
        let listener = bind_dual_stack().or_else(|_| TcpListener::bind("127.0.0.1:0"))?;
        let host_addr = listener.local_addr()?;
        let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
        let shutdown_th = shutdown.clone();
        let trace = crate::devices::virtio::vsock::muxer::vsock_trace_enabled();
        let h = std::thread::Builder::new()
            .name(format!("tsi-accept-{vm_port}"))
            .spawn(move || {
                if trace {
                    eprintln!(
                        "[tsi_stream] accept loop for cid={cid} vm_port={vm_port} on {host_addr}"
                    );
                }
                for stream in listener.incoming() {
                    if shutdown_th.load(std::sync::atomic::Ordering::SeqCst) {
                        break;
                    }
                    let stream = match stream {
                        Ok(s) => s,
                        Err(e) => {
                            eprintln!("[tsi_stream] accept err: {e}");
                            continue;
                        }
                    };
                    let _ = stream.set_nodelay(true);
                    let host_src_port = alloc_host_src_port();
                    if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
                        eprintln!(
                            "[tsi_stream] cid={cid} vm_port={vm_port} accept from {:?} -> src_port={host_src_port}",
                            stream.peer_addr().ok());
                    }
                    on_accept(cid, host_src_port, vm_port, stream);
                }
                if trace {
                    eprintln!("[tsi_stream] accept loop EXIT cid={cid} vm_port={vm_port}");
                }
            })?;
        Ok(Self {
            cid,
            vm_port,
            host_addr,
            accept_thread: Some(h),
            shutdown,
        })
    }
}

// (TsiConn removed — replaced by muxer_thread.rs single-epoll
// architecture.)