car-server-core 0.8.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! FFI wrappers for the `car-a2a` server lifecycle.
//!
//! Each binding (NAPI, PyO3, server JSON-RPC) calls these from its
//! own thread. The bound listener and join handle live in
//! process-global state so a later `stop_a2a` / `a2a_status` reaches
//! the right server.
//!
//! ## Wire shapes
//!
//! `startA2aServer(paramsJson)`:
//! ```jsonc
//! {
//!   "bind": "127.0.0.1:8731",        // required
//!   "public_url": "https://...",     // optional; defaults to http://<bound>
//!   "agent_name": "Common Agent Runtime",          // optional
//!   "agent_description": "Deterministic ...",      // optional
//!   "organization": "Parslee",       // optional
//!   "organization_url": "https://parslee.ai"       // optional
//! }
//! ```
//! Returns `{ "bound": "127.0.0.1:8731" }` on success. Errors if a
//! server is already running or the bind fails.
//!
//! `stopA2aServer()`:
//! Returns `{ "stopped": true }`. Errors if no server is running.
//!
//! `a2aServerStatus()`:
//! Returns `{ "running": true, "bound": "...", "uptime_secs": N }`
//! when running, or `{ "running": false }` otherwise.
//!
//! ## Scope (v1)
//!
//! The spawned `Runtime` is fresh — engine builtins via
//! `register_agent_basics()`, no shared state with the embedder's
//! `CarRuntime`. Sharing state with the caller's runtime is future
//! work and matches the v0.5.0 a2a drop's "out of scope" note for
//! per-WS-session runtimes.

use car_a2a::{
    build_default_agent_card, build_router, A2aDispatcher, AgentCard, AgentCardConfig,
    AgentCardSource, AgentProvider, InMemoryTaskStore,
};
use car_engine::Runtime;
use serde::Deserialize;
use serde_json::json;
use std::sync::{Arc, Mutex};
use std::time::Instant;

struct A2aRunning {
    bound: std::net::SocketAddr,
    started_at: Instant,
    join_handle: tokio::task::JoinHandle<()>,
}

static A2A: Mutex<Option<A2aRunning>> = Mutex::new(None);

#[derive(Deserialize)]
struct StartParams {
    bind: String,
    #[serde(default)]
    public_url: Option<String>,
    #[serde(default)]
    agent_name: Option<String>,
    #[serde(default)]
    agent_description: Option<String>,
    #[serde(default)]
    organization: Option<String>,
    #[serde(default)]
    organization_url: Option<String>,
}

/// Start an A2A listener. Errors if one is already running, the
/// `bind` address is malformed, or `axum::serve` returns immediately.
pub async fn start_a2a(params_json: &str) -> Result<String, String> {
    let params: StartParams = serde_json::from_str(params_json)
        .map_err(|e| format!("invalid start params: {e}"))?;

    {
        let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
        if guard.is_some() {
            return Err("a2a server already running; call stop_a2a first".into());
        }
    }

    let listener = tokio::net::TcpListener::bind(&params.bind)
        .await
        .map_err(|e| format!("bind {}: {e}", params.bind))?;
    let bound = listener
        .local_addr()
        .map_err(|e| format!("local_addr: {e}"))?;

    let card_url = params
        .public_url
        .unwrap_or_else(|| format!("http://{}", bound));

    let runtime = Arc::new(Runtime::new());
    runtime.register_agent_basics().await;

    let store = Arc::new(InMemoryTaskStore::new());
    let initial_card = build_default_agent_card(
        &runtime,
        AgentCardConfig::minimal(
            params
                .agent_name
                .as_deref()
                .unwrap_or("Common Agent Runtime"),
            params
                .agent_description
                .as_deref()
                .unwrap_or("Deterministic execution layer for AI agents."),
            card_url,
            AgentProvider {
                organization: params
                    .organization
                    .unwrap_or_else(|| "Unknown".into()),
                url: params.organization_url,
            },
        ),
    )
    .await;
    let card_factory: Arc<AgentCardSource> = {
        let card: AgentCard = initial_card;
        Arc::new(move || card.clone())
    };

    let dispatcher = A2aDispatcher::new(runtime, store, card_factory);
    let app = build_router(dispatcher);
    let join_handle = tokio::spawn(async move {
        if let Err(e) = axum::serve(listener, app).await {
            tracing::warn!("a2a HTTP server exited: {}", e);
        }
    });

    let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    *guard = Some(A2aRunning {
        bound,
        started_at: Instant::now(),
        join_handle,
    });

    Ok(json!({ "bound": bound.to_string() }).to_string())
}

/// Stop the running A2A listener. Aborts the spawned task — Axum
/// has no graceful-shutdown hook on the listener, so the abort is
/// the available seam, same as `tasks/cancel` in the bridge itself.
/// Errors if no server is running.
pub fn stop_a2a() -> Result<String, String> {
    let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    let running = guard
        .take()
        .ok_or_else(|| "a2a server not running".to_string())?;
    running.join_handle.abort();
    Ok(json!({ "stopped": true }).to_string())
}

/// Report whether the A2A listener is up, the bound address, and
/// uptime in whole seconds.
pub fn a2a_status() -> Result<String, String> {
    let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    Ok(match guard.as_ref() {
        Some(r) => json!({
            "running": true,
            "bound": r.bound.to_string(),
            "uptime_secs": r.started_at.elapsed().as_secs(),
        })
        .to_string(),
        None => json!({ "running": false }).to_string(),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Cycle the global state with start/stop/status so the lifecycle
    /// works end-to-end without a real peer connecting. Uses port 0
    /// to ask the kernel for an ephemeral port.
    #[tokio::test]
    async fn start_stop_status_cycle() {
        // status: not running yet
        let s = a2a_status().unwrap();
        assert!(s.contains("\"running\":false"), "expected not-running, got: {s}");

        // start
        let started = start_a2a(r#"{"bind":"127.0.0.1:0"}"#).await.unwrap();
        assert!(started.contains("\"bound\""), "expected bound addr, got: {started}");

        // status: running
        let s = a2a_status().unwrap();
        assert!(s.contains("\"running\":true"), "expected running, got: {s}");
        assert!(s.contains("\"bound\""));
        assert!(s.contains("\"uptime_secs\""));

        // start again — should error (already running)
        let dup = start_a2a(r#"{"bind":"127.0.0.1:0"}"#).await;
        assert!(dup.is_err(), "second start should fail while one is running");

        // stop
        let stopped = stop_a2a().unwrap();
        assert!(stopped.contains("\"stopped\":true"));

        // stop again — should error (not running)
        let dup = stop_a2a();
        assert!(dup.is_err());

        // status: not running again
        let s = a2a_status().unwrap();
        assert!(s.contains("\"running\":false"));
    }

    #[tokio::test]
    async fn start_rejects_malformed_params() {
        let r = start_a2a("not json").await;
        assert!(r.is_err());
    }
}