autoschematic-core 0.9.0

Core shared functionality for Autoschematic: workflow engine, state management, and Git integrations
Documentation
use std::{
    collections::HashMap,
    io,
    path::{Path, PathBuf},
    sync::{Arc, Mutex},
    thread::JoinHandle,
};

use crate::{
    bundle::UnbundleResponseElement,
    config::Spec,
    connector::{
        Connector, ConnectorOutbox, DocIdent, FilterResponse, GetDocResponse, GetResourceResponse, OpExecResponse,
        PlanResponseElement, SkeletonResponse, VirtToPhyResponse,
        handle::{ConnectorHandle, ConnectorHandleStatus},
        spawn::{random_error_dump_path, random_socket_path},
    },
    diag::DiagnosticResponse,
    grpc_bridge,
    keystore::KeyStore,
    tarpc_bridge::{self},
    util::passthrough_secrets_from_env,
};
use anyhow::bail;
use async_trait::async_trait;

use libc::killpg;
use once_cell::sync::Lazy;
use process_wrap::tokio::*;
use sysinfo::{Pid, ProcessRefreshKind};

/// This module handles unsandboxed execution of connector instances.
pub struct UnsandboxConnectorHandle {
    client: Arc<dyn Connector>,
    socket: PathBuf,
    error_dump: PathBuf,
    read_thread: Option<JoinHandle<()>>,
    child: Arc<Mutex<Box<dyn process_wrap::tokio::ChildWrapper>>>,
}

#[async_trait]
impl Connector for UnsandboxConnectorHandle {
    async fn new(_name: &str, _prefix: &Path, _outbox: ConnectorOutbox) -> Result<Arc<dyn Connector>, anyhow::Error> {
        bail!("Connector::new() for UnsandboxConnectorHandle is a stub!")
        // <TarpcConnectorClient as Connector>::new(name, prefix, outbox).await
    }
    async fn init(&self) -> Result<(), anyhow::Error> {
        Connector::init(&self.client).await
    }

    async fn version(&self) -> Result<String, anyhow::Error> {
        Connector::version(&self.client).await
    }

    async fn filter(&self, addr: &Path) -> Result<FilterResponse, anyhow::Error> {
        Connector::filter(&self.client, addr).await
    }

    async fn list(&self, subpath: &Path) -> anyhow::Result<Vec<PathBuf>> {
        Connector::list(&self.client, subpath).await
    }

    async fn subpaths(&self) -> anyhow::Result<Vec<PathBuf>> {
        Connector::subpaths(&self.client).await
    }

    async fn get(&self, addr: &Path) -> Result<Option<GetResourceResponse>, anyhow::Error> {
        Connector::get(&self.client, addr).await
    }

    async fn plan(
        &self,
        addr: &Path,
        current: Option<Vec<u8>>,
        desired: Option<Vec<u8>>,
    ) -> Result<Vec<PlanResponseElement>, anyhow::Error> {
        Connector::plan(&self.client, addr, current, desired).await
    }

    async fn op_exec(&self, addr: &Path, op: &str) -> Result<OpExecResponse, anyhow::Error> {
        Connector::op_exec(&self.client, addr, op).await
    }

    async fn addr_virt_to_phy(&self, addr: &Path) -> Result<VirtToPhyResponse, anyhow::Error> {
        Connector::addr_virt_to_phy(&self.client, addr).await
    }

    async fn addr_phy_to_virt(&self, addr: &Path) -> Result<Option<PathBuf>, anyhow::Error> {
        Connector::addr_phy_to_virt(&self.client, addr).await
    }

    async fn get_skeletons(&self) -> Result<Vec<SkeletonResponse>, anyhow::Error> {
        Connector::get_skeletons(&self.client).await
    }

    async fn get_docstring(&self, addr: &Path, ident: DocIdent) -> Result<Option<GetDocResponse>, anyhow::Error> {
        Connector::get_docstring(&self.client, addr, ident).await
    }

    async fn eq(&self, addr: &Path, a: &[u8], b: &[u8]) -> Result<bool, anyhow::Error> {
        Connector::eq(&self.client, addr, a, b).await
    }

    async fn diag(&self, addr: &Path, a: &[u8]) -> Result<Option<DiagnosticResponse>, anyhow::Error> {
        Connector::diag(&self.client, addr, a).await
    }

