use car_a2a::{
build_default_agent_card, build_router, A2aDispatcher, AgentCard, AgentCardConfig,
AgentCardSource, AgentProvider, InMemoryTaskStore,
};
use car_engine::Runtime;
use serde::Deserialize;
use serde_json::json;
use std::sync::{Arc, Mutex};
use std::time::Instant;
struct A2aRunning {
bound: std::net::SocketAddr,
started_at: Instant,
join_handle: tokio::task::JoinHandle<()>,
}
static A2A: Mutex<Option<A2aRunning>> = Mutex::new(None);
#[derive(Deserialize)]
struct StartParams {
bind: String,
#[serde(default)]
public_url: Option<String>,
#[serde(default)]
agent_name: Option<String>,
#[serde(default)]
agent_description: Option<String>,
#[serde(default)]
organization: Option<String>,
#[serde(default)]
organization_url: Option<String>,
#[serde(default)]
share_session_runtime: bool,
}
pub async fn start_a2a(
params_json: &str,
session_runtime: Option<Arc<Runtime>>,
) -> Result<String, String> {
let params: StartParams =
serde_json::from_str(params_json).map_err(|e| format!("invalid start params: {e}"))?;
{
let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
if guard.is_some() {
return Err("a2a server already running; call stop_a2a first".into());
}
}
let listener = tokio::net::TcpListener::bind(¶ms.bind)
.await
.map_err(|e| format!("bind {}: {e}", params.bind))?;
let bound = listener
.local_addr()
.map_err(|e| format!("local_addr: {e}"))?;
let card_url = params
.public_url
.unwrap_or_else(|| format!("http://{}", bound));
let runtime = match (params.share_session_runtime, session_runtime) {
(true, Some(rt)) => rt,
(true, None) => {
return Err(
"share_session_runtime requested but no calling session runtime is \
available — invoke a2a.start over an authenticated WS session, not \
from the CLI / embedded path"
.to_string(),
);
}
(false, _) => {
let rt = Arc::new(Runtime::new());
rt.register_agent_basics().await;
rt
}
};
let store = Arc::new(InMemoryTaskStore::new());
let initial_card = build_default_agent_card(
&runtime,
AgentCardConfig::minimal(
params
.agent_name
.as_deref()
.unwrap_or("Common Agent Runtime"),
params
.agent_description
.as_deref()
.unwrap_or("Deterministic execution layer for AI agents."),
card_url,
AgentProvider {
organization: params.organization.unwrap_or_else(|| "Unknown".into()),
url: params.organization_url,
},
),
)
.await;
let card_factory: Arc<AgentCardSource> = {
let card: AgentCard = initial_card;
Arc::new(move || card.clone())
};
let dispatcher = A2aDispatcher::new(runtime, store, card_factory);
let app = build_router(dispatcher);
let join_handle = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
tracing::warn!("a2a HTTP server exited: {}", e);
}
});
let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
*guard = Some(A2aRunning {
bound,
started_at: Instant::now(),
join_handle,
});
Ok(json!({ "bound": bound.to_string() }).to_string())
}
pub fn stop_a2a() -> Result<String, String> {
let mut guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
let running = guard
.take()
.ok_or_else(|| "a2a server not running".to_string())?;
running.join_handle.abort();
Ok(json!({ "stopped": true }).to_string())
}
pub fn a2a_status() -> Result<String, String> {
let guard = A2A.lock().map_err(|e| format!("lock poisoned: {e}"))?;
Ok(match guard.as_ref() {
Some(r) => json!({
"running": true,
"bound": r.bound.to_string(),
"uptime_secs": r.started_at.elapsed().as_secs(),
})
.to_string(),
None => json!({ "running": false }).to_string(),
})
}
#[cfg(test)]
mod tests {
use super::*;
fn lifecycle_lock() -> &'static std::sync::Mutex<()> {
static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
LOCK.get_or_init(|| std::sync::Mutex::new(()))
}
#[tokio::test]
async fn start_stop_status_cycle() {
let _guard = lifecycle_lock().lock().unwrap();
let _ = stop_a2a();
let s = a2a_status().unwrap();
assert!(
s.contains("\"running\":false"),
"expected not-running, got: {s}"
);
let started = start_a2a(r#"{"bind":"127.0.0.1:0"}"#, None).await.unwrap();
assert!(
started.contains("\"bound\""),
"expected bound addr, got: {started}"
);
let s = a2a_status().unwrap();
assert!(s.contains("\"running\":true"), "expected running, got: {s}");
assert!(s.contains("\"bound\""));
assert!(s.contains("\"uptime_secs\""));
let dup = start_a2a(r#"{"bind":"127.0.0.1:0"}"#, None).await;
assert!(
dup.is_err(),
"second start should fail while one is running"
);
let stopped = stop_a2a().unwrap();
assert!(stopped.contains("\"stopped\":true"));
let dup = stop_a2a();
assert!(dup.is_err());
let s = a2a_status().unwrap();
assert!(s.contains("\"running\":false"));
}
#[tokio::test]
async fn start_rejects_malformed_params() {
let _guard = lifecycle_lock().lock().unwrap();
let r = start_a2a("not json", None).await;
assert!(r.is_err());
}
#[tokio::test]
async fn share_flag_without_session_runtime_errors() {
let _guard = lifecycle_lock().lock().unwrap();
let r = start_a2a(
r#"{"bind":"127.0.0.1:0","share_session_runtime":true}"#,
None,
)
.await;
assert!(r.is_err(), "expected error, got: {r:?}");
let msg = r.unwrap_err();
assert!(
msg.contains("share_session_runtime"),
"error should name the offending flag, got: {msg}"
);
}
#[tokio::test]
async fn share_flag_uses_supplied_runtime() {
let _guard = lifecycle_lock().lock().unwrap();
let _ = stop_a2a();
let rt = Arc::new(Runtime::new());
rt.register_tool_schema(car_ir::ToolSchema {
name: "demo.echo".into(),
description: "demo".into(),
parameters: serde_json::json!({}),
returns: None,
idempotent: true,
cache_ttl_secs: None,
rate_limit: None,
})
.await;
let started = start_a2a(
r#"{"bind":"127.0.0.1:0","share_session_runtime":true}"#,
Some(rt),
)
.await
.expect("start_a2a should succeed with shared runtime");
assert!(started.contains("\"bound\""));
let _ = stop_a2a();
}
}