use crate::session::{A2aRouteAuth, ServerState, WsChannel};
use car_proto::*;
use car_verify;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::{accept_async, tungstenite::Message};
use tracing::{info, instrument};
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct JsonRpcMessage {
#[serde(default)]
pub jsonrpc: String,
#[serde(default)]
pub method: Option<String>,
#[serde(default)]
pub params: Value,
#[serde(default)]
pub id: Value,
#[serde(default)]
pub result: Option<Value>,
#[serde(default)]
pub error: Option<Value>,
}
#[derive(Debug, Serialize)]
pub struct JsonRpcResponse {
pub jsonrpc: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
pub id: Value,
}
#[derive(Debug, Serialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
}
impl JsonRpcResponse {
pub fn success(id: Value, result: Value) -> Self {
Self {
jsonrpc: "2.0",
result: Some(result),
error: None,
id,
}
}
pub fn error(id: Value, code: i32, message: &str) -> Self {
Self {
jsonrpc: "2.0",
result: None,
error: Some(JsonRpcError {
code,
message: message.to_string(),
}),
id,
}
}
}
#[instrument(
name = "ws.connection",
skip_all,
fields(peer = %peer),
)]
pub async fn handle_connection(
stream: TcpStream,
peer: SocketAddr,
state: Arc<ServerState>,
) -> Result<(), Box<dyn std::error::Error>> {
let ws_stream = accept_async(stream).await?;
let (write, read) = ws_stream.split();
run_dispatch(read, Box::pin(write), peer.to_string(), state).await
}
#[cfg(unix)]
#[instrument(
name = "ws.connection",
skip_all,
fields(peer = %peer),
)]
pub async fn handle_connection_unix(
stream: tokio::net::UnixStream,
peer: String,
state: Arc<ServerState>,
) -> Result<(), Box<dyn std::error::Error>> {
let ws_stream = tokio_tungstenite::accept_async(stream).await?;
let (write, read) = ws_stream.split();
run_dispatch(read, Box::pin(write), peer, state).await
}
#[instrument(
name = "ws.dispatch",
skip_all,
fields(client_id = tracing::field::Empty, peer = %peer),
)]
pub async fn run_dispatch<R>(
mut read: R,
write: crate::session::WsSink,
peer: String,
state: Arc<ServerState>,
) -> Result<(), Box<dyn std::error::Error>>
where
R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
+ Unpin
+ Send,
{
let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
tracing::Span::current().record("client_id", &client_id.as_str());
info!("New connection from {}", peer);
let channel = Arc::new(WsChannel {
write: Mutex::new(write),
pending: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(1),
});
let session = state.create_session(&client_id, channel.clone()).await;
let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
while let Some(msg) = read.next().await {
while conn_tasks.try_join_next().is_some() {}
let msg = match msg {
Ok(m) => m,
Err(e) => {
info!("read error from {}: {}; closing", client_id, e);
break;
}
};
if msg.is_text() {
let text = match msg.to_text() {
Ok(t) => t,
Err(e) => {
info!("non-text frame from {}: {}; closing", client_id, e);
break;
}
};
let parsed: JsonRpcMessage = match serde_json::from_str(text) {
Ok(m) => m,
Err(e) => {
send_response(
&session.channel,
JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
)
.await
.ok();
continue;
}
};
if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
if let Some(id_str) = parsed.id.as_str() {
let mut pending = session.channel.pending.lock().await;
if let Some(tx) = pending.remove(id_str) {
let tool_resp = if let Some(result) = parsed.result {
ToolExecuteResponse {
action_id: id_str.to_string(),
output: Some(result),
error: None,
}
} else {
let err_msg = parsed
.error
.as_ref()
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("unknown error")
.to_string();
ToolExecuteResponse {
action_id: id_str.to_string(),
output: None,
error: Some(err_msg),
}
};
let _ = tx.send(tool_resp);
continue;
}
}
}
if try_forward_agent_chat_event(&parsed, &state).await {
continue;
}
if let Some(method) = &parsed.method {
info!(method = %method, "dispatching JSON-RPC method");
if state.auth_token.get().is_some()
&& !session
.authenticated
.load(std::sync::atomic::Ordering::Acquire)
&& method != "session.auth"
{
let resp = JsonRpcResponse::error(
parsed.id.clone(),
-32001,
"auth required: send `session.auth` with the per-launch token \
from ~/Library/Application Support/ai.parslee.car/auth-token \
(macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
as the first frame on this connection",
);
let _ = send_response(&session.channel, resp).await;
info!(client = %client_id, method = %method,
"rejecting non-auth method on unauthenticated session; closing");
break;
}
if state.approval_gate.requires_approval(method.as_str()) {
match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
Ok(()) => {}
Err(reason) => {
let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
let _ = send_response(&session.channel, resp).await;
info!(
client = %client_id,
method = %method,
reason = %reason,
"approval gate blocked dispatch"
);
continue;
}
}
}
let session_task = session.clone();
let state_task = state.clone();
let method_owned = method.clone();
let parsed_task = parsed;
conn_tasks.spawn(async move {
let session = session_task;
let state = state_task;
let parsed = parsed_task;
let result = match method_owned.as_str() {
"session.auth" => handle_session_auth(&parsed, &session, &state).await,
"parslee.auth" => handle_parslee_auth().await,
"auth.start" => handle_auth_start(&parsed).await,
"auth.complete" => handle_auth_complete(&parsed).await,
"auth.status" => handle_auth_status().await,
"auth.logout" => handle_auth_logout().await,
"session.init" => handle_session_init(&parsed, &session).await,
"host.subscribe" => handle_host_subscribe(&session, &state).await,
"host.agents" => handle_host_agents(&session).await,
"host.events" => handle_host_events(&parsed, &session).await,
"host.approvals" => handle_host_approvals(&session).await,
"host.register_agent" => {
handle_host_register_agent(&parsed, &session).await
}
"host.unregister_agent" => {
handle_host_unregister_agent(&parsed, &session).await
}
"host.set_status" => handle_host_set_status(&parsed, &session).await,
"host.notify" => handle_host_notify(&parsed, &session).await,
"host.request_approval" => {
handle_host_request_approval(&parsed, &session).await
}
"host.resolve_approval" => {
handle_host_resolve_approval(&parsed, &session).await
}
"tools.register" => handle_tools_register(&parsed, &session).await,
"proposal.submit" => handle_proposal_submit(&parsed, &session).await,
"policy.register" => handle_policy_register(&parsed, &session).await,
"session.policy.open" => handle_session_policy_open(&session).await,
"session.policy.close" => {
handle_session_policy_close(&parsed, &session).await
}
"verify" => handle_verify(&parsed, &session).await,
"state.get" => handle_state_get(&parsed, &session).await,
"state.set" => handle_state_set(&parsed, &session).await,
"state.exists" => handle_state_exists(&parsed, &session).await,
"state.keys" => handle_state_keys(&parsed, &session).await,
"state.snapshot" => handle_state_snapshot(&parsed, &session).await,
"memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
"memory.query" => handle_memory_query(&parsed, &session).await,
"memory.build_context" => {
handle_memory_build_context(&parsed, &session).await
}
"memory.build_context_fast" => {
handle_memory_build_context_fast(&parsed, &session).await
}
"memory.consolidate" => handle_memory_consolidate(&session).await,
"memory.fact_count" => handle_memory_fact_count(&session).await,
"memory.persist" => handle_memory_persist(&parsed, &session).await,
"memory.load" => handle_memory_load(&parsed, &session).await,
"skill.ingest" => handle_skill_ingest(&parsed, &session).await,
"skill.find" => handle_skill_find(&parsed, &session).await,
"skill.report" => handle_skill_report(&parsed, &session).await,
"skill.repair" => handle_skill_repair(&parsed, &session).await,
"skills.ingest_distilled" => {
handle_skills_ingest_distilled(&parsed, &session).await
}
"skills.evolve" => handle_skills_evolve(&parsed, &session).await,
"skills.domains_needing_evolution" => {
handle_skills_domains_needing_evolution(&parsed, &session).await
}
"multi.swarm" => handle_multi_swarm(&parsed, &session).await,
"multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
"multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
"multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
"multi.vote" => handle_multi_vote(&parsed, &session).await,
"scheduler.create" => handle_scheduler_create(&parsed),
"scheduler.run" => handle_scheduler_run(&parsed, &session).await,
"scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
"infer" => handle_infer(&parsed, &state, &session).await,
"image.generate" => handle_image_generate(&parsed, &state).await,
"video.generate" => handle_video_generate(&parsed, &state).await,
"embed" => handle_embed(&parsed, &state).await,
"classify" => handle_classify(&parsed, &state).await,
"tokenize" => handle_tokenize(&parsed, &state).await,
"detokenize" => handle_detokenize(&parsed, &state).await,
"rerank" => handle_rerank(&parsed, &state).await,
"transcribe" => handle_transcribe(&parsed, &state).await,
"synthesize" => handle_synthesize(&parsed, &state).await,
"infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
"speech.prepare" => handle_speech_prepare(&state).await,
"models.route" => handle_models_route(&parsed, &state).await,
"models.stats" => handle_models_stats(&state).await,
"outcomes.resolve_pending" => {
handle_outcomes_resolve_pending(&parsed, &state).await
}
"events.count" => handle_events_count(&session).await,
"events.stats" => handle_events_stats(&session).await,
"events.truncate" => handle_events_truncate(&parsed, &session).await,
"events.clear" => handle_events_clear(&session).await,
"replan.set_config" => handle_replan_set_config(&parsed, &session).await,
"models.list" => handle_models_list(&state),
"models.register" => handle_models_register(&parsed, &state).await,
"models.unregister" => handle_models_unregister(&parsed, &state).await,
"models.list_unified" => handle_models_list_unified(&state),
"models.search" => handle_models_search(&parsed, &state),
"models.upgrades" => handle_models_upgrades(&state),
"models.pull" => handle_models_pull(&parsed, &state).await,
"models.install" => handle_models_pull(&parsed, &state).await,
"skills.distill" => handle_skills_distill(&parsed, &state).await,
"skills.list" => handle_skills_list(&parsed, &session).await,
"browser.run" => handle_browser_run(&parsed, &session).await,
"browser.close" => handle_browser_close(&session).await,
"secret.put" => handle_secret_put(&parsed),
"secret.get" => handle_secret_get(&parsed),
"secret.delete" => handle_secret_delete(&parsed),
"secret.status" => handle_secret_status(&parsed),
"secret.available" => Ok(car_ffi_common::secrets::is_available()),
"permissions.status" => handle_perm_status(&parsed),
"permissions.request" => handle_perm_request(&parsed),
"permissions.explain" => handle_perm_explain(&parsed),
"permissions.domains" => Ok(car_ffi_common::permissions::domains()),
"accounts.list" => car_ffi_common::accounts::list(),
"accounts.open" => {
#[derive(serde::Deserialize, Default)]
struct OpenParams {
#[serde(default)]
account_id: Option<String>,
}
let p: OpenParams =
serde_json::from_value(parsed.params.clone()).unwrap_or_default();
car_ffi_common::accounts::open_settings(p.account_id.as_deref())
}
"calendar.list" => car_ffi_common::integrations::calendar_list(),
"calendar.events" => handle_calendar_events(&parsed),
"contacts.containers" => {
car_ffi_common::integrations::contacts_containers()
}
"contacts.find" => handle_contacts_find(&parsed),
"mail.accounts" => car_ffi_common::integrations::mail_accounts(),
"mail.inbox" => handle_mail_inbox(&parsed),
"mail.send" => handle_mail_send(&parsed),
"messages.services" => car_ffi_common::integrations::messages_services(),
"messages.chats" => handle_messages_chats(&parsed),
"messages.send" => handle_messages_send(&parsed),
"notes.accounts" => car_ffi_common::integrations::notes_accounts(),
"notes.find" => handle_notes_find(&parsed),
"reminders.lists" => car_ffi_common::integrations::reminders_lists(),
"reminders.items" => handle_reminders_items(&parsed),
"photos.albums" => car_ffi_common::integrations::photos_albums(),
"bookmarks.list" => handle_bookmarks_list(&parsed),
"files.locations" => car_ffi_common::integrations::files_locations(),
"keychain.status" => car_ffi_common::integrations::keychain_status(),
"health.status" => car_ffi_common::health::status(),
"health.sleep" => handle_health_sleep(&parsed),
"health.workouts" => handle_health_workouts(&parsed),
"health.activity" => handle_health_activity(&parsed),
"voice.transcribe_stream.start" => {
handle_voice_transcribe_stream_start(&parsed, &state, &session).await
}
"voice.transcribe_stream.stop" => {
handle_voice_transcribe_stream_stop(&parsed, &state).await
}
"voice.transcribe_stream.push" => {
handle_voice_transcribe_stream_push(&parsed, &state).await
}
"voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
"voice.dispatch_turn" => {
handle_voice_dispatch_turn(&parsed, &state, &session).await
}
"voice.cancel_turn" => handle_voice_cancel_turn().await,
"voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
"inference.register_runner" => {
handle_inference_register_runner(&session).await
}
"inference.runner.event" => handle_inference_runner_event(&parsed).await,
"inference.runner.complete" => {
handle_inference_runner_complete(&parsed).await
}
"inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
"voice.providers.list" => {
serde_json::from_str::<serde_json::Value>(
&car_voice::list_voice_providers_json(),
)
.map_err(|e| e.to_string())
}
"voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
.await
.and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
"voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
.await
.and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
"voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
"voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
.and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
"voice.remove_enrollment" => handle_remove_enrollment(&parsed),
"workflow.run" => handle_workflow_run(&parsed, &session).await,
"workflow.verify" => handle_workflow_verify(&parsed),
"meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
"meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
"meeting.list" => handle_meeting_list(&parsed),
"meeting.get" => handle_meeting_get(&parsed),
"registry.register" => handle_registry_register(&parsed),
"registry.heartbeat" => handle_registry_heartbeat(&parsed),
"registry.unregister" => handle_registry_unregister(&parsed),
"registry.list" => handle_registry_list(&parsed),
"registry.reap" => handle_registry_reap(&parsed),
"admission.status" => handle_admission_status(&state),
"a2a.start" => handle_a2a_start(&parsed, &session).await,
"a2a.stop" => handle_a2a_stop(),
"a2a.status" => handle_a2a_status(),
"a2a.send" => handle_a2a_send(&parsed, &state).await,
"a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
"a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
"a2ui.capabilities" => handle_a2ui_capabilities(&state),
"a2ui.reap" => handle_a2ui_reap(&state).await,
"a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
"a2ui.get" => handle_a2ui_get(&parsed, &state).await,
"a2ui.action" => handle_a2ui_action(&parsed, &state).await,
"a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
"a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
"a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
"a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
"automation.run_applescript" => handle_run_applescript(&parsed).await,
"automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
"automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
"notifications.local" => handle_local_notification(&parsed).await,
"vision.ocr" => handle_vision_ocr(&parsed).await,
"agents.list" => handle_agents_list(&state).await,
"agents.health" => handle_agents_health(&state).await,
"agents.upsert" => handle_agents_upsert(&parsed, &state).await,
"agents.install" => handle_agents_install(&parsed, &state).await,
"agents.remove" => handle_agents_remove(&parsed, &state).await,
"agents.start" => handle_agents_start(&parsed, &state).await,
"agents.stop" => handle_agents_stop(&parsed, &state).await,
"agents.restart" => handle_agents_restart(&parsed, &state).await,
"agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
"agents.list_external" => handle_agents_list_external(&parsed).await,
"agents.detect_external" => handle_agents_detect_external(&parsed).await,
"agents.health_external" => handle_agents_health_external(&parsed).await,
"agents.invoke_external" => {
handle_agents_invoke_external(&parsed, &state, &session).await
}
"agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
"agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
"message/send"
| "SendMessage"
| "message/stream"
| "SendStreamingMessage"
| "tasks/get"
| "GetTask"
| "tasks/list"
| "ListTasks"
| "tasks/cancel"
| "CancelTask"
| "tasks/resubscribe"
| "SubscribeToTask"
| "tasks/pushNotificationConfig/set"
| "CreateTaskPushNotificationConfig"
| "tasks/pushNotificationConfig/get"
| "GetTaskPushNotificationConfig"
| "tasks/pushNotificationConfig/list"
| "ListTaskPushNotificationConfigs"
| "tasks/pushNotificationConfig/delete"
| "DeleteTaskPushNotificationConfig"
| "agent/getAuthenticatedExtendedCard"
| "GetExtendedAgentCard" => {
handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
}
_ => Err(format!("unknown method: {}", method_owned)),
};
let resp = match result {
Ok(value) => JsonRpcResponse::success(parsed.id, value),
Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
};
let _ = send_response(&session.channel, resp).await;
});
}
} else if msg.is_close() {
info!("Client {} disconnected", client_id);
break;
}
}
conn_tasks.abort_all();
session.host.unsubscribe(&client_id).await;
session.host.reap_session_approvals(&client_id).await;
state.a2ui_subscribers.lock().await.remove(&client_id);
let _removed = state.remove_session(&client_id).await;
{
let mut pending = session.channel.pending.lock().await;
pending.clear();
}
Ok(())
}
async fn send_response(
channel: &WsChannel,
resp: JsonRpcResponse,
) -> Result<(), Box<dyn std::error::Error>> {
use futures::SinkExt;
let json = serde_json::to_string(&resp)?;
channel
.write
.lock()
.await
.send(Message::Text(json.into()))
.await?;
Ok(())
}
async fn handle_host_subscribe(
session: &crate::session::ClientSession,
state: &Arc<ServerState>,
) -> Result<Value, String> {
session
.host
.subscribe(&session.client_id, session.channel.clone())
.await;
serde_json::to_value(HostSnapshot {
subscribed: true,
agents: session.host.agents().await,
approvals: session.host.approvals().await,
events: session.host.events(50).await,
identity: Some(daemon_identity(state)),
})
.map_err(|e| e.to_string())
}
fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
(
Some(p.to_string_lossy().into_owned()),
car_proto::HostManifestRole::Observer,
)
} else if let Some(s) = state.supervisor_if_installed() {
(
Some(s.manifest_path().to_string_lossy().into_owned()),
car_proto::HostManifestRole::Owner,
)
} else {
(None, car_proto::HostManifestRole::None)
};
car_proto::HostIdentity {
version: env!("CARGO_PKG_VERSION").to_string(),
pid: std::process::id(),
manifest_path,
manifest_role,
parslee: state
.parslee_session
.get()
.map(|session| session.identity.clone()),
}
}
async fn handle_parslee_auth() -> Result<Value, String> {
let session = crate::parslee_auth::load_or_refresh()
.await?
.ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
Ok(serde_json::json!({
"authenticated": true,
"token_type": "Bearer",
"access_token": session.access_token,
"authorization_header": format!("Bearer {}", session.access_token),
"identity": session.identity,
}))
}
async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
let client_id = req
.params
.get("client_id")
.and_then(|v| v.as_str())
.unwrap_or("parslee-car");
let redirect_uri = req
.params
.get("redirect_uri")
.and_then(|v| v.as_str())
.ok_or_else(|| "redirect_uri is required".to_string())?;
let provider = req.params.get("provider").and_then(|v| v.as_str());
let state = car_auth::new_state();
let verifier = car_auth::pkce_verifier();
let challenge = car_auth::pkce_challenge(&verifier);
let url =
car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
Ok(serde_json::json!({
"authorize_url": url,
"state": state,
"verifier": verifier,
}))
}
async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
let client_id = req
.params
.get("client_id")
.and_then(|v| v.as_str())
.unwrap_or("parslee-car");
let redirect_uri = req
.params
.get("redirect_uri")
.and_then(|v| v.as_str())
.ok_or_else(|| "redirect_uri is required".to_string())?;
let code = req
.params
.get("code")
.and_then(|v| v.as_str())
.ok_or_else(|| "code is required".to_string())?;
let verifier = req
.params
.get("verifier")
.and_then(|v| v.as_str())
.ok_or_else(|| "verifier is required".to_string())?;
let token =
car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
car_auth::store_tokens(&api_base, &token)?;
Ok(serde_json::json!({ "ok": true }))
}
async fn handle_auth_status() -> Result<Value, String> {
match car_auth::fetch_status(None).await? {
Some(session_json) => {
let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
Ok(serde_json::json!({ "authenticated": true, "session": session }))
}
None => Ok(serde_json::json!({ "authenticated": false })),
}
}
async fn handle_auth_logout() -> Result<Value, String> {
car_auth::clear_tokens()?;
Ok(serde_json::json!({ "ok": true }))
}
async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
}
async fn handle_host_events(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let limit = req
.params
.get("limit")
.and_then(|v| v.as_u64())
.unwrap_or(100) as usize;
serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
}
async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
}
async fn handle_a2ui_apply(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
#[derive(Deserialize)]
struct Params {
#[serde(default)]
envelope: Option<car_a2ui::A2uiEnvelope>,
#[serde(default)]
message: Option<car_a2ui::A2uiEnvelope>,
}
let envelope = if req.params.get("createSurface").is_some()
|| req.params.get("updateComponents").is_some()
|| req.params.get("updateDataModel").is_some()
|| req.params.get("deleteSurface").is_some()
{
serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
.map_err(|e| e.to_string())?
} else {
match serde_json::from_value::<Params>(req.params.clone()) {
Ok(params) => params
.envelope
.or(params.message)
.ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
.map_err(|e| e.to_string())?,
}
};
apply_a2ui_envelope(state, envelope, None, None).await
}
async fn handle_a2ui_ingest(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Params {
#[serde(default)]
endpoint: Option<String>,
#[serde(default)]
a2a_endpoint: Option<String>,
#[serde(default)]
owner: Option<car_a2ui::A2uiSurfaceOwner>,
#[serde(default)]
route_auth: Option<A2aRouteAuth>,
#[serde(default)]
allow_untrusted_endpoint: bool,
}
let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
endpoint: None,
a2a_endpoint: None,
owner: None,
route_auth: None,
allow_untrusted_endpoint: false,
});
let payload = req.params.get("payload").unwrap_or(&req.params);
state
.a2ui
.validate_payload(payload)
.map_err(|e| e.to_string())?;
let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
if envelopes.is_empty() {
return Err("no A2UI envelopes found in payload".into());
}
let endpoint = params.endpoint.or(params.a2a_endpoint);
let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
let owner = params
.owner
.or_else(|| car_a2ui::owner_from_value(payload))
.map(|owner| match endpoint.clone() {
Some(endpoint) => owner.with_endpoint(Some(endpoint)),
None => owner,
});
let mut results = Vec::new();
for envelope in envelopes {
let value =
apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
results.push(value);
}
Ok(serde_json::json!({ "applied": results }))
}
async fn apply_a2ui_envelope(
state: &Arc<ServerState>,
envelope: car_a2ui::A2uiEnvelope,
owner: Option<car_a2ui::A2uiSurfaceOwner>,
route_auth: Option<A2aRouteAuth>,
) -> Result<Value, String> {
let result = state
.a2ui
.apply_with_owner(envelope, owner)
.await
.map_err(|e| e.to_string())?;
update_a2ui_route_auth(state, &result, route_auth).await;
let kind = if result.deleted {
"a2ui.surface_deleted"
} else {
"a2ui.surface_updated"
};
let message = if result.deleted {
format!("A2UI surface {} deleted", result.surface_id)
} else {
format!("A2UI surface {} updated", result.surface_id)
};
let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
state
.host
.record_event(kind, None, message, payload.clone())
.await;
broadcast_a2ui_event(state, kind, &payload).await;
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let subscribers: Vec<Arc<crate::session::WsChannel>> = state
.a2ui_subscribers
.lock()
.await
.values()
.cloned()
.collect();
if subscribers.is_empty() {
return;
}
let Ok(json) = serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"method": "a2ui.event",
"params": {
"kind": kind,
"result": result,
},
})) else {
return;
};
for channel in subscribers {
let _ = channel
.write
.lock()
.await
.send(Message::Text(json.clone().into()))
.await;
}
}
async fn update_a2ui_route_auth(
state: &Arc<ServerState>,
result: &car_a2ui::A2uiApplyResult,
route_auth: Option<A2aRouteAuth>,
) {
let mut auth = state.a2ui_route_auth.lock().await;
if result.deleted {
auth.remove(&result.surface_id);
return;
}
let has_route_endpoint = result
.surface
.as_ref()
.and_then(|surface| surface.owner.as_ref())
.and_then(|owner| owner.endpoint.as_ref())
.is_some();
match (has_route_endpoint, route_auth) {
(true, Some(route_auth)) => {
auth.insert(result.surface_id.clone(), route_auth);
}
_ => {
auth.remove(&result.surface_id);
}
}
}
fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
}
async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
if !removed.is_empty() {
let mut auth = state.a2ui_route_auth.lock().await;
for surface_id in &removed {
auth.remove(surface_id);
}
}
Ok(serde_json::json!({ "removed": removed }))
}
async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
}
async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
let surface_id = req
.params
.get("surface_id")
.or_else(|| req.params.get("surfaceId"))
.and_then(Value::as_str)
.ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
}
async fn handle_a2ui_subscribe(
session: &crate::session::ClientSession,
state: &Arc<ServerState>,
) -> Result<Value, String> {
state
.a2ui_subscribers
.lock()
.await
.insert(session.client_id.clone(), session.channel.clone());
Ok(serde_json::json!({ "subscribed": true }))
}
async fn handle_a2ui_unsubscribe(
session: &crate::session::ClientSession,
state: &Arc<ServerState>,
) -> Result<Value, String> {
state
.a2ui_subscribers
.lock()
.await
.remove(&session.client_id);
Ok(serde_json::json!({ "subscribed": false }))
}
async fn handle_a2ui_replay(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let surface_id = req
.params
.get("surface_id")
.or_else(|| req.params.get("surfaceId"))
.and_then(Value::as_str)
.ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
}
async fn handle_a2ui_action(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let action: car_a2ui::ClientAction =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let owner = state.a2ui.owner(&action.surface_id).await;
let route = route_a2ui_action(state, &action, owner.clone()).await;
let payload = serde_json::json!({
"action": action,
"owner": owner,
"route": route,
});
let event = state
.host
.record_event(
"a2ui.action",
None,
format!(
"A2UI action {} from {}",
action.name, action.source_component_id
),
payload,
)
.await;
Ok(serde_json::json!({
"event": event,
"route": route,
}))
}
async fn handle_a2ui_render_report(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let report: car_a2ui::RenderReport =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
let kind = "a2ui.render_report";
let message = format!("A2UI render report for surface {}", report.surface_id);
let event = state
.host
.record_event(kind, None, message, payload.clone())
.await;
broadcast_a2ui_event(state, kind, &payload).await;
if let Some(surface) = state.a2ui.get(&report.surface_id).await {
if !state.ui_agent_budget.try_consume(&report.surface_id) {
tracing::warn!(
surface_id = %report.surface_id,
count = state.ui_agent_budget.count(&report.surface_id),
max = state.ui_agent_budget.max(),
"ui-agent iteration budget exhausted; skipping agent invocation"
);
return Ok(serde_json::json!({ "event": event }));
}
match state.ui_agent.on_render_report(&report, &surface) {
car_ui_agent::Decision::Patch {
envelope,
strategy_id,
patch_hash,
elapsed_ns,
} => {
if !state
.ui_agent_oscillation
.check_and_record(&report.surface_id, patch_hash)
{
tracing::warn!(
surface_id = %report.surface_id,
strategy = %strategy_id,
patch_hash,
"ui-agent oscillation detected; suppressing patch"
);
state.ui_agent_budget.refund(&report.surface_id);
return Ok(serde_json::json!({ "event": event }));
}
let a2ui_envelope = car_a2ui::A2uiEnvelope {
patch_components: Some(envelope),
..Default::default()
};
if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
tracing::warn!(
surface_id = %report.surface_id,
strategy = %strategy_id,
patch_hash,
elapsed_ns,
error = %e,
"ui-agent patch apply failed",
);
state.ui_agent_budget.refund(&report.surface_id);
} else {
tracing::debug!(
surface_id = %report.surface_id,
strategy = %strategy_id,
patch_hash,
elapsed_ns,
iteration = state.ui_agent_budget.count(&report.surface_id),
"ui-agent patch applied",
);
if let Some(memgine) = state.shared_memgine.clone() {
let speaker = format!("ui-agent/{}", report.surface_id);
let text = format!("strategy applied: {}", strategy_id);
tokio::spawn(async move {
let mut guard = memgine.lock().await;
guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
});
}
}
}
car_ui_agent::Decision::StableNoChange => {
state.ui_agent_budget.refund(&report.surface_id);
}
car_ui_agent::Decision::HardStop { reason } => {
state.ui_agent_budget.refund(&report.surface_id);
tracing::error!(
surface_id = %report.surface_id,
reason = %reason,
"ui-agent hard-stopped improvement loop",
);
}
}
} else {
tracing::debug!(
surface_id = %report.surface_id,
"ui-agent skipped — surface not found in store",
);
}
Ok(serde_json::json!({ "event": event }))
}
async fn route_a2ui_action(
state: &Arc<ServerState>,
action: &car_a2ui::ClientAction,
owner: Option<car_a2ui::A2uiSurfaceOwner>,
) -> Value {
let Some(owner) = owner else {
return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
};
if owner.kind != "a2a" {
return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
}
let Some(endpoint) = owner.endpoint.clone() else {
return serde_json::json!({
"delivered": false,
"reason": "surface owner has no endpoint",
"owner": owner
});
};
let message = car_a2a::Message {
message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
role: car_a2a::MessageRole::User,
parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
data: serde_json::json!({
"a2uiAction": action,
}),
metadata: Default::default(),
})],
task_id: owner.task_id.clone(),
context_id: owner.context_id.clone(),
metadata: Default::default(),
};
let auth = state
.a2ui_route_auth
.lock()
.await
.get(&action.surface_id)
.cloned()
.map(client_auth_from_route_auth)
.unwrap_or(car_a2a::ClientAuth::None);
match car_a2a::A2aClient::new(endpoint.clone())
.with_auth(auth)
.send_message(message, false)
.await
{
Ok(result) => serde_json::json!({
"delivered": true,
"owner": owner,
"endpoint": endpoint,
"result": result,
}),
Err(error) => serde_json::json!({
"delivered": false,
"owner": owner,
"endpoint": endpoint,
"error": error.to_string(),
}),
}
}
fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
match auth {
A2aRouteAuth::None => car_a2a::ClientAuth::None,
A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
}
}
fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
let endpoint = endpoint?;
if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
Some(endpoint)
} else {
None
}
}
fn is_loopback_http_endpoint(endpoint: &str) -> bool {
endpoint == "http://localhost"
|| endpoint.starts_with("http://localhost:")
|| endpoint.starts_with("http://localhost/")
|| endpoint == "http://127.0.0.1"
|| endpoint.starts_with("http://127.0.0.1:")
|| endpoint.starts_with("http://127.0.0.1/")
|| endpoint == "http://[::1]"
|| endpoint.starts_with("http://[::1]:")
|| endpoint.starts_with("http://[::1]/")
}
async fn handle_host_register_agent(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let request: RegisterHostAgentRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
serde_json::to_value(
session
.host
.register_agent(&session.client_id, request)
.await?,
)
.map_err(|e| e.to_string())
}
async fn handle_host_unregister_agent(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let agent_id = req
.params
.get("agent_id")
.and_then(|v| v.as_str())
.ok_or("missing agent_id")?;
session
.host
.unregister_agent(&session.client_id, agent_id)
.await?;
Ok(serde_json::json!({"ok": true}))
}
async fn handle_host_set_status(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let request: SetHostAgentStatusRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
.map_err(|e| e.to_string())
}
async fn handle_host_notify(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let kind = req
.params
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("host.notification");
let agent_id = req
.params
.get("agent_id")
.and_then(|v| v.as_str())
.map(str::to_string);
let message = req
.params
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("");
let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
serde_json::to_value(
session
.host
.record_event(kind, agent_id, message, payload)
.await,
)
.map_err(|e| e.to_string())
}
async fn handle_host_request_approval(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let request: CreateHostApprovalRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
if let Some(agent_id) = &request.agent_id {
let _ = session
.host
.set_status(
&session.client_id,
SetHostAgentStatusRequest {
agent_id: agent_id.clone(),
status: HostAgentStatus::WaitingForApproval,
current_task: None,
message: Some("Waiting for approval".to_string()),
payload: Value::Null,
},
)
.await;
}
let owner_client_id = if request.system_level {
None
} else {
Some(session.client_id.as_str())
};
serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
.map_err(|e| e.to_string())
}
async fn handle_host_resolve_approval(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let request: ResolveHostApprovalRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
serde_json::to_value(
session
.host
.resolve_approval(&session.client_id, request)
.await?,
)
.map_err(|e| e.to_string())
}
async fn handle_session_auth(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let supplied = req
.params
.get("token")
.and_then(Value::as_str)
.ok_or_else(|| "session.auth requires { token: string }".to_string())?;
let agent_id = req
.params
.get("agent_id")
.and_then(Value::as_str)
.map(str::to_string);
if let Some(id) = agent_id {
let supervisor = state.supervisor()?;
if !supervisor.validate_agent_token(&id, supplied).await {
return Err(format!(
"auth failed: agent_id `{id}` is not supervised, or token mismatch"
));
}
{
let mut attached = state.attached_agents.lock().await;
if let Some(prior) = attached.get(&id) {
if prior != &session.client_id {
return Err(format!(
"auth failed: agent_id `{id}` is already attached on \
another connection (client_id={prior})"
));
}
}
attached.insert(id.clone(), session.client_id.clone());
}
let agent_eng = get_or_load_agent_memgine(state, &id).await?;
*session.bound_memgine.lock().await = Some(agent_eng);
*session.agent_id.lock().await = Some(id.clone());
session
.authenticated
.store(true, std::sync::atomic::Ordering::Release);
return Ok(serde_json::json!({
"ok": true,
"auth_enabled": true,
"agent_id": id,
}));
}
let expected = match state.auth_token.get() {
Some(t) => t,
None => {
session
.authenticated
.store(true, std::sync::atomic::Ordering::Release);
return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
}
};
if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
return Err("auth failed: token mismatch".to_string());
}
session
.authenticated
.store(true, std::sync::atomic::Ordering::Release);
Ok(serde_json::json!({
"ok": true,
"auth_enabled": true,
"parslee": state.parslee_session.get().map(|session| session.identity.clone()),
}))
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff: u8 = 0;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
async fn gate_high_risk_method(
method: &str,
params: &Value,
state: &Arc<ServerState>,
) -> Result<(), String> {
let timeout = state.approval_gate.timeout;
let req = CreateHostApprovalRequest {
agent_id: None,
action: format!("ws.method:{method}"),
details: serde_json::json!({
"method": method,
"params_preview": preview_params(params, 2_000),
}),
options: vec!["approve".to_string(), "deny".to_string()],
system_level: true,
};
match state
.host
.request_and_wait_approval(req, "approve", timeout)
.await
{
Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
"{method} denied by user (approval gate, audit 2026-05). \
To call this method without an interactive prompt, start \
car-server with --no-approvals on a trusted machine."
)),
Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
"{method} approval timed out after {}s with no resolution. \
The approval is still visible in `host.approvals` for \
forensics; resubmit the request to retry.",
timeout.as_secs()
)),
Err(e) => Err(format!("approval gate error: {e}")),
}
}
fn preview_params(value: &Value, max_chars: usize) -> Value {
let s = value.to_string();
if s.len() <= max_chars {
value.clone()
} else {
Value::String(format!("{}… (truncated)", &s[..max_chars]))
}
}
async fn handle_session_init(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let init: SessionInitRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
for tool in &init.tools {
register_from_definition(&session.runtime, tool).await;
}
let mut policy_count = 0;
{
let mut policies = session.runtime.policies.write().await;
for policy_def in &init.policies {
if let Some(check) = build_policy_check(policy_def) {
policies.register(&policy_def.name, check, "");
policy_count += 1;
}
}
}
serde_json::to_value(SessionInitResponse {
session_id: session.client_id.clone(),
tools_registered: init.tools.len(),
policies_registered: policy_count,
})
.map_err(|e| e.to_string())
}
fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
match def.rule.as_str() {
"deny_tool" => {
let target = def.target.clone();
Some(Box::new(
move |action: &car_ir::Action, _: &car_state::StateStore| {
if action.tool.as_deref() == Some(&target) {
Some(format!("tool '{}' denied", target))
} else {
None
}
},
))
}
"require_state" => {
let key = def.key.clone();
let value = def.value.clone();
Some(Box::new(
move |_: &car_ir::Action, state: &car_state::StateStore| {
if state.get(&key).as_ref() != Some(&value) {
Some(format!("state['{}'] must be {:?}", key, value))
} else {
None
}
},
))
}
"deny_tool_param" => {
let target = def.target.clone();
let param = def.key.clone();
let pattern = def.pattern.clone();
Some(Box::new(
move |action: &car_ir::Action, _: &car_state::StateStore| {
if action.tool.as_deref() != Some(&target) {
return None;
}
if let Some(val) = action.parameters.get(¶m) {
let s = val.as_str().unwrap_or(&val.to_string()).to_string();
if s.contains(&pattern) {
return Some(format!("param '{}' matches '{}'", param, pattern));
}
}
None
},
))
}
_ => None,
}
}
async fn handle_tools_register(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let tools: Vec<ToolDefinition> =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
for tool in &tools {
register_from_definition(&session.runtime, tool).await;
}
Ok(Value::from(tools.len()))
}
async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
runtime
.register_tool_schema(car_ir::ToolSchema {
name: def.name.clone(),
description: def.description.clone(),
parameters: def.parameters.clone(),
returns: def.returns.clone(),
idempotent: def.idempotent,
cache_ttl_secs: def.cache_ttl_secs,
rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
max_calls: rl.max_calls,
interval_secs: rl.interval_secs,
}),
})
.await;
}
async fn handle_proposal_submit(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let submit: ProposalSubmitRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let session_id = req
.params
.get("session_id")
.and_then(|v| v.as_str())
.map(str::to_string);
let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
Some(v) if !v.is_null() => {
Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
}
_ => None,
};
let result = match (session_id, scope) {
(Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
(Some(sid), None) => {
session
.runtime
.execute_with_session(&submit.proposal, &sid)
.await
}
(None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
(None, None) => session.runtime.execute(&submit.proposal).await,
};
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn handle_session_policy_open(
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let id = session.runtime.open_session().await;
Ok(serde_json::json!({ "session_id": id }))
}
async fn handle_session_policy_close(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let sid = req
.params
.get("session_id")
.and_then(|v| v.as_str())
.ok_or("missing 'session_id'")?;
let closed = session.runtime.close_session(sid).await;
Ok(serde_json::json!({ "closed": closed }))
}
async fn handle_policy_register(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let def: PolicyDefinition = serde_json::from_value(req.params.clone())
.map_err(|e| format!("invalid policy params: {e}"))?;
let session_id = req
.params
.get("session_id")
.and_then(|v| v.as_str())
.map(str::to_string);
let check = build_policy_check(&def)
.ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
match session_id {
Some(sid) => session
.runtime
.register_policy_in_session(&sid, &def.name, check, "")
.await
.map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
None => {
let mut policies = session.runtime.policies.write().await;
policies.register(&def.name, check, "");
Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
}
}
}
async fn handle_verify(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let vr: VerifyRequest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let tools: std::collections::HashSet<String> =
session.runtime.tools.read().await.keys().cloned().collect();
let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
serde_json::to_value(VerifyResponse {
valid: result.valid,
issues: result
.issues
.iter()
.map(|i| VerifyIssueProto {
action_id: i.action_id.clone(),
severity: i.severity.clone(),
message: i.message.clone(),
})
.collect(),
simulated_state: result.simulated_state,
})
.map_err(|e| e.to_string())
}
fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
req.params
.get("tenant_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(str::to_string)
}
async fn handle_state_get(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let key = req
.params
.get("key")
.and_then(|v| v.as_str())
.ok_or("missing 'key'")?;
let tenant = tenant_from_params(req);
Ok(session
.runtime
.state
.scoped(tenant.as_deref())
.get(key)
.unwrap_or(Value::Null))
}
async fn handle_state_set(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let key = req
.params
.get("key")
.and_then(|v| v.as_str())
.ok_or("missing 'key'")?;
let value = req.params.get("value").cloned().unwrap_or(Value::Null);
let tenant = tenant_from_params(req);
session
.runtime
.state
.scoped(tenant.as_deref())
.set(key, value, "client");
Ok(Value::from("ok"))
}
async fn handle_state_exists(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let key = req
.params
.get("key")
.and_then(|v| v.as_str())
.ok_or("missing 'key'")?;
let tenant = tenant_from_params(req);
Ok(Value::Bool(
session.runtime.state.scoped(tenant.as_deref()).exists(key),
))
}
async fn handle_state_keys(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let tenant = tenant_from_params(req);
Ok(Value::Array(
session
.runtime
.state
.scoped(tenant.as_deref())
.keys()
.into_iter()
.map(Value::String)
.collect(),
))
}
async fn handle_state_snapshot(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let tenant = tenant_from_params(req);
let view = session.runtime.state.scoped(tenant.as_deref());
let mut map = serde_json::Map::new();
for key in view.keys() {
if let Some(value) = view.get(&key) {
map.insert(key, value);
}
}
Ok(Value::Object(map))
}
fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
let base = car_ffi_common::memory_path::ensure_base()
.map_err(|e| format!("memory base unavailable: {e}"))?;
let dir = base.join("agents");
std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
Ok(dir.join(format!("{agent_id}.json")))
}
async fn get_or_load_agent_memgine(
state: &Arc<ServerState>,
agent_id: &str,
) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
{
let map = state.agent_memgines.lock().await;
if let Some(eng) = map.get(agent_id) {
return Ok(eng.clone());
}
}
let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
None,
)));
let path = agent_memgine_snapshot_path(agent_id)?;
if path.exists() {
let content = std::fs::read_to_string(&path)
.map_err(|e| format!("read {}: {}", path.display(), e))?;
let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
let mut g = engine.lock().await;
let mut loaded: u32 = 0;
for fact in &facts {
let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
let kind = fact
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("pattern");
let fid = format!("loaded-{loaded}");
g.ingest_fact(
&fid,
subject,
body,
"user",
"peer",
chrono::Utc::now(),
"global",
None,
vec![],
kind == "constraint",
);
loaded += 1;
}
}
let mut map = state.agent_memgines.lock().await;
let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
Ok(stored)
}
async fn persist_agent_memgine(
agent_id: &str,
engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
) -> Result<(), String> {
let path = agent_memgine_snapshot_path(agent_id)?;
let g = engine.lock().await;
let facts: Vec<Value> = g
.graph
.inner
.node_indices()
.filter_map(|nix| {
let node = g.graph.inner.node_weight(nix)?;
if !node.is_valid() {
return None;
}
if node.kind == car_memgine::MemKind::Identity
|| node.kind == car_memgine::MemKind::Environment
{
return None;
}
Some(serde_json::json!({
"subject": node.key,
"body": node.value,
"kind": match node.kind {
car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
car_memgine::MemKind::Conversation => "outcome",
_ => "pattern",
},
"confidence": 0.5,
"content_type": node.content_type.as_label(),
}))
})
.collect();
let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
Ok(())
}
async fn handle_memory_fact_count(
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let engine_arc = session.effective_memgine().await;
let engine = engine_arc.lock().await;
Ok(Value::from(engine.valid_fact_count()))
}
async fn handle_memory_add_fact(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let subject = req
.params
.get("subject")
.and_then(|v| v.as_str())
.ok_or("missing subject")?;
let body = req
.params
.get("body")
.and_then(|v| v.as_str())
.ok_or("missing body")?;
let kind = req
.params
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("pattern");
let engine_arc = session.effective_memgine().await;
let count = {
let mut engine = engine_arc.lock().await;
let fid = format!("ws-{}", engine.valid_fact_count());
engine.ingest_fact(
&fid,
subject,
body,
"user",
"peer",
chrono::Utc::now(),
"global",
None,
vec![],
kind == "constraint",
);
engine.valid_fact_count()
};
if let Some(id) = session.agent_id.lock().await.clone() {
if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
tracing::warn!(agent_id = %id, error = %e,
"agent memgine persist failed; in-memory state is canonical");
}
}
Ok(Value::from(count))
}
async fn handle_memory_query(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let query = req
.params
.get("query")
.and_then(|v| v.as_str())
.ok_or("missing query")?;
let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
let engine_arc = session.effective_memgine().await;
let engine = engine_arc.lock().await;
let seeds = engine.graph.find_seeds(query, 5);
let hits = if !seeds.is_empty() {
engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
} else {
vec![]
};
let results: Vec<Value> = hits
.iter()
.filter_map(|hit| {
let node = engine.graph.inner.node_weight(hit.node_ix)?;
Some(serde_json::json!({
"subject": node.key,
"body": node.value,
"kind": format!("{:?}", node.kind).to_lowercase(),
"confidence": hit.activation,
}))
})
.collect();
serde_json::to_value(results).map_err(|e| e.to_string())
}
async fn handle_memory_build_context(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let query = req
.params
.get("query")
.and_then(|v| v.as_str())
.unwrap_or("");
let model_context_window = req
.params
.get("model_context_window")
.and_then(|v| v.as_u64())
.map(|w| w as usize);
let mut engine = session.memgine.lock().await;
Ok(Value::from(
engine.build_context_for_model(query, model_context_window),
))
}
async fn handle_memory_build_context_fast(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let query = req
.params
.get("query")
.and_then(|v| v.as_str())
.unwrap_or("");
let model_context_window = req
.params
.get("model_context_window")
.and_then(|v| v.as_u64())
.map(|w| w as usize);
let mut engine = session.memgine.lock().await;
Ok(Value::from(engine.build_context_with_options(
query,
model_context_window,
car_memgine::ContextMode::Fast,
None,
)))
}
async fn handle_memory_persist(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let path = req
.params
.get("path")
.and_then(|v| v.as_str())
.ok_or("missing path")?;
let resolved = car_ffi_common::memory_path::resolve(path)
.map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
let engine = session.memgine.lock().await;
let facts: Vec<Value> = engine
.graph
.inner
.node_indices()
.filter_map(|nix| {
let node = engine.graph.inner.node_weight(nix)?;
if !node.is_valid() {
return None;
}
if node.kind == car_memgine::MemKind::Identity
|| node.kind == car_memgine::MemKind::Environment
{
return None;
}
Some(serde_json::json!({
"subject": node.key,
"body": node.value,
"kind": match node.kind {
car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
car_memgine::MemKind::Conversation => "outcome",
_ => "pattern",
},
"confidence": 0.5,
"content_type": node.content_type.as_label(),
}))
})
.collect();
let count = facts.len();
let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
std::fs::write(&resolved, json)
.map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
Ok(Value::from(count as u64))
}
async fn handle_memory_load(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let path = req
.params
.get("path")
.and_then(|v| v.as_str())
.ok_or("missing path")?;
let resolved = car_ffi_common::memory_path::resolve(path)
.map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
let content = std::fs::read_to_string(&resolved)
.map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
let facts: Vec<Value> =
serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
let mut engine = session.memgine.lock().await;
engine.reset();
let mut count: u32 = 0;
for fact in &facts {
let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
let kind = fact
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("pattern");
let fid = format!("loaded-{}", count);
engine.ingest_fact(
&fid,
subject,
body,
"user",
"peer",
chrono::Utc::now(),
"global",
None,
vec![],
kind == "constraint",
);
count += 1;
}
Ok(Value::from(count))
}
async fn handle_skill_ingest(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let name = req
.params
.get("name")
.and_then(|v| v.as_str())
.ok_or("missing name")?;
let code = req
.params
.get("code")
.and_then(|v| v.as_str())
.ok_or("missing code")?;
let platform = req
.params
.get("platform")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let persona = req
.params
.get("persona")
.and_then(|v| v.as_str())
.unwrap_or("");
let url_pattern = req
.params
.get("url_pattern")
.and_then(|v| v.as_str())
.unwrap_or("");
let description = req
.params
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("");
let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
let keywords: Vec<String> = req
.params
.get("task_keywords")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let trigger = car_memgine::SkillTrigger {
persona: persona.into(),
url_pattern: url_pattern.into(),
task_keywords: keywords,
structured: None,
};
let mut engine = session.memgine.lock().await;
let node = engine.ingest_skill(
name,
code,
platform,
trigger,
description,
supersedes,
vec![],
vec![],
);
Ok(Value::from(node.index() as u64))
}
async fn handle_skill_find(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let persona = req
.params
.get("persona")
.and_then(|v| v.as_str())
.unwrap_or("");
let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.unwrap_or("");
let max = req
.params
.get("max_results")
.and_then(|v| v.as_u64())
.unwrap_or(1) as usize;
let engine = session.memgine.lock().await;
let results = engine.find_skill(persona, url, task, max);
let json: Vec<Value> = results
.iter()
.map(|(m, s)| {
serde_json::json!({
"name": m.name, "code": m.code, "platform": m.platform,
"description": m.description, "stats": m.stats, "match_score": s,
})
})
.collect();
serde_json::to_value(json).map_err(|e| e.to_string())
}
async fn handle_skill_report(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let name = req
.params
.get("skill_name")
.and_then(|v| v.as_str())
.ok_or("missing skill_name")?;
let outcome_str = req
.params
.get("outcome")
.and_then(|v| v.as_str())
.ok_or("missing outcome")?;
let outcome = match outcome_str {
"success" => car_memgine::SkillOutcome::Success,
_ => car_memgine::SkillOutcome::Fail,
};
let mut engine = session.memgine.lock().await;
let stats = engine
.report_outcome(name, outcome)
.ok_or(format!("skill '{}' not found", name))?;
serde_json::to_value(stats).map_err(|e| e.to_string())
}
struct WsAgentRunner {
channel: Arc<WsChannel>,
host: Arc<crate::host::HostState>,
client_id: String,
}
#[async_trait::async_trait]
impl car_multi::AgentRunner for WsAgentRunner {
async fn run(
&self,
spec: &car_multi::AgentSpec,
task: &str,
_runtime: &car_engine::Runtime,
_mailbox: &car_multi::Mailbox,
) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
use futures::SinkExt;
let request_id = self.channel.next_request_id();
let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
let agent = self
.host
.register_agent(
&self.client_id,
RegisterHostAgentRequest {
id: Some(agent_id.clone()),
name: spec.name.clone(),
kind: "callback".to_string(),
capabilities: spec.tools.clone(),
project: spec
.metadata
.get("project")
.and_then(|v| v.as_str())
.map(str::to_string),
pid: None,
display: serde_json::from_value(
spec.metadata
.get("display")
.cloned()
.unwrap_or(serde_json::Value::Null),
)
.unwrap_or_default(),
metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
},
)
.await
.map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id: agent.id.clone(),
status: HostAgentStatus::Running,
current_task: Some(task.to_string()),
message: Some(format!("{} started", spec.name)),
payload: serde_json::json!({ "task": task }),
},
)
.await;
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"method": "multi.run_agent",
"params": {
"spec": spec,
"task": task,
},
"id": request_id,
});
let (tx, rx) = tokio::sync::oneshot::channel();
self.channel
.pending
.lock()
.await
.insert(request_id.clone(), tx);
let msg = Message::Text(
serde_json::to_string(&rpc_request)
.map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
.into(),
);
if let Err(e) = self.channel.write.lock().await.send(msg).await {
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id: agent_id.clone(),
status: HostAgentStatus::Errored,
current_task: None,
message: Some(format!("{} failed to start", spec.name)),
payload: serde_json::json!({ "error": e.to_string() }),
},
)
.await;
return Err(car_multi::MultiError::AgentFailed(
spec.name.clone(),
format!("ws send error: {}", e),
));
}
let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
Ok(Ok(response)) => response,
Ok(Err(_)) => {
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id: agent_id.clone(),
status: HostAgentStatus::Errored,
current_task: None,
message: Some(format!("{} callback channel closed", spec.name)),
payload: Value::Null,
},
)
.await;
return Err(car_multi::MultiError::AgentFailed(
spec.name.clone(),
"agent callback channel closed".into(),
));
}
Err(_) => {
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id: agent_id.clone(),
status: HostAgentStatus::Errored,
current_task: None,
message: Some(format!("{} timed out", spec.name)),
payload: Value::Null,
},
)
.await;
return Err(car_multi::MultiError::AgentFailed(
spec.name.clone(),
"agent callback timed out (300s)".into(),
));
}
};
if let Some(err) = response.error {
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id: agent_id.clone(),
status: HostAgentStatus::Errored,
current_task: None,
message: Some(format!("{} errored", spec.name)),
payload: serde_json::json!({ "error": err }),
},
)
.await;
return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
}
let output_value = response.output.unwrap_or(Value::Null);
let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
car_multi::MultiError::AgentFailed(
spec.name.clone(),
format!("invalid AgentOutput: {}", e),
)
})?;
let status = if output.error.is_some() {
HostAgentStatus::Errored
} else {
HostAgentStatus::Completed
};
let message = if output.error.is_some() {
format!("{} errored", spec.name)
} else {
format!("{} completed", spec.name)
};
let _ = self
.host
.set_status(
&self.client_id,
SetHostAgentStatusRequest {
agent_id,
status,
current_task: None,
message: Some(message),
payload: serde_json::to_value(&output).unwrap_or(Value::Null),
},
)
.await;
Ok(output)
}
}
fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
let safe_name: String = name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'-'
}
})
.collect();
format!("{}:{}:{}", client_id, safe_name, request_id)
}
async fn handle_multi_swarm(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let mode_str = req
.params
.get("mode")
.and_then(|v| v.as_str())
.ok_or("missing 'mode'")?;
let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.ok_or("missing 'task'")?;
let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
.map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
let agent_specs: Vec<car_multi::AgentSpec> =
serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
let synth: Option<car_multi::AgentSpec> = req
.params
.get("synthesizer")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(|e| format!("invalid synthesizer: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let infra = car_multi::SharedInfra::new();
let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
if let Some(s) = synth {
swarm = swarm.with_synthesizer(s);
}
let result = swarm
.run(task, &runner, &infra)
.await
.map_err(|e| format!("swarm error: {}", e))?;
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn handle_multi_pipeline(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.ok_or("missing 'task'")?;
let stage_specs: Vec<car_multi::AgentSpec> =
serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let infra = car_multi::SharedInfra::new();
let result = car_multi::Pipeline::new(stage_specs)
.run(task, &runner, &infra)
.await
.map_err(|e| format!("pipeline error: {}", e))?;
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn handle_multi_supervisor(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.ok_or("missing 'task'")?;
let max_rounds = req
.params
.get("max_rounds")
.and_then(|v| v.as_u64())
.unwrap_or(3) as u32;
let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
.map_err(|e| format!("invalid workers: {}", e))?;
let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
.map_err(|e| format!("invalid supervisor: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let infra = car_multi::SharedInfra::new();
let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
.with_max_rounds(max_rounds)
.run(task, &runner, &infra)
.await
.map_err(|e| format!("supervisor error: {}", e))?;
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn handle_multi_map_reduce(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.ok_or("missing 'task'")?;
let items_val = req.params.get("items").ok_or("missing 'items'")?;
let mapper_spec: car_multi::AgentSpec =
serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
.map_err(|e| format!("invalid reducer: {}", e))?;
let items: Vec<String> =
serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let infra = car_multi::SharedInfra::new();
let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
.run(task, &items, &runner, &infra)
.await
.map_err(|e| format!("map_reduce error: {}", e))?;
serde_json::to_value(result).map_err(|e| e.to_string())
}
async fn handle_multi_vote(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
let task = req
.params
.get("task")
.and_then(|v| v.as_str())
.ok_or("missing 'task'")?;
let agent_specs: Vec<car_multi::AgentSpec> =
serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
let synth: Option<car_multi::AgentSpec> = req
.params
.get("synthesizer")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(|e| format!("invalid synthesizer: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let infra = car_multi::SharedInfra::new();
let mut vote = car_multi::Vote::new(agent_specs);
if let Some(s) = synth {
vote = vote.with_synthesizer(s);
}
let result = vote
.run(task, &runner, &infra)
.await
.map_err(|e| format!("vote error: {}", e))?;
serde_json::to_value(result).map_err(|e| e.to_string())
}
fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
let name = req
.params
.get("name")
.and_then(|v| v.as_str())
.ok_or("scheduler.create requires 'name'")?;
let prompt = req
.params
.get("prompt")
.and_then(|v| v.as_str())
.ok_or("scheduler.create requires 'prompt'")?;
let mut task = car_scheduler::Task::new(name, prompt);
if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
let trigger = match t {
"once" => car_scheduler::TaskTrigger::Once,
"cron" => car_scheduler::TaskTrigger::Cron,
"interval" => car_scheduler::TaskTrigger::Interval,
"file_watch" => car_scheduler::TaskTrigger::FileWatch,
_ => car_scheduler::TaskTrigger::Manual,
};
let schedule = req
.params
.get("schedule")
.and_then(|v| v.as_str())
.unwrap_or("");
task = task.with_trigger(trigger, schedule);
}
if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
task = task.with_system_prompt(sp);
}
serde_json::to_value(&task).map_err(|e| e.to_string())
}
async fn handle_scheduler_run(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let task_val = req
.params
.get("task")
.ok_or("scheduler.run requires 'task'")?;
let mut task: car_scheduler::Task =
serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let executor = car_scheduler::Executor::new(runner);
let execution = executor.run_once(&mut task).await;
serde_json::to_value(&execution).map_err(|e| e.to_string())
}
async fn handle_scheduler_run_loop(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let task_val = req
.params
.get("task")
.ok_or("scheduler.run_loop requires 'task'")?;
let mut task: car_scheduler::Task =
serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
let max_iterations = req
.params
.get("max_iterations")
.and_then(|v| v.as_u64())
.map(|v| v as u32);
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let executor = car_scheduler::Executor::new(runner);
let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
let executions = executor
.run_loop(&mut task, max_iterations, cancel_rx)
.await;
serde_json::to_value(&executions).map_err(|e| e.to_string())
}
fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
state.inference.get_or_init(|| {
Arc::new(car_inference::InferenceEngine::new(
car_inference::InferenceConfig::default(),
))
})
}
async fn handle_infer(
msg: &JsonRpcMessage,
state: &ServerState,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let engine = get_inference_engine(state);
let mut req: car_inference::GenerateRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
let mut memgine = session.memgine.lock().await;
let ctx = memgine.build_context(cq);
if !ctx.is_empty() {
req.context = Some(ctx);
}
}
let _permit = state.admission.acquire().await;
let result = engine
.generate_tracked(req)
.await
.map_err(|e| e.to_string())?;
serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
}
async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let req: car_inference::GenerateImageRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine
.generate_image(req)
.await
.map_err(|e| e.to_string())?;
serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
}
async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let req: car_inference::GenerateVideoRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine
.generate_video(req)
.await
.map_err(|e| e.to_string())?;
serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
}
async fn handle_infer_stream(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
state: &ServerState,
) -> Result<Value, String> {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let engine = get_inference_engine(state);
let mut req: car_inference::GenerateRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
let mut memgine = session.memgine.lock().await;
let ctx = memgine.build_context(cq);
if !ctx.is_empty() {
req.context = Some(ctx);
}
}
let _permit = state.admission.acquire().await;
let mut rx = engine
.generate_tracked_stream(req)
.await
.map_err(|e| e.to_string())?;
let mut accumulator = car_inference::StreamAccumulator::default();
let request_id = msg.id.clone();
while let Some(event) = rx.recv().await {
let event_payload = match &event {
car_inference::StreamEvent::TextDelta(text) => {
serde_json::json!({"type": "text", "data": text})
}
car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
serde_json::json!({"type": "tool_start", "name": name, "index": index})
}
car_inference::StreamEvent::ToolCallDelta {
index,
arguments_delta,
} => serde_json::json!({
"type": "tool_delta",
"index": index,
"data": arguments_delta,
}),
car_inference::StreamEvent::Usage {
input_tokens,
output_tokens,
} => serde_json::json!({
"type": "usage",
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}),
car_inference::StreamEvent::Done { .. } => {
accumulator.push(&event);
continue;
}
};
let notif = serde_json::json!({
"jsonrpc": "2.0",
"method": "inference.stream.event",
"params": {
"request_id": request_id,
"event": event_payload,
},
});
if let Ok(text) = serde_json::to_string(¬if) {
let _ = session
.channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
}
accumulator.push(&event);
}
let (text, tool_calls, usage) = accumulator.finish_with_usage();
Ok(serde_json::json!({
"text": text,
"tool_calls": tool_calls,
"usage": usage,
}))
}
async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let req: car_inference::EmbedRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine.embed(req).await.map_err(|e| e.to_string())?;
Ok(serde_json::json!({"embeddings": result}))
}
async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let req: car_inference::ClassifyRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine.classify(req).await.map_err(|e| e.to_string())?;
Ok(serde_json::json!({"classifications": result}))
}
fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
let total = state.admission.permits();
let available = state.admission.permits_available();
let in_use = total.saturating_sub(available);
Ok(serde_json::json!({
"permits_total": total,
"permits_available": available,
"permits_in_use": in_use,
"env_override": crate::admission::ENV_MAX_CONCURRENT,
}))
}
async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let model = msg
.params
.get("model")
.and_then(|v| v.as_str())
.ok_or("missing 'model' parameter")?;
let text = msg
.params
.get("text")
.and_then(|v| v.as_str())
.ok_or("missing 'text' parameter")?;
let engine = get_inference_engine(state);
let ids = engine
.tokenize(model, text)
.await
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({"tokens": ids}))
}
async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let model = msg
.params
.get("model")
.and_then(|v| v.as_str())
.ok_or("missing 'model' parameter")?;
let tokens: Vec<u32> = msg
.params
.get("tokens")
.and_then(|v| v.as_array())
.ok_or("missing 'tokens' parameter")?
.iter()
.map(|t| {
t.as_u64()
.and_then(|n| u32::try_from(n).ok())
.ok_or_else(|| "tokens[] must be u32 values".to_string())
})
.collect::<Result<Vec<_>, _>>()?;
let engine = get_inference_engine(state);
let text = engine
.detokenize(model, &tokens)
.await
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({"text": text}))
}
async fn handle_models_register(
req: &JsonRpcMessage,
_state: &Arc<ServerState>,
) -> Result<Value, String> {
let schema_value = match req.params.get("schema") {
Some(v) => v.clone(),
None => req.params.clone(),
};
let schema: car_inference::ModelSchema =
serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
let id = schema.id.clone();
let home = std::env::var_os("HOME")
.or_else(|| std::env::var_os("USERPROFILE"))
.ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
let car_dir = std::path::PathBuf::from(home).join(".car");
std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
let path = car_dir.join("models.json");
let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
let text =
std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
if text.trim().is_empty() {
Vec::new()
} else {
serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
}
} else {
Vec::new()
};
if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
*slot = schema;
} else {
models.push(schema);
}
let json =
serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
let tmp = path.with_extension("json.tmp");
std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
std::fs::rename(&tmp, &path)
.map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
Ok(serde_json::json!({
"id": id,
"registered": true,
"path": path.to_string_lossy(),
"note": "Daemon restart required for live UnifiedRegistry visibility \
(Parslee-ai/car-releases#39 phase 1). The model is persisted; \
next car-server boot loads it via UnifiedRegistry::load_user_config.",
}))
}
async fn handle_models_unregister(
req: &JsonRpcMessage,
_state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = match req.params.get("id") {
Some(v) => v
.as_str()
.ok_or_else(|| "`id` must be a string".to_string())?
.to_string(),
None => match req.params.as_str() {
Some(s) => s.to_string(),
None => return Err("missing `id` parameter".to_string()),
},
};
let home = std::env::var_os("HOME")
.or_else(|| std::env::var_os("USERPROFILE"))
.ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
let car_dir = std::path::PathBuf::from(home).join(".car");
let path = car_dir.join("models.json");
if !path.exists() {
return Err(format!(
"no models.json at {} — nothing to unregister",
path.display()
));
}
let text =
std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
Vec::new()
} else {
serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
};
let before = models.len();
models.retain(|m| m.id != id);
if models.len() == before {
return Err(format!("model {} not found in {}", id, path.display()));
}
let json =
serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
let tmp = path.with_extension("json.tmp");
std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
std::fs::rename(&tmp, &path)
.map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
Ok(serde_json::json!({
"id": id,
"unregistered": true,
"path": path.to_string_lossy(),
"note": "Daemon restart required for live UnifiedRegistry visibility \
(phase 1, matching models.register).",
}))
}
fn handle_models_list(state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let models = engine.list_models();
serde_json::to_value(&models).map_err(|e| e.to_string())
}
fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let models = engine.list_models_unified();
serde_json::to_value(&models).map_err(|e| e.to_string())
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ModelSearchParams {
#[serde(default)]
query: Option<String>,
#[serde(default)]
capability: Option<car_inference::ModelCapability>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
local_only: bool,
#[serde(default)]
available_only: bool,
#[serde(default)]
limit: Option<usize>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ModelSearchEntry {
#[serde(flatten)]
info: car_inference::ModelInfo,
family: String,
version: String,
tags: Vec<String>,
pullable: bool,
upgrade: Option<car_inference::ModelUpgrade>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ModelSearchResponse {
models: Vec<ModelSearchEntry>,
upgrades: Vec<car_inference::ModelUpgrade>,
total: usize,
available: usize,
local: usize,
remote: usize,
}
fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let params: ModelSearchParams =
serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
query: None,
capability: None,
provider: None,
local_only: false,
available_only: false,
limit: None,
});
let engine = get_inference_engine(state);
let upgrades = engine.available_model_upgrades();
let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
.iter()
.cloned()
.map(|upgrade| (upgrade.from_id.clone(), upgrade))
.collect();
let query = params
.query
.as_deref()
.map(str::trim)
.filter(|q| !q.is_empty())
.map(|q| q.to_ascii_lowercase());
let provider = params
.provider
.as_deref()
.map(str::trim)
.filter(|p| !p.is_empty())
.map(|p| p.to_ascii_lowercase());
let mut entries: Vec<ModelSearchEntry> = engine
.list_schemas()
.into_iter()
.filter(|schema| {
if let Some(capability) = params.capability {
if !schema.has_capability(capability) {
return false;
}
}
if let Some(provider) = provider.as_deref() {
if schema.provider.to_ascii_lowercase() != provider {
return false;
}
}
if params.local_only && !schema.is_local() {
return false;
}
if params.available_only && !schema.available {
return false;
}
if let Some(query) = query.as_deref() {
let capability_text = schema
.capabilities
.iter()
.map(|cap| format!("{cap:?}").to_ascii_lowercase())
.collect::<Vec<_>>()
.join(" ");
let haystack = format!(
"{} {} {} {} {} {}",
schema.id,
schema.name,
schema.provider,
schema.family,
schema.tags.join(" "),
capability_text
)
.to_ascii_lowercase();
if !haystack.contains(query) {
return false;
}
}
true
})
.map(|schema| {
let pullable = !schema.available
&& matches!(
schema.source,
car_inference::ModelSource::Local { .. }
| car_inference::ModelSource::Mlx { .. }
);
let info = car_inference::ModelInfo::from(&schema);
let upgrade = upgrades_by_from.get(&schema.id).cloned();
ModelSearchEntry {
info,
family: schema.family,
version: schema.version,
tags: schema.tags,
pullable,
upgrade,
}
})
.collect();
entries.sort_by(|a, b| {
b.info
.available
.cmp(&a.info.available)
.then(b.info.is_local.cmp(&a.info.is_local))
.then(a.info.name.cmp(&b.info.name))
});
if let Some(limit) = params.limit {
entries.truncate(limit);
}
let total = entries.len();
let available = entries.iter().filter(|entry| entry.info.available).count();
let local = entries.iter().filter(|entry| entry.info.is_local).count();
let response = ModelSearchResponse {
models: entries,
upgrades,
total,
available,
local,
remote: total.saturating_sub(local),
};
serde_json::to_value(response).map_err(|e| e.to_string())
}
fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
serde_json::to_value(serde_json::json!({
"upgrades": engine.available_model_upgrades()
}))
.map_err(|e| e.to_string())
}
async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let name = msg
.params
.get("name")
.or_else(|| msg.params.get("id"))
.or_else(|| msg.params.get("model"))
.and_then(|v| v.as_str())
.ok_or("missing 'name' parameter")?;
let engine = get_inference_engine(state);
let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
Ok(serde_json::json!({"path": path.display().to_string()}))
}
async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
msg.params
.get("events")
.cloned()
.unwrap_or(msg.params.clone()),
)
.map_err(|e| format!("invalid events: {}", e))?;
let inference = get_inference_engine(state).clone();
let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
let skills = engine.distill_skills(&events).await;
serde_json::to_value(&skills).map_err(|e| e.to_string())
}
async fn handle_memory_consolidate(
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let engine_arc = session.effective_memgine().await;
let report = {
let mut engine = engine_arc.lock().await;
engine.consolidate().await
};
if let Some(id) = session.agent_id.lock().await.clone() {
if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
tracing::warn!(agent_id = %id, error = %e,
"agent memgine persist after consolidate failed");
}
}
serde_json::to_value(&report).map_err(|e| e.to_string())
}
async fn handle_skill_repair(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let name = msg
.params
.get("skill_name")
.and_then(|v| v.as_str())
.ok_or("missing 'skill_name' parameter")?;
let mut engine = session.memgine.lock().await;
let code = engine.repair_skill(name).await;
Ok(match code {
Some(c) => serde_json::json!({ "code": c }),
None => Value::Null,
})
}
async fn handle_skills_ingest_distilled(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
msg.params
.get("skills")
.cloned()
.unwrap_or(msg.params.clone()),
)
.map_err(|e| format!("invalid skills: {}", e))?;
let mut engine = session.memgine.lock().await;
let nodes = engine.ingest_distilled_skills(&skills);
Ok(serde_json::json!({ "ingested": nodes.len() }))
}
async fn handle_skills_evolve(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let domain = msg
.params
.get("domain")
.and_then(|v| v.as_str())
.ok_or("missing 'domain' parameter")?
.to_string();
let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
msg.params
.get("events")
.cloned()
.unwrap_or(Value::Array(vec![])),
)
.map_err(|e| format!("invalid events: {}", e))?;
let mut engine = session.memgine.lock().await;
let skills = engine.evolve_skills(&events, &domain).await;
serde_json::to_value(&skills).map_err(|e| e.to_string())
}
async fn handle_skills_domains_needing_evolution(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let threshold = msg
.params
.get("threshold")
.and_then(|v| v.as_f64())
.unwrap_or(0.6);
let engine = session.memgine.lock().await;
let domains = engine.domains_needing_evolution(threshold);
serde_json::to_value(&domains).map_err(|e| e.to_string())
}
async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let req: car_inference::RerankRequest =
serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
serde_json::to_value(&result).map_err(|e| e.to_string())
}
async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
use base64::Engine as _;
let engine = get_inference_engine(state);
let mut params = msg.params.clone();
let audio_b64 = params
.as_object_mut()
.and_then(|m| m.remove("audio_b64"))
.and_then(|v| v.as_str().map(str::to_string));
let _tmp_audio = if let Some(b64) = audio_b64 {
let bytes = base64::engine::general_purpose::STANDARD
.decode(b64.as_bytes())
.map_err(|e| format!("audio_b64 decode failed: {e}"))?;
let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
let path = tmp.path().to_string_lossy().into_owned();
if let Some(obj) = params.as_object_mut() {
obj.insert("audio_path".to_string(), Value::String(path));
}
Some(tmp)
} else {
None
};
let req: car_inference::TranscribeRequest =
serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
serde_json::to_value(&result).map_err(|e| e.to_string())
}
async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
use base64::Engine as _;
let engine = get_inference_engine(state);
let mut params = msg.params.clone();
let return_b64 = params
.as_object_mut()
.and_then(|m| m.remove("return_b64"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let no_output_path = params
.as_object()
.map(|m| !m.contains_key("output_path"))
.unwrap_or(true);
let req: car_inference::SynthesizeRequest =
serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
let _permit = state.admission.acquire().await;
let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
if return_b64 || no_output_path {
let bytes = std::fs::read(&result.audio_path).map_err(|e| {
format!(
"synthesize: failed to read rendered audio at {}: {e}",
result.audio_path
)
})?;
let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
if let Some(obj) = value.as_object_mut() {
obj.insert("audio_b64".to_string(), Value::String(encoded));
}
}
Ok(value)
}
async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let status = engine
.prepare_speech_runtime()
.await
.map_err(|e| e.to_string())?;
serde_json::to_value(&status).map_err(|e| e.to_string())
}
async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
let prompt = msg
.params
.get("prompt")
.and_then(|v| v.as_str())
.ok_or("missing 'prompt' parameter")?;
let engine = get_inference_engine(state);
let decision = engine.route_adaptive(prompt).await;
serde_json::to_value(&decision).map_err(|e| e.to_string())
}
async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
let engine = get_inference_engine(state);
let profiles = engine.export_profiles().await;
serde_json::to_value(&profiles).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct OutcomesResolvePendingParams {
action_results: Vec<(String, bool, f64, String)>,
}
async fn handle_outcomes_resolve_pending(
req: &JsonRpcMessage,
state: &ServerState,
) -> Result<Value, String> {
let params: OutcomesResolvePendingParams =
serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
let engine = get_inference_engine(state);
let mut tracker = engine.outcome_tracker.write().await;
let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
tracker.resolve_pending_from_signals(inferred);
Ok(serde_json::json!({ "recorded": params.action_results.len() }))
}
async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
let n = session.runtime.log.lock().await.len();
Ok(Value::from(n as u64))
}
async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
let stats = session.runtime.log.lock().await.stats();
serde_json::to_value(stats).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct EventsTruncateParams {
#[serde(default)]
max_events: Option<usize>,
#[serde(default)]
max_spans: Option<usize>,
}
async fn handle_events_truncate(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let params: EventsTruncateParams =
serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
max_events: None,
max_spans: None,
});
let mut log = session.runtime.log.lock().await;
let removed_events = params
.max_events
.map(|max| log.truncate_events_keep_last(max))
.unwrap_or(0);
let removed_spans = params
.max_spans
.map(|max| log.truncate_spans_keep_last(max))
.unwrap_or(0);
let stats = log.stats();
Ok(serde_json::json!({
"removedEvents": removed_events,
"removedSpans": removed_spans,
"stats": stats,
}))
}
async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
let mut log = session.runtime.log.lock().await;
let removed = log.clear();
Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
}
async fn handle_replan_set_config(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let max_replans = msg
.params
.get("max_replans")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
let delay_ms = msg
.params
.get("delay_ms")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let verify_before_execute = msg
.params
.get("verify_before_execute")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let cfg = car_engine::ReplanConfig {
max_replans,
delay_ms,
verify_before_execute,
};
session.runtime.set_replan_config(cfg).await;
Ok(Value::Null)
}
async fn handle_skills_list(
msg: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let domain = msg.params.get("domain").and_then(|v| v.as_str());
let engine = session.memgine.lock().await;
let skills: Vec<serde_json::Value> = engine
.graph
.inner
.node_indices()
.filter_map(|nix| {
let node = engine.graph.inner.node_weight(nix)?;
if node.kind != car_memgine::MemKind::Skill {
return None;
}
let meta = car_memgine::SkillMeta::from_node(node)?;
if let Some(d) = domain {
match &meta.scope {
car_memgine::SkillScope::Global => {}
car_memgine::SkillScope::Domain(sd) if sd == d => {}
_ => return None,
}
}
Some(serde_json::to_value(&meta).unwrap_or_default())
})
.collect();
serde_json::to_value(&skills).map_err(|e| e.to_string())
}
#[derive(serde::Deserialize)]
struct SecretParams {
#[serde(default)]
service: Option<String>,
key: String,
#[serde(default)]
value: Option<String>,
}
fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
}
fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
}
fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
}
fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
}
#[derive(serde::Deserialize)]
struct PermParams {
domain: String,
#[serde(default)]
target_bundle_id: Option<String>,
}
fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
}
fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
}
fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
}
fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
start: String,
end: String,
#[serde(default)]
calendar_ids: Vec<String>,
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let start = chrono::DateTime::parse_from_rfc3339(&p.start)
.map_err(|e| format!("parse start: {}", e))?
.with_timezone(&chrono::Utc);
let end = chrono::DateTime::parse_from_rfc3339(&p.end)
.map_err(|e| format!("parse end: {}", e))?
.with_timezone(&chrono::Utc);
car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
}
fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
query: String,
#[serde(default = "default_limit")]
limit: usize,
#[serde(default)]
container_ids: Vec<String>,
}
fn default_limit() -> usize {
50
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
}
fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize, Default)]
struct P {
#[serde(default)]
account_ids: Vec<String>,
}
let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
car_ffi_common::integrations::mail_inbox(&p.account_ids)
}
fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
let raw = req.params.to_string();
car_ffi_common::integrations::mail_send(&raw)
}
fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
#[serde(default = "default_limit")]
limit: usize,
}
fn default_limit() -> usize {
50
}
let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
car_ffi_common::integrations::messages_chats(p.limit)
}
fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
let raw = req.params.to_string();
car_ffi_common::integrations::messages_send(&raw)
}
fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
query: String,
#[serde(default = "default_limit")]
limit: usize,
}
fn default_limit() -> usize {
50
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::integrations::notes_find(&p.query, p.limit)
}
fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
#[serde(default = "default_limit")]
limit: usize,
}
fn default_limit() -> usize {
50
}
let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
car_ffi_common::integrations::reminders_items(p.limit)
}
fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
#[serde(default = "default_limit")]
limit: usize,
}
fn default_limit() -> usize {
100
}
let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
car_ffi_common::integrations::bookmarks_list(p.limit)
}
fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
start: String,
end: String,
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let s = chrono::DateTime::parse_from_rfc3339(&p.start)
.map_err(|e| format!("parse start: {}", e))?
.with_timezone(&chrono::Utc);
let e = chrono::DateTime::parse_from_rfc3339(&p.end)
.map_err(|e| format!("parse end: {}", e))?
.with_timezone(&chrono::Utc);
car_ffi_common::health::sleep_windows(s, e)
}
fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
start: String,
end: String,
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let s = chrono::DateTime::parse_from_rfc3339(&p.start)
.map_err(|e| format!("parse start: {}", e))?
.with_timezone(&chrono::Utc);
let e = chrono::DateTime::parse_from_rfc3339(&p.end)
.map_err(|e| format!("parse end: {}", e))?
.with_timezone(&chrono::Utc);
car_ffi_common::health::workouts(s, e)
}
fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct P {
start: String,
end: String,
}
let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
.map_err(|e| format!("parse start: {}", e))?;
let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
.map_err(|e| format!("parse end: {}", e))?;
car_ffi_common::health::activity(s, e)
}
async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
let closed = session.browser.close().await?;
Ok(serde_json::json!({"closed": closed}))
}
async fn handle_browser_run(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
#[derive(serde::Deserialize)]
struct BrowserRunParams {
script: Value,
#[serde(default)]
width: Option<u32>,
#[serde(default)]
height: Option<u32>,
#[serde(default)]
headed: Option<bool>,
#[serde(default)]
extra_args: Option<Vec<String>>,
}
let params: BrowserRunParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let script_json = match params.script {
Value::String(s) => s,
other => other.to_string(),
};
let browser_session = session
.browser
.get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
width: params.width.unwrap_or(1280),
height: params.height.unwrap_or(720),
headless: !params.headed.unwrap_or(false),
extra_args: params.extra_args.unwrap_or_default(),
})
.await?;
let trace_json = browser_session.run(&script_json).await?;
serde_json::from_str(&trace_json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct VoiceStartParams {
session_id: String,
audio_source: Value,
#[serde(default)]
options: Option<Value>,
}
async fn handle_voice_transcribe_stream_start(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let params: VoiceStartParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let audio_source_json =
serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
let options_json = params
.options
.as_ref()
.map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
.transpose()?;
let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
channel: session.channel.clone(),
});
let json = car_ffi_common::voice::transcribe_stream_start(
¶ms.session_id,
&audio_source_json,
options_json.as_deref(),
state.voice_sessions.clone(),
sink,
)
.await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct VoiceStopParams {
session_id: String,
}
async fn handle_voice_transcribe_stream_stop(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let params: VoiceStopParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let json = car_ffi_common::voice::transcribe_stream_stop(
¶ms.session_id,
state.voice_sessions.clone(),
)
.await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct VoicePushParams {
session_id: String,
pcm_b64: String,
}
async fn handle_voice_transcribe_stream_push(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
use base64::Engine;
let params: VoicePushParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let pcm = base64::engine::general_purpose::STANDARD
.decode(¶ms.pcm_b64)
.map_err(|e| format!("invalid pcm_b64: {}", e))?;
let json = car_ffi_common::voice::transcribe_stream_push(
¶ms.session_id,
&pcm,
state.voice_sessions.clone(),
)
.await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
serde_json::from_str(&json).unwrap_or(Value::Null)
}
async fn handle_voice_dispatch_turn(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let req_value = req.params.clone();
let request: crate::voice_turn::DispatchVoiceTurnRequest =
serde_json::from_value(req_value).map_err(|e| e.to_string())?;
let engine = get_inference_engine(state).clone();
let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
channel: session.channel.clone(),
});
let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
serde_json::to_value(resp).map_err(|e| e.to_string())
}
async fn handle_voice_cancel_turn() -> Result<Value, String> {
crate::voice_turn::cancel().await;
Ok(serde_json::json!({"cancelled": true}))
}
async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
let engine = get_inference_engine(state).clone();
crate::voice_turn::prewarm(engine).await;
Ok(serde_json::json!({"prewarmed": true}))
}
fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
std::sync::OnceLock::new();
SLOT.get_or_init(|| std::sync::RwLock::new(None))
}
fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
std::sync::OnceLock::new();
MAP.get_or_init(dashmap::DashMap::new)
}
fn ws_runner_completions() -> &'static dashmap::DashMap<
String,
tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
> {
static MAP: std::sync::OnceLock<
dashmap::DashMap<
String,
tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
>,
> = std::sync::OnceLock::new();
MAP.get_or_init(dashmap::DashMap::new)
}
struct WsInferenceRunner;
#[async_trait::async_trait]
impl car_inference::InferenceRunner for WsInferenceRunner {
async fn run(
&self,
request: car_inference::tasks::generate::GenerateRequest,
emitter: car_inference::EventEmitter,
) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
let channel = ws_runner_session()
.read()
.map_err(|e| {
car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
})?
.clone()
.ok_or_else(|| {
car_inference::RunnerError::Declined(
"no WebSocket inference runner registered — call inference.register_runner first"
.into(),
)
})?;
let call_id = uuid::Uuid::new_v4().to_string();
let request_json = serde_json::to_value(&request)
.map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
let (tx, rx) = tokio::sync::oneshot::channel();
ws_runner_calls().insert(call_id.clone(), emitter);
ws_runner_completions().insert(call_id.clone(), tx);
use futures::SinkExt;
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "inference.runner.invoke",
"params": {
"call_id": call_id,
"request": request_json,
},
});
let text = serde_json::to_string(¬ification)
.map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
let _ = channel
.write
.lock()
.await
.send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
.await;
let result = rx.await.map_err(|_| {
car_inference::RunnerError::Failed("runner completion channel dropped".into())
})?;
ws_runner_calls().remove(&call_id);
result.map_err(car_inference::RunnerError::Failed)
}
}
async fn handle_inference_register_runner(
session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let mut guard = ws_runner_session()
.write()
.map_err(|e| format!("ws runner slot poisoned: {e}"))?;
*guard = Some(session.channel.clone());
drop(guard);
car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
Ok(serde_json::json!({"registered": true}))
}
#[derive(serde::Deserialize)]
struct InferenceRunnerEventParams {
call_id: String,
event: Value,
}
async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
let params: InferenceRunnerEventParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let stream_event = match parse_runner_event_value(¶ms.event) {
Some(e) => e,
None => return Err("unrecognised runner event shape".into()),
};
if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
let emitter = entry.value().clone();
tokio::spawn(async move { emitter.emit(stream_event).await });
}
Ok(serde_json::json!({"emitted": true}))
}
#[derive(serde::Deserialize)]
struct InferenceRunnerCompleteParams {
call_id: String,
result: Value,
}
async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
let params: InferenceRunnerCompleteParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let result: std::result::Result<car_inference::RunnerResult, String> =
serde_json::from_value(params.result)
.map_err(|e| format!("invalid RunnerResult JSON: {e}"));
if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
let _ = tx.send(result);
}
Ok(serde_json::json!({"completed": true}))
}
#[derive(serde::Deserialize)]
struct InferenceRunnerFailParams {
call_id: String,
error: String,
}
async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
let params: InferenceRunnerFailParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
let _ = tx.send(Err(params.error));
}
Ok(serde_json::json!({"failed": true}))
}
fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
let ty = v.get("type").and_then(|t| t.as_str())?;
match ty {
"text" => Some(car_inference::StreamEvent::TextDelta(
v.get("data")?.as_str()?.to_string(),
)),
"tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
name: v.get("name")?.as_str()?.to_string(),
index: v.get("index")?.as_u64()? as usize,
id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
}),
"tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
index: v.get("index")?.as_u64()? as usize,
arguments_delta: v.get("data")?.as_str()?.to_string(),
}),
"usage" => Some(car_inference::StreamEvent::Usage {
input_tokens: v.get("input_tokens")?.as_u64()?,
output_tokens: v.get("output_tokens")?.as_u64()?,
}),
"done" => Some(car_inference::StreamEvent::Done {
text: v.get("text")?.as_str()?.to_string(),
tool_calls: v
.get("tool_calls")
.and_then(|tc| serde_json::from_value(tc.clone()).ok())
.unwrap_or_default(),
}),
_ => None,
}
}
#[derive(Deserialize)]
struct EnrollSpeakerParams {
label: String,
audio: Value,
}
async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
let params: EnrollSpeakerParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct RemoveEnrollmentParams {
label: String,
}
fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RemoveEnrollmentParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct WorkflowRunParams {
workflow: Value,
}
async fn handle_workflow_run(
req: &JsonRpcMessage,
session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let params: WorkflowRunParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
channel: session.channel.clone(),
host: session.host.clone(),
client_id: session.client_id.clone(),
});
let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct WorkflowVerifyParams {
workflow: Value,
}
fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
let params: WorkflowVerifyParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_meeting_start(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let mut req_value = req.params.clone();
let meeting_id = req_value
.get("id")
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
if let Some(map) = req_value.as_object_mut() {
map.insert("id".into(), Value::String(meeting_id.clone()));
}
let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
Arc::new(crate::session::WsVoiceEventSink {
channel: session.channel.clone(),
});
let upstream: Arc<dyn car_voice::VoiceEventSink> =
Arc::new(crate::session::WsMemgineIngestSink {
meeting_id,
engine: session.memgine.clone(),
upstream: ws_upstream,
});
let cwd = std::env::current_dir().ok();
let json = crate::meeting::start_meeting(
&request_json,
state.meetings.clone(),
state.voice_sessions.clone(),
upstream,
None,
cwd,
)
.await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct MeetingStopParams {
meeting_id: String,
#[serde(default = "default_summarize")]
summarize: bool,
}
fn default_summarize() -> bool {
true
}
async fn handle_meeting_stop(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
_session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let params: MeetingStopParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let inference = if params.summarize {
Some(state.inference.get().cloned()).flatten()
} else {
None
};
let json = crate::meeting::stop_meeting(
¶ms.meeting_id,
params.summarize,
state.meetings.clone(),
state.voice_sessions.clone(),
inference,
)
.await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize, Default)]
struct MeetingListParams {
#[serde(default)]
root: Option<std::path::PathBuf>,
}
fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
let cwd = std::env::current_dir().ok();
let json = crate::meeting::list_meetings(params.root, cwd)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
struct MeetingGetParams {
meeting_id: String,
#[serde(default)]
root: Option<std::path::PathBuf>,
}
fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
let params: MeetingGetParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let cwd = std::env::current_dir().ok();
let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize, Default)]
struct RegistryRegisterParams {
entry: Value,
#[serde(default)]
registry_path: Option<std::path::PathBuf>,
}
fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RegistryRegisterParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
Ok(Value::Null)
}
#[derive(Deserialize, Default)]
struct RegistryNameParams {
name: String,
#[serde(default)]
registry_path: Option<std::path::PathBuf>,
}
fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RegistryNameParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RegistryNameParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
Ok(Value::Null)
}
#[derive(Deserialize, Default)]
struct RegistryListParams {
#[serde(default)]
registry_path: Option<std::path::PathBuf>,
}
fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
let json = car_ffi_common::registry::list_agents(params.registry_path)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize, Default)]
struct RegistryReapParams {
#[serde(default = "default_reap_age")]
max_age_secs: u64,
#[serde(default)]
registry_path: Option<std::path::PathBuf>,
}
fn default_reap_age() -> u64 {
60
}
fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
let json =
car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_a2a_start(
req: &JsonRpcMessage,
session: &crate::session::ClientSession,
) -> Result<Value, String> {
let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
fn handle_a2a_stop() -> Result<Value, String> {
let json = crate::a2a::stop_a2a()?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
fn handle_a2a_status() -> Result<Value, String> {
let json = crate::a2a::a2a_status()?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct A2aSendParams {
endpoint: String,
message: car_a2a::Message,
#[serde(default)]
blocking: bool,
#[serde(default = "default_true")]
ingest_a2ui: bool,
#[serde(default)]
route_auth: Option<A2aRouteAuth>,
#[serde(default)]
allow_untrusted_endpoint: bool,
}
fn default_true() -> bool {
true
}
async fn handle_a2a_dispatch(
method: &str,
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let dispatcher = state.a2a_dispatcher().await;
dispatcher
.dispatch(method, req.params.clone())
.await
.map_err(|e| e.to_string())
}
async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
let params: A2aSendParams =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let endpoint = trusted_route_endpoint(
Some(params.endpoint.clone()),
params.allow_untrusted_endpoint,
)
.ok_or_else(|| {
"`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
})?;
let client = match params.route_auth.clone() {
Some(auth) => {
car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
}
None => car_a2a::A2aClient::new(endpoint.clone()),
};
let result = client
.send_message(params.message, params.blocking)
.await
.map_err(|e| e.to_string())?;
let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
let mut applied = Vec::new();
if params.ingest_a2ui {
state
.a2ui
.validate_payload(&result_value)
.map_err(|e| e.to_string())?;
let routed_endpoint = Some(endpoint.clone());
for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
if owner.endpoint.is_none() {
owner.with_endpoint(routed_endpoint.clone())
} else {
owner
}
});
applied.push(
apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
);
}
}
Ok(serde_json::json!({
"result": result,
"a2ui": {
"applied": applied,
}
}))
}
async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = car_ffi_common::automation::run_applescript(&args_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = car_ffi_common::notifications::local(&args_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
let json = car_ffi_common::vision::ocr(&args_json).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
let agents = match state.observer_manifest_path() {
Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
.map_err(|e| e.to_string())?,
None => {
let supervisor = state.supervisor()?;
supervisor.list().await
}
};
let attached = state.attached_agents.lock().await.clone();
let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
for a in agents {
let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
let session_id = attached.get(&a.spec.id).cloned();
if let Some(map) = v.as_object_mut() {
map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
if let Some(sid) = session_id {
map.insert("session_id".to_string(), Value::String(sid));
}
}
decorated.push(v);
}
Ok(Value::Array(decorated))
}
async fn handle_agents_upsert(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let mut params = req.params.clone();
if let Some(name) = params
.get("interpreter")
.and_then(|v| v.as_str())
.map(str::to_string)
{
let resolved =
car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
params["command"] = Value::String(resolved.to_string_lossy().into_owned());
}
let spec: car_registry::supervisor::AgentSpec =
serde_json::from_value(params).map_err(|e| e.to_string())?;
let supervisor = state.supervisor()?;
let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
serde_json::to_value(agent).map_err(|e| e.to_string())
}
async fn handle_agents_install(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let manifest: car_registry::manifest::AgentManifest =
serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
let supervisor = state.supervisor()?;
let (report, managed) = supervisor
.install_manifest(manifest, &host)
.await
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"report": {
"missingOptional": report
.missing_optional
.iter()
.map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
.collect::<Vec<_>>(),
},
"agent": managed,
}))
}
async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
let entries = match state.observer_manifest_path() {
Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
.map_err(|e| e.to_string())?,
None => {
let supervisor = state.supervisor()?;
supervisor.health().await
}
};
serde_json::to_value(entries).map_err(|e| e.to_string())
}
fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
req.params
.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.ok_or_else(|| "missing required `id` parameter".to_string())
}
async fn handle_agents_remove(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = extract_agent_id(req)?;
let supervisor = state.supervisor()?;
let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
Ok(serde_json::json!({ "removed": removed }))
}
async fn handle_agents_start(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = extract_agent_id(req)?;
let supervisor = state.supervisor()?;
let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
serde_json::to_value(agent).map_err(|e| e.to_string())
}
async fn handle_agents_stop(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = extract_agent_id(req)?;
let signal: car_registry::supervisor::StopSignal = req
.params
.get("signal")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(|e| e.to_string())?
.unwrap_or_default();
let supervisor = state.supervisor()?;
let agent = supervisor
.stop(&id, signal)
.await
.map_err(|e| e.to_string())?;
serde_json::to_value(agent).map_err(|e| e.to_string())
}
async fn handle_agents_restart(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = extract_agent_id(req)?;
let supervisor = state.supervisor()?;
let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
serde_json::to_value(agent).map_err(|e| e.to_string())
}
async fn handle_agents_tail_log(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
let id = extract_agent_id(req)?;
let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
let supervisor = state.supervisor()?;
let lines = supervisor
.tail_log(&id, n)
.await
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({ "lines": lines }))
}
async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
let include_health = req
.params
.get("include_health")
.and_then(Value::as_bool)
.unwrap_or(false);
let json = car_ffi_common::external_agents::list(include_health).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
let include_health = req
.params
.get("include_health")
.and_then(Value::as_bool)
.unwrap_or(false);
let json = car_ffi_common::external_agents::detect(include_health).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
async fn handle_agents_invoke_external(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
host_session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
let id = req
.params
.get("id")
.and_then(Value::as_str)
.ok_or_else(|| "missing required `id` parameter".to_string())?
.to_string();
let task = req
.params
.get("task")
.and_then(Value::as_str)
.ok_or_else(|| "missing required `task` parameter".to_string())?
.to_string();
let stream = req
.params
.get("stream")
.and_then(Value::as_bool)
.unwrap_or(false);
let session_id = req
.params
.get("session_id")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
let mut options_value = req.params.clone();
if let Some(obj) = options_value.as_object_mut() {
obj.remove("id");
obj.remove("task");
obj.remove("stream");
obj.remove("session_id");
let has_explicit_mcp = obj.contains_key("mcp_endpoint");
if !has_explicit_mcp {
if let Some(url) = state.mcp_url.get() {
obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
}
}
}
if !stream {
let options_json = options_value.to_string();
let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
append_external_agent_audit(&id, &task, &options_value, &result);
return Ok(result);
}
let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
.map_err(|e| format!("invalid options: {e}"))?;
{
let mut chats = state.chat_sessions.lock().await;
chats.entry(session_id.clone()).or_insert_with(|| {
let created_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
crate::session::ChatSession {
agent_id: id.clone(),
host_client_id: host_session.client_id.clone(),
created_at,
}
});
}
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
let drain_state = state.clone();
let drain_session_id = session_id.clone();
let drain_agent_id = id.clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
}
});
let emitter_tx = tx.clone();
let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
let _ = emitter_tx.send(event);
});
let spawn_state = state.clone();
let spawn_session_id = session_id.clone();
let spawn_id = id.clone();
let spawn_task = task.clone();
let spawn_options = options_value.clone();
tokio::spawn(async move {
let outcome =
car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
.await;
drop(tx);
let terminal_params: Value;
let result_value: Value;
match outcome {
Ok(res) => {
let mut parts: Vec<String> = Vec::new();
if res.turns > 0 {
parts.push(format!(
"{} turn{}",
res.turns,
if res.turns == 1 { "" } else { "s" }
));
}
if res.tool_calls > 0 {
parts.push(format!(
"{} tool{}",
res.tool_calls,
if res.tool_calls == 1 { "" } else { "s" }
));
}
if res.duration_ms > 0 {
parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
}
let summary = if parts.is_empty() {
"stop".to_string()
} else {
parts.join(" · ")
};
if res.is_error {
terminal_params = serde_json::json!({
"session_id": spawn_session_id,
"agent_id": spawn_id,
"kind": "error",
"error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
});
} else {
terminal_params = serde_json::json!({
"session_id": spawn_session_id,
"agent_id": spawn_id,
"kind": "done",
"finish_reason": summary,
});
}
result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
}
Err(e) => {
let message = format!("{e}");
terminal_params = serde_json::json!({
"session_id": spawn_session_id,
"agent_id": spawn_id,
"kind": "error",
"error": message.clone(),
});
result_value = serde_json::json!({ "is_error": true, "error": message });
}
}
send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
spawn_state
.chat_sessions
.lock()
.await
.remove(&spawn_session_id);
append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
});
Ok(serde_json::json!({
"accepted": true,
"session_id": session_id,
}))
}
async fn emit_external_chat_event(
state: &Arc<ServerState>,
session_id: &str,
agent_id: &str,
event: car_external_agents::StreamEvent,
) {
use car_external_agents::StreamEvent;
match event {
StreamEvent::Assistant(a) => {
if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
for block in content {
let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
match block_type {
"text" => {
if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
if !text.is_empty() {
let params = serde_json::json!({
"session_id": session_id,
"agent_id": agent_id,
"kind": "token",
"delta": text,
});
send_external_chat_frame(state, session_id, params).await;
}
}
}
"tool_use" => {
let name = block
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("(unknown tool)");
let params = serde_json::json!({
"session_id": session_id,
"agent_id": agent_id,
"kind": "tool_call",
"detail": name,
});
send_external_chat_frame(state, session_id, params).await;
}
_ => {}
}
}
}
}
_ => {
}
}
}
async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let host_client_id = state
.chat_sessions
.lock()
.await
.get(session_id)
.map(|s| s.host_client_id.clone());
let Some(host_client_id) = host_client_id else {
return;
};
let host_channel = {
let sessions = state.sessions.lock().await;
sessions.get(&host_client_id).map(|s| s.channel.clone())
};
let Some(channel) = host_channel else {
return;
};
let frame = serde_json::json!({
"jsonrpc": "2.0",
"method": "agents.chat.event",
"params": params,
});
if let Ok(text) = serde_json::to_string(&frame) {
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
}
}
fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
use std::io::Write;
let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
Some(home) => home.join(".car"),
None => return,
};
if std::fs::create_dir_all(&car_dir).is_err() {
return;
}
let path = car_dir.join("external-agents.jsonl");
let record = serde_json::json!({
"ts": chrono::Utc::now().to_rfc3339(),
"adapter_id": id,
"task": task,
"options": options,
"result": result,
});
let line = match serde_json::to_string(&record) {
Ok(s) => s,
Err(_) => return,
};
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
{
let _ = writeln!(f, "{}", line);
} else {
tracing::warn!(
path = %path.display(),
"failed to append external-agent audit record"
);
}
}
async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
let force = req
.params
.get("force")
.and_then(Value::as_bool)
.unwrap_or(false);
if let Some(id) = req.params.get("id").and_then(Value::as_str) {
let json = car_ffi_common::external_agents::health_one(id, force).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
} else {
let json = car_ffi_common::external_agents::health(force).await?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
}
const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
async fn handle_agents_chat(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
host_session: &Arc<crate::session::ClientSession>,
) -> Result<Value, String> {
use futures::SinkExt;
use tokio::sync::oneshot;
use tokio_tungstenite::tungstenite::Message;
let agent_id = req
.params
.get("agent_id")
.and_then(Value::as_str)
.ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
.to_string();
let prompt = req
.params
.get("prompt")
.and_then(Value::as_str)
.ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
.to_string();
let session_id = req
.params
.get("session_id")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
let stream = req
.params
.get("stream")
.and_then(Value::as_bool)
.unwrap_or(true);
let voice_input = req
.params
.get("voice_input")
.and_then(Value::as_bool)
.unwrap_or(false);
let agent_client_id = state
.attached_agents
.lock()
.await
.get(&agent_id)
.cloned()
.ok_or_else(|| {
format!(
"agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
agent_id
)
})?;
let agent_channel = {
let sessions = state.sessions.lock().await;
sessions
.get(&agent_client_id)
.map(|s| s.channel.clone())
.ok_or_else(|| {
format!(
"agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
agent_id, agent_client_id
)
})?
};
{
let created_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
state.chat_sessions.lock().await.insert(
session_id.clone(),
crate::session::ChatSession {
agent_id: agent_id.clone(),
host_client_id: host_session.client_id.clone(),
created_at,
},
);
}
let request_id = agent_channel.next_request_id();
let (tx, rx) = oneshot::channel();
agent_channel
.pending
.lock()
.await
.insert(request_id.clone(), tx);
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"method": "agent.chat",
"params": {
"session_id": session_id,
"prompt": prompt,
"stream": stream,
"context": {
"host_client_id": host_session.client_id,
"voice_input": voice_input,
},
},
"id": request_id,
});
let msg = Message::Text(
serde_json::to_string(&rpc_request)
.map_err(|e| e.to_string())?
.into(),
);
if let Err(e) = agent_channel.write.lock().await.send(msg).await {
agent_channel.pending.lock().await.remove(&request_id);
state.chat_sessions.lock().await.remove(&session_id);
return Err(format!(
"failed to deliver agent.chat to `{}`: {}",
agent_id, e
));
}
let ack = match tokio::time::timeout(
std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
rx,
)
.await
{
Ok(Ok(resp)) => resp,
Ok(Err(_)) => {
state.chat_sessions.lock().await.remove(&session_id);
return Err(format!(
"agent `{}` disconnected before acking agents.chat",
agent_id
));
}
Err(_) => {
agent_channel.pending.lock().await.remove(&request_id);
state.chat_sessions.lock().await.remove(&session_id);
return Err(format!(
"agent `{}` did not ack agents.chat within {}s",
agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
));
}
};
if let Some(err) = ack.error {
state.chat_sessions.lock().await.remove(&session_id);
return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
}
Ok(serde_json::json!({
"accepted": true,
"session_id": session_id,
}))
}
async fn handle_agents_chat_cancel(
req: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> Result<Value, String> {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let session_id = req
.params
.get("session_id")
.and_then(Value::as_str)
.ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
.to_string();
let chat = state.chat_sessions.lock().await.remove(&session_id);
let chat = match chat {
Some(c) => c,
None => {
return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
}
};
let agent_client_id = state
.attached_agents
.lock()
.await
.get(&chat.agent_id)
.cloned();
if let Some(client_id) = agent_client_id {
let channel_opt = {
let sessions = state.sessions.lock().await;
sessions.get(&client_id).map(|s| s.channel.clone())
};
if let Some(channel) = channel_opt {
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "agent.chat.cancel",
"params": { "session_id": session_id },
});
if let Ok(text) = serde_json::to_string(¬ification) {
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
}
}
}
Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
}
pub(crate) async fn try_forward_agent_chat_event(
parsed: &JsonRpcMessage,
state: &Arc<ServerState>,
) -> bool {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
let Some(method) = parsed.method.as_deref() else {
return false;
};
if method != "agent.chat.event" {
return false;
}
if !parsed.id.is_null() {
return false;
}
let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
return false;
};
let session_id = session_id.to_string();
let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
let Some(chat) = chat else {
return true; };
let kind = parsed
.params
.get("kind")
.and_then(Value::as_str)
.unwrap_or("token")
.to_string();
let host_channel = {
let sessions = state.sessions.lock().await;
sessions
.get(&chat.host_client_id)
.map(|s| s.channel.clone())
};
if let Some(channel) = host_channel {
let mut params = parsed.params.clone();
if let Some(obj) = params.as_object_mut() {
obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
}
let forward = serde_json::json!({
"jsonrpc": "2.0",
"method": "agents.chat.event",
"params": params,
});
if let Ok(text) = serde_json::to_string(&forward) {
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
}
}
if matches!(kind.as_str(), "done" | "error") {
state.chat_sessions.lock().await.remove(&session_id);
}
true
}
#[cfg(test)]
mod fd_leak_regression {
use super::run_dispatch;
use futures::SinkExt;
use std::sync::Arc;
use tokio_tungstenite::tungstenite::{Error as WsError, Message};
#[tokio::test]
async fn abrupt_read_error_still_runs_session_cleanup() {
let tmp = tempfile::TempDir::new().unwrap();
let state = Arc::new(crate::session::ServerState::standalone(
tmp.path().to_path_buf(),
));
let read = futures::stream::iter(vec![Err::<Message, WsError>(
WsError::ConnectionClosed,
)]);
let write: crate::session::WsSink = Box::pin(
futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
);
let result =
run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
assert!(
result.is_ok(),
"run_dispatch must return Ok after cleanup, got {result:?}"
);
assert!(
state.sessions.lock().await.is_empty(),
"state.sessions must be empty after an abrupt disconnect (car#209)"
);
}
}