car-ffi-common 0.24.1

Shared logic for FFI bindings (NAPI, PyO3) — JSON wrappers for verify, multi-agent, scheduler
//! In-process lifecycle-managed agents for FFI bindings.
//!
//! Wraps [`car_registry::supervisor::Supervisor`] behind a
//! process-singleton so NAPI/PyO3 consumers share one supervisor
//! instance per process.
//! The car-server daemon manages its own supervisor via
//! [`car_server_core::ServerState`] — these helpers exist for hosts
//! that want to drive lifecycle from JS/Python without a daemon.
//!
//! Closes Parslee-ai/car-releases#27 on the embedded side; the WS
//! `agents.*` namespace handles the daemon side.
//!
//! ## Wire shapes
//!
//! All functions take/return JSON strings — the same shapes the
//! `agents.*` JSON-RPC methods accept and emit, so a host can swap
//! between transports without reshaping payloads.
//!
//! - `list()` → `[ManagedAgent]`
//! - `upsert(spec_json)` → `ManagedAgent`
//! - `remove(id)` → `{ "removed": bool }`
//! - `start(id)` → `ManagedAgent`
//! - `stop(id, signal?)` → `ManagedAgent`
//! - `restart(id)` → `ManagedAgent`
//! - `tail_log(id, n?, stream?, offset?)` → `LogTail` JSON
//!   (`{ lines, stdout, stderr, stdout_total, stderr_total,
//!   stdout_path, stderr_path, more }`)

use std::sync::{Arc, OnceLock};

use car_registry::supervisor::{AgentSpec, LogStream, StopSignal, Supervisor, SupervisorError};

use crate::proxy::DaemonClient;

/// Process-singleton WS client for the daemon-fallback path. Lazily
/// connects on first use. Per car-releases#54: when CarHost.app holds
/// the supervisor lock on `~/.car/agents.json`, in-process mutations
/// (`upsert`, `start`, etc.) used to surface a terminal
/// `AlreadyRunning` error — now we route to the running daemon's
/// `agents.*` JSON-RPC namespace instead, which the lock-holder is
/// already serving.
fn daemon_singleton() -> Arc<DaemonClient> {
    static CLIENT: OnceLock<Arc<DaemonClient>> = OnceLock::new();
    CLIENT.get_or_init(DaemonClient::new).clone()
}

/// Call an `agents.*` method on the running daemon. Used as the
/// fallback when [`Supervisor::user_default`] reports
/// `AlreadyRunning` — the daemon already supervises this manifest,
/// so route the request to it rather than re-spawning a duplicate.
async fn ws_call(method: &str, params: serde_json::Value) -> Result<String, String> {
    let v = daemon_singleton().call(method, params).await?;
    serde_json::to_string(&v).map_err(|e| e.to_string())
}

/// Structured variant of [`handle`]. Returns the underlying
/// [`SupervisorError`] so callers can match on
/// [`SupervisorError::AlreadyRunning`] and degrade to the manifest-
/// only fallback for read-only ops (`list`, `health`). Mutation ops
/// stay on the [`handle`] String-error path — they have no
/// safe degraded mode (silent fallback would re-introduce the
/// double-spawn class the singleton lock prevents, #44).
fn handle_or_locked() -> Result<&'static Supervisor, SupervisorError> {
    static SUP: OnceLock<Supervisor> = OnceLock::new();
    if let Some(s) = SUP.get() {
        return Ok(s);
    }
    let s = Supervisor::user_default()?;
    let _ = SUP.set(s);
    SUP.get().ok_or_else(|| {
        SupervisorError::Other("supervisor singleton not set after init".to_string())
    })
}

pub async fn list() -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => serde_json::to_string(&s.list().await).map_err(|e| e.to_string()),
        // Daemon-owned supervisor — call the daemon over WS so
        // callers see live runtime state (status, pid, restart_count)
        // instead of the manifest defaults. If the WS call itself
        // fails (no daemon listening, auth rejection, etc.), fall
        // back to reading the manifest with default runtime fields —
        // preserves the pre-#54 read-only degraded behaviour.
        Err(SupervisorError::AlreadyRunning(_)) => match ws_call("agents.list", serde_json::json!({})).await {
            Ok(s) => Ok(s),
            Err(_) => {
                let path = Supervisor::user_default_manifest_path().map_err(|e| e.to_string())?;
                let agents = Supervisor::list_from_manifest(&path).map_err(|e| e.to_string())?;
                serde_json::to_string(&agents).map_err(|e| e.to_string())
            }
        },
        Err(e) => Err(e.to_string()),
    }
}

pub async fn upsert(spec_json: &str) -> Result<String, String> {
    let mut value: serde_json::Value =
        serde_json::from_str(spec_json).map_err(|e| format!("invalid AgentSpec JSON: {e}"))?;
    // `interpreter` sugar (#171). Resolve once, persist the absolute
    // path. Mirrors `handle_agents_upsert` on the daemon — keep the
    // two paths bit-for-bit identical so in-process and over-WS
    // consumers see the same manifest shape.
    if let Some(name) = value
        .get("interpreter")
        .and_then(|v| v.as_str())
        .map(str::to_string)
    {
        let resolved =
            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
        value["command"] = serde_json::Value::String(resolved.to_string_lossy().into_owned());
    }
    match handle_or_locked() {
        Ok(s) => {
            let spec: AgentSpec =
                serde_json::from_value(value).map_err(|e| format!("invalid AgentSpec JSON: {e}"))?;
            let agent = s.upsert(spec).await.map_err(|e| e.to_string())?;
            serde_json::to_string(&agent).map_err(|e| e.to_string())
        }
        // Daemon owns the manifest lock — route the upsert to its
        // `agents.upsert` handler instead of refusing (#54).
        Err(SupervisorError::AlreadyRunning(_)) => ws_call("agents.upsert", value).await,
        Err(e) => Err(e.to_string()),
    }
}