    async fn unbundle(&self, addr: &Path, resource: &[u8]) -> Result<Vec<UnbundleResponseElement>, anyhow::Error> {
        Connector::unbundle(&self.client, addr, resource).await
    }
}

pub async fn launch_server_binary(
    spec: &Spec,
    shortname: &str,
    prefix: &Path,
    env: &HashMap<String, String>,
    _outbox: ConnectorOutbox,
    keystore: Option<Arc<dyn KeyStore>>,
) -> anyhow::Result<UnsandboxConnectorHandle> {
    let mut env = env.clone();

    let socket = random_socket_path();
    let error_dump = random_error_dump_path();

    if let Some(keystore) = keystore {
        env = keystore.unseal_env_map(&env)?;
    } else {
        env = passthrough_secrets_from_env(&env)?;
    }

    if let Some(spec_pre_command) = spec.pre_command()? {
        let mut command = CommandWrap::with_new(spec_pre_command.binary, |command| {
            command.args(spec_pre_command.args);
            command.stderr(io::stderr());
            command.stdout(io::stderr());
            command.kill_on_drop(true);
        });

        #[cfg(unix)]
        {
            command.wrap(ProcessGroup::leader());
        }
        #[cfg(windows)]
        {
            command.wrap(JobObject);
        }

        let status = command.spawn()?.wait().await?;

        if !status.success() {
            bail!(
                "launch_server_binary: pre_command failed: \n {:?} \n {:?}",
                command.command(),
                status.code()
            )
        }
    }

    let spec_command = spec.command()?;

    let mut command = CommandWrap::with_new(spec_command.binary, |command| {
        command.args(spec_command.args);
        command.args([shortname.into(), prefix.into(), socket.clone(), error_dump.clone()]);
        command.stdout(io::stderr());
        command.stderr(io::stderr());
        command.kill_on_drop(true);

        for (key, val) in env {
            command.env(key, val);
        }
    });

    #[cfg(unix)]
    {
        command.wrap(ProcessGroup::leader());
    }
    #[cfg(windows)]
    {
        command.wrap(JobObject);
    }

    let child = command.spawn()?;

    tracing::info!("Launching client at {:?}", socket);

    let client = match spec.protocol() {
        crate::config::Protocol::Tarpc => tarpc_bridge::launch_client(&socket).await?,
        crate::config::Protocol::Grpc => grpc_bridge::launch_client(&socket).await?,
    };

    tracing::info!("Launched client.");

    Ok(UnsandboxConnectorHandle {
        client: Arc::new(client),
        socket,
        error_dump,
        read_thread: None,
        child: Arc::new(child.into()),
    })
}

impl Drop for UnsandboxConnectorHandle {
    fn drop(&mut self) {
        let pid = self.child.lock().unwrap().inner().id().unwrap();

        unsafe {
            killpg(pid.try_into().unwrap(), 9);
        }

        // self.child.lock().unwrap().start_kill().unwrap();
        // self.child.lock().unwrap().try_wait().unwrap();

        match std::fs::remove_file(&self.socket) {
            Ok(_) => {}
            Err(e) => tracing::warn!("Couldn't remove socket {:?}: {}", self.socket, e),
        }

        match std::fs::remove_file(&self.error_dump) {
            Ok(_) => {}
            Err(e) => tracing::warn!("Couldn't remove error_dump {:?}: {}", self.error_dump, e),
        }

        if self.read_thread.is_some() {}
    }
}

pub static SYSINFO: Lazy<Arc<tokio::sync::Mutex<sysinfo::System>>> =
    Lazy::new(|| Arc::new(tokio::sync::Mutex::new(sysinfo::System::new())));

#[async_trait]
impl ConnectorHandle for UnsandboxConnectorHandle {
    async fn status(&self) -> ConnectorHandleStatus {
        let Some(pid) = self.child.lock().unwrap().inner().id() else {
            return ConnectorHandleStatus::Dead;
        };

        let pid = Pid::from_u32(pid);

        SYSINFO.lock().await.refresh_processes_specifics(
            sysinfo::ProcessesToUpdate::Some(&[pid]),
            false,
            ProcessRefreshKind::nothing().with_cpu().with_memory(),
        );

        if let Some(p) = SYSINFO.lock().await.process(pid) {
            return ConnectorHandleStatus::Alive {
                memory: p.memory(),
                cpu_usage: p.cpu_usage(),
            };
        } else {
            return ConnectorHandleStatus::Dead;
        }
    }
}