car-server-core 0.12.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! FFI wrappers for the `car-a2a` server lifecycle.
//!
//! Each binding (NAPI, PyO3, server JSON-RPC) calls these from its
//! own thread. The bound listener and join handle live in
//! process-global state so a later `stop_a2a` / `a2a_status` reaches
//! the right server.
//!
//! ## Wire shapes
//!
//! `startA2aServer(paramsJson)`:
//! ```jsonc
//! {
//!   "bind": "127.0.0.1:8731",        // required
//!   "public_url": "https://...",     // optional; defaults to http://<bound>
//!   "agent_name": "Common Agent Runtime",          // optional
//!   "agent_description": "Deterministic ...",      // optional
//!   "organization": "Parslee",       // optional
//!   "organization_url": "https://parslee.ai"       // optional
//! }
//! ```
//! Returns `{ "bound": "127.0.0.1:8731" }` on success. Errors if a
//! server is already running or the bind fails.
//!
//! `stopA2aServer()`:
//! Returns `{ "stopped": true }`. Errors if no server is running.
//!
//! `a2aServerStatus()`:
//! Returns `{ "running": true, "bound": "...", "uptime_secs": N }`
//! when running, or `{ "running": false }` otherwise.
//!
//! ## Scope (v2)
//!
//! By default the spawned `Runtime` is still fresh — engine builtins
//! via `register_agent_basics()`, no shared state with the calling
//! FFI session. Pass `share_session_runtime: true` in StartParams to
//! re-use the calling WS session's runtime instead. That runtime
//! already has [`WsToolExecutor`] wired to the session's WS channel,
//! so every tool dispatch reaches the FFI client's
//! `register_tool_handler` callback. The Agent Card's `skills` list
//! is built from `runtime.tool_schemas()` and therefore reflects
//! whatever the FFI client has registered via `tools.register` /
//! `register_tool_schema`. This is the path host-language agents
//! (Python `neo serve`, JS embedders) use to project themselves over
//! A2A without bespoke wiring.
//!
//! `share_session_runtime` is per-call opt-in so the default
//! (self-contained Rust runtime with built-in tools) continues to
//! work for the v0.5.x flow — no regressions for existing consumers.

use car_a2a::{
    build_default_agent_card, build_router, A2aDispatcher, AgentCard, AgentCardConfig,
    AgentCardSource, AgentProvider, InMemoryTaskStore,
};
use car_engine::Runtime;
use serde::Deserialize;
use serde_json::json;
use std::sync::{Arc, Mutex};
use std::time::Instant;

struct A2aRunning {
    bound: std::net::SocketAddr,
    started_at: Instant,
    join_handle: tokio::task::JoinHandle<()>,
}

static A2A: Mutex<Option<A2aRunning>> = Mutex::new(None);

#[derive(Deserialize)]
struct StartParams {
    bind: String,
    #[serde(default)]
    public_url: Option<String>,
    #[serde(default)]
    agent_name: Option<String>,
    #[serde(default)]
    agent_description: Option<String>,
    #[serde(default)]
    organization: Option<String>,
    #[serde(default)]
    organization_url: Option<String>,
    /// Use the calling WS session's runtime for A2A dispatch instead
    /// of spawning a fresh one. When true, tools registered by the
    /// FFI client via `tools.register` show up on the Agent Card and
    /// `message/send` for those tools routes back to the client's
    /// registered `tools.execute` handler. When false (default),
    /// preserves the legacy v0.5.x behaviour: fresh Runtime with only
    /// `register_agent_basics()`. Closes the gap noted in the
    /// previous `Scope (v1)` doc-block.
    #[serde(default)]
    share_session_runtime: bool,
}

