supermachine 0.4.7

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
//! Async wrappers for the sync supermachine API. Enabled with the
//! `tokio` feature — pulls in `tokio` as a dep and exposes
//! [`AsyncImage`] / [`AsyncVm`] / [`AsyncPooledVm`] which mirror
//! the sync types and `await`-return.
//!
//! The implementation is `tokio::task::spawn_blocking` over the
//! sync surface — no fundamentally async I/O yet (the underlying
//! vsock + exec channel is sync). For the meta-harness use case
//! (acquire from many tokio tasks, run shorts execs in each) this
//! gives the right ergonomics: each `await` parks the task while
//! the blocking work runs on a tokio blocking-pool thread, the
//! task wakes when the work returns. Concurrency comes from
//! tokio's blocking pool size (default 512).
//!
//! For pure-sync embedders that don't want tokio, leave the
//! feature off — the types in this module won't exist.

use std::sync::Arc;
use std::time::Duration;

use crate::{exec::ExecOutcome, Error, Image, PooledVm, Vm, VmConfig};

/// Async wrapper around [`Image`]. `Arc`-internally so cheap to
/// clone and `Send` across tokio tasks.
#[derive(Clone)]
pub struct AsyncImage {
    inner: Arc<Image>,
}

impl AsyncImage {
    /// Async equivalent of [`Image::from_snapshot`].
    pub async fn from_snapshot(
        path: impl Into<std::path::PathBuf> + Send + 'static,
    ) -> Result<Self, Error> {
        let inner = tokio::task::spawn_blocking(move || Image::from_snapshot(path))
            .await
            .map_err(|e| Error::vm_msg(format!("tokio join: {e}")))??;
        Ok(Self { inner: Arc::new(inner) })
    }

    /// Async equivalent of [`Image::start`]. Spawns a one-shot
    /// VM via the blocking pool. Returns an [`AsyncVm`].
    pub async fn start(&self, config: VmConfig) -> Result<AsyncVm, Error> {
        let img = Arc::clone(&self.inner);
        let vm = tokio::task::spawn_blocking(move || img.start(&config))
            .await
            .map_err(|e| Error::vm_msg(format!("tokio join: {e}")))??;
        Ok(AsyncVm { inner: Arc::new(vm) })
    }

    /// Async equivalent of [`Image::acquire`]. Acquires a worker
    /// from the hidden subprocess pool. Returns an
    /// [`AsyncPooledVm`] that holds a clone of the image's Arc
    /// for the lifetime of the borrow (so the pool can't go away
    /// while the PooledVm is alive).
    pub async fn acquire(&self) -> Result<AsyncPooledVm, Error> {
        self.acquire_with(VmConfig::new()).await
    }

    /// Async equivalent of [`Image::acquire_with`].
    pub async fn acquire_with(&self, config: VmConfig) -> Result<AsyncPooledVm, Error> {
        let img = Arc::clone(&self.inner);
        let pooled = tokio::task::spawn_blocking({
            let img = Arc::clone(&img);
            move || -> Result<PooledVm<'static>, Error> {
                // SAFETY: the returned PooledVm references `img:
                // Arc<Image>` which we keep alive in the
                // AsyncPooledVm wrapper for the entire lifetime
                // of the PooledVm. The 'static bound lets the
                // value cross the `.await` boundary.
                let pooled: PooledVm<'_> = img.acquire_with(&config)?;
                Ok(unsafe { std::mem::transmute::<PooledVm<'_>, PooledVm<'static>>(pooled) })
            }
        })
        .await
        .map_err(|e| Error::vm_msg(format!("tokio join: {e}")))??;
        Ok(AsyncPooledVm {
            inner: Some(pooled),
            _image_keepalive: img,
        })
    }
}

/// Async wrapper around [`Vm`].
#[derive(Clone)]
pub struct AsyncVm {
    inner: Arc<Vm>,
}

impl AsyncVm {
    /// Drop a file into the guest. Async wrapper over
    /// [`Vm::write_file`].
    pub async fn write_file(&self, path: String, bytes: Vec<u8>) -> std::io::Result<()> {
        let vm = Arc::clone(&self.inner);
        tokio::task::spawn_blocking(move || vm.write_file(&path, &bytes))
            .await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }

    /// Read a file from the guest. Async wrapper over
    /// [`Vm::read_file`].
    pub async fn read_file(&self, path: String) -> std::io::Result<Vec<u8>> {
        let vm = Arc::clone(&self.inner);
        tokio::task::spawn_blocking(move || vm.read_file(&path))
            .await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }

    /// Run a command in the guest, collect output. Async wrapper
    /// over [`crate::exec::ExecBuilder::output`].
    pub async fn exec_output(
        &self,
        argv: Vec<String>,
        env: Vec<(String, String)>,
        timeout: Option<Duration>,
    ) -> std::io::Result<ExecOutcome> {
        let vm = Arc::clone(&self.inner);
        tokio::task::spawn_blocking(move || {
            let mut b = vm.exec_builder().argv(argv);
            for (k, v) in env {
                b = b.env(k, v);
            }
            if let Some(t) = timeout {
                b = b.timeout(t);
            }
            b.output()
        })
        .await
        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }
}

/// Async wrapper around [`PooledVm`]. Pooled VM stays checked
/// out for the lifetime of this handle; on Drop, returned to the
/// pool exactly like the sync version.
pub struct AsyncPooledVm {
    inner: Option<PooledVm<'static>>,
    /// Holds the Image alive until the PooledVm is dropped, since
    /// the PooledVm's `'static` lifetime is a transmute lie that
    /// only stays sound while this Arc is alive.
    _image_keepalive: Arc<Image>,
}

impl AsyncPooledVm {
    /// Same async-exec convenience as [`AsyncVm::exec_output`],
    /// dispatched against the pooled VM.
    pub async fn exec_output(
        &self,
        argv: Vec<String>,
        env: Vec<(String, String)>,
        timeout: Option<Duration>,
    ) -> std::io::Result<ExecOutcome> {
        let mut b = self
            .inner
            .as_ref()
            .expect("AsyncPooledVm used after drop")
            .exec_builder()
            .argv(argv);
        for (k, v) in env {
            b = b.env(k, v);
        }
        if let Some(t) = timeout {
            b = b.timeout(t);
        }
        tokio::task::spawn_blocking(move || b.output())
            .await
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }

    /// Async write_file via pooled VM.
    pub async fn write_file(&self, path: String, bytes: Vec<u8>) -> std::io::Result<()> {
        let exec_path = self
            .inner
            .as_ref()
            .expect("AsyncPooledVm used after drop")
            .exec_path()
            .to_path_buf();
        tokio::task::spawn_blocking(move || {
            let body = serde_json::json!({
                "action": "write_file",
                "path": path,
                "data_b64": crate::api::b64_encode(&bytes),
            });
            crate::exec::send_control(&exec_path, &body)
        })
        .await
        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }

    /// Async read_file via pooled VM.
    pub async fn read_file(&self, path: String) -> std::io::Result<Vec<u8>> {
        let exec_path = self
            .inner
            .as_ref()
            .expect("AsyncPooledVm used after drop")
            .exec_path()
            .to_path_buf();
        tokio::task::spawn_blocking(move || {
            let body = serde_json::json!({
                "action": "read_file",
                "path": path,
            });
            let ack = crate::exec::send_control_with_ack(
                &exec_path,
                &body,
                Some(Duration::from_secs(30)),
            )?;
            let s = ack
                .get("data_b64")
                .and_then(|v| v.as_str())
                .ok_or_else(|| {
                    std::io::Error::new(
                        std::io::ErrorKind::InvalidData,
                        "read_file: agent ack missing data_b64",
                    )
                })?;
            crate::api::b64_decode(s)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
        })
        .await
        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
    }
}

impl Drop for AsyncPooledVm {
    fn drop(&mut self) {
        // PooledVm::Drop kills the worker + signals replenishment
        // (synchronous, but fast — kill+wait is sub-ms).
        let _ = self.inner.take();
    }
}