pub async fn health() -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => serde_json::to_string(&s.health().await).map_err(|e| e.to_string()),
        Err(SupervisorError::AlreadyRunning(_)) => match ws_call("agents.health", serde_json::json!({})).await {
            Ok(s) => Ok(s),
            Err(_) => {
                let path = Supervisor::user_default_manifest_path().map_err(|e| e.to_string())?;
                let health = Supervisor::health_from_manifest(&path).map_err(|e| e.to_string())?;
                serde_json::to_string(&health).map_err(|e| e.to_string())
            }
        },
        Err(e) => Err(e.to_string()),
    }
}

/// In-process `agents.install`. Parses the manifest, runs install-
/// time validation against the daemon's default host capability
/// advertisement (Parslee-ai/car#182 phase 3), and adopts on
/// success. Returns
/// `{ report: { missingOptional: [...] }, agent: ManagedAgent? }`
/// matching the WS handler shape.
pub async fn install(manifest_json: &str) -> Result<String, String> {
    let manifest: car_registry::manifest::AgentManifest = serde_json::from_str(manifest_json)
        .map_err(|e| format!("invalid AgentManifest JSON: {e}"))?;
    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
    let s = match handle_or_locked() {
        Ok(s) => s,
        Err(SupervisorError::AlreadyRunning(_)) => {
            // Daemon owns the manifest — route the install to it.
            return ws_call(
                "agents.install",
                serde_json::from_str(manifest_json)
                    .map_err(|e| format!("invalid AgentManifest JSON: {e}"))?,
            )
            .await;
        }
        Err(e) => return Err(e.to_string()),
    };
    let (report, managed) = s
        .install_manifest(manifest, &host)
        .await
        .map_err(|e| e.to_string())?;
    Ok(serde_json::json!({
        "report": {
            "missingOptional": report
                .missing_optional
                .iter()
                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
                .collect::<Vec<_>>(),
        },
        "agent": managed,
    })
    .to_string())
}

pub async fn remove(id: &str) -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => {
            let removed = s.remove(id).await.map_err(|e| e.to_string())?;
            serde_json::to_string(&serde_json::json!({ "removed": removed }))
                .map_err(|e| e.to_string())
        }
        Err(SupervisorError::AlreadyRunning(_)) => {
            ws_call("agents.remove", serde_json::json!({ "id": id })).await
        }
        Err(e) => Err(e.to_string()),
    }
}

pub async fn start(id: &str) -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => {
            let agent = s.start(id).await.map_err(|e| e.to_string())?;
            serde_json::to_string(&agent).map_err(|e| e.to_string())
        }
        Err(SupervisorError::AlreadyRunning(_)) => {
            ws_call("agents.start", serde_json::json!({ "id": id })).await
        }
        Err(e) => Err(e.to_string()),
    }
}

pub async fn stop(id: &str, signal: Option<&str>) -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => {
            let signal = signal
                .map(|raw| {
                    serde_json::from_value::<StopSignal>(serde_json::Value::String(raw.to_string()))
                        .map_err(|e| format!("invalid signal `{raw}`: {e}"))
                })
                .transpose()?
                .unwrap_or_default();
            let agent = s.stop(id, signal).await.map_err(|e| e.to_string())?;
            serde_json::to_string(&agent).map_err(|e| e.to_string())
        }
        Err(SupervisorError::AlreadyRunning(_)) => {
            let mut params = serde_json::json!({ "id": id });
            if let Some(raw) = signal {
                params["signal"] = serde_json::Value::String(raw.to_string());
            }
            ws_call("agents.stop", params).await
        }
        Err(e) => Err(e.to_string()),
    }
}

pub async fn restart(id: &str) -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => {
            let agent = s.restart(id).await.map_err(|e| e.to_string())?;
            serde_json::to_string(&agent).map_err(|e| e.to_string())
        }
        Err(SupervisorError::AlreadyRunning(_)) => {
            ws_call("agents.restart", serde_json::json!({ "id": id })).await
        }
        Err(e) => Err(e.to_string()),
    }
}

pub async fn tail_log(
    id: &str,
    n: Option<usize>,
    stream: Option<&str>,
    offset: Option<usize>,
) -> Result<String, String> {
    match handle_or_locked() {
        Ok(s) => {
            let tail = s
                .read_log(
                    id,
                    LogStream::from_wire(stream),
                    n.unwrap_or(100),
                    offset.unwrap_or(0),
                )
                .await
                .map_err(|e| e.to_string())?;
            serde_json::to_string(&tail).map_err(|e| e.to_string())
        }
        Err(SupervisorError::AlreadyRunning(_)) => {
            let mut params = serde_json::json!({ "id": id });
            if let Some(n) = n {
                params["n"] = serde_json::Value::Number(serde_json::Number::from(n));
            }
            if let Some(stream) = stream {
                params["stream"] = serde_json::Value::String(stream.to_string());
            }
            if let Some(offset) = offset {
                params["offset"] = serde_json::Value::Number(serde_json::Number::from(offset));
            }
            ws_call("agents.tail_log", params).await
        }
        Err(e) => Err(e.to_string()),
    }
}