/// Start an A2A listener. Errors if one is already running, the
/// `bind` address is malformed, or `axum::serve` returns immediately.
pub async fn start_a2a(
    params_json: &str,
    session_runtime: Option<Arc<Runtime>>,
) -> Result<String, String> {
    let params: StartParams =
        serde_json::from_str(params_json).map_err(|e| format!("invalid start params: {e}"))?;

    {
        let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
        if guard.is_some() {
            return Err("a2a server already running; call stop_a2a first".into());
        }
    }

    let listener = tokio::net::TcpListener::bind(&params.bind)
        .await
        .map_err(|e| format!("bind {}: {e}", params.bind))?;
    let bound = listener
        .local_addr()
        .map_err(|e| format!("local_addr: {e}"))?;

    let card_url = params
        .public_url
        .unwrap_or_else(|| format!("http://{}", bound));

    // share_session_runtime: when set AND the dispatch chain handed us
    // the calling session's runtime, use it. That runtime already has
    // WsToolExecutor wired to the session's WS channel, so A2A tool
    // dispatch reaches the FFI client's register_tool_handler callback,
    // and its tool registry is what appears in the Agent Card's skills.
    //
    // The asymmetric error is on purpose: if the caller asks to share
    // but we have no session runtime (e.g. invoked from a non-WS path),
    // fail loudly rather than silently regress to the fresh-runtime
    // shape — the caller's expectation about tool routing would be
    // wrong and the diagnostic is otherwise invisible.
    let runtime = match (params.share_session_runtime, session_runtime) {
        (true, Some(rt)) => rt,
        (true, None) => {
            return Err(
                "share_session_runtime requested but no calling session runtime is \
                 available — invoke a2a.start over an authenticated WS session, not \
                 from the CLI / embedded path"
                    .to_string(),
            );
        }
        (false, _) => {
            let rt = Arc::new(Runtime::new());
            rt.register_agent_basics().await;
            rt
        }
    };

    let store = Arc::new(InMemoryTaskStore::new());
    let initial_card = build_default_agent_card(
        &runtime,
        AgentCardConfig::minimal(
            params
                .agent_name
                .as_deref()
                .unwrap_or("Common Agent Runtime"),
            params
                .agent_description
                .as_deref()
                .unwrap_or("Deterministic execution layer for AI agents."),
            card_url,
            AgentProvider {
                organization: params.organization.unwrap_or_else(|| "Unknown".into()),
                url: params.organization_url,
            },
        ),
    )
    .await;
    let card_factory: Arc<AgentCardSource> = {
        let card: AgentCard = initial_card;
        Arc::new(move || card.clone())
    };

    let dispatcher = A2aDispatcher::new(runtime, store, card_factory);
    let app = build_router(dispatcher);
    let join_handle = tokio::spawn(async move {
        if let Err(e) = axum::serve(listener, app).await {
            tracing::warn!("a2a HTTP server exited: {}", e);
        }
    });

    let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    *guard = Some(A2aRunning {
        bound,
        started_at: Instant::now(),
        join_handle,
    });

    Ok(json!({ "bound": bound.to_string() }).to_string())
}

/// Stop the running A2A listener. Aborts the spawned task — Axum
/// has no graceful-shutdown hook on the listener, so the abort is
/// the available seam, same as `tasks/cancel` in the bridge itself.
/// Errors if no server is running.
pub fn stop_a2a() -> Result<String, String> {
    let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    let running = guard
        .take()
        .ok_or_else(|| "a2a server not running".to_string())?;
    running.join_handle.abort();
    Ok(json!({ "stopped": true }).to_string())
}

