arcbox-core 0.4.9

Core orchestration layer for ArcBox
use super::ContainerBackend;
use crate::config::ContainerRuntimeConfig;
use crate::error::{CoreError, Result};
use crate::machine::MachineManager;
use crate::vm_lifecycle::VmLifecycleManager;
use async_trait::async_trait;
use std::os::fd::FromRawFd;
use std::sync::Arc;
use std::time::{Duration, Instant};

/// Guest Docker backend (dockerd/containerd/runc inside VM).
pub struct GuestDockerBackend {
    vm_lifecycle: Arc<VmLifecycleManager>,
    machine_manager: Arc<MachineManager>,
    machine_name: &'static str,
    config: ContainerRuntimeConfig,
}

impl GuestDockerBackend {
    #[must_use]
    pub const fn new(
        vm_lifecycle: Arc<VmLifecycleManager>,
        machine_manager: Arc<MachineManager>,
        machine_name: &'static str,
        config: ContainerRuntimeConfig,
    ) -> Self {
        Self {
            vm_lifecycle,
            machine_manager,
            machine_name,
            config,
        }
    }

    async fn wait_guest_endpoint_ready(&self) -> Result<()> {
        const INITIAL_DELAY_MS: u64 = 120;
        const MAX_DELAY_MS: u64 = 1200;

        let port = self.config.guest_docker_vsock_port;
        let timeout = Duration::from_millis(self.config.startup_timeout_ms);
        let deadline = Instant::now() + timeout;
        let mut delay_ms = INITIAL_DELAY_MS;
        let mut last_status_detail: Option<String> = None;

        loop {
            let mut docker_ready = false;

            if let Ok(mut agent) = self.machine_manager.connect_agent(self.machine_name) {
                match agent.ensure_runtime(true).await {
                    Ok(resp) => {
                        last_status_detail = Some(resp.message.clone());
                        if resp.ready {
                            validate_reported_vsock_endpoint(&resp.endpoint, port)?;
                            docker_ready = true;
                        }
                        tracing::debug!(
                            ready = resp.ready,
                            endpoint = resp.endpoint,
                            message = resp.message,
                            status = resp.status,
                            "requested guest runtime ensure"
                        );
                    }
                    Err(e) => {
                        tracing::trace!("failed to request guest runtime ensure: {}", e);
                    }
                }

                if !docker_ready {
                    match agent.get_runtime_status().await {
                        Ok(status) => {
                            last_status_detail = Some(status.detail.clone());
                            if status.docker_ready {
                                validate_reported_vsock_endpoint(&status.endpoint, port)?;
                                docker_ready = true;
                            }
                        }
                        Err(e) => {
                            tracing::trace!("failed to get guest runtime status: {}", e);
                        }
                    }
                }
            }

            if docker_ready {
                match self
                    .machine_manager
                    .connect_vsock_port(self.machine_name, port)
                {
                    Ok(fd) => {
                        let _owned = unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) };
                        tracing::debug!(port, "guest docker endpoint is ready");
                        return Ok(());
                    }
                    Err(e) => {
                        if Instant::now() >= deadline {
                            return Err(CoreError::Machine(format!(
                                "guest docker endpoint on vsock port {} not ready within {}ms: {}",
                                port,
                                self.config.startup_timeout_ms,
                                last_status_detail.unwrap_or_else(|| e.to_string())
                            )));
                        }
                        tracing::trace!(
                            port,
                            retry_delay_ms = delay_ms,
                            "guest docker endpoint not reachable yet: {}",
                            e
                        );
                    }
                }
            } else if Instant::now() >= deadline {
                return Err(CoreError::Machine(format!(
                    "guest docker endpoint on vsock port {} not ready within {}ms: {}",
                    port,
                    self.config.startup_timeout_ms,
                    last_status_detail.unwrap_or_else(|| "runtime status unavailable".to_string())
                )));
            } else {
                tracing::trace!(
                    port,
                    retry_delay_ms = delay_ms,
                    "guest runtime not ready yet"
                );
            }

            tokio::time::sleep(Duration::from_millis(delay_ms)).await;
            delay_ms = (delay_ms * 3 / 2).min(MAX_DELAY_MS);
        }
    }
}

fn parse_vsock_endpoint_port(endpoint: &str) -> Option<u32> {
    endpoint.strip_prefix("vsock:")?.parse::<u32>().ok()
}

fn validate_reported_vsock_endpoint(endpoint: &str, expected_port: u32) -> Result<()> {
    let endpoint_port = parse_vsock_endpoint_port(endpoint).ok_or_else(|| {
        CoreError::Machine(format!(
            "guest runtime endpoint format invalid: '{endpoint}'; expected 'vsock:<port>'"
        ))
    })?;

    if endpoint_port != expected_port {
        return Err(CoreError::Machine(format!(
            "guest runtime endpoint mismatch: guest reports vsock:{endpoint_port} but host is configured for vsock:{expected_port}"
        )));
    }

    Ok(())
}

#[async_trait]
impl ContainerBackend for GuestDockerBackend {
    fn name(&self) -> &'static str {
        "guest_docker"
    }

    async fn ensure_ready(&self) -> Result<u32> {
        let cid = self.vm_lifecycle.ensure_ready().await?;
        self.wait_guest_endpoint_ready().await?;
        Ok(cid)
    }
}

#[cfg(test)]
mod tests {
    use super::{parse_vsock_endpoint_port, validate_reported_vsock_endpoint};

    #[test]
    fn test_parse_vsock_endpoint_port_ok() {
        assert_eq!(parse_vsock_endpoint_port("vsock:2375"), Some(2375));
    }

    #[test]
    fn test_parse_vsock_endpoint_port_invalid() {
        assert_eq!(
            parse_vsock_endpoint_port("unix:///var/run/docker.sock"),
            None
        );
        assert_eq!(parse_vsock_endpoint_port("vsock:not-a-number"), None);
        assert_eq!(parse_vsock_endpoint_port("2375"), None);
    }

    #[test]
    fn test_validate_reported_vsock_endpoint_ok() {
        assert!(validate_reported_vsock_endpoint("vsock:2375", 2375).is_ok());
    }

    #[test]
    fn test_validate_reported_vsock_endpoint_mismatch() {
        let err = validate_reported_vsock_endpoint("vsock:2375", 1234).unwrap_err();
        assert!(err.to_string().contains("endpoint mismatch"));
    }

    #[test]
    fn test_validate_reported_vsock_endpoint_invalid_format() {
        let err =
            validate_reported_vsock_endpoint("unix:///var/run/docker.sock", 2375).unwrap_err();
        assert!(err.to_string().contains("format invalid"));
    }
}