/// Report whether the A2A listener is up, the bound address, and
/// uptime in whole seconds.
pub fn a2a_status() -> Result<String, String> {
    let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
    Ok(match guard.as_ref() {
        Some(r) => json!({
            "running": true,
            "bound": r.bound.to_string(),
            "uptime_secs": r.started_at.elapsed().as_secs(),
        })
        .to_string(),
        None => json!({ "running": false }).to_string(),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Tests in this module mutate the process-global `A2A` singleton
    /// via `start_a2a` / `stop_a2a`. Cargo's default test runner
    /// parallelises across cores; without a module-level lock, two
    /// tests racing on `start_a2a` would let one win and fail the
    /// other with "already running". `serial_test` isn't on the
    /// workspace; a plain Mutex achieves the same.
    fn lifecycle_lock() -> &'static std::sync::Mutex<()> {
        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
        LOCK.get_or_init(|| std::sync::Mutex::new(()))
    }

    /// Cycle the global state with start/stop/status so the lifecycle
    /// works end-to-end without a real peer connecting. Uses port 0
    /// to ask the kernel for an ephemeral port.
    #[tokio::test]
    async fn start_stop_status_cycle() {
        let _guard = lifecycle_lock().lock().unwrap();
        let _ = stop_a2a();

        // status: not running yet
        let s = a2a_status().unwrap();
        assert!(
            s.contains("\"running\":false"),
            "expected not-running, got: {s}"
        );

        // start — legacy path (no session runtime, no share flag);
        // start_a2a falls back to fresh Runtime + agent_basics.
        let started = start_a2a(r#"{"bind":"127.0.0.1:0"}"#, None).await.unwrap();
        assert!(
            started.contains("\"bound\""),
            "expected bound addr, got: {started}"
        );

        // status: running
        let s = a2a_status().unwrap();
        assert!(s.contains("\"running\":true"), "expected running, got: {s}");
        assert!(s.contains("\"bound\""));
        assert!(s.contains("\"uptime_secs\""));

        // start again — should error (already running)
        let dup = start_a2a(r#"{"bind":"127.0.0.1:0"}"#, None).await;
        assert!(
            dup.is_err(),
            "second start should fail while one is running"
        );

        // stop
        let stopped = stop_a2a().unwrap();
        assert!(stopped.contains("\"stopped\":true"));

        // stop again — should error (not running)
        let dup = stop_a2a();
        assert!(dup.is_err());

        // status: not running again
        let s = a2a_status().unwrap();
        assert!(s.contains("\"running\":false"));
    }

    #[tokio::test]
    async fn start_rejects_malformed_params() {
        let _guard = lifecycle_lock().lock().unwrap();
        let r = start_a2a("not json", None).await;
        assert!(r.is_err());
    }

    /// `share_session_runtime: true` with no session runtime supplied
    /// is a configuration error — fail fast rather than silently
    /// regress to the fresh-runtime shape (the caller's expectation
    /// about tool routing would otherwise be invisible-wrong).
    #[tokio::test]
    async fn share_flag_without_session_runtime_errors() {
        let _guard = lifecycle_lock().lock().unwrap();
        let r = start_a2a(
            r#"{"bind":"127.0.0.1:0","share_session_runtime":true}"#,
            None,
        )
        .await;
        assert!(r.is_err(), "expected error, got: {r:?}");
        let msg = r.unwrap_err();
        assert!(
            msg.contains("share_session_runtime"),
            "error should name the offending flag, got: {msg}"
        );
    }

    /// `share_session_runtime: true` with a session runtime supplied
    /// uses that runtime — the Agent Card surface and tool dispatch
    /// both pick it up.
    #[tokio::test]
    async fn share_flag_uses_supplied_runtime() {
        let _guard = lifecycle_lock().lock().unwrap();
        let _ = stop_a2a();

        let rt = Arc::new(Runtime::new());
        // Pretend the FFI client registered a tool via tools.register.
        rt.register_tool_schema(car_ir::ToolSchema {
            name: "demo.echo".into(),
            description: "demo".into(),
            parameters: serde_json::json!({}),
            returns: None,
            idempotent: true,
            cache_ttl_secs: None,
            rate_limit: None,
        })
        .await;

        let started = start_a2a(
            r#"{"bind":"127.0.0.1:0","share_session_runtime":true}"#,
            Some(rt),
        )
        .await
        .expect("start_a2a should succeed with shared runtime");
        assert!(started.contains("\"bound\""));
        let _ = stop_a2a();
    }
}