Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150        + Unpin
151        + Send,
152{
153    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154    tracing::Span::current().record("client_id", &client_id.as_str());
155
156    info!("New connection from {}", peer);
157
158    let channel = Arc::new(WsChannel {
159        write: Mutex::new(write),
160        pending: Mutex::new(HashMap::new()),
161        next_id: AtomicU64::new(1),
162    });
163
164    let session = state.create_session(&client_id, channel.clone()).await;
165
166    // car#209: per-request handlers are spawned detached (below) and
167    // each clones `Arc<ClientSession>` → `Arc<WsChannel>`. A bare
168    // `tokio::spawn` outlives the connection, so a slow/hung handler
169    // pins the split sink and the inbound socket lingers in CLOSED
170    // until the daemon hits EMFILE. Own them in a per-connection
171    // `JoinSet` so every in-flight handler is aborted the instant the
172    // WS drops (`abort_all` in the cleanup block; `JoinSet`'s Drop
173    // also aborts on any early return), releasing the FD immediately.
174    let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176    while let Some(msg) = read.next().await {
177        // Reap finished handlers so a long-lived connection doesn't
178        // retain their `JoinHandle`s unbounded (memory, not FDs).
179        while conn_tasks.try_join_next().is_some() {}
180
181        // car#209: on an abrupt transport error (peer reset / network
182        // drop — the trader crash-loop case) `read.next()` yields
183        // `Some(Err(_))`. The old `let msg = msg?;` propagated that
184        // out of the function, *skipping the cleanup block below*, so
185        // `state.sessions` (+ the host/a2ui/chat registries) kept the
186        // session + channel — and thus the socket FD — forever. Break
187        // instead: every disconnect, clean or not, runs the single
188        // cleanup path.
189        let msg = match msg {
190            Ok(m) => m,
191            Err(e) => {
192                info!("read error from {}: {}; closing", client_id, e);
193                break;
194            }
195        };
196        if msg.is_text() {
197            let text = match msg.to_text() {
198                Ok(t) => t,
199                Err(e) => {
200                    info!("non-text frame from {}: {}; closing", client_id, e);
201                    break;
202                }
203            };
204            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205                Ok(m) => m,
206                Err(e) => {
207                    send_response(
208                        &session.channel,
209                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210                    )
211                    .await
212                    .ok();
213                    continue;
214                }
215            };
216
217            // Is this a response to a pending tool callback?
218            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219                if let Some(id_str) = parsed.id.as_str() {
220                    let mut pending = session.channel.pending.lock().await;
221                    if let Some(tx) = pending.remove(id_str) {
222                        let tool_resp = if let Some(result) = parsed.result {
223                            ToolExecuteResponse {
224                                action_id: id_str.to_string(),
225                                output: Some(result),
226                                error: None,
227                            }
228                        } else {
229                            let err_msg = parsed
230                                .error
231                                .as_ref()
232                                .and_then(|e| e.get("message"))
233                                .and_then(|m| m.as_str())
234                                .unwrap_or("unknown error")
235                                .to_string();
236                            ToolExecuteResponse {
237                                action_id: id_str.to_string(),
238                                output: None,
239                                error: Some(err_msg),
240                            }
241                        };
242                        let _ = tx.send(tool_resp);
243                        continue;
244                    }
245                }
246            }
247
248            // Agent → host chat-event interceptor. `agent.chat.event`
249            // notifications coming up the WS from a connected agent
250            // are forwarded to the originating host's channel as
251            // `agents.chat.event`. Lives ahead of the regular method
252            // dispatch so the dispatcher doesn't reply with
253            // "method-not-found" on what is a fire-and-forget
254            // notification (no id). See
255            // `docs/proposals/agent-chat-surface.md`.
256            if try_forward_agent_chat_event(&parsed, &state).await {
257                continue;
258            }
259
260            // Otherwise it's a client request
261            if let Some(method) = &parsed.method {
262                info!(method = %method, "dispatching JSON-RPC method");
263
264                // Auth gate (Parslee-ai/car-releases#32). When the
265                // server has an auth token installed, every method
266                // other than `session.auth` is rejected on
267                // unauthenticated sessions and the connection is
268                // closed after the error response goes out. When no
269                // token is installed (default), this branch never
270                // fires — preserves pre-#32 behaviour.
271                if state.auth_token.get().is_some()
272                    && !session
273                        .authenticated
274                        .load(std::sync::atomic::Ordering::Acquire)
275                    && method != "session.auth"
276                {
277                    let resp = JsonRpcResponse::error(
278                        parsed.id.clone(),
279                        -32001,
280                        "auth required: send `session.auth` with the per-launch token \
281                         from ~/Library/Application Support/ai.parslee.car/auth-token \
282                         (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283                         or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284                         as the first frame on this connection",
285                    );
286                    let _ = send_response(&session.channel, resp).await;
287                    info!(client = %client_id, method = %method,
288                        "rejecting non-auth method on unauthenticated session; closing");
289                    break;
290                }
291
292                // Approval gate (audit 2026-05). High-risk methods —
293                // anything that drives macOS automation or sends
294                // messages on the user's behalf — must be acked by
295                // the user via `host.resolve_approval` before they
296                // dispatch. The gate raises an `approval.requested`
297                // host event the local UI can render approve/deny on,
298                // then parks until resolved or the configured
299                // timeout fires. Returns a JSON-RPC error and
300                // continues the dispatch loop on deny / timeout —
301                // no connection close, since the caller may want to
302                // retry with revised parameters.
303                if state.approval_gate.requires_approval(method.as_str()) {
304                    match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305                        Ok(()) => {}
306                        Err(reason) => {
307                            let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308                            let _ = send_response(&session.channel, resp).await;
309                            info!(
310                                client = %client_id,
311                                method = %method,
312                                reason = %reason,
313                                "approval gate blocked dispatch"
314                            );
315                            continue;
316                        }
317                    }
318                }
319
320                // Spawn the per-method dispatch in a task so the read
321                // loop keeps reading frames. Without this, methods
322                // that trigger server-initiated `tools.execute`
323                // callbacks (`proposal.submit`, `workflow.run`,
324                // `multi.*` paths that fire a registered tool)
325                // deadlock the connection: the handler awaits the
326                // callback response on a oneshot, but the response is
327                // another frame on this same read half — which the
328                // synchronous `.await` here would prevent the loop
329                // from ever picking up. Surfaced by the
330                // `executeProposal: echo tool via JS callback` smoke
331                // (#173). Response ordering becomes id-keyed (the
332                // JSON-RPC demuxing contract) rather than
333                // arrival-ordered.
334                let session_task = session.clone();
335                let state_task = state.clone();
336                let method_owned = method.clone();
337                let parsed_task = parsed;
338                // car#209: owned by the per-connection JoinSet so it's
339                // aborted on disconnect instead of leaking the channel.
340                conn_tasks.spawn(async move {
341                    let session = session_task;
342                    let state = state_task;
343                    let parsed = parsed_task;
344                    let result = match method_owned.as_str() {
345                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346                        "parslee.auth" => handle_parslee_auth().await,
347                        "auth.start" => handle_auth_start(&parsed).await,
348                        "auth.complete" => handle_auth_complete(&parsed).await,
349                        "auth.status" => handle_auth_status().await,
350                        "auth.logout" => handle_auth_logout().await,
351                        "session.init" => handle_session_init(&parsed, &session).await,
352                        "host.subscribe" => handle_host_subscribe(&session, &state).await,
353                        "host.agents" => handle_host_agents(&session).await,
354                        "host.events" => handle_host_events(&parsed, &session).await,
355                        "host.approvals" => handle_host_approvals(&session).await,
356                        "host.register_agent" => {
357                            handle_host_register_agent(&parsed, &session).await
358                        }
359                        "host.unregister_agent" => {
360                            handle_host_unregister_agent(&parsed, &session).await
361                        }
362                        "host.set_status" => handle_host_set_status(&parsed, &session).await,
363                        "host.notify" => handle_host_notify(&parsed, &session).await,
364                        "host.request_approval" => {
365                            handle_host_request_approval(&parsed, &session).await
366                        }
367                        "host.resolve_approval" => {
368                            handle_host_resolve_approval(&parsed, &session).await
369                        }
370                        "tools.register" => handle_tools_register(&parsed, &session).await,
371                        "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
372                        "policy.register" => handle_policy_register(&parsed, &session).await,
373                        "session.policy.open" => handle_session_policy_open(&session).await,
374                        "session.policy.close" => {
375                            handle_session_policy_close(&parsed, &session).await
376                        }
377                        "verify" => handle_verify(&parsed, &session).await,
378                        "state.get" => handle_state_get(&parsed, &session).await,
379                        "state.set" => handle_state_set(&parsed, &session).await,
380                        "state.exists" => handle_state_exists(&parsed, &session).await,
381                        "state.keys" => handle_state_keys(&parsed, &session).await,
382                        "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
383                        "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
384                        "memory.query" => handle_memory_query(&parsed, &session).await,
385                        "memory.build_context" => {
386                            handle_memory_build_context(&parsed, &session).await
387                        }
388                        "memory.build_context_fast" => {
389                            handle_memory_build_context_fast(&parsed, &session).await
390                        }
391                        "memory.consolidate" => handle_memory_consolidate(&session).await,
392                        "memory.fact_count" => handle_memory_fact_count(&session).await,
393                        "memory.persist" => handle_memory_persist(&parsed, &session).await,
394                        "memory.load" => handle_memory_load(&parsed, &session).await,
395                        "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
396                        "skill.find" => handle_skill_find(&parsed, &session).await,
397                        "skill.report" => handle_skill_report(&parsed, &session).await,
398                        "skill.repair" => handle_skill_repair(&parsed, &session).await,
399                        "skills.ingest_distilled" => {
400                            handle_skills_ingest_distilled(&parsed, &session).await
401                        }
402                        "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
403                        "skills.domains_needing_evolution" => {
404                            handle_skills_domains_needing_evolution(&parsed, &session).await
405                        }
406                        "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
407                        "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
408                        "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
409                        "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
410                        "multi.vote" => handle_multi_vote(&parsed, &session).await,
411                        "scheduler.create" => handle_scheduler_create(&parsed),
412                        "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
413                        "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
414                        "infer" => handle_infer(&parsed, &state, &session).await,
415                        "image.generate" => handle_image_generate(&parsed, &state).await,
416                        "video.generate" => handle_video_generate(&parsed, &state).await,
417                        "embed" => handle_embed(&parsed, &state).await,
418                        "classify" => handle_classify(&parsed, &state).await,
419                        "tokenize" => handle_tokenize(&parsed, &state).await,
420                        "detokenize" => handle_detokenize(&parsed, &state).await,
421                        "rerank" => handle_rerank(&parsed, &state).await,
422                        "transcribe" => handle_transcribe(&parsed, &state).await,
423                        "synthesize" => handle_synthesize(&parsed, &state).await,
424                        "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
425                        "speech.prepare" => handle_speech_prepare(&state).await,
426                        "models.route" => handle_models_route(&parsed, &state).await,
427                        "models.stats" => handle_models_stats(&state).await,
428                        "outcomes.resolve_pending" => {
429                            handle_outcomes_resolve_pending(&parsed, &state).await
430                        }
431                        "events.count" => handle_events_count(&session).await,
432                        "events.stats" => handle_events_stats(&session).await,
433                        "events.truncate" => handle_events_truncate(&parsed, &session).await,
434                        "events.clear" => handle_events_clear(&session).await,
435                        "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
436                        "models.list" => handle_models_list(&state),
437                        "models.register" => handle_models_register(&parsed, &state).await,
438                        "models.unregister" => handle_models_unregister(&parsed, &state).await,
439                        "models.list_unified" => handle_models_list_unified(&state),
440                        "models.search" => handle_models_search(&parsed, &state),
441                        "models.upgrades" => handle_models_upgrades(&state),
442                        "models.pull" => handle_models_pull(&parsed, &state).await,
443                        "models.install" => handle_models_pull(&parsed, &state).await,
444                        "skills.distill" => handle_skills_distill(&parsed, &state).await,
445                        "skills.list" => handle_skills_list(&parsed, &session).await,
446                        "browser.run" => handle_browser_run(&parsed, &session).await,
447                        "browser.close" => handle_browser_close(&session).await,
448                        "secret.put" => handle_secret_put(&parsed),
449                        "secret.get" => handle_secret_get(&parsed),
450                        "secret.delete" => handle_secret_delete(&parsed),
451                        "secret.status" => handle_secret_status(&parsed),
452                        "secret.available" => Ok(car_ffi_common::secrets::is_available()),
453                        "permissions.status" => handle_perm_status(&parsed),
454                        "permissions.request" => handle_perm_request(&parsed),
455                        "permissions.explain" => handle_perm_explain(&parsed),
456                        "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
457                        "accounts.list" => car_ffi_common::accounts::list(),
458                        "accounts.open" => {
459                            #[derive(serde::Deserialize, Default)]
460                            struct OpenParams {
461                                #[serde(default)]
462                                account_id: Option<String>,
463                            }
464                            let p: OpenParams =
465                                serde_json::from_value(parsed.params.clone()).unwrap_or_default();
466                            car_ffi_common::accounts::open_settings(p.account_id.as_deref())
467                        }
468                        "calendar.list" => car_ffi_common::integrations::calendar_list(),
469                        "calendar.events" => handle_calendar_events(&parsed),
470                        "contacts.containers" => {
471                            car_ffi_common::integrations::contacts_containers()
472                        }
473                        "contacts.find" => handle_contacts_find(&parsed),
474                        "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
475                        "mail.inbox" => handle_mail_inbox(&parsed),
476                        "mail.send" => handle_mail_send(&parsed),
477                        "messages.services" => car_ffi_common::integrations::messages_services(),
478                        "messages.chats" => handle_messages_chats(&parsed),
479                        "messages.send" => handle_messages_send(&parsed),
480                        "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
481                        "notes.find" => handle_notes_find(&parsed),
482                        "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
483                        "reminders.items" => handle_reminders_items(&parsed),
484                        "photos.albums" => car_ffi_common::integrations::photos_albums(),
485                        "bookmarks.list" => handle_bookmarks_list(&parsed),
486                        "files.locations" => car_ffi_common::integrations::files_locations(),
487                        "keychain.status" => car_ffi_common::integrations::keychain_status(),
488                        "health.status" => car_ffi_common::health::status(),
489                        "health.sleep" => handle_health_sleep(&parsed),
490                        "health.workouts" => handle_health_workouts(&parsed),
491                        "health.activity" => handle_health_activity(&parsed),
492                        "voice.transcribe_stream.start" => {
493                            handle_voice_transcribe_stream_start(&parsed, &state, &session).await
494                        }
495                        "voice.transcribe_stream.stop" => {
496                            handle_voice_transcribe_stream_stop(&parsed, &state).await
497                        }
498                        "voice.transcribe_stream.push" => {
499                            handle_voice_transcribe_stream_push(&parsed, &state).await
500                        }
501                        "voice.tts_stream.start" => {
502                            handle_voice_tts_stream_start(&parsed, &session).await
503                        }
504                        "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
505                        "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
506                        "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
507                        "voice.dispatch_turn" => {
508                            handle_voice_dispatch_turn(&parsed, &state, &session).await
509                        }
510                        "voice.cancel_turn" => handle_voice_cancel_turn().await,
511                        "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
512                        "inference.register_runner" => {
513                            handle_inference_register_runner(&session).await
514                        }
515                        "inference.runner.event" => handle_inference_runner_event(&parsed).await,
516                        "inference.runner.complete" => {
517                            handle_inference_runner_complete(&parsed).await
518                        }
519                        "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
520                        "voice.providers.list" => {
521                            // Stateless: enumerates STT/TTS providers compiled into
522                            // this build. Runtime readiness (API key, permission,
523                            // model download) is reported via per-provider errors.
524                            serde_json::from_str::<serde_json::Value>(
525                                &car_voice::list_voice_providers_json(),
526                            )
527                            .map_err(|e| e.to_string())
528                        }
529                        "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
530                            .await
531                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532                        "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
533                            .await
534                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
535                        "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
536                        "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
537                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
538                        "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
539                        "workflow.run" => handle_workflow_run(&parsed, &session).await,
540                        "workflow.verify" => handle_workflow_verify(&parsed),
541                        "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
542                        "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
543                        "meeting.list" => handle_meeting_list(&parsed),
544                        "meeting.get" => handle_meeting_get(&parsed),
545                        "registry.register" => handle_registry_register(&parsed),
546                        "registry.heartbeat" => handle_registry_heartbeat(&parsed),
547                        "registry.unregister" => handle_registry_unregister(&parsed),
548                        "registry.list" => handle_registry_list(&parsed),
549                        "registry.reap" => handle_registry_reap(&parsed),
550                        "admission.status" => handle_admission_status(&state),
551                        "a2a.start" => handle_a2a_start(&parsed, &session).await,
552                        "a2a.stop" => handle_a2a_stop(),
553                        "a2a.status" => handle_a2a_status(),
554                        "a2a.send" => handle_a2a_send(&parsed, &state).await,
555                        "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
556                        "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
557                        "a2ui.capabilities" => handle_a2ui_capabilities(&state),
558                        "a2ui.reap" => handle_a2ui_reap(&state).await,
559                        "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
560                        "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
561                        "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
562                        "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
563                        "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
564                        "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
565                        "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
566                        "automation.run_applescript" => handle_run_applescript(&parsed).await,
567                        "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
568                        "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
569                        "notifications.local" => handle_local_notification(&parsed).await,
570                        "vision.ocr" => handle_vision_ocr(&parsed).await,
571                        "agents.list" => handle_agents_list(&state).await,
572                        "agents.health" => handle_agents_health(&state).await,
573                        "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
574                        "agents.install" => handle_agents_install(&parsed, &state).await,
575                        "agents.remove" => handle_agents_remove(&parsed, &state).await,
576                        "agents.start" => handle_agents_start(&parsed, &state).await,
577                        "agents.stop" => handle_agents_stop(&parsed, &state).await,
578                        "agents.restart" => handle_agents_restart(&parsed, &state).await,
579                        "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
580                        "agents.list_external" => handle_agents_list_external(&parsed).await,
581                        "agents.detect_external" => handle_agents_detect_external(&parsed).await,
582                        "agents.health_external" => handle_agents_health_external(&parsed).await,
583                        "agents.invoke_external" => {
584                            handle_agents_invoke_external(&parsed, &state, &session).await
585                        }
586                        "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
587                        "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
588                        // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
589                        // alias to the same in-core dispatcher per
590                        // Parslee-ai/car-releases#28. Embedders that need a
591                        // custom AgentCardSource / TaskStore plug them in
592                        // via ServerStateConfig::with_a2a_card_source /
593                        // with_a2a_store before any handler runs.
594                        "message/send"
595                        | "SendMessage"
596                        | "message/stream"
597                        | "SendStreamingMessage"
598                        | "tasks/get"
599                        | "GetTask"
600                        | "tasks/list"
601                        | "ListTasks"
602                        | "tasks/cancel"
603                        | "CancelTask"
604                        | "tasks/resubscribe"
605                        | "SubscribeToTask"
606                        | "tasks/pushNotificationConfig/set"
607                        | "CreateTaskPushNotificationConfig"
608                        | "tasks/pushNotificationConfig/get"
609                        | "GetTaskPushNotificationConfig"
610                        | "tasks/pushNotificationConfig/list"
611                        | "ListTaskPushNotificationConfigs"
612                        | "tasks/pushNotificationConfig/delete"
613                        | "DeleteTaskPushNotificationConfig"
614                        | "agent/getAuthenticatedExtendedCard"
615                        | "GetExtendedAgentCard" => {
616                            handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
617                        }
618                        _ => Err(format!("unknown method: {}", method_owned)),
619                    };
620
621                    let resp = match result {
622                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
623                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
624                    };
625                    let _ = send_response(&session.channel, resp).await;
626                });
627            }
628        } else if msg.is_binary() {
629            // CAR binary frame transport — see `car_ffi_common::voice::binary`
630            // for the canonical header definition. 26-byte fixed header
631            // followed by an opaque payload. Inbound type 0x01 carries
632            // 16-bit signed LE PCM into a `pcm_push` session; other
633            // types (0x02 TTS chunk, 0x03 final marker, 0x04 error)
634            // are server-emitted and rejected here.
635            let bytes = msg.into_data();
636            let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
637                Ok(p) => p,
638                Err(e) => {
639                    tracing::warn!("binary frame from {} rejected: {}", client_id, e);
640                    continue;
641                }
642            };
643            match parsed.frame_type {
644                car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
645                    let registry = state.voice_sessions.clone();
646                    let payload_owned = parsed.payload.to_vec();
647                    let session_id_owned = parsed.session_id_hex.clone();
648                    conn_tasks.spawn(async move {
649                        if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
650                            &session_id_owned,
651                            &payload_owned,
652                            registry,
653                        )
654                        .await
655                        {
656                            tracing::warn!(
657                                "binary PCM push to session {} failed: {}",
658                                session_id_owned,
659                                e
660                            );
661                        }
662                    });
663                }
664                other => {
665                    tracing::debug!(
666                        "binary frame type {:#04x} from {} not accepted server-side",
667                        other,
668                        client_id
669                    );
670                }
671            }
672        } else if msg.is_close() {
673            info!("Client {} disconnected", client_id);
674            break;
675        }
676    }
677
678    // car#209: abort every in-flight handler for this connection
679    // *first* — they each hold an `Arc<ClientSession>` → `Arc<WsChannel>`
680    // clone; until they're gone the split sink (and the inbound socket
681    // FD) can't drop, even after the registries below are cleared.
682    conn_tasks.abort_all();
683
684    session.host.unsubscribe(&client_id).await;
685    // Auto-cancel this session's pending approvals so the queue stays
686    // in sync with what's actually decidable — covers graceful
687    // unregister+close, hard crash (TCP reset), and ping timeout in
688    // one place. System-level gate approvals (client_id None) are not
689    // touched. car-releases#48.
690    session.host.reap_session_approvals(&client_id).await;
691    state.a2ui_subscribers.lock().await.remove(&client_id);
692
693    // Fix for MULTI-4 / WS-3: drop the session from the registry and
694    // drain any pending tool callbacks. Without this, every connection
695    // we ever accepted keeps an `Arc<ClientSession>` alive in
696    // `state.sessions`, and outstanding `oneshot::Sender`s in
697    // `session.channel.pending` outlive the closed connection until
698    // their per-call timeout (60s). Dropping the senders here causes any
699    // awaiting `recv()` in `WsToolExecutor::execute` to return
700    // `RecvError` immediately, which the existing error-handler path
701    // already maps to "callback channel closed" — same shape as the
702    // timeout path, just faster.
703    let _removed = state.remove_session(&client_id).await;
704    {
705        let mut pending = session.channel.pending.lock().await;
706        pending.clear();
707    }
708
709    Ok(())
710}
711
712async fn send_response(
713    channel: &WsChannel,
714    resp: JsonRpcResponse,
715) -> Result<(), Box<dyn std::error::Error>> {
716    use futures::SinkExt;
717    let json = serde_json::to_string(&resp)?;
718    channel
719        .write
720        .lock()
721        .await
722        .send(Message::Text(json.into()))
723        .await?;
724    Ok(())
725}
726
727// --- Request handlers ---
728
729async fn handle_host_subscribe(
730    session: &crate::session::ClientSession,
731    state: &Arc<ServerState>,
732) -> Result<Value, String> {
733    session
734        .host
735        .subscribe(&session.client_id, session.channel.clone())
736        .await;
737    serde_json::to_value(HostSnapshot {
738        subscribed: true,
739        agents: session.host.agents().await,
740        approvals: session.host.approvals().await,
741        events: session.host.events(50).await,
742        identity: Some(daemon_identity(state)),
743    })
744    .map_err(|e| e.to_string())
745}
746
747/// Snapshot the daemon-identity facts for a fresh subscriber.
748/// Cheap: non-acquiring reads on `OnceLock`s + a single
749/// `to_string_lossy` on the manifest path. Critically uses
750/// [`ServerState::supervisor_if_installed`] — not the lazy-init
751/// `supervisor()` — so a Heisenberg subscribe can't *cause* the
752/// daemon to acquire the manifest lock just by asking whether it
753/// owns one.
754fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
755    // Observer takes precedence: when both supervisor and observer
756    // markers are set (currently unreachable through the standalone
757    // binary, but an embedder could install both racily), the
758    // observer marker is the authoritative role since the
759    // supervisor handle is only installed when this daemon owns
760    // the lock.
761    let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
762        (
763            Some(p.to_string_lossy().into_owned()),
764            car_proto::HostManifestRole::Observer,
765        )
766    } else if let Some(s) = state.supervisor_if_installed() {
767        (
768            Some(s.manifest_path().to_string_lossy().into_owned()),
769            car_proto::HostManifestRole::Owner,
770        )
771    } else {
772        (None, car_proto::HostManifestRole::None)
773    };
774    car_proto::HostIdentity {
775        version: env!("CARGO_PKG_VERSION").to_string(),
776        pid: std::process::id(),
777        manifest_path,
778        manifest_role,
779        parslee: state
780            .parslee_session
781            .get()
782            .map(|session| session.identity.clone()),
783    }
784}
785
786/// Return the Parslee cloud credential for an authenticated CAR
787/// connection. Managed agents already receive `CAR_AUTH_TOKEN` and
788/// authenticate to the local daemon with `session.auth`; this method is
789/// their supported bridge from local CAR auth to the user's Parslee
790/// backend auth.
791///
792/// The bearer token is intentionally not injected into every managed
793/// child process environment. Agents ask for it only when they need it,
794/// through the same local auth gate that protects the rest of the
795/// daemon.
796async fn handle_parslee_auth() -> Result<Value, String> {
797    let session = crate::parslee_auth::load_or_refresh()
798        .await?
799        .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
800    Ok(serde_json::json!({
801        "authenticated": true,
802        "token_type": "Bearer",
803        "access_token": session.access_token,
804        "authorization_header": format!("Bearer {}", session.access_token),
805        "identity": session.identity,
806    }))
807}
808
809// --- auth.* : GUI-driven Parslee sign-in (CAR Host.app).
810// Shares one implementation with `car auth login` via the car-auth
811// crate. Stateless: the trusted in-process GUI holds the PKCE
812// verifier + state between auth.start and auth.complete (same trust
813// boundary as the keychain it ultimately writes).
814async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
815    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
816    let client_id = req
817        .params
818        .get("client_id")
819        .and_then(|v| v.as_str())
820        .unwrap_or("parslee-car");
821    let redirect_uri = req
822        .params
823        .get("redirect_uri")
824        .and_then(|v| v.as_str())
825        .ok_or_else(|| "redirect_uri is required".to_string())?;
826    let provider = req.params.get("provider").and_then(|v| v.as_str());
827    let state = car_auth::new_state();
828    let verifier = car_auth::pkce_verifier();
829    let challenge = car_auth::pkce_challenge(&verifier);
830    let url =
831        car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
832    Ok(serde_json::json!({
833        "authorize_url": url,
834        "state": state,
835        "verifier": verifier,
836    }))
837}
838
839async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
840    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
841    let client_id = req
842        .params
843        .get("client_id")
844        .and_then(|v| v.as_str())
845        .unwrap_or("parslee-car");
846    let redirect_uri = req
847        .params
848        .get("redirect_uri")
849        .and_then(|v| v.as_str())
850        .ok_or_else(|| "redirect_uri is required".to_string())?;
851    let code = req
852        .params
853        .get("code")
854        .and_then(|v| v.as_str())
855        .ok_or_else(|| "code is required".to_string())?;
856    let verifier = req
857        .params
858        .get("verifier")
859        .and_then(|v| v.as_str())
860        .ok_or_else(|| "verifier is required".to_string())?;
861    let token =
862        car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
863    car_auth::store_tokens(&api_base, &token)?;
864    Ok(serde_json::json!({ "ok": true }))
865}
866
867async fn handle_auth_status() -> Result<Value, String> {
868    match car_auth::fetch_status(None).await? {
869        Some(session_json) => {
870            let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
871            Ok(serde_json::json!({ "authenticated": true, "session": session }))
872        }
873        None => Ok(serde_json::json!({ "authenticated": false })),
874    }
875}
876
877async fn handle_auth_logout() -> Result<Value, String> {
878    car_auth::clear_tokens()?;
879    Ok(serde_json::json!({ "ok": true }))
880}
881
882async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
883    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
884}
885
886async fn handle_host_events(
887    req: &JsonRpcMessage,
888    session: &crate::session::ClientSession,
889) -> Result<Value, String> {
890    let limit = req
891        .params
892        .get("limit")
893        .and_then(|v| v.as_u64())
894        .unwrap_or(100) as usize;
895    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
896}
897
898async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
899    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
900}
901
902async fn handle_a2ui_apply(
903    req: &JsonRpcMessage,
904    state: &Arc<ServerState>,
905) -> Result<Value, String> {
906    #[derive(Deserialize)]
907    struct Params {
908        #[serde(default)]
909        envelope: Option<car_a2ui::A2uiEnvelope>,
910        #[serde(default)]
911        message: Option<car_a2ui::A2uiEnvelope>,
912    }
913
914    let envelope = if req.params.get("createSurface").is_some()
915        || req.params.get("updateComponents").is_some()
916        || req.params.get("updateDataModel").is_some()
917        || req.params.get("deleteSurface").is_some()
918    {
919        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
920            .map_err(|e| e.to_string())?
921    } else {
922        match serde_json::from_value::<Params>(req.params.clone()) {
923            Ok(params) => params
924                .envelope
925                .or(params.message)
926                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
927            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
928                .map_err(|e| e.to_string())?,
929        }
930    };
931
932    apply_a2ui_envelope(state, envelope, None, None).await
933}
934
935async fn handle_a2ui_ingest(
936    req: &JsonRpcMessage,
937    state: &Arc<ServerState>,
938) -> Result<Value, String> {
939    #[derive(Deserialize)]
940    #[serde(rename_all = "camelCase")]
941    struct Params {
942        #[serde(default)]
943        endpoint: Option<String>,
944        #[serde(default)]
945        a2a_endpoint: Option<String>,
946        #[serde(default)]
947        owner: Option<car_a2ui::A2uiSurfaceOwner>,
948        #[serde(default)]
949        route_auth: Option<A2aRouteAuth>,
950        #[serde(default)]
951        allow_untrusted_endpoint: bool,
952    }
953
954    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
955        endpoint: None,
956        a2a_endpoint: None,
957        owner: None,
958        route_auth: None,
959        allow_untrusted_endpoint: false,
960    });
961    let payload = req.params.get("payload").unwrap_or(&req.params);
962    state
963        .a2ui
964        .validate_payload(payload)
965        .map_err(|e| e.to_string())?;
966    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
967    if envelopes.is_empty() {
968        return Err("no A2UI envelopes found in payload".into());
969    }
970    let endpoint = params.endpoint.or(params.a2a_endpoint);
971    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
972    let owner = params
973        .owner
974        .or_else(|| car_a2ui::owner_from_value(payload))
975        .map(|owner| match endpoint.clone() {
976            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
977            None => owner,
978        });
979
980    let mut results = Vec::new();
981    for envelope in envelopes {
982        let value =
983            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
984        results.push(value);
985    }
986    Ok(serde_json::json!({ "applied": results }))
987}
988
989async fn apply_a2ui_envelope(
990    state: &Arc<ServerState>,
991    envelope: car_a2ui::A2uiEnvelope,
992    owner: Option<car_a2ui::A2uiSurfaceOwner>,
993    route_auth: Option<A2aRouteAuth>,
994) -> Result<Value, String> {
995    let result = state
996        .a2ui
997        .apply_with_owner(envelope, owner)
998        .await
999        .map_err(|e| e.to_string())?;
1000    update_a2ui_route_auth(state, &result, route_auth).await;
1001    let kind = if result.deleted {
1002        "a2ui.surface_deleted"
1003    } else {
1004        "a2ui.surface_updated"
1005    };
1006    let message = if result.deleted {
1007        format!("A2UI surface {} deleted", result.surface_id)
1008    } else {
1009        format!("A2UI surface {} updated", result.surface_id)
1010    };
1011    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1012    state
1013        .host
1014        .record_event(kind, None, message, payload.clone())
1015        .await;
1016    // Push the envelope result to every WS subscriber as an
1017    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
1018    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
1019    broadcast_a2ui_event(state, kind, &payload).await;
1020    serde_json::to_value(result).map_err(|e| e.to_string())
1021}
1022
1023async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1024    use futures::SinkExt;
1025    use tokio_tungstenite::tungstenite::Message;
1026    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1027        .a2ui_subscribers
1028        .lock()
1029        .await
1030        .values()
1031        .cloned()
1032        .collect();
1033    if subscribers.is_empty() {
1034        return;
1035    }
1036    let Ok(json) = serde_json::to_string(&serde_json::json!({
1037        "jsonrpc": "2.0",
1038        "method": "a2ui.event",
1039        "params": {
1040            "kind": kind,
1041            "result": result,
1042        },
1043    })) else {
1044        return;
1045    };
1046    for channel in subscribers {
1047        let _ = channel
1048            .write
1049            .lock()
1050            .await
1051            .send(Message::Text(json.clone().into()))
1052            .await;
1053    }
1054}
1055
1056async fn update_a2ui_route_auth(
1057    state: &Arc<ServerState>,
1058    result: &car_a2ui::A2uiApplyResult,
1059    route_auth: Option<A2aRouteAuth>,
1060) {
1061    let mut auth = state.a2ui_route_auth.lock().await;
1062    if result.deleted {
1063        auth.remove(&result.surface_id);
1064        return;
1065    }
1066
1067    let has_route_endpoint = result
1068        .surface
1069        .as_ref()
1070        .and_then(|surface| surface.owner.as_ref())
1071        .and_then(|owner| owner.endpoint.as_ref())
1072        .is_some();
1073    match (has_route_endpoint, route_auth) {
1074        (true, Some(route_auth)) => {
1075            auth.insert(result.surface_id.clone(), route_auth);
1076        }
1077        _ => {
1078            auth.remove(&result.surface_id);
1079        }
1080    }
1081}
1082
1083fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1084    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1085}
1086
1087async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1088    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1089    if !removed.is_empty() {
1090        let mut auth = state.a2ui_route_auth.lock().await;
1091        for surface_id in &removed {
1092            auth.remove(surface_id);
1093        }
1094    }
1095    Ok(serde_json::json!({ "removed": removed }))
1096}
1097
1098async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1099    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1100}
1101
1102async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1103    let surface_id = req
1104        .params
1105        .get("surface_id")
1106        .or_else(|| req.params.get("surfaceId"))
1107        .and_then(Value::as_str)
1108        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1109    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1110}
1111
1112/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
1113/// notifications. Subscribers receive every `apply_a2ui_envelope`
1114/// result for as long as they're connected; the cleanup hook in
1115/// `run_dispatch` removes them on disconnect. Closes
1116/// Parslee-ai/car-releases#29.
1117async fn handle_a2ui_subscribe(
1118    session: &crate::session::ClientSession,
1119    state: &Arc<ServerState>,
1120) -> Result<Value, String> {
1121    state
1122        .a2ui_subscribers
1123        .lock()
1124        .await
1125        .insert(session.client_id.clone(), session.channel.clone());
1126    Ok(serde_json::json!({ "subscribed": true }))
1127}
1128
1129/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
1130/// Idempotent: returns `{ subscribed: false }` regardless of prior
1131/// state.
1132async fn handle_a2ui_unsubscribe(
1133    session: &crate::session::ClientSession,
1134    state: &Arc<ServerState>,
1135) -> Result<Value, String> {
1136    state
1137        .a2ui_subscribers
1138        .lock()
1139        .await
1140        .remove(&session.client_id);
1141    Ok(serde_json::json!({ "subscribed": false }))
1142}
1143
1144/// `a2ui/replay` — fetch the current state of one surface. Intended
1145/// for late joiners and reconnect: a client calls `subscribe`, then
1146/// `replay` once per surface it's tracking, and from then on
1147/// notifications keep it in sync. Equivalent to `a2ui.get` on the
1148/// surface store; lives in the subscribe namespace for
1149/// discoverability.
1150async fn handle_a2ui_replay(
1151    req: &JsonRpcMessage,
1152    state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154    let surface_id = req
1155        .params
1156        .get("surface_id")
1157        .or_else(|| req.params.get("surfaceId"))
1158        .and_then(Value::as_str)
1159        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1160    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1161}
1162
1163async fn handle_a2ui_action(
1164    req: &JsonRpcMessage,
1165    state: &Arc<ServerState>,
1166) -> Result<Value, String> {
1167    let action: car_a2ui::ClientAction =
1168        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1169    let owner = state.a2ui.owner(&action.surface_id).await;
1170    let route = route_a2ui_action(state, &action, owner.clone()).await;
1171    let payload = serde_json::json!({
1172        "action": action,
1173        "owner": owner,
1174        "route": route,
1175    });
1176    let event = state
1177        .host
1178        .record_event(
1179            "a2ui.action",
1180            None,
1181            format!(
1182                "A2UI action {} from {}",
1183                action.name, action.source_component_id
1184            ),
1185            payload,
1186        )
1187        .await;
1188    Ok(serde_json::json!({
1189        "event": event,
1190        "route": route,
1191    }))
1192}
1193
1194/// `a2ui.render_report` — renderer-emitted telemetry envelope
1195/// (Parslee-ai/car#180). Fire-and-forget; we record an event in
1196/// the host log (so dev tools and the conversation log see it) and
1197/// broadcast as `a2ui.event { kind: "a2ui.render_report", result }`
1198/// to every WS subscriber. The improvement agent reads the report
1199/// and decides whether to issue a follow-up `patchComponents`.
1200async fn handle_a2ui_render_report(
1201    req: &JsonRpcMessage,
1202    state: &Arc<ServerState>,
1203) -> Result<Value, String> {
1204    // Parse into the typed struct to enforce the schema; we
1205    // re-serialize for the event/broadcast payload so downstream
1206    // consumers don't have to defensively re-validate.
1207    let report: car_a2ui::RenderReport =
1208        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1209    let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1210    let kind = "a2ui.render_report";
1211    let message = format!("A2UI render report for surface {}", report.surface_id);
1212    let event = state
1213        .host
1214        .record_event(kind, None, message, payload.clone())
1215        .await;
1216    broadcast_a2ui_event(state, kind, &payload).await;
1217
1218    // Hand the report to the in-process UI-improvement agent. The
1219    // agent is sync but cheap; we await the surface lookup before
1220    // calling it so the strategies see the same surface state the
1221    // renderer saw at report-emit time. Best-effort: surface lookup
1222    // misses (the surface might have been deleted between
1223    // report-emit and report-receive) are logged and skipped, never
1224    // surfaced as JSON-RPC errors — render_report is fire-and-forget.
1225    if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1226        // Iteration budget — runaway-loop backstop. try_consume
1227        // claims the slot atomically; if the surface is already at
1228        // the cap, we short-circuit before consulting the agent.
1229        // The slot stays consumed only if the patch actually
1230        // applies — failure paths refund.
1231        if !state.ui_agent_budget.try_consume(&report.surface_id) {
1232            tracing::warn!(
1233                surface_id = %report.surface_id,
1234                count = state.ui_agent_budget.count(&report.surface_id),
1235                max = state.ui_agent_budget.max(),
1236                "ui-agent iteration budget exhausted; skipping agent invocation"
1237            );
1238            return Ok(serde_json::json!({ "event": event }));
1239        }
1240        // From here on, every non-applied branch must `refund` the
1241        // slot we just claimed. Only the successful-apply branch
1242        // keeps it consumed.
1243        match state.ui_agent.on_render_report(&report, &surface) {
1244            car_ui_agent::Decision::Patch {
1245                envelope,
1246                strategy_id,
1247                patch_hash,
1248                elapsed_ns,
1249            } => {
1250                // Runtime-side convergence monitor — neo's deferred
1251                // ask. The agent's no-double-patch guard catches
1252                // same-sequence repeats; this catches A→B→A across
1253                // sequences by tracking recent patch hashes per
1254                // surface. When a proposal would repeat one in the
1255                // window, drop it; the loop waits for the next
1256                // signature change.
1257                if !state
1258                    .ui_agent_oscillation
1259                    .check_and_record(&report.surface_id, patch_hash)
1260                {
1261                    tracing::warn!(
1262                        surface_id = %report.surface_id,
1263                        strategy = %strategy_id,
1264                        patch_hash,
1265                        "ui-agent oscillation detected; suppressing patch"
1266                    );
1267                    // Suppressed patch never applied → release the
1268                    // budget slot we claimed above.
1269                    state.ui_agent_budget.refund(&report.surface_id);
1270                    return Ok(serde_json::json!({ "event": event }));
1271                }
1272                let a2ui_envelope = car_a2ui::A2uiEnvelope {
1273                    patch_components: Some(envelope),
1274                    ..Default::default()
1275                };
1276                if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1277                    tracing::warn!(
1278                        surface_id = %report.surface_id,
1279                        strategy = %strategy_id,
1280                        patch_hash,
1281                        elapsed_ns,
1282                        error = %e,
1283                        "ui-agent patch apply failed",
1284                    );
1285                    // Apply failed → release the budget slot.
1286                    state.ui_agent_budget.refund(&report.surface_id);
1287                } else {
1288                    tracing::debug!(
1289                        surface_id = %report.surface_id,
1290                        strategy = %strategy_id,
1291                        patch_hash,
1292                        elapsed_ns,
1293                        iteration = state.ui_agent_budget.count(&report.surface_id),
1294                        "ui-agent patch applied",
1295                    );
1296                    // Memgine trace: one Conversation node per
1297                    // successful patch, tagged "ui-agent/<surface>".
1298                    // Spreading activation can surface these on
1299                    // future renders of related surfaces. Spawned
1300                    // off the render-report hot path — ingest walks
1301                    // the graph for spreading activation and can
1302                    // take real time; we don't want two surfaces
1303                    // churning patches to serialize through the
1304                    // memgine mutex behind the WS handler.
1305                    if let Some(memgine) = state.shared_memgine.clone() {
1306                        let speaker = format!("ui-agent/{}", report.surface_id);
1307                        let text = format!("strategy applied: {}", strategy_id);
1308                        tokio::spawn(async move {
1309                            let mut guard = memgine.lock().await;
1310                            guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1311                        });
1312                    }
1313                }
1314            }
1315            car_ui_agent::Decision::StableNoChange => {
1316                // No patch this round → release the slot.
1317                state.ui_agent_budget.refund(&report.surface_id);
1318            }
1319            car_ui_agent::Decision::HardStop { reason } => {
1320                state.ui_agent_budget.refund(&report.surface_id);
1321                // Renderer painted unknown components — contract
1322                // violation between server and renderer. Per
1323                // types.rs docs: "loud failure" + "MUST pause."
1324                // `error!` not `warn!` so it surfaces in production
1325                // logs at the right severity.
1326                tracing::error!(
1327                    surface_id = %report.surface_id,
1328                    reason = %reason,
1329                    "ui-agent hard-stopped improvement loop",
1330                );
1331            }
1332        }
1333    } else {
1334        tracing::debug!(
1335            surface_id = %report.surface_id,
1336            "ui-agent skipped — surface not found in store",
1337        );
1338    }
1339
1340    Ok(serde_json::json!({ "event": event }))
1341}
1342
1343async fn route_a2ui_action(
1344    state: &Arc<ServerState>,
1345    action: &car_a2ui::ClientAction,
1346    owner: Option<car_a2ui::A2uiSurfaceOwner>,
1347) -> Value {
1348    let Some(owner) = owner else {
1349        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1350    };
1351    if owner.kind != "a2a" {
1352        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1353    }
1354    let Some(endpoint) = owner.endpoint.clone() else {
1355        return serde_json::json!({
1356            "delivered": false,
1357            "reason": "surface owner has no endpoint",
1358            "owner": owner
1359        });
1360    };
1361
1362    let message = car_a2a::Message {
1363        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1364        role: car_a2a::MessageRole::User,
1365        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1366            data: serde_json::json!({
1367                "a2uiAction": action,
1368            }),
1369            metadata: Default::default(),
1370        })],
1371        task_id: owner.task_id.clone(),
1372        context_id: owner.context_id.clone(),
1373        metadata: Default::default(),
1374    };
1375
1376    let auth = state
1377        .a2ui_route_auth
1378        .lock()
1379        .await
1380        .get(&action.surface_id)
1381        .cloned()
1382        .map(client_auth_from_route_auth)
1383        .unwrap_or(car_a2a::ClientAuth::None);
1384
1385    match car_a2a::A2aClient::new(endpoint.clone())
1386        .with_auth(auth)
1387        .send_message(message, false)
1388        .await
1389    {
1390        Ok(result) => serde_json::json!({
1391            "delivered": true,
1392            "owner": owner,
1393            "endpoint": endpoint,
1394            "result": result,
1395        }),
1396        Err(error) => serde_json::json!({
1397            "delivered": false,
1398            "owner": owner,
1399            "endpoint": endpoint,
1400            "error": error.to_string(),
1401        }),
1402    }
1403}
1404
1405fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1406    match auth {
1407        A2aRouteAuth::None => car_a2a::ClientAuth::None,
1408        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1409        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1410    }
1411}
1412
1413fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1414    let endpoint = endpoint?;
1415    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1416        Some(endpoint)
1417    } else {
1418        None
1419    }
1420}
1421
1422fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1423    endpoint == "http://localhost"
1424        || endpoint.starts_with("http://localhost:")
1425        || endpoint.starts_with("http://localhost/")
1426        || endpoint == "http://127.0.0.1"
1427        || endpoint.starts_with("http://127.0.0.1:")
1428        || endpoint.starts_with("http://127.0.0.1/")
1429        || endpoint == "http://[::1]"
1430        || endpoint.starts_with("http://[::1]:")
1431        || endpoint.starts_with("http://[::1]/")
1432}
1433
1434async fn handle_host_register_agent(
1435    req: &JsonRpcMessage,
1436    session: &crate::session::ClientSession,
1437) -> Result<Value, String> {
1438    let request: RegisterHostAgentRequest =
1439        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1440    serde_json::to_value(
1441        session
1442            .host
1443            .register_agent(&session.client_id, request)
1444            .await?,
1445    )
1446    .map_err(|e| e.to_string())
1447}
1448
1449async fn handle_host_unregister_agent(
1450    req: &JsonRpcMessage,
1451    session: &crate::session::ClientSession,
1452) -> Result<Value, String> {
1453    let agent_id = req
1454        .params
1455        .get("agent_id")
1456        .and_then(|v| v.as_str())
1457        .ok_or("missing agent_id")?;
1458    session
1459        .host
1460        .unregister_agent(&session.client_id, agent_id)
1461        .await?;
1462    Ok(serde_json::json!({"ok": true}))
1463}
1464
1465async fn handle_host_set_status(
1466    req: &JsonRpcMessage,
1467    session: &crate::session::ClientSession,
1468) -> Result<Value, String> {
1469    let request: SetHostAgentStatusRequest =
1470        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1471    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1472        .map_err(|e| e.to_string())
1473}
1474
1475async fn handle_host_notify(
1476    req: &JsonRpcMessage,
1477    session: &crate::session::ClientSession,
1478) -> Result<Value, String> {
1479    let kind = req
1480        .params
1481        .get("kind")
1482        .and_then(|v| v.as_str())
1483        .unwrap_or("host.notification");
1484    let agent_id = req
1485        .params
1486        .get("agent_id")
1487        .and_then(|v| v.as_str())
1488        .map(str::to_string);
1489    let message = req
1490        .params
1491        .get("message")
1492        .and_then(|v| v.as_str())
1493        .unwrap_or("");
1494    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1495    serde_json::to_value(
1496        session
1497            .host
1498            .record_event(kind, agent_id, message, payload)
1499            .await,
1500    )
1501    .map_err(|e| e.to_string())
1502}
1503
1504async fn handle_host_request_approval(
1505    req: &JsonRpcMessage,
1506    session: &crate::session::ClientSession,
1507) -> Result<Value, String> {
1508    let request: CreateHostApprovalRequest =
1509        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1510    if let Some(agent_id) = &request.agent_id {
1511        // Best-effort. If the caller doesn't own this agent the
1512        // ACL added 2026-05 will refuse the status update — that
1513        // is the correct semantics; we still want the approval row
1514        // itself to land so the UI can render the request.
1515        let _ = session
1516            .host
1517            .set_status(
1518                &session.client_id,
1519                SetHostAgentStatusRequest {
1520                    agent_id: agent_id.clone(),
1521                    status: HostAgentStatus::WaitingForApproval,
1522                    current_task: None,
1523                    message: Some("Waiting for approval".to_string()),
1524                    payload: Value::Null,
1525                },
1526            )
1527            .await;
1528    }
1529    // `system_level: true` opts the approval out of per-session
1530    // ownership. The host-side ACL then allows any authenticated
1531    // session (typically CarHost or `car-host approve`) to resolve.
1532    // Agents requesting user approval should always set this — the
1533    // session-owned mode is only correct when the requesting session
1534    // is also the resolving session, which approval-via-UI never is.
1535    let owner_client_id = if request.system_level {
1536        None
1537    } else {
1538        Some(session.client_id.as_str())
1539    };
1540    serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1541        .map_err(|e| e.to_string())
1542}
1543
1544async fn handle_host_resolve_approval(
1545    req: &JsonRpcMessage,
1546    session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548    let request: ResolveHostApprovalRequest =
1549        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1550    serde_json::to_value(
1551        session
1552            .host
1553            .resolve_approval(&session.client_id, request)
1554            .await?,
1555    )
1556    .map_err(|e| e.to_string())
1557}
1558
1559/// `session.auth` — present the per-launch token to unlock the
1560/// connection. When `state.auth_token` is unset, this method is a
1561/// no-op success (auth is disabled). When set, the supplied token
1562/// must equal it (constant-time comparison) — a successful auth
1563/// flips `session.authenticated` to `true` so subsequent methods
1564/// pass the gate. Wrong token returns an error AND leaves the
1565/// session unauthenticated; the dispatcher loop's gate then closes
1566/// the connection on the next non-auth method.
1567///
1568/// Closes Parslee-ai/car-releases#32.
1569async fn handle_session_auth(
1570    req: &JsonRpcMessage,
1571    session: &crate::session::ClientSession,
1572    state: &Arc<ServerState>,
1573) -> Result<Value, String> {
1574    let supplied = req
1575        .params
1576        .get("token")
1577        .and_then(Value::as_str)
1578        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1579    // #169: optional `agent_id` binds the WS connection to a
1580    // supervised lifecycle agent. When present, the supplied token
1581    // must equal the per-agent token the supervisor minted at upsert
1582    // (NOT the daemon-wide auth token). When absent, fall back to
1583    // the daemon-wide token — preserves the legacy unbound-token
1584    // path for browser/host/CLI clients.
1585    let agent_id = req
1586        .params
1587        .get("agent_id")
1588        .and_then(Value::as_str)
1589        .map(str::to_string);
1590
1591    if let Some(id) = agent_id {
1592        let supervisor = state.supervisor()?;
1593        if !supervisor.validate_agent_token(&id, supplied).await {
1594            return Err(format!(
1595                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1596            ));
1597        }
1598        // Single-claim: only one connection at a time per
1599        // agent_id. A second claim is rejected so the daemon-side
1600        // per-agent state stays unambiguous.
1601        {
1602            let mut attached = state.attached_agents.lock().await;
1603            if let Some(prior) = attached.get(&id) {
1604                if prior != &session.client_id {
1605                    return Err(format!(
1606                        "auth failed: agent_id `{id}` is already attached on \
1607                         another connection (client_id={prior})"
1608                    ));
1609                }
1610            }
1611            attached.insert(id.clone(), session.client_id.clone());
1612        }
1613        // #170: attach the daemon-owned persistent memgine for
1614        // this agent. Lazy-loaded on first connection per id from
1615        // `~/.car/memory/agents/<id>.jsonl`; retained across
1616        // disconnect so the next session sees the same state.
1617        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1618        *session.bound_memgine.lock().await = Some(agent_eng);
1619        *session.agent_id.lock().await = Some(id.clone());
1620        session
1621            .authenticated
1622            .store(true, std::sync::atomic::Ordering::Release);
1623        return Ok(serde_json::json!({
1624            "ok": true,
1625            "auth_enabled": true,
1626            "agent_id": id,
1627        }));
1628    }
1629
1630    let expected = match state.auth_token.get() {
1631        Some(t) => t,
1632        None => {
1633            // Auth disabled — accept any token politely so callers
1634            // that always include a session.auth handshake (e.g. the
1635            // FFI proxy) don't fail when the daemon happens to be
1636            // unauthed. Mark the session authenticated anyway so the
1637            // gate is a no-op below.
1638            session
1639                .authenticated
1640                .store(true, std::sync::atomic::Ordering::Release);
1641            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1642        }
1643    };
1644    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1645        return Err("auth failed: token mismatch".to_string());
1646    }
1647    session
1648        .authenticated
1649        .store(true, std::sync::atomic::Ordering::Release);
1650    Ok(serde_json::json!({
1651        "ok": true,
1652        "auth_enabled": true,
1653        "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1654    }))
1655}
1656
1657/// Length-checked constant-time byte comparison. Returns false when
1658/// lengths differ (so length itself is the only timing leak — fine
1659/// for our 43-char fixed-length tokens).
1660fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1661    if a.len() != b.len() {
1662        return false;
1663    }
1664    let mut diff: u8 = 0;
1665    for (x, y) in a.iter().zip(b.iter()) {
1666        diff |= x ^ y;
1667    }
1668    diff == 0
1669}
1670
1671/// Block dispatch of `method` until the user resolves the approval
1672/// raised on [`HostState`].
1673///
1674/// Called from the dispatcher loop only when
1675/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1676/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1677/// to the caller as JSON-RPC error code `-32003` with the supplied
1678/// reason. On timeout, the approval row stays in `Pending` so the
1679/// UI keeps a record of the unanswered request.
1680async fn gate_high_risk_method(
1681    method: &str,
1682    params: &Value,
1683    state: &Arc<ServerState>,
1684) -> Result<(), String> {
1685    let timeout = state.approval_gate.timeout;
1686    let req = CreateHostApprovalRequest {
1687        agent_id: None,
1688        action: format!("ws.method:{method}"),
1689        details: serde_json::json!({
1690            "method": method,
1691            // Truncate params for the UI — full payload is recoverable
1692            // via the request-time host event log if needed. The cap
1693            // keeps a malicious caller from drowning the UI in JSON.
1694            "params_preview": preview_params(params, 2_000),
1695        }),
1696        options: vec!["approve".to_string(), "deny".to_string()],
1697        // The high-risk-method gate is already system-level (it
1698        // passes None as the owner via request_and_wait_approval's
1699        // internal call). This field is informational here.
1700        system_level: true,
1701    };
1702    match state
1703        .host
1704        .request_and_wait_approval(req, "approve", timeout)
1705        .await
1706    {
1707        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1708        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1709            "{method} denied by user (approval gate, audit 2026-05). \
1710             To call this method without an interactive prompt, start \
1711             car-server with --no-approvals on a trusted machine."
1712        )),
1713        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1714            "{method} approval timed out after {}s with no resolution. \
1715             The approval is still visible in `host.approvals` for \
1716             forensics; resubmit the request to retry.",
1717            timeout.as_secs()
1718        )),
1719        Err(e) => Err(format!("approval gate error: {e}")),
1720    }
1721}
1722
1723fn preview_params(value: &Value, max_chars: usize) -> Value {
1724    let s = value.to_string();
1725    if s.len() <= max_chars {
1726        value.clone()
1727    } else {
1728        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1729    }
1730}
1731
1732async fn handle_session_init(
1733    req: &JsonRpcMessage,
1734    session: &crate::session::ClientSession,
1735) -> Result<Value, String> {
1736    let init: SessionInitRequest =
1737        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1738
1739    for tool in &init.tools {
1740        register_from_definition(&session.runtime, tool).await;
1741    }
1742
1743    let mut policy_count = 0;
1744    {
1745        let mut policies = session.runtime.policies.write().await;
1746        for policy_def in &init.policies {
1747            if let Some(check) = build_policy_check(policy_def) {
1748                policies.register(&policy_def.name, check, "");
1749                policy_count += 1;
1750            }
1751        }
1752    }
1753
1754    serde_json::to_value(SessionInitResponse {
1755        session_id: session.client_id.clone(),
1756        tools_registered: init.tools.len(),
1757        policies_registered: policy_count,
1758    })
1759    .map_err(|e| e.to_string())
1760}
1761
1762fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1763    match def.rule.as_str() {
1764        "deny_tool" => {
1765            let target = def.target.clone();
1766            Some(Box::new(
1767                move |action: &car_ir::Action, _: &car_state::StateStore| {
1768                    if action.tool.as_deref() == Some(&target) {
1769                        Some(format!("tool '{}' denied", target))
1770                    } else {
1771                        None
1772                    }
1773                },
1774            ))
1775        }
1776        "require_state" => {
1777            let key = def.key.clone();
1778            let value = def.value.clone();
1779            Some(Box::new(
1780                move |_: &car_ir::Action, state: &car_state::StateStore| {
1781                    if state.get(&key).as_ref() != Some(&value) {
1782                        Some(format!("state['{}'] must be {:?}", key, value))
1783                    } else {
1784                        None
1785                    }
1786                },
1787            ))
1788        }
1789        "deny_tool_param" => {
1790            let target = def.target.clone();
1791            let param = def.key.clone();
1792            let pattern = def.pattern.clone();
1793            Some(Box::new(
1794                move |action: &car_ir::Action, _: &car_state::StateStore| {
1795                    if action.tool.as_deref() != Some(&target) {
1796                        return None;
1797                    }
1798                    if let Some(val) = action.parameters.get(&param) {
1799                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1800                        if s.contains(&pattern) {
1801                            return Some(format!("param '{}' matches '{}'", param, pattern));
1802                        }
1803                    }
1804                    None
1805                },
1806            ))
1807        }
1808        _ => None,
1809    }
1810}
1811
1812async fn handle_tools_register(
1813    req: &JsonRpcMessage,
1814    session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816    let tools: Vec<ToolDefinition> =
1817        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818    for tool in &tools {
1819        register_from_definition(&session.runtime, tool).await;
1820    }
1821    Ok(Value::from(tools.len()))
1822}
1823
1824/// Bridge a wire-protocol `ToolDefinition` to the engine's
1825/// schema-aware registration. Carries the full ToolSchema shape
1826/// (description, parameters, returns, idempotency, caching, rate
1827/// limit) through to the validator. An empty `parameters` object is
1828/// the legacy schemaless registration — the validator no-ops for
1829/// those, so pre-v0.5.x callers see no change.
1830async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1831    runtime
1832        .register_tool_schema(car_ir::ToolSchema {
1833            name: def.name.clone(),
1834            description: def.description.clone(),
1835            parameters: def.parameters.clone(),
1836            returns: def.returns.clone(),
1837            idempotent: def.idempotent,
1838            cache_ttl_secs: def.cache_ttl_secs,
1839            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1840                max_calls: rl.max_calls,
1841                interval_secs: rl.interval_secs,
1842            }),
1843        })
1844        .await;
1845}
1846
1847async fn handle_proposal_submit(
1848    req: &JsonRpcMessage,
1849    session: &crate::session::ClientSession,
1850) -> Result<Value, String> {
1851    let submit: ProposalSubmitRequest =
1852        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1853    // `session_id` is sibling to `proposal` in the params object —
1854    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1855    // present, executes the proposal under the named session so any
1856    // session-scoped policies layer on top of global ones.
1857    // See docs/proposals/per-session-policy-scoping.md.
1858    let session_id = req
1859        .params
1860        .get("session_id")
1861        .and_then(|v| v.as_str())
1862        .map(str::to_string);
1863
1864    // `scope` is the optional per-execution caller / tenant surface
1865    // added in Parslee-ai/car#187 phase 3. Shape mirrors
1866    // `car_engine::RuntimeScope` ({ callerId, tenantId, claims });
1867    // serde's camelCase rename keeps the wire form consistent with
1868    // the rest of the JSON-RPC surface. When present, the proposal
1869    // routes through `execute_scoped*` so the runtime records the
1870    // identity on the event log and state R/W ops apply the tenant
1871    // prefix (#187 phase 3-B).
1872    let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1873        Some(v) if !v.is_null() => {
1874            Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1875        }
1876        _ => None,
1877    };
1878
1879    let result = match (session_id, scope) {
1880        // session_id + scope: no single combined entry point on
1881        // Runtime today. Scope takes precedence because it's the
1882        // tenant-isolation surface; per-session policies layer in
1883        // a follow-up when there's a real caller asking for both.
1884        (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1885        (Some(sid), None) => {
1886            session
1887                .runtime
1888                .execute_with_session(&submit.proposal, &sid)
1889                .await
1890        }
1891        (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1892        (None, None) => session.runtime.execute(&submit.proposal).await,
1893    };
1894    serde_json::to_value(result).map_err(|e| e.to_string())
1895}
1896
1897async fn handle_session_policy_open(
1898    session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900    let id = session.runtime.open_session().await;
1901    Ok(serde_json::json!({ "session_id": id }))
1902}
1903
1904async fn handle_session_policy_close(
1905    req: &JsonRpcMessage,
1906    session: &crate::session::ClientSession,
1907) -> Result<Value, String> {
1908    let sid = req
1909        .params
1910        .get("session_id")
1911        .and_then(|v| v.as_str())
1912        .ok_or("missing 'session_id'")?;
1913    let closed = session.runtime.close_session(sid).await;
1914    Ok(serde_json::json!({ "closed": closed }))
1915}
1916
1917/// `policy.register` — register one policy against this WebSocket
1918/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1919/// `session.init`. When `session_id` is present, the policy is scoped
1920/// to the named in-runtime session opened via `session.policy.open`;
1921/// otherwise it is global.
1922async fn handle_policy_register(
1923    req: &JsonRpcMessage,
1924    session: &crate::session::ClientSession,
1925) -> Result<Value, String> {
1926    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1927        .map_err(|e| format!("invalid policy params: {e}"))?;
1928    let session_id = req
1929        .params
1930        .get("session_id")
1931        .and_then(|v| v.as_str())
1932        .map(str::to_string);
1933    let check = build_policy_check(&def)
1934        .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1935    match session_id {
1936        Some(sid) => session
1937            .runtime
1938            .register_policy_in_session(&sid, &def.name, check, "")
1939            .await
1940            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1941        None => {
1942            let mut policies = session.runtime.policies.write().await;
1943            policies.register(&def.name, check, "");
1944            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1945        }
1946    }
1947}
1948
1949async fn handle_verify(
1950    req: &JsonRpcMessage,
1951    session: &crate::session::ClientSession,
1952) -> Result<Value, String> {
1953    let vr: VerifyRequest =
1954        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1955    let tools: std::collections::HashSet<String> =
1956        session.runtime.tools.read().await.keys().cloned().collect();
1957    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1958    serde_json::to_value(VerifyResponse {
1959        valid: result.valid,
1960        issues: result
1961            .issues
1962            .iter()
1963            .map(|i| VerifyIssueProto {
1964                action_id: i.action_id.clone(),
1965                severity: i.severity.clone(),
1966                message: i.message.clone(),
1967            })
1968            .collect(),
1969        simulated_state: result.simulated_state,
1970    })
1971    .map_err(|e| e.to_string())
1972}
1973
1974/// Parse the optional `tenant_id` sibling field from JSON-RPC
1975/// params (Parslee-ai/car#187 phase 3-E). When set and non-empty,
1976/// state R/W routes through `StateStore::scoped(tenant_id)` so
1977/// distinct tenants can't see each other's keys over the WS
1978/// surface — symmetric to the proposal.submit scope plumbing.
1979/// When absent / empty, the legacy unscoped namespace applies.
1980fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1981    req.params
1982        .get("tenant_id")
1983        .and_then(|v| v.as_str())
1984        .filter(|s| !s.is_empty())
1985        .map(str::to_string)
1986}
1987
1988async fn handle_state_get(
1989    req: &JsonRpcMessage,
1990    session: &crate::session::ClientSession,
1991) -> Result<Value, String> {
1992    let key = req
1993        .params
1994        .get("key")
1995        .and_then(|v| v.as_str())
1996        .ok_or("missing 'key'")?;
1997    let tenant = tenant_from_params(req);
1998    Ok(session
1999        .runtime
2000        .state
2001        .scoped(tenant.as_deref())
2002        .get(key)
2003        .unwrap_or(Value::Null))
2004}
2005
2006async fn handle_state_set(
2007    req: &JsonRpcMessage,
2008    session: &crate::session::ClientSession,
2009) -> Result<Value, String> {
2010    let key = req
2011        .params
2012        .get("key")
2013        .and_then(|v| v.as_str())
2014        .ok_or("missing 'key'")?;
2015    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2016    let tenant = tenant_from_params(req);
2017    session
2018        .runtime
2019        .state
2020        .scoped(tenant.as_deref())
2021        .set(key, value, "client");
2022    Ok(Value::from("ok"))
2023}
2024
2025/// `state.exists` — true if the key is set in this session's state
2026/// store, false otherwise. Cheaper than `state.get` + null-check on
2027/// the client side because it doesn't serialize the value.
2028async fn handle_state_exists(
2029    req: &JsonRpcMessage,
2030    session: &crate::session::ClientSession,
2031) -> Result<Value, String> {
2032    let key = req
2033        .params
2034        .get("key")
2035        .and_then(|v| v.as_str())
2036        .ok_or("missing 'key'")?;
2037    let tenant = tenant_from_params(req);
2038    Ok(Value::Bool(
2039        session.runtime.state.scoped(tenant.as_deref()).exists(key),
2040    ))
2041}
2042
2043/// `state.keys` — list every key currently set in this session's
2044/// state store. Returns a JSON array of strings.
2045async fn handle_state_keys(
2046    req: &JsonRpcMessage,
2047    session: &crate::session::ClientSession,
2048) -> Result<Value, String> {
2049    let tenant = tenant_from_params(req);
2050    Ok(Value::Array(
2051        session
2052            .runtime
2053            .state
2054            .scoped(tenant.as_deref())
2055            .keys()
2056            .into_iter()
2057            .map(Value::String)
2058            .collect(),
2059    ))
2060}
2061
2062/// `state.snapshot` — return the entire session state store as a
2063/// JSON object (`{ key: value, ... }`). Equivalent to iterating
2064/// `state.keys` + `state.get` but in a single round-trip; for
2065/// inspectors/dashboards.
2066///
2067/// Tenant-scoped variant: when `tenant_id` is set, only that
2068/// tenant's keys are returned (prefix stripped on the way out).
2069/// `state.snapshot` with no `tenant_id` returns only unscoped
2070/// keys; consistent with `state.keys`'s filter behaviour and the
2071/// strict-isolation contract from phase 3-B.
2072async fn handle_state_snapshot(
2073    req: &JsonRpcMessage,
2074    session: &crate::session::ClientSession,
2075) -> Result<Value, String> {
2076    let tenant = tenant_from_params(req);
2077    let view = session.runtime.state.scoped(tenant.as_deref());
2078    let mut map = serde_json::Map::new();
2079    for key in view.keys() {
2080        if let Some(value) = view.get(&key) {
2081            map.insert(key, value);
2082        }
2083    }
2084    Ok(Value::Object(map))
2085}
2086
2087// --- Per-agent persistent memgine (#170) ---
2088
2089/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
2090/// Mirrors the existing `memory.persist` shape (flat JSON array of
2091/// fact objects) so the same loader path works.
2092fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2093    let base = car_ffi_common::memory_path::ensure_base()
2094        .map_err(|e| format!("memory base unavailable: {e}"))?;
2095    let dir = base.join("agents");
2096    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2097    Ok(dir.join(format!("{agent_id}.json")))
2098}
2099
2100/// Acquire (or lazy-create + load from disk) the daemon-owned
2101/// persistent memgine for `agent_id`. First call per id reads
2102/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
2103/// share the in-memory engine across sessions. Caller stores the
2104/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
2105/// handlers route through it via [`ClientSession::effective_memgine`].
2106async fn get_or_load_agent_memgine(
2107    state: &Arc<ServerState>,
2108    agent_id: &str,
2109) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2110    {
2111        let map = state.agent_memgines.lock().await;
2112        if let Some(eng) = map.get(agent_id) {
2113            return Ok(eng.clone());
2114        }
2115    }
2116    // Build a fresh engine and try to load from disk.
2117    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2118        None,
2119    )));
2120    let path = agent_memgine_snapshot_path(agent_id)?;
2121    if path.exists() {
2122        let content = std::fs::read_to_string(&path)
2123            .map_err(|e| format!("read {}: {}", path.display(), e))?;
2124        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2125        let mut g = engine.lock().await;
2126        let mut loaded: u32 = 0;
2127        for fact in &facts {
2128            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2129            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2130            let kind = fact
2131                .get("kind")
2132                .and_then(|v| v.as_str())
2133                .unwrap_or("pattern");
2134            let fid = format!("loaded-{loaded}");
2135            g.ingest_fact(
2136                &fid,
2137                subject,
2138                body,
2139                "user",
2140                "peer",
2141                chrono::Utc::now(),
2142                "global",
2143                None,
2144                vec![],
2145                kind == "constraint",
2146            );
2147            loaded += 1;
2148        }
2149    }
2150    let mut map = state.agent_memgines.lock().await;
2151    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2152    Ok(stored)
2153}
2154
2155/// Snapshot the agent's memgine to its disk file. Same on-wire shape
2156/// as `memory.persist` so manual snapshots and the daemon-owned
2157/// persistence stay interoperable.
2158async fn persist_agent_memgine(
2159    agent_id: &str,
2160    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2161) -> Result<(), String> {
2162    let path = agent_memgine_snapshot_path(agent_id)?;
2163    let g = engine.lock().await;
2164    let facts: Vec<Value> = g
2165        .graph
2166        .inner
2167        .node_indices()
2168        .filter_map(|nix| {
2169            let node = g.graph.inner.node_weight(nix)?;
2170            if !node.is_valid() {
2171                return None;
2172            }
2173            if node.kind == car_memgine::MemKind::Identity
2174                || node.kind == car_memgine::MemKind::Environment
2175            {
2176                return None;
2177            }
2178            Some(serde_json::json!({
2179                "subject": node.key,
2180                "body": node.value,
2181                "kind": match node.kind {
2182                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2183                    car_memgine::MemKind::Conversation => "outcome",
2184                    _ => "pattern",
2185                },
2186                "confidence": 0.5,
2187                "content_type": node.content_type.as_label(),
2188            }))
2189        })
2190        .collect();
2191    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2192    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2193    Ok(())
2194}
2195
2196// --- Memory handlers ---
2197
2198/// `memory.fact_count` — return `valid_fact_count()` of the
2199/// session's memgine. Used by FFI bindings to mirror their
2200/// embedded `fact_count()` accessor without round-tripping a full
2201/// query. No params.
2202async fn handle_memory_fact_count(
2203    session: &crate::session::ClientSession,
2204) -> Result<Value, String> {
2205    let engine_arc = session.effective_memgine().await;
2206    let engine = engine_arc.lock().await;
2207    Ok(Value::from(engine.valid_fact_count()))
2208}
2209
2210async fn handle_memory_add_fact(
2211    req: &JsonRpcMessage,
2212    session: &crate::session::ClientSession,
2213) -> Result<Value, String> {
2214    let subject = req
2215        .params
2216        .get("subject")
2217        .and_then(|v| v.as_str())
2218        .ok_or("missing subject")?;
2219    let body = req
2220        .params
2221        .get("body")
2222        .and_then(|v| v.as_str())
2223        .ok_or("missing body")?;
2224    let kind = req
2225        .params
2226        .get("kind")
2227        .and_then(|v| v.as_str())
2228        .unwrap_or("pattern");
2229    // Route through `effective_memgine` so connections bound to a
2230    // lifecycle agent (#169) write into the daemon-owned per-agent
2231    // memgine instead of the per-WS ephemeral one (#170).
2232    let engine_arc = session.effective_memgine().await;
2233    let count = {
2234        let mut engine = engine_arc.lock().await;
2235        let fid = format!("ws-{}", engine.valid_fact_count());
2236        engine.ingest_fact(
2237            &fid,
2238            subject,
2239            body,
2240            "user",
2241            "peer",
2242            chrono::Utc::now(),
2243            "global",
2244            None,
2245            vec![],
2246            kind == "constraint",
2247        );
2248        engine.valid_fact_count()
2249    };
2250    // Persist after every add when the session is bound to a
2251    // supervised agent. Synchronous write — small JSON snapshot.
2252    if let Some(id) = session.agent_id.lock().await.clone() {
2253        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2254            tracing::warn!(agent_id = %id, error = %e,
2255                "agent memgine persist failed; in-memory state is canonical");
2256        }
2257    }
2258    Ok(Value::from(count))
2259}
2260
2261async fn handle_memory_query(
2262    req: &JsonRpcMessage,
2263    session: &crate::session::ClientSession,
2264) -> Result<Value, String> {
2265    let query = req
2266        .params
2267        .get("query")
2268        .and_then(|v| v.as_str())
2269        .ok_or("missing query")?;
2270    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2271    let engine_arc = session.effective_memgine().await;
2272    let engine = engine_arc.lock().await;
2273    let seeds = engine.graph.find_seeds(query, 5);
2274    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
2275    // both use Personalized PageRank so transport choice doesn't shift
2276    // ranking semantics. Result shape (subject/body/kind/confidence)
2277    // also mirrors NAPI for the same reason.
2278    let hits = if !seeds.is_empty() {
2279        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2280    } else {
2281        vec![]
2282    };
2283    let results: Vec<Value> = hits
2284        .iter()
2285        .filter_map(|hit| {
2286            let node = engine.graph.inner.node_weight(hit.node_ix)?;
2287            Some(serde_json::json!({
2288                "subject": node.key,
2289                "body": node.value,
2290                "kind": format!("{:?}", node.kind).to_lowercase(),
2291                "confidence": hit.activation,
2292            }))
2293        })
2294        .collect();
2295    serde_json::to_value(results).map_err(|e| e.to_string())
2296}
2297
2298async fn handle_memory_build_context(
2299    req: &JsonRpcMessage,
2300    session: &crate::session::ClientSession,
2301) -> Result<Value, String> {
2302    let query = req
2303        .params
2304        .get("query")
2305        .and_then(|v| v.as_str())
2306        .unwrap_or("");
2307    // FFI parity with NAPI `build_context(query, model_context_window)`.
2308    // When supplied, sizes the assembly budget against the model's window
2309    // instead of the fixed 8K default.
2310    let model_context_window = req
2311        .params
2312        .get("model_context_window")
2313        .and_then(|v| v.as_u64())
2314        .map(|w| w as usize);
2315    let mut engine = session.memgine.lock().await;
2316    Ok(Value::from(
2317        engine.build_context_for_model(query, model_context_window),
2318    ))
2319}
2320
2321/// `memory.build_context_fast` — Fast-mode context assembly for
2322/// latency-sensitive paths (voice, real-time). Skips embedding flush,
2323/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
2324/// extraction. Keeps identity, constraints, facts (creation order),
2325/// conversation, environment.
2326async fn handle_memory_build_context_fast(
2327    req: &JsonRpcMessage,
2328    session: &crate::session::ClientSession,
2329) -> Result<Value, String> {
2330    let query = req
2331        .params
2332        .get("query")
2333        .and_then(|v| v.as_str())
2334        .unwrap_or("");
2335    let model_context_window = req
2336        .params
2337        .get("model_context_window")
2338        .and_then(|v| v.as_u64())
2339        .map(|w| w as usize);
2340    let mut engine = session.memgine.lock().await;
2341    Ok(Value::from(engine.build_context_with_options(
2342        query,
2343        model_context_window,
2344        car_memgine::ContextMode::Fast,
2345        None,
2346    )))
2347}
2348
2349/// `memory.persist` — write the session's memgine to a JSON file
2350/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
2351/// so daemon-mode clients can drive checkpoint/restore symmetrically
2352/// with embedded mode. Returns the number of facts written.
2353///
2354/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
2355/// not the caller's. Since the 2026-05 audit, `path` is also
2356/// sandboxed under `~/.car/memory/` via
2357/// [`car_ffi_common::memory_path::resolve`] — relative paths land
2358/// under the base, absolute paths must already be under the base,
2359/// `..` segments are rejected, symlinks pointing out are rejected.
2360/// Pre-2026-05 the path was passed straight to `std::fs::write` and
2361/// became an arbitrary file-write primitive. The base64-blob escape
2362/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
2363/// same resolver when it lands.
2364async fn handle_memory_persist(
2365    req: &JsonRpcMessage,
2366    session: &crate::session::ClientSession,
2367) -> Result<Value, String> {
2368    let path = req
2369        .params
2370        .get("path")
2371        .and_then(|v| v.as_str())
2372        .ok_or("missing path")?;
2373    let resolved = car_ffi_common::memory_path::resolve(path)
2374        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2375    let engine = session.memgine.lock().await;
2376    let facts: Vec<Value> = engine
2377        .graph
2378        .inner
2379        .node_indices()
2380        .filter_map(|nix| {
2381            let node = engine.graph.inner.node_weight(nix)?;
2382            if !node.is_valid() {
2383                return None;
2384            }
2385            if node.kind == car_memgine::MemKind::Identity
2386                || node.kind == car_memgine::MemKind::Environment
2387            {
2388                return None;
2389            }
2390            Some(serde_json::json!({
2391                "subject": node.key,
2392                "body": node.value,
2393                "kind": match node.kind {
2394                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2395                    car_memgine::MemKind::Conversation => "outcome",
2396                    _ => "pattern",
2397                },
2398                "confidence": 0.5,
2399                "content_type": node.content_type.as_label(),
2400            }))
2401        })
2402        .collect();
2403    let count = facts.len();
2404    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2405    std::fs::write(&resolved, json)
2406        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2407    Ok(Value::from(count as u64))
2408}
2409
2410/// `memory.load` — replace the session's memgine with facts from the
2411/// JSON file at `path`. Mirrors NAPI `load_memory`
2412/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
2413/// as `memory.persist` since the 2026-05 audit — relative paths
2414/// land under the base, anything that escapes is rejected.
2415async fn handle_memory_load(
2416    req: &JsonRpcMessage,
2417    session: &crate::session::ClientSession,
2418) -> Result<Value, String> {
2419    let path = req
2420        .params
2421        .get("path")
2422        .and_then(|v| v.as_str())
2423        .ok_or("missing path")?;
2424    let resolved = car_ffi_common::memory_path::resolve(path)
2425        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2426    let content = std::fs::read_to_string(&resolved)
2427        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2428    let facts: Vec<Value> =
2429        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2430    let mut engine = session.memgine.lock().await;
2431    engine.reset();
2432    let mut count: u32 = 0;
2433    for fact in &facts {
2434        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2435        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2436        let kind = fact
2437            .get("kind")
2438            .and_then(|v| v.as_str())
2439            .unwrap_or("pattern");
2440        let fid = format!("loaded-{}", count);
2441        engine.ingest_fact(
2442            &fid,
2443            subject,
2444            body,
2445            "user",
2446            "peer",
2447            chrono::Utc::now(),
2448            "global",
2449            None,
2450            vec![],
2451            kind == "constraint",
2452        );
2453        count += 1;
2454    }
2455    Ok(Value::from(count))
2456}
2457
2458// --- Skill handlers ---
2459
2460async fn handle_skill_ingest(
2461    req: &JsonRpcMessage,
2462    session: &crate::session::ClientSession,
2463) -> Result<Value, String> {
2464    let name = req
2465        .params
2466        .get("name")
2467        .and_then(|v| v.as_str())
2468        .ok_or("missing name")?;
2469    let code = req
2470        .params
2471        .get("code")
2472        .and_then(|v| v.as_str())
2473        .ok_or("missing code")?;
2474    let platform = req
2475        .params
2476        .get("platform")
2477        .and_then(|v| v.as_str())
2478        .unwrap_or("unknown");
2479    let persona = req
2480        .params
2481        .get("persona")
2482        .and_then(|v| v.as_str())
2483        .unwrap_or("");
2484    let url_pattern = req
2485        .params
2486        .get("url_pattern")
2487        .and_then(|v| v.as_str())
2488        .unwrap_or("");
2489    let description = req
2490        .params
2491        .get("description")
2492        .and_then(|v| v.as_str())
2493        .unwrap_or("");
2494    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2495    let keywords: Vec<String> = req
2496        .params
2497        .get("task_keywords")
2498        .and_then(|v| v.as_array())
2499        .map(|arr| {
2500            arr.iter()
2501                .filter_map(|v| v.as_str().map(String::from))
2502                .collect()
2503        })
2504        .unwrap_or_default();
2505
2506    let trigger = car_memgine::SkillTrigger {
2507        persona: persona.into(),
2508        url_pattern: url_pattern.into(),
2509        task_keywords: keywords,
2510        structured: None,
2511    };
2512    let mut engine = session.memgine.lock().await;
2513    let node = engine.ingest_skill(
2514        name,
2515        code,
2516        platform,
2517        trigger,
2518        description,
2519        supersedes,
2520        vec![],
2521        vec![],
2522    );
2523    Ok(Value::from(node.index() as u64))
2524}
2525
2526async fn handle_skill_find(
2527    req: &JsonRpcMessage,
2528    session: &crate::session::ClientSession,
2529) -> Result<Value, String> {
2530    let persona = req
2531        .params
2532        .get("persona")
2533        .and_then(|v| v.as_str())
2534        .unwrap_or("");
2535    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2536    let task = req
2537        .params
2538        .get("task")
2539        .and_then(|v| v.as_str())
2540        .unwrap_or("");
2541    let max = req
2542        .params
2543        .get("max_results")
2544        .and_then(|v| v.as_u64())
2545        .unwrap_or(1) as usize;
2546    let engine = session.memgine.lock().await;
2547    let results = engine.find_skill(persona, url, task, max);
2548    let json: Vec<Value> = results
2549        .iter()
2550        .map(|(m, s)| {
2551            serde_json::json!({
2552                "name": m.name, "code": m.code, "platform": m.platform,
2553                "description": m.description, "stats": m.stats, "match_score": s,
2554            })
2555        })
2556        .collect();
2557    serde_json::to_value(json).map_err(|e| e.to_string())
2558}
2559
2560async fn handle_skill_report(
2561    req: &JsonRpcMessage,
2562    session: &crate::session::ClientSession,
2563) -> Result<Value, String> {
2564    let name = req
2565        .params
2566        .get("skill_name")
2567        .and_then(|v| v.as_str())
2568        .ok_or("missing skill_name")?;
2569    let outcome_str = req
2570        .params
2571        .get("outcome")
2572        .and_then(|v| v.as_str())
2573        .ok_or("missing outcome")?;
2574    let outcome = match outcome_str {
2575        "success" => car_memgine::SkillOutcome::Success,
2576        _ => car_memgine::SkillOutcome::Fail,
2577    };
2578    let mut engine = session.memgine.lock().await;
2579    let stats = engine
2580        .report_outcome(name, outcome)
2581        .ok_or(format!("skill '{}' not found", name))?;
2582    serde_json::to_value(stats).map_err(|e| e.to_string())
2583}
2584
2585// ---------------------------------------------------------------------------
2586// Multi-agent coordination handlers
2587//
2588// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2589// The client runs the model loop and responds with AgentOutput JSON.
2590// ---------------------------------------------------------------------------
2591
2592/// AgentRunner backed by WebSocket callback to the client.
2593struct WsAgentRunner {
2594    channel: Arc<WsChannel>,
2595    host: Arc<crate::host::HostState>,
2596    client_id: String,
2597}
2598
2599#[async_trait::async_trait]
2600impl car_multi::AgentRunner for WsAgentRunner {
2601    async fn run(
2602        &self,
2603        spec: &car_multi::AgentSpec,
2604        task: &str,
2605        _runtime: &car_engine::Runtime,
2606        _mailbox: &car_multi::Mailbox,
2607    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2608        use futures::SinkExt;
2609
2610        let request_id = self.channel.next_request_id();
2611        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2612        let agent = self
2613            .host
2614            .register_agent(
2615                &self.client_id,
2616                RegisterHostAgentRequest {
2617                    id: Some(agent_id.clone()),
2618                    name: spec.name.clone(),
2619                    kind: "callback".to_string(),
2620                    capabilities: spec.tools.clone(),
2621                    project: spec
2622                        .metadata
2623                        .get("project")
2624                        .and_then(|v| v.as_str())
2625                        .map(str::to_string),
2626                    pid: None,
2627                    display: serde_json::from_value(
2628                        spec.metadata
2629                            .get("display")
2630                            .cloned()
2631                            .unwrap_or(serde_json::Value::Null),
2632                    )
2633                    .unwrap_or_default(),
2634                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2635                },
2636            )
2637            .await
2638            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2639        let _ = self
2640            .host
2641            .set_status(
2642                &self.client_id,
2643                SetHostAgentStatusRequest {
2644                    agent_id: agent.id.clone(),
2645                    status: HostAgentStatus::Running,
2646                    current_task: Some(task.to_string()),
2647                    message: Some(format!("{} started", spec.name)),
2648                    payload: serde_json::json!({ "task": task }),
2649                },
2650            )
2651            .await;
2652
2653        let rpc_request = serde_json::json!({
2654            "jsonrpc": "2.0",
2655            "method": "multi.run_agent",
2656            "params": {
2657                "spec": spec,
2658                "task": task,
2659            },
2660            "id": request_id,
2661        });
2662
2663        // Create oneshot channel for the response
2664        let (tx, rx) = tokio::sync::oneshot::channel();
2665        self.channel
2666            .pending
2667            .lock()
2668            .await
2669            .insert(request_id.clone(), tx);
2670
2671        let msg = Message::Text(
2672            serde_json::to_string(&rpc_request)
2673                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2674                .into(),
2675        );
2676        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2677            let _ = self
2678                .host
2679                .set_status(
2680                    &self.client_id,
2681                    SetHostAgentStatusRequest {
2682                        agent_id: agent_id.clone(),
2683                        status: HostAgentStatus::Errored,
2684                        current_task: None,
2685                        message: Some(format!("{} failed to start", spec.name)),
2686                        payload: serde_json::json!({ "error": e.to_string() }),
2687                    },
2688                )
2689                .await;
2690            return Err(car_multi::MultiError::AgentFailed(
2691                spec.name.clone(),
2692                format!("ws send error: {}", e),
2693            ));
2694        }
2695
2696        // Wait for client response (5 min timeout for model loops)
2697        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2698            Ok(Ok(response)) => response,
2699            Ok(Err(_)) => {
2700                let _ = self
2701                    .host
2702                    .set_status(
2703                        &self.client_id,
2704                        SetHostAgentStatusRequest {
2705                            agent_id: agent_id.clone(),
2706                            status: HostAgentStatus::Errored,
2707                            current_task: None,
2708                            message: Some(format!("{} callback channel closed", spec.name)),
2709                            payload: Value::Null,
2710                        },
2711                    )
2712                    .await;
2713                return Err(car_multi::MultiError::AgentFailed(
2714                    spec.name.clone(),
2715                    "agent callback channel closed".into(),
2716                ));
2717            }
2718            Err(_) => {
2719                let _ = self
2720                    .host
2721                    .set_status(
2722                        &self.client_id,
2723                        SetHostAgentStatusRequest {
2724                            agent_id: agent_id.clone(),
2725                            status: HostAgentStatus::Errored,
2726                            current_task: None,
2727                            message: Some(format!("{} timed out", spec.name)),
2728                            payload: Value::Null,
2729                        },
2730                    )
2731                    .await;
2732                return Err(car_multi::MultiError::AgentFailed(
2733                    spec.name.clone(),
2734                    "agent callback timed out (300s)".into(),
2735                ));
2736            }
2737        };
2738
2739        if let Some(err) = response.error {
2740            let _ = self
2741                .host
2742                .set_status(
2743                    &self.client_id,
2744                    SetHostAgentStatusRequest {
2745                        agent_id: agent_id.clone(),
2746                        status: HostAgentStatus::Errored,
2747                        current_task: None,
2748                        message: Some(format!("{} errored", spec.name)),
2749                        payload: serde_json::json!({ "error": err }),
2750                    },
2751                )
2752                .await;
2753            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2754        }
2755
2756        let output_value = response.output.unwrap_or(Value::Null);
2757        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2758            car_multi::MultiError::AgentFailed(
2759                spec.name.clone(),
2760                format!("invalid AgentOutput: {}", e),
2761            )
2762        })?;
2763        let status = if output.error.is_some() {
2764            HostAgentStatus::Errored
2765        } else {
2766            HostAgentStatus::Completed
2767        };
2768        let message = if output.error.is_some() {
2769            format!("{} errored", spec.name)
2770        } else {
2771            format!("{} completed", spec.name)
2772        };
2773        let _ = self
2774            .host
2775            .set_status(
2776                &self.client_id,
2777                SetHostAgentStatusRequest {
2778                    agent_id,
2779                    status,
2780                    current_task: None,
2781                    message: Some(message),
2782                    payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2783                },
2784            )
2785            .await;
2786
2787        Ok(output)
2788    }
2789}
2790
2791fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2792    let safe_name: String = name
2793        .chars()
2794        .map(|c| {
2795            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2796                c
2797            } else {
2798                '-'
2799            }
2800        })
2801        .collect();
2802    format!("{}:{}:{}", client_id, safe_name, request_id)
2803}
2804
2805async fn handle_multi_swarm(
2806    req: &JsonRpcMessage,
2807    session: &crate::session::ClientSession,
2808) -> Result<Value, String> {
2809    let mode_str = req
2810        .params
2811        .get("mode")
2812        .and_then(|v| v.as_str())
2813        .ok_or("missing 'mode'")?;
2814    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2815    let task = req
2816        .params
2817        .get("task")
2818        .and_then(|v| v.as_str())
2819        .ok_or("missing 'task'")?;
2820
2821    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2822        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2823    let agent_specs: Vec<car_multi::AgentSpec> =
2824        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2825    let synth: Option<car_multi::AgentSpec> = req
2826        .params
2827        .get("synthesizer")
2828        .map(|v| serde_json::from_value(v.clone()))
2829        .transpose()
2830        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2831
2832    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2833        channel: session.channel.clone(),
2834        host: session.host.clone(),
2835        client_id: session.client_id.clone(),
2836    });
2837    let infra = car_multi::SharedInfra::new();
2838
2839    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2840    if let Some(s) = synth {
2841        swarm = swarm.with_synthesizer(s);
2842    }
2843
2844    let result = swarm
2845        .run(task, &runner, &infra)
2846        .await
2847        .map_err(|e| format!("swarm error: {}", e))?;
2848    serde_json::to_value(result).map_err(|e| e.to_string())
2849}
2850
2851async fn handle_multi_pipeline(
2852    req: &JsonRpcMessage,
2853    session: &crate::session::ClientSession,
2854) -> Result<Value, String> {
2855    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2856    let task = req
2857        .params
2858        .get("task")
2859        .and_then(|v| v.as_str())
2860        .ok_or("missing 'task'")?;
2861
2862    let stage_specs: Vec<car_multi::AgentSpec> =
2863        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2864
2865    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2866        channel: session.channel.clone(),
2867        host: session.host.clone(),
2868        client_id: session.client_id.clone(),
2869    });
2870    let infra = car_multi::SharedInfra::new();
2871
2872    let result = car_multi::Pipeline::new(stage_specs)
2873        .run(task, &runner, &infra)
2874        .await
2875        .map_err(|e| format!("pipeline error: {}", e))?;
2876    serde_json::to_value(result).map_err(|e| e.to_string())
2877}
2878
2879async fn handle_multi_supervisor(
2880    req: &JsonRpcMessage,
2881    session: &crate::session::ClientSession,
2882) -> Result<Value, String> {
2883    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2884    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2885    let task = req
2886        .params
2887        .get("task")
2888        .and_then(|v| v.as_str())
2889        .ok_or("missing 'task'")?;
2890    let max_rounds = req
2891        .params
2892        .get("max_rounds")
2893        .and_then(|v| v.as_u64())
2894        .unwrap_or(3) as u32;
2895
2896    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2897        .map_err(|e| format!("invalid workers: {}", e))?;
2898    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2899        .map_err(|e| format!("invalid supervisor: {}", e))?;
2900
2901    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2902        channel: session.channel.clone(),
2903        host: session.host.clone(),
2904        client_id: session.client_id.clone(),
2905    });
2906    let infra = car_multi::SharedInfra::new();
2907
2908    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2909        .with_max_rounds(max_rounds)
2910        .run(task, &runner, &infra)
2911        .await
2912        .map_err(|e| format!("supervisor error: {}", e))?;
2913    serde_json::to_value(result).map_err(|e| e.to_string())
2914}
2915
2916async fn handle_multi_map_reduce(
2917    req: &JsonRpcMessage,
2918    session: &crate::session::ClientSession,
2919) -> Result<Value, String> {
2920    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2921    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2922    let task = req
2923        .params
2924        .get("task")
2925        .and_then(|v| v.as_str())
2926        .ok_or("missing 'task'")?;
2927    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2928
2929    let mapper_spec: car_multi::AgentSpec =
2930        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2931    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2932        .map_err(|e| format!("invalid reducer: {}", e))?;
2933    let items: Vec<String> =
2934        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2935
2936    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2937        channel: session.channel.clone(),
2938        host: session.host.clone(),
2939        client_id: session.client_id.clone(),
2940    });
2941    let infra = car_multi::SharedInfra::new();
2942
2943    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2944        .run(task, &items, &runner, &infra)
2945        .await
2946        .map_err(|e| format!("map_reduce error: {}", e))?;
2947    serde_json::to_value(result).map_err(|e| e.to_string())
2948}
2949
2950async fn handle_multi_vote(
2951    req: &JsonRpcMessage,
2952    session: &crate::session::ClientSession,
2953) -> Result<Value, String> {
2954    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2955    let task = req
2956        .params
2957        .get("task")
2958        .and_then(|v| v.as_str())
2959        .ok_or("missing 'task'")?;
2960
2961    let agent_specs: Vec<car_multi::AgentSpec> =
2962        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2963    let synth: Option<car_multi::AgentSpec> = req
2964        .params
2965        .get("synthesizer")
2966        .map(|v| serde_json::from_value(v.clone()))
2967        .transpose()
2968        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2969
2970    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2971        channel: session.channel.clone(),
2972        host: session.host.clone(),
2973        client_id: session.client_id.clone(),
2974    });
2975    let infra = car_multi::SharedInfra::new();
2976
2977    let mut vote = car_multi::Vote::new(agent_specs);
2978    if let Some(s) = synth {
2979        vote = vote.with_synthesizer(s);
2980    }
2981
2982    let result = vote
2983        .run(task, &runner, &infra)
2984        .await
2985        .map_err(|e| format!("vote error: {}", e))?;
2986    serde_json::to_value(result).map_err(|e| e.to_string())
2987}
2988
2989// ---------------------------------------------------------------------------
2990// Scheduler handlers
2991// ---------------------------------------------------------------------------
2992
2993fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2994    let name = req
2995        .params
2996        .get("name")
2997        .and_then(|v| v.as_str())
2998        .ok_or("scheduler.create requires 'name'")?;
2999    let prompt = req
3000        .params
3001        .get("prompt")
3002        .and_then(|v| v.as_str())
3003        .ok_or("scheduler.create requires 'prompt'")?;
3004
3005    let mut task = car_scheduler::Task::new(name, prompt);
3006
3007    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3008        let trigger = match t {
3009            "once" => car_scheduler::TaskTrigger::Once,
3010            "cron" => car_scheduler::TaskTrigger::Cron,
3011            "interval" => car_scheduler::TaskTrigger::Interval,
3012            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3013            _ => car_scheduler::TaskTrigger::Manual,
3014        };
3015        let schedule = req
3016            .params
3017            .get("schedule")
3018            .and_then(|v| v.as_str())
3019            .unwrap_or("");
3020        task = task.with_trigger(trigger, schedule);
3021    }
3022
3023    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3024        task = task.with_system_prompt(sp);
3025    }
3026
3027    serde_json::to_value(&task).map_err(|e| e.to_string())
3028}
3029
3030async fn handle_scheduler_run(
3031    req: &JsonRpcMessage,
3032    session: &crate::session::ClientSession,
3033) -> Result<Value, String> {
3034    let task_val = req
3035        .params
3036        .get("task")
3037        .ok_or("scheduler.run requires 'task'")?;
3038    let mut task: car_scheduler::Task =
3039        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3040
3041    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3042        channel: session.channel.clone(),
3043        host: session.host.clone(),
3044        client_id: session.client_id.clone(),
3045    });
3046    let executor = car_scheduler::Executor::new(runner);
3047    let execution = executor.run_once(&mut task).await;
3048
3049    serde_json::to_value(&execution).map_err(|e| e.to_string())
3050}
3051
3052async fn handle_scheduler_run_loop(
3053    req: &JsonRpcMessage,
3054    session: &crate::session::ClientSession,
3055) -> Result<Value, String> {
3056    let task_val = req
3057        .params
3058        .get("task")
3059        .ok_or("scheduler.run_loop requires 'task'")?;
3060    let mut task: car_scheduler::Task =
3061        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3062    let max_iterations = req
3063        .params
3064        .get("max_iterations")
3065        .and_then(|v| v.as_u64())
3066        .map(|v| v as u32);
3067
3068    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3069        channel: session.channel.clone(),
3070        host: session.host.clone(),
3071        client_id: session.client_id.clone(),
3072    });
3073    let executor = car_scheduler::Executor::new(runner);
3074    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3075    let executions = executor
3076        .run_loop(&mut task, max_iterations, cancel_rx)
3077        .await;
3078
3079    serde_json::to_value(&executions).map_err(|e| e.to_string())
3080}
3081
3082// ---------------------------------------------------------------------------
3083// Inference handlers
3084// ---------------------------------------------------------------------------
3085
3086fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3087    state.inference.get_or_init(|| {
3088        Arc::new(car_inference::InferenceEngine::new(
3089            car_inference::InferenceConfig::default(),
3090        ))
3091    })
3092}
3093
3094async fn handle_infer(
3095    msg: &JsonRpcMessage,
3096    state: &ServerState,
3097    session: &crate::session::ClientSession,
3098) -> Result<Value, String> {
3099    let engine = get_inference_engine(state);
3100    let mut req: car_inference::GenerateRequest =
3101        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3102
3103    // If context_query is provided, build context from memgine and inject it
3104    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3105        let mut memgine = session.memgine.lock().await;
3106        let ctx = memgine.build_context(cq);
3107        if !ctx.is_empty() {
3108            req.context = Some(ctx);
3109        }
3110    }
3111
3112    // Process-wide admission gate. Held for the duration of the
3113    // generation so a burst of concurrent infer RPCs can't multiply
3114    // KV-cache + activation memory and take the host out. The
3115    // `_permit` binding is intentional — its `Drop` releases the slot
3116    // when this future returns.
3117    let _permit = state.admission.acquire().await;
3118
3119    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
3120    // latency_ms are preserved in the response. Plain `generate()` discards
3121    // everything except `.text`, which silently breaks tool-use over the
3122    // WebSocket protocol (issue #43).
3123    //
3124    // NOTE: This directly serializes `InferenceResult`. Any field added to
3125    // that struct in `car-inference` becomes part of the public WebSocket
3126    // protocol. The shape is locked by `inference_result_serializes_*` tests
3127    // in car-inference; updating those tests is part of intentionally
3128    // changing the wire contract.
3129    let result = engine
3130        .generate_tracked(req)
3131        .await
3132        .map_err(|e| e.to_string())?;
3133    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3134}
3135
3136/// Streaming inference — mirrors NAPI `inferStream`. Closes
3137/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
3138/// `infer`; emits `inference.stream.event` JSON-RPC notifications
3139/// during the run, and returns the final `InferenceResult` as the
3140/// JSON-RPC response when the stream completes.
3141///
3142/// Notification shape (server → client):
3143/// ```jsonc
3144/// {
3145///   "jsonrpc": "2.0",
3146///   "method": "inference.stream.event",
3147///   "params": {
3148///     "request_id": "<original RPC id>",
3149///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
3150///   }
3151/// }
3152/// ```
3153///
3154/// The final `done` event is not pushed as a notification — it's
3155/// the JSON-RPC response with the accumulated `InferenceResult`.
3156/// `video.generate` — daemon-side wrapper for
3157/// `InferenceEngine::generate_video`. Mirrors `handle_infer`'s
3158/// admission gate + JSON request shape (Parslee-ai/car#185).
3159///
3160/// Previously the CLI's `cmd_video` constructed an in-process
3161/// engine and called `generate_video` directly — a v0.7 holdover
3162/// that bypassed the daemon. With this handler the CLI proxies
3163/// here, so the engine-level audio_passthrough gate fires
3164/// inside the daemon process where all FFI surfaces converge.
3165async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3166    let engine = get_inference_engine(state);
3167    let req: car_inference::GenerateImageRequest =
3168        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3169    // Share the same admission gate as text/video generation — a burst
3170    // of image requests shouldn't smuggle around the concurrency cap.
3171    let _permit = state.admission.acquire().await;
3172    let result = engine
3173        .generate_image(req)
3174        .await
3175        .map_err(|e| e.to_string())?;
3176    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3177}
3178
3179async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3180    let engine = get_inference_engine(state);
3181    let req: car_inference::GenerateVideoRequest =
3182        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3183    let _permit = state.admission.acquire().await;
3184    let result = engine
3185        .generate_video(req)
3186        .await
3187        .map_err(|e| e.to_string())?;
3188    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3189}
3190
3191async fn handle_infer_stream(
3192    msg: &JsonRpcMessage,
3193    session: &crate::session::ClientSession,
3194    state: &ServerState,
3195) -> Result<Value, String> {
3196    use futures::SinkExt;
3197    use tokio_tungstenite::tungstenite::Message;
3198
3199    let engine = get_inference_engine(state);
3200    let mut req: car_inference::GenerateRequest =
3201        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3202
3203    // Same context-injection convenience as non-streaming `infer` so
3204    // the two methods have parity on the call shape.
3205    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3206        let mut memgine = session.memgine.lock().await;
3207        let ctx = memgine.build_context(cq);
3208        if !ctx.is_empty() {
3209            req.context = Some(ctx);
3210        }
3211    }
3212
3213    let _permit = state.admission.acquire().await;
3214    let mut rx = engine
3215        .generate_tracked_stream(req)
3216        .await
3217        .map_err(|e| e.to_string())?;
3218
3219    let mut accumulator = car_inference::StreamAccumulator::default();
3220    let request_id = msg.id.clone();
3221
3222    while let Some(event) = rx.recv().await {
3223        let event_payload = match &event {
3224            car_inference::StreamEvent::TextDelta(text) => {
3225                serde_json::json!({"type": "text", "data": text})
3226            }
3227            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3228                serde_json::json!({"type": "tool_start", "name": name, "index": index})
3229            }
3230            car_inference::StreamEvent::ToolCallDelta {
3231                index,
3232                arguments_delta,
3233            } => serde_json::json!({
3234                "type": "tool_delta",
3235                "index": index,
3236                "data": arguments_delta,
3237            }),
3238            car_inference::StreamEvent::Usage {
3239                input_tokens,
3240                output_tokens,
3241            } => serde_json::json!({
3242                "type": "usage",
3243                "input_tokens": input_tokens,
3244                "output_tokens": output_tokens,
3245            }),
3246            // Done is delivered as the JSON-RPC response, not a
3247            // notification — matches the NAPI contract where the
3248            // standalone function's return value is the accumulated
3249            // result and the callback only sees in-progress events.
3250            car_inference::StreamEvent::Done { .. } => {
3251                accumulator.push(&event);
3252                continue;
3253            }
3254        };
3255
3256        let notif = serde_json::json!({
3257            "jsonrpc": "2.0",
3258            "method": "inference.stream.event",
3259            "params": {
3260                "request_id": request_id,
3261                "event": event_payload,
3262            },
3263        });
3264        if let Ok(text) = serde_json::to_string(&notif) {
3265            let _ = session
3266                .channel
3267                .write
3268                .lock()
3269                .await
3270                .send(Message::Text(text.into()))
3271                .await;
3272        }
3273        accumulator.push(&event);
3274    }
3275
3276    let (text, tool_calls, usage) = accumulator.finish_with_usage();
3277    Ok(serde_json::json!({
3278        "text": text,
3279        "tool_calls": tool_calls,
3280        "usage": usage,
3281    }))
3282}
3283
3284async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3285    let engine = get_inference_engine(state);
3286    let req: car_inference::EmbedRequest =
3287        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3288    // Embeds load their own model weights; share the same admission
3289    // gate as generations so a burst of embed requests can't smuggle
3290    // around the concurrency cap.
3291    let _permit = state.admission.acquire().await;
3292    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3293    Ok(serde_json::json!({"embeddings": result}))
3294}
3295
3296async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3297    let engine = get_inference_engine(state);
3298    let req: car_inference::ClassifyRequest =
3299        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3300    let _permit = state.admission.acquire().await;
3301    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3302    Ok(serde_json::json!({"classifications": result}))
3303}
3304
3305/// Surface the current admission state so the menubar tray and
3306/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
3307/// snapshot — racy by definition but correct enough for status panels.
3308fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3309    let total = state.admission.permits();
3310    let available = state.admission.permits_available();
3311    let in_use = total.saturating_sub(available);
3312    Ok(serde_json::json!({
3313        "permits_total": total,
3314        "permits_available": available,
3315        "permits_in_use": in_use,
3316        "env_override": crate::admission::ENV_MAX_CONCURRENT,
3317    }))
3318}
3319
3320async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3321    let model = msg
3322        .params
3323        .get("model")
3324        .and_then(|v| v.as_str())
3325        .ok_or("missing 'model' parameter")?;
3326    let text = msg
3327        .params
3328        .get("text")
3329        .and_then(|v| v.as_str())
3330        .ok_or("missing 'text' parameter")?;
3331    let engine = get_inference_engine(state);
3332    let ids = engine
3333        .tokenize(model, text)
3334        .await
3335        .map_err(|e| e.to_string())?;
3336    Ok(serde_json::json!({"tokens": ids}))
3337}
3338
3339async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3340    let model = msg
3341        .params
3342        .get("model")
3343        .and_then(|v| v.as_str())
3344        .ok_or("missing 'model' parameter")?;
3345    let tokens: Vec<u32> = msg
3346        .params
3347        .get("tokens")
3348        .and_then(|v| v.as_array())
3349        .ok_or("missing 'tokens' parameter")?
3350        .iter()
3351        .map(|t| {
3352            t.as_u64()
3353                .and_then(|n| u32::try_from(n).ok())
3354                .ok_or_else(|| "tokens[] must be u32 values".to_string())
3355        })
3356        .collect::<Result<Vec<_>, _>>()?;
3357    let engine = get_inference_engine(state);
3358    let text = engine
3359        .detokenize(model, &tokens)
3360        .await
3361        .map_err(|e| e.to_string())?;
3362    Ok(serde_json::json!({"text": text}))
3363}
3364
3365/// `models.register` — persist a user-supplied `ModelSchema` to
3366/// `~/.car/models.json` (Parslee-ai/car-releases#39). Replaces any
3367/// existing entry with the same `id`. Returns `{id, registered}`.
3368///
3369/// **Phase 1 limitation**: the daemon's live `UnifiedRegistry` is
3370/// not updated in-process — the new model becomes visible to
3371/// `models.list`, `infer`, `infer_stream` on the **next daemon
3372/// boot** when `load_user_config` re-reads the file. This is
3373/// enough to unblock opencode's setup flow (register ahead of
3374/// time, then start the daemon). Hot-update requires either an
3375/// `RwLock<InferenceEngine>` on `ServerState` or an
3376/// interior-mutable `UnifiedRegistry`; both touch 20+ call sites
3377/// and are tracked as a follow-up.
3378///
3379/// Until hot-update lands, callers SHOULD register their models
3380/// before issuing `infer` calls against them, and operators
3381/// SHOULD restart the daemon after batches of model
3382/// registrations.
3383async fn handle_models_register(
3384    req: &JsonRpcMessage,
3385    _state: &Arc<ServerState>,
3386) -> Result<Value, String> {
3387    // The params shape mirrors v0.7's FFI `rt.registerModel(schemaJson)`:
3388    // either the bare `ModelSchema` value, OR `{ schema: ModelSchema }`.
3389    // Honor both so existing in-process callers don't have to reshape.
3390    let schema_value = match req.params.get("schema") {
3391        Some(v) => v.clone(),
3392        None => req.params.clone(),
3393    };
3394    let schema: car_inference::ModelSchema =
3395        serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3396    let id = schema.id.clone();
3397
3398    // Resolve the models.json path the same way UnifiedRegistry does:
3399    // `<models_dir>/../models.json` where models_dir defaults to
3400    // `~/.car/models/`. We read whatever's there, swap in the new
3401    // entry, and write back atomically.
3402    let home = std::env::var_os("HOME")
3403        .or_else(|| std::env::var_os("USERPROFILE"))
3404        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3405    let car_dir = std::path::PathBuf::from(home).join(".car");
3406    std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3407    let path = car_dir.join("models.json");
3408
3409    let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3410        let text =
3411            std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3412        if text.trim().is_empty() {
3413            Vec::new()
3414        } else {
3415            serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3416        }
3417    } else {
3418        Vec::new()
3419    };
3420    // Replace existing entry with the same id, else append.
3421    if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3422        *slot = schema;
3423    } else {
3424        models.push(schema);
3425    }
3426    let json =
3427        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3428    let tmp = path.with_extension("json.tmp");
3429    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3430    std::fs::rename(&tmp, &path)
3431        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3432    Ok(serde_json::json!({
3433        "id": id,
3434        "registered": true,
3435        "path": path.to_string_lossy(),
3436        "note": "Daemon restart required for live UnifiedRegistry visibility \
3437                 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3438                 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3439    }))
3440}
3441
3442/// `models.unregister` — remove an entry from `~/.car/models.json`
3443/// by id (Parslee-ai/car#186 — symmetric to `models.register`).
3444/// Returns `{ id, unregistered, path }` on success. Returns an error
3445/// when the model isn't present.
3446///
3447/// **Phase 1 limitation** (same as `models.register`): the daemon's
3448/// live `UnifiedRegistry` is not rebuilt — the removal takes effect
3449/// on the next daemon boot. Callers SHOULD restart the daemon after
3450/// a batch of unregistrations if they expect `models.list_unified`
3451/// to reflect the change immediately.
3452async fn handle_models_unregister(
3453    req: &JsonRpcMessage,
3454    _state: &Arc<ServerState>,
3455) -> Result<Value, String> {
3456    // Params shape mirrors the CLI flag: `{ id: string }`. Bare-string
3457    // params are honored for symmetry with the register handler's
3458    // tolerant shape (`{schema: ...}` OR bare schema).
3459    let id = match req.params.get("id") {
3460        Some(v) => v
3461            .as_str()
3462            .ok_or_else(|| "`id` must be a string".to_string())?
3463            .to_string(),
3464        None => match req.params.as_str() {
3465            Some(s) => s.to_string(),
3466            None => return Err("missing `id` parameter".to_string()),
3467        },
3468    };
3469
3470    let home = std::env::var_os("HOME")
3471        .or_else(|| std::env::var_os("USERPROFILE"))
3472        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3473    let car_dir = std::path::PathBuf::from(home).join(".car");
3474    let path = car_dir.join("models.json");
3475
3476    if !path.exists() {
3477        return Err(format!(
3478            "no models.json at {} — nothing to unregister",
3479            path.display()
3480        ));
3481    }
3482    let text =
3483        std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3484    let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3485        Vec::new()
3486    } else {
3487        serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3488    };
3489    let before = models.len();
3490    models.retain(|m| m.id != id);
3491    if models.len() == before {
3492        return Err(format!("model {} not found in {}", id, path.display()));
3493    }
3494    let json =
3495        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3496    let tmp = path.with_extension("json.tmp");
3497    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3498    std::fs::rename(&tmp, &path)
3499        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3500    Ok(serde_json::json!({
3501        "id": id,
3502        "unregistered": true,
3503        "path": path.to_string_lossy(),
3504        "note": "Daemon restart required for live UnifiedRegistry visibility \
3505                 (phase 1, matching models.register).",
3506    }))
3507}
3508
3509fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3510    let engine = get_inference_engine(state);
3511    let models = engine.list_models();
3512    serde_json::to_value(&models).map_err(|e| e.to_string())
3513}
3514
3515fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3516    let engine = get_inference_engine(state);
3517    let models = engine.list_models_unified();
3518    serde_json::to_value(&models).map_err(|e| e.to_string())
3519}
3520
3521#[derive(Debug, Deserialize)]
3522#[serde(rename_all = "camelCase")]
3523struct ModelSearchParams {
3524    #[serde(default)]
3525    query: Option<String>,
3526    #[serde(default)]
3527    capability: Option<car_inference::ModelCapability>,
3528    #[serde(default)]
3529    provider: Option<String>,
3530    #[serde(default)]
3531    local_only: bool,
3532    #[serde(default)]
3533    available_only: bool,
3534    #[serde(default)]
3535    limit: Option<usize>,
3536}
3537
3538#[derive(Debug, Serialize)]
3539#[serde(rename_all = "camelCase")]
3540struct ModelSearchEntry {
3541    #[serde(flatten)]
3542    info: car_inference::ModelInfo,
3543    family: String,
3544    version: String,
3545    tags: Vec<String>,
3546    pullable: bool,
3547    upgrade: Option<car_inference::ModelUpgrade>,
3548}
3549
3550#[derive(Debug, Serialize)]
3551#[serde(rename_all = "camelCase")]
3552struct ModelSearchResponse {
3553    models: Vec<ModelSearchEntry>,
3554    upgrades: Vec<car_inference::ModelUpgrade>,
3555    total: usize,
3556    available: usize,
3557    local: usize,
3558    remote: usize,
3559}
3560
3561fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3562    let params: ModelSearchParams =
3563        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3564            query: None,
3565            capability: None,
3566            provider: None,
3567            local_only: false,
3568            available_only: false,
3569            limit: None,
3570        });
3571    let engine = get_inference_engine(state);
3572    let upgrades = engine.available_model_upgrades();
3573    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3574        .iter()
3575        .cloned()
3576        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3577        .collect();
3578    let query = params
3579        .query
3580        .as_deref()
3581        .map(str::trim)
3582        .filter(|q| !q.is_empty())
3583        .map(|q| q.to_ascii_lowercase());
3584    let provider = params
3585        .provider
3586        .as_deref()
3587        .map(str::trim)
3588        .filter(|p| !p.is_empty())
3589        .map(|p| p.to_ascii_lowercase());
3590
3591    let mut entries: Vec<ModelSearchEntry> = engine
3592        .list_schemas()
3593        .into_iter()
3594        .filter(|schema| {
3595            if let Some(capability) = params.capability {
3596                if !schema.has_capability(capability) {
3597                    return false;
3598                }
3599            }
3600            if let Some(provider) = provider.as_deref() {
3601                if schema.provider.to_ascii_lowercase() != provider {
3602                    return false;
3603                }
3604            }
3605            if params.local_only && !schema.is_local() {
3606                return false;
3607            }
3608            if params.available_only && !schema.available {
3609                return false;
3610            }
3611            if let Some(query) = query.as_deref() {
3612                let capability_text = schema
3613                    .capabilities
3614                    .iter()
3615                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3616                    .collect::<Vec<_>>()
3617                    .join(" ");
3618                let haystack = format!(
3619                    "{} {} {} {} {} {}",
3620                    schema.id,
3621                    schema.name,
3622                    schema.provider,
3623                    schema.family,
3624                    schema.tags.join(" "),
3625                    capability_text
3626                )
3627                .to_ascii_lowercase();
3628                if !haystack.contains(query) {
3629                    return false;
3630                }
3631            }
3632            true
3633        })
3634        .map(|schema| {
3635            let pullable = !schema.available
3636                && matches!(
3637                    schema.source,
3638                    car_inference::ModelSource::Local { .. }
3639                        | car_inference::ModelSource::Mlx { .. }
3640                );
3641            let info = car_inference::ModelInfo::from(&schema);
3642            let upgrade = upgrades_by_from.get(&schema.id).cloned();
3643            ModelSearchEntry {
3644                info,
3645                family: schema.family,
3646                version: schema.version,
3647                tags: schema.tags,
3648                pullable,
3649                upgrade,
3650            }
3651        })
3652        .collect();
3653    entries.sort_by(|a, b| {
3654        b.info
3655            .available
3656            .cmp(&a.info.available)
3657            .then(b.info.is_local.cmp(&a.info.is_local))
3658            .then(a.info.name.cmp(&b.info.name))
3659    });
3660    if let Some(limit) = params.limit {
3661        entries.truncate(limit);
3662    }
3663
3664    let total = entries.len();
3665    let available = entries.iter().filter(|entry| entry.info.available).count();
3666    let local = entries.iter().filter(|entry| entry.info.is_local).count();
3667    let response = ModelSearchResponse {
3668        models: entries,
3669        upgrades,
3670        total,
3671        available,
3672        local,
3673        remote: total.saturating_sub(local),
3674    };
3675    serde_json::to_value(response).map_err(|e| e.to_string())
3676}
3677
3678fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3679    let engine = get_inference_engine(state);
3680    serde_json::to_value(serde_json::json!({
3681        "upgrades": engine.available_model_upgrades()
3682    }))
3683    .map_err(|e| e.to_string())
3684}
3685
3686async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3687    let name = msg
3688        .params
3689        .get("name")
3690        .or_else(|| msg.params.get("id"))
3691        .or_else(|| msg.params.get("model"))
3692        .and_then(|v| v.as_str())
3693        .ok_or("missing 'name' parameter")?;
3694    let engine = get_inference_engine(state);
3695    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3696    Ok(serde_json::json!({"path": path.display().to_string()}))
3697}
3698
3699async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3700    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3701        msg.params
3702            .get("events")
3703            .cloned()
3704            .unwrap_or(msg.params.clone()),
3705    )
3706    .map_err(|e| format!("invalid events: {}", e))?;
3707
3708    let inference = get_inference_engine(state).clone();
3709    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3710
3711    let skills = engine.distill_skills(&events).await;
3712    serde_json::to_value(&skills).map_err(|e| e.to_string())
3713}
3714
3715/// Run memory consolidation against this client's session memgine
3716/// (or the daemon-owned per-agent memgine when bound — #170).
3717/// Returns the JSON `ConsolidationReport`.
3718async fn handle_memory_consolidate(
3719    session: &crate::session::ClientSession,
3720) -> Result<Value, String> {
3721    let engine_arc = session.effective_memgine().await;
3722    let report = {
3723        let mut engine = engine_arc.lock().await;
3724        engine.consolidate().await
3725    };
3726    if let Some(id) = session.agent_id.lock().await.clone() {
3727        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3728            tracing::warn!(agent_id = %id, error = %e,
3729                "agent memgine persist after consolidate failed");
3730        }
3731    }
3732    serde_json::to_value(&report).map_err(|e| e.to_string())
3733}
3734
3735/// Repair a degraded skill on this client's session memgine.
3736/// Returns `{ code: "..." }` on success, `null` if the skill
3737/// isn't broken or repair failed.
3738async fn handle_skill_repair(
3739    msg: &JsonRpcMessage,
3740    session: &crate::session::ClientSession,
3741) -> Result<Value, String> {
3742    let name = msg
3743        .params
3744        .get("skill_name")
3745        .and_then(|v| v.as_str())
3746        .ok_or("missing 'skill_name' parameter")?;
3747    let mut engine = session.memgine.lock().await;
3748    let code = engine.repair_skill(name).await;
3749    Ok(match code {
3750        Some(c) => serde_json::json!({ "code": c }),
3751        None => Value::Null,
3752    })
3753}
3754
3755/// Ingest distilled skills into this client's session memgine.
3756/// Returns the number of nodes inserted.
3757async fn handle_skills_ingest_distilled(
3758    msg: &JsonRpcMessage,
3759    session: &crate::session::ClientSession,
3760) -> Result<Value, String> {
3761    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3762        msg.params
3763            .get("skills")
3764            .cloned()
3765            .unwrap_or(msg.params.clone()),
3766    )
3767    .map_err(|e| format!("invalid skills: {}", e))?;
3768    let mut engine = session.memgine.lock().await;
3769    let nodes = engine.ingest_distilled_skills(&skills);
3770    Ok(serde_json::json!({ "ingested": nodes.len() }))
3771}
3772
3773/// Run skill evolution against this session's memgine for a
3774/// specified domain.  Returns the resulting `DistilledSkill` array.
3775async fn handle_skills_evolve(
3776    msg: &JsonRpcMessage,
3777    session: &crate::session::ClientSession,
3778) -> Result<Value, String> {
3779    let domain = msg
3780        .params
3781        .get("domain")
3782        .and_then(|v| v.as_str())
3783        .ok_or("missing 'domain' parameter")?
3784        .to_string();
3785    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3786        msg.params
3787            .get("events")
3788            .cloned()
3789            .unwrap_or(Value::Array(vec![])),
3790    )
3791    .map_err(|e| format!("invalid events: {}", e))?;
3792    let mut engine = session.memgine.lock().await;
3793    let skills = engine.evolve_skills(&events, &domain).await;
3794    serde_json::to_value(&skills).map_err(|e| e.to_string())
3795}
3796
3797/// List domains whose skills are underperforming on this session.
3798async fn handle_skills_domains_needing_evolution(
3799    msg: &JsonRpcMessage,
3800    session: &crate::session::ClientSession,
3801) -> Result<Value, String> {
3802    let threshold = msg
3803        .params
3804        .get("threshold")
3805        .and_then(|v| v.as_f64())
3806        .unwrap_or(0.6);
3807    let engine = session.memgine.lock().await;
3808    let domains = engine.domains_needing_evolution(threshold);
3809    serde_json::to_value(&domains).map_err(|e| e.to_string())
3810}
3811
3812/// Rerank documents against a query using a cross-encoder model.
3813async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3814    let engine = get_inference_engine(state);
3815    let req: car_inference::RerankRequest =
3816        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3817    let _permit = state.admission.acquire().await;
3818    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3819    serde_json::to_value(&result).map_err(|e| e.to_string())
3820}
3821
3822/// Transcribe audio at the given path. The path is interpreted on
3823/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3824/// callers must pass a path the daemon can read (typically a
3825/// shared `~/.car/...` location or stdin push via the streaming
3826/// API).
3827async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3828    use base64::Engine as _;
3829    let engine = get_inference_engine(state);
3830
3831    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3832    // the caller can't share a filesystem view with the daemon (e.g.
3833    // unsandboxed Milo talking to a sandboxed car-host), they pass
3834    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3835    // run transcribe against the path the engine expects, and clean up
3836    // on drop. Accepts either form; `audio_b64` wins if both are set.
3837    let mut params = msg.params.clone();
3838    let audio_b64 = params
3839        .as_object_mut()
3840        .and_then(|m| m.remove("audio_b64"))
3841        .and_then(|v| v.as_str().map(str::to_string));
3842    let _tmp_audio = if let Some(b64) = audio_b64 {
3843        let bytes = base64::engine::general_purpose::STANDARD
3844            .decode(b64.as_bytes())
3845            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3846        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3847        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3848        let path = tmp.path().to_string_lossy().into_owned();
3849        if let Some(obj) = params.as_object_mut() {
3850            obj.insert("audio_path".to_string(), Value::String(path));
3851        }
3852        Some(tmp)
3853    } else {
3854        None
3855    };
3856
3857    let req: car_inference::TranscribeRequest =
3858        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3859    let _permit = state.admission.acquire().await;
3860    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3861    serde_json::to_value(&result).map_err(|e| e.to_string())
3862}
3863
3864/// Synthesize speech. By default writes to `output_path` on the
3865/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3866/// was supplied) the result also includes an `audio_b64` field with
3867/// the rendered bytes inline so cross-sandbox callers can avoid
3868/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3869async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3870    use base64::Engine as _;
3871    let engine = get_inference_engine(state);
3872
3873    let mut params = msg.params.clone();
3874    let return_b64 = params
3875        .as_object_mut()
3876        .and_then(|m| m.remove("return_b64"))
3877        .and_then(|v| v.as_bool())
3878        .unwrap_or(false);
3879    let no_output_path = params
3880        .as_object()
3881        .map(|m| !m.contains_key("output_path"))
3882        .unwrap_or(true);
3883
3884    let req: car_inference::SynthesizeRequest =
3885        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3886    let _permit = state.admission.acquire().await;
3887    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3888    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3889
3890    // Inline the bytes when the caller asked for them OR when no
3891    // output_path was specified (typical sandbox-crossing case —
3892    // they didn't pick a path because they have no shared one).
3893    if return_b64 || no_output_path {
3894        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3895            format!(
3896                "synthesize: failed to read rendered audio at {}: {e}",
3897                result.audio_path
3898            )
3899        })?;
3900        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3901        if let Some(obj) = value.as_object_mut() {
3902            obj.insert("audio_b64".to_string(), Value::String(encoded));
3903        }
3904    }
3905    Ok(value)
3906}
3907
3908/// Prepare the speech runtime (downloads / warmup). Returns a
3909/// JSON status string, mirroring the embedded
3910/// `prepare_speech_runtime` shape.
3911async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3912    let engine = get_inference_engine(state);
3913    let status = engine
3914        .prepare_speech_runtime()
3915        .await
3916        .map_err(|e| e.to_string())?;
3917    serde_json::to_value(&status).map_err(|e| e.to_string())
3918}
3919
3920/// Adaptive route decision for a prompt — returns the routing
3921/// JSON the FFI's `route_model` returns.
3922async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3923    let prompt = msg
3924        .params
3925        .get("prompt")
3926        .and_then(|v| v.as_str())
3927        .ok_or("missing 'prompt' parameter")?;
3928    let engine = get_inference_engine(state);
3929    let decision = engine.route_adaptive(prompt).await;
3930    serde_json::to_value(&decision).map_err(|e| e.to_string())
3931}
3932
3933/// Model performance profiles snapshot.
3934async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3935    let engine = get_inference_engine(state);
3936    let profiles = engine.export_profiles().await;
3937    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3938}
3939
3940#[derive(Deserialize)]
3941#[serde(rename_all = "camelCase")]
3942struct OutcomesResolvePendingParams {
3943    /// Flat `(trace_id, success, confidence, output)` tuples from the
3944    /// caller. Same shape `car-reason`'s session produces from its
3945    /// `ActionOutcome` vector. Daemon side runs the inference rules
3946    /// and writes resolved outcomes back to the shared tracker.
3947    action_results: Vec<(String, bool, f64, String)>,
3948}
3949
3950/// `outcomes.resolve_pending` — write inferred outcomes back to the
3951/// shared engine's `OutcomeTracker` (Parslee-ai/car#189 follow-up).
3952///
3953/// Symmetric to the in-process path
3954/// `ReasoningInferenceHandle::record_inferred_outcomes` on
3955/// `InferenceEngine`: takes the per-action result tuples the
3956/// reasoning session produces, runs
3957/// `OutcomeTracker::infer_outcomes_from_action_sequence` to convert
3958/// them into `InferredOutcome` records, and calls
3959/// `resolve_pending_from_signals` under the tracker write lock. The
3960/// learning loop that adjusts routing decisions therefore survives
3961/// daemon-routed reasoning runs (previously a best-effort no-op on
3962/// the daemon side).
3963///
3964/// Returns `{ recorded: N }` where N is the number of action results
3965/// the caller passed. The tracker doesn't surface how many of those
3966/// actually had pending entries to resolve; that count would require
3967/// expanding the tracker API and isn't load-bearing for any caller
3968/// yet.
3969async fn handle_outcomes_resolve_pending(
3970    req: &JsonRpcMessage,
3971    state: &ServerState,
3972) -> Result<Value, String> {
3973    let params: OutcomesResolvePendingParams =
3974        serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3975    let engine = get_inference_engine(state);
3976    let mut tracker = engine.outcome_tracker.write().await;
3977    let inferred = tracker.infer_outcomes_from_action_sequence(&params.action_results);
3978    tracker.resolve_pending_from_signals(inferred);
3979    Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3980}
3981
3982/// Per-session event log size.
3983async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3984    let n = session.runtime.log.lock().await.len();
3985    Ok(Value::from(n as u64))
3986}
3987
3988async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3989    let stats = session.runtime.log.lock().await.stats();
3990    serde_json::to_value(stats).map_err(|e| e.to_string())
3991}
3992
3993#[derive(Deserialize)]
3994#[serde(rename_all = "camelCase")]
3995struct EventsTruncateParams {
3996    #[serde(default)]
3997    max_events: Option<usize>,
3998    #[serde(default)]
3999    max_spans: Option<usize>,
4000}
4001
4002async fn handle_events_truncate(
4003    msg: &JsonRpcMessage,
4004    session: &crate::session::ClientSession,
4005) -> Result<Value, String> {
4006    let params: EventsTruncateParams =
4007        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4008            max_events: None,
4009            max_spans: None,
4010        });
4011    let mut log = session.runtime.log.lock().await;
4012    let removed_events = params
4013        .max_events
4014        .map(|max| log.truncate_events_keep_last(max))
4015        .unwrap_or(0);
4016    let removed_spans = params
4017        .max_spans
4018        .map(|max| log.truncate_spans_keep_last(max))
4019        .unwrap_or(0);
4020    let stats = log.stats();
4021    Ok(serde_json::json!({
4022        "removedEvents": removed_events,
4023        "removedSpans": removed_spans,
4024        "stats": stats,
4025    }))
4026}
4027
4028async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4029    let mut log = session.runtime.log.lock().await;
4030    let removed = log.clear();
4031    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4032}
4033
4034/// Update the per-session replan config. Wire shape mirrors the
4035/// FFI's positional `set_replan_config` arguments — the engine
4036/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
4037/// reconstruct it from a flat object here.
4038async fn handle_replan_set_config(
4039    msg: &JsonRpcMessage,
4040    session: &crate::session::ClientSession,
4041) -> Result<Value, String> {
4042    let max_replans = msg
4043        .params
4044        .get("max_replans")
4045        .and_then(|v| v.as_u64())
4046        .unwrap_or(0) as u32;
4047    let delay_ms = msg
4048        .params
4049        .get("delay_ms")
4050        .and_then(|v| v.as_u64())
4051        .unwrap_or(0);
4052    let verify_before_execute = msg
4053        .params
4054        .get("verify_before_execute")
4055        .and_then(|v| v.as_bool())
4056        .unwrap_or(true);
4057    let cfg = car_engine::ReplanConfig {
4058        max_replans,
4059        delay_ms,
4060        verify_before_execute,
4061    };
4062    session.runtime.set_replan_config(cfg).await;
4063    Ok(Value::Null)
4064}
4065
4066async fn handle_skills_list(
4067    msg: &JsonRpcMessage,
4068    session: &crate::session::ClientSession,
4069) -> Result<Value, String> {
4070    let domain = msg.params.get("domain").and_then(|v| v.as_str());
4071    let engine = session.memgine.lock().await;
4072    let skills: Vec<serde_json::Value> = engine
4073        .graph
4074        .inner
4075        .node_indices()
4076        .filter_map(|nix| {
4077            let node = engine.graph.inner.node_weight(nix)?;
4078            if node.kind != car_memgine::MemKind::Skill {
4079                return None;
4080            }
4081            let meta = car_memgine::SkillMeta::from_node(node)?;
4082            if let Some(d) = domain {
4083                match &meta.scope {
4084                    car_memgine::SkillScope::Global => {}
4085                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
4086                    _ => return None,
4087                }
4088            }
4089            Some(serde_json::to_value(&meta).unwrap_or_default())
4090        })
4091        .collect();
4092    serde_json::to_value(&skills).map_err(|e| e.to_string())
4093}
4094
4095#[derive(serde::Deserialize)]
4096struct SecretParams {
4097    #[serde(default)]
4098    service: Option<String>,
4099    key: String,
4100    #[serde(default)]
4101    value: Option<String>,
4102}
4103
4104fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4105    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4106    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4107    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4108}
4109
4110fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4111    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4112    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4113}
4114
4115fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4116    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4117    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4118}
4119
4120fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4121    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4122    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4123}
4124
4125#[derive(serde::Deserialize)]
4126struct PermParams {
4127    domain: String,
4128    #[serde(default)]
4129    target_bundle_id: Option<String>,
4130}
4131
4132fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4133    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4134    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4135}
4136
4137fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4138    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4139    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4140}
4141
4142fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4143    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4144    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4145}
4146
4147fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4148    #[derive(serde::Deserialize)]
4149    struct P {
4150        start: String,
4151        end: String,
4152        #[serde(default)]
4153        calendar_ids: Vec<String>,
4154    }
4155    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4156    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4157        .map_err(|e| format!("parse start: {}", e))?
4158        .with_timezone(&chrono::Utc);
4159    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4160        .map_err(|e| format!("parse end: {}", e))?
4161        .with_timezone(&chrono::Utc);
4162    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4163}
4164
4165fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4166    #[derive(serde::Deserialize)]
4167    struct P {
4168        query: String,
4169        #[serde(default = "default_limit")]
4170        limit: usize,
4171        #[serde(default)]
4172        container_ids: Vec<String>,
4173    }
4174    fn default_limit() -> usize {
4175        50
4176    }
4177    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4178    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4179}
4180
4181fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4182    #[derive(serde::Deserialize, Default)]
4183    struct P {
4184        #[serde(default)]
4185        account_ids: Vec<String>,
4186    }
4187    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4188    car_ffi_common::integrations::mail_inbox(&p.account_ids)
4189}
4190
4191fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4192    let raw = req.params.to_string();
4193    car_ffi_common::integrations::mail_send(&raw)
4194}
4195
4196fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4197    #[derive(serde::Deserialize)]
4198    struct P {
4199        #[serde(default = "default_limit")]
4200        limit: usize,
4201    }
4202    fn default_limit() -> usize {
4203        50
4204    }
4205    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4206    car_ffi_common::integrations::messages_chats(p.limit)
4207}
4208
4209fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4210    let raw = req.params.to_string();
4211    car_ffi_common::integrations::messages_send(&raw)
4212}
4213
4214fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4215    #[derive(serde::Deserialize)]
4216    struct P {
4217        query: String,
4218        #[serde(default = "default_limit")]
4219        limit: usize,
4220    }
4221    fn default_limit() -> usize {
4222        50
4223    }
4224    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4225    car_ffi_common::integrations::notes_find(&p.query, p.limit)
4226}
4227
4228fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4229    #[derive(serde::Deserialize)]
4230    struct P {
4231        #[serde(default = "default_limit")]
4232        limit: usize,
4233    }
4234    fn default_limit() -> usize {
4235        50
4236    }
4237    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4238    car_ffi_common::integrations::reminders_items(p.limit)
4239}
4240
4241fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4242    #[derive(serde::Deserialize)]
4243    struct P {
4244        #[serde(default = "default_limit")]
4245        limit: usize,
4246    }
4247    fn default_limit() -> usize {
4248        100
4249    }
4250    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4251    car_ffi_common::integrations::bookmarks_list(p.limit)
4252}
4253
4254fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4255    #[derive(serde::Deserialize)]
4256    struct P {
4257        start: String,
4258        end: String,
4259    }
4260    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4261    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4262        .map_err(|e| format!("parse start: {}", e))?
4263        .with_timezone(&chrono::Utc);
4264    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4265        .map_err(|e| format!("parse end: {}", e))?
4266        .with_timezone(&chrono::Utc);
4267    car_ffi_common::health::sleep_windows(s, e)
4268}
4269
4270fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4271    #[derive(serde::Deserialize)]
4272    struct P {
4273        start: String,
4274        end: String,
4275    }
4276    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4277    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4278        .map_err(|e| format!("parse start: {}", e))?
4279        .with_timezone(&chrono::Utc);
4280    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4281        .map_err(|e| format!("parse end: {}", e))?
4282        .with_timezone(&chrono::Utc);
4283    car_ffi_common::health::workouts(s, e)
4284}
4285
4286fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4287    #[derive(serde::Deserialize)]
4288    struct P {
4289        start: String,
4290        end: String,
4291    }
4292    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4293    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4294        .map_err(|e| format!("parse start: {}", e))?;
4295    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4296        .map_err(|e| format!("parse end: {}", e))?;
4297    car_ffi_common::health::activity(s, e)
4298}
4299
4300async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4301    let closed = session.browser.close().await?;
4302    Ok(serde_json::json!({"closed": closed}))
4303}
4304
4305async fn handle_browser_run(
4306    req: &JsonRpcMessage,
4307    session: &crate::session::ClientSession,
4308) -> Result<Value, String> {
4309    #[derive(serde::Deserialize)]
4310    struct BrowserRunParams {
4311        /// Inline JSON string (CLI-compatible), OR the structured object.
4312        script: Value,
4313        #[serde(default)]
4314        width: Option<u32>,
4315        #[serde(default)]
4316        height: Option<u32>,
4317        /// When true, launches a visible Chromium window for interactive
4318        /// flows (first-time auth, 2FA, supervised runs). Only honored on
4319        /// the call that first launches the browser session — subsequent
4320        /// calls reuse the existing browser regardless.
4321        #[serde(default)]
4322        headed: Option<bool>,
4323        /// Extra Chromium command-line flags appended verbatim at
4324        /// launch (#112). Honoured only on the launch call.
4325        #[serde(default)]
4326        extra_args: Option<Vec<String>>,
4327    }
4328    let params: BrowserRunParams =
4329        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4330
4331    // Accept either a JSON string OR a structured object under `script`.
4332    let script_json = match params.script {
4333        Value::String(s) => s,
4334        other => other.to_string(),
4335    };
4336
4337    let browser_session = session
4338        .browser
4339        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4340            width: params.width.unwrap_or(1280),
4341            height: params.height.unwrap_or(720),
4342            headless: !params.headed.unwrap_or(false),
4343            extra_args: params.extra_args.unwrap_or_default(),
4344        })
4345        .await?;
4346
4347    let trace_json = browser_session.run(&script_json).await?;
4348    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4349}
4350
4351// ---------------------------------------------------------------------------
4352// Voice streaming JSON-RPC methods
4353//
4354// Events are pushed back to the originating client as JSON-RPC notifications:
4355//   { "jsonrpc": "2.0", "method": "voice.event",
4356//     "params": { "session_id": "...", "event": {...} } }
4357//
4358// The session registry is process-wide (ServerState.voice_sessions); per-call
4359// WsVoiceEventSink instances bind each session to its originating WS so a
4360// client only ever sees events for sessions it started.
4361// ---------------------------------------------------------------------------
4362
4363#[derive(Deserialize)]
4364struct VoiceStartParams {
4365    session_id: String,
4366    audio_source: Value,
4367    #[serde(default)]
4368    options: Option<Value>,
4369}
4370
4371async fn handle_voice_transcribe_stream_start(
4372    req: &JsonRpcMessage,
4373    state: &Arc<ServerState>,
4374    session: &Arc<crate::session::ClientSession>,
4375) -> Result<Value, String> {
4376    let params: VoiceStartParams =
4377        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4378    let audio_source_json =
4379        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
4380    let options_json = params
4381        .options
4382        .as_ref()
4383        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4384        .transpose()?;
4385    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4386        channel: session.channel.clone(),
4387    });
4388    let json = car_ffi_common::voice::transcribe_stream_start(
4389        &params.session_id,
4390        &audio_source_json,
4391        options_json.as_deref(),
4392        state.voice_sessions.clone(),
4393        sink,
4394    )
4395    .await?;
4396    serde_json::from_str(&json).map_err(|e| e.to_string())
4397}
4398
4399#[derive(Deserialize)]
4400struct VoiceStopParams {
4401    session_id: String,
4402}
4403
4404async fn handle_voice_transcribe_stream_stop(
4405    req: &JsonRpcMessage,
4406    state: &Arc<ServerState>,
4407) -> Result<Value, String> {
4408    let params: VoiceStopParams =
4409        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4410    let json = car_ffi_common::voice::transcribe_stream_stop(
4411        &params.session_id,
4412        state.voice_sessions.clone(),
4413    )
4414    .await?;
4415    serde_json::from_str(&json).map_err(|e| e.to_string())
4416}
4417
4418#[derive(Deserialize)]
4419struct VoicePushParams {
4420    session_id: String,
4421    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
4422    /// audio frames have to be encoded; clients in WS-binary contexts that
4423    /// want to skip the round trip can call the FFI directly.
4424    pcm_b64: String,
4425}
4426
4427async fn handle_voice_transcribe_stream_push(
4428    req: &JsonRpcMessage,
4429    state: &Arc<ServerState>,
4430) -> Result<Value, String> {
4431    use base64::Engine;
4432    let params: VoicePushParams =
4433        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4434    let pcm = base64::engine::general_purpose::STANDARD
4435        .decode(&params.pcm_b64)
4436        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4437    let json = car_ffi_common::voice::transcribe_stream_push(
4438        &params.session_id,
4439        &pcm,
4440        state.voice_sessions.clone(),
4441    )
4442    .await?;
4443    serde_json::from_str(&json).map_err(|e| e.to_string())
4444}
4445
4446fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4447    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4448    serde_json::from_str(&json).unwrap_or(Value::Null)
4449}
4450
4451#[derive(Deserialize)]
4452struct VoiceTtsStreamStartParams {
4453    /// Caller-chosen opaque id for this stream. Used both as the
4454    /// session_id wrapped onto each `voice.event` notification AND as
4455    /// the key for `voice.tts_stream.cancel`.
4456    stream_id: String,
4457    /// Text to synthesize. Splitting into multiple synth calls (for
4458    /// long-form narration) is the caller's responsibility.
4459    text: String,
4460    /// Optional [`car_ffi_common::voice::TtsStreamOptions`] as a raw
4461    /// JSON value (provider, voice_id, binary_frames).
4462    #[serde(default)]
4463    options: Option<Value>,
4464}
4465
4466async fn handle_voice_tts_stream_start(
4467    req: &JsonRpcMessage,
4468    session: &Arc<crate::session::ClientSession>,
4469) -> Result<Value, String> {
4470    let params: VoiceTtsStreamStartParams =
4471        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4472    let opts_str = params
4473        .options
4474        .as_ref()
4475        .map(|v| v.to_string())
4476        .filter(|s| !s.is_empty());
4477    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4478        channel: session.channel.clone(),
4479    });
4480    let json = car_ffi_common::voice::tts_stream_start(
4481        &params.stream_id,
4482        &params.text,
4483        opts_str.as_deref(),
4484        sink,
4485    )
4486    .await?;
4487    serde_json::from_str(&json).map_err(|e| e.to_string())
4488}
4489
4490#[derive(Deserialize)]
4491struct VoiceTtsStreamCancelParams {
4492    stream_id: String,
4493}
4494
4495async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
4496    let params: VoiceTtsStreamCancelParams =
4497        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4498    let json = car_ffi_common::voice::tts_stream_cancel(&params.stream_id).await?;
4499    serde_json::from_str(&json).map_err(|e| e.to_string())
4500}
4501
4502fn handle_voice_tts_stream_list() -> Value {
4503    let json = car_ffi_common::voice::list_tts_streams();
4504    serde_json::from_str(&json).unwrap_or(Value::Null)
4505}
4506
4507async fn handle_voice_dispatch_turn(
4508    req: &JsonRpcMessage,
4509    state: &Arc<ServerState>,
4510    session: &Arc<crate::session::ClientSession>,
4511) -> Result<Value, String> {
4512    let req_value = req.params.clone();
4513    let request: crate::voice_turn::DispatchVoiceTurnRequest =
4514        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4515    let engine = get_inference_engine(state).clone();
4516    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4517        channel: session.channel.clone(),
4518    });
4519    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4520    serde_json::to_value(resp).map_err(|e| e.to_string())
4521}
4522
4523async fn handle_voice_cancel_turn() -> Result<Value, String> {
4524    crate::voice_turn::cancel().await;
4525    Ok(serde_json::json!({"cancelled": true}))
4526}
4527
4528async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4529    let engine = get_inference_engine(state).clone();
4530    crate::voice_turn::prewarm(engine).await;
4531    Ok(serde_json::json!({"prewarmed": true}))
4532}
4533
4534// ---------------------------------------------------------------------------
4535// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
4536//
4537// Bidirectional protocol shape:
4538//   1. Client → server: `inference.register_runner` (no params). The
4539//      session that calls this becomes the host for delegated models.
4540//   2. Server → client: `inference.runner.invoke` notification with
4541//      {call_id, request} when CAR needs to dispatch a delegated turn.
4542//   3. Client → server: `inference.runner.event` with {call_id, event}
4543//      for each chunk; `inference.runner.complete` with {call_id, result}
4544//      on success; `inference.runner.fail` with {call_id, error} on
4545//      failure.
4546//
4547// The server-side data is process-wide because only one inference
4548// runner can be registered at a time (matches the FFI bindings'
4549// constraint). The per-call mailboxes live in dedicated DashMaps.
4550// ---------------------------------------------------------------------------
4551
4552fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4553    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4554        std::sync::OnceLock::new();
4555    SLOT.get_or_init(|| std::sync::RwLock::new(None))
4556}
4557
4558fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4559    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4560        std::sync::OnceLock::new();
4561    MAP.get_or_init(dashmap::DashMap::new)
4562}
4563
4564fn ws_runner_completions() -> &'static dashmap::DashMap<
4565    String,
4566    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4567> {
4568    static MAP: std::sync::OnceLock<
4569        dashmap::DashMap<
4570            String,
4571            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4572        >,
4573    > = std::sync::OnceLock::new();
4574    MAP.get_or_init(dashmap::DashMap::new)
4575}
4576
4577struct WsInferenceRunner;
4578
4579#[async_trait::async_trait]
4580impl car_inference::InferenceRunner for WsInferenceRunner {
4581    async fn run(
4582        &self,
4583        request: car_inference::tasks::generate::GenerateRequest,
4584        emitter: car_inference::EventEmitter,
4585    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4586        let channel = ws_runner_session()
4587            .read()
4588            .map_err(|e| {
4589                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4590            })?
4591            .clone()
4592            .ok_or_else(|| {
4593                car_inference::RunnerError::Declined(
4594                    "no WebSocket inference runner registered — call inference.register_runner first"
4595                        .into(),
4596                )
4597            })?;
4598
4599        let call_id = uuid::Uuid::new_v4().to_string();
4600        let request_json = serde_json::to_value(&request)
4601            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4602        let (tx, rx) = tokio::sync::oneshot::channel();
4603        ws_runner_calls().insert(call_id.clone(), emitter);
4604        ws_runner_completions().insert(call_id.clone(), tx);
4605
4606        // Fire the invoke notification.
4607        use futures::SinkExt;
4608        let notification = serde_json::json!({
4609            "jsonrpc": "2.0",
4610            "method": "inference.runner.invoke",
4611            "params": {
4612                "call_id": call_id,
4613                "request": request_json,
4614            },
4615        });
4616        let text = serde_json::to_string(&notification)
4617            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4618        let _ = channel
4619            .write
4620            .lock()
4621            .await
4622            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4623            .await;
4624
4625        let result = rx.await.map_err(|_| {
4626            car_inference::RunnerError::Failed("runner completion channel dropped".into())
4627        })?;
4628        ws_runner_calls().remove(&call_id);
4629        result.map_err(car_inference::RunnerError::Failed)
4630    }
4631}
4632
4633async fn handle_inference_register_runner(
4634    session: &Arc<crate::session::ClientSession>,
4635) -> Result<Value, String> {
4636    let mut guard = ws_runner_session()
4637        .write()
4638        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4639    *guard = Some(session.channel.clone());
4640    drop(guard);
4641    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4642    Ok(serde_json::json!({"registered": true}))
4643}
4644
4645#[derive(serde::Deserialize)]
4646struct InferenceRunnerEventParams {
4647    call_id: String,
4648    event: Value,
4649}
4650
4651async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4652    let params: InferenceRunnerEventParams =
4653        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4654    let stream_event = match parse_runner_event_value(&params.event) {
4655        Some(e) => e,
4656        None => return Err("unrecognised runner event shape".into()),
4657    };
4658    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
4659        let emitter = entry.value().clone();
4660        tokio::spawn(async move { emitter.emit(stream_event).await });
4661    }
4662    Ok(serde_json::json!({"emitted": true}))
4663}
4664
4665#[derive(serde::Deserialize)]
4666struct InferenceRunnerCompleteParams {
4667    call_id: String,
4668    result: Value,
4669}
4670
4671async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4672    let params: InferenceRunnerCompleteParams =
4673        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4674    let result: std::result::Result<car_inference::RunnerResult, String> =
4675        serde_json::from_value(params.result)
4676            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4677    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4678        let _ = tx.send(result);
4679    }
4680    Ok(serde_json::json!({"completed": true}))
4681}
4682
4683#[derive(serde::Deserialize)]
4684struct InferenceRunnerFailParams {
4685    call_id: String,
4686    error: String,
4687}
4688
4689async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4690    let params: InferenceRunnerFailParams =
4691        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4692    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4693        let _ = tx.send(Err(params.error));
4694    }
4695    Ok(serde_json::json!({"failed": true}))
4696}
4697
4698fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4699    let ty = v.get("type").and_then(|t| t.as_str())?;
4700    match ty {
4701        "text" => Some(car_inference::StreamEvent::TextDelta(
4702            v.get("data")?.as_str()?.to_string(),
4703        )),
4704        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4705            name: v.get("name")?.as_str()?.to_string(),
4706            index: v.get("index")?.as_u64()? as usize,
4707            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4708        }),
4709        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4710            index: v.get("index")?.as_u64()? as usize,
4711            arguments_delta: v.get("data")?.as_str()?.to_string(),
4712        }),
4713        "usage" => Some(car_inference::StreamEvent::Usage {
4714            input_tokens: v.get("input_tokens")?.as_u64()?,
4715            output_tokens: v.get("output_tokens")?.as_u64()?,
4716        }),
4717        "done" => Some(car_inference::StreamEvent::Done {
4718            text: v.get("text")?.as_str()?.to_string(),
4719            tool_calls: v
4720                .get("tool_calls")
4721                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4722                .unwrap_or_default(),
4723        }),
4724        _ => None,
4725    }
4726}
4727
4728#[derive(Deserialize)]
4729struct EnrollSpeakerParams {
4730    label: String,
4731    audio: Value,
4732}
4733
4734async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4735    let params: EnrollSpeakerParams =
4736        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4737    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
4738    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
4739    serde_json::from_str(&json).map_err(|e| e.to_string())
4740}
4741
4742#[derive(Deserialize)]
4743struct RemoveEnrollmentParams {
4744    label: String,
4745}
4746
4747fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4748    let params: RemoveEnrollmentParams =
4749        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4750    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
4751    serde_json::from_str(&json).map_err(|e| e.to_string())
4752}
4753
4754#[derive(Deserialize)]
4755struct WorkflowRunParams {
4756    workflow: Value,
4757}
4758
4759async fn handle_workflow_run(
4760    req: &JsonRpcMessage,
4761    session: &Arc<crate::session::ClientSession>,
4762) -> Result<Value, String> {
4763    let params: WorkflowRunParams =
4764        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4765    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4766    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4767        channel: session.channel.clone(),
4768        host: session.host.clone(),
4769        client_id: session.client_id.clone(),
4770    });
4771    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4772    serde_json::from_str(&json).map_err(|e| e.to_string())
4773}
4774
4775#[derive(Deserialize)]
4776struct WorkflowVerifyParams {
4777    workflow: Value,
4778}
4779
4780fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4781    let params: WorkflowVerifyParams =
4782        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4783    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4784    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4785    serde_json::from_str(&json).map_err(|e| e.to_string())
4786}
4787
4788// ---------------------------------------------------------------------------
4789// Meeting JSON-RPC methods
4790// ---------------------------------------------------------------------------
4791
4792async fn handle_meeting_start(
4793    req: &JsonRpcMessage,
4794    state: &Arc<ServerState>,
4795    session: &Arc<crate::session::ClientSession>,
4796) -> Result<Value, String> {
4797    // We need the meeting id BEFORE handing the upstream sink to
4798    // start_meeting so the WsMemgineIngestSink stamps transcripts with
4799    // the correct `meeting/<id>/<source>` speaker. Parse the request
4800    // here, mint an id if none was provided, and pass the same id
4801    // through to start_meeting via the request JSON.
4802    let mut req_value = req.params.clone();
4803    let meeting_id = req_value
4804        .get("id")
4805        .and_then(|v| v.as_str())
4806        .map(str::to_string)
4807        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4808    if let Some(map) = req_value.as_object_mut() {
4809        map.insert("id".into(), Value::String(meeting_id.clone()));
4810    }
4811    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4812
4813    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4814        Arc::new(crate::session::WsVoiceEventSink {
4815            channel: session.channel.clone(),
4816        });
4817
4818    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4819    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4820    // the FFI-common `start_meeting` memgine arg to avoid the
4821    // sync-mutex contract there — ingest happens here instead.
4822    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4823        Arc::new(crate::session::WsMemgineIngestSink {
4824            meeting_id,
4825            engine: session.memgine.clone(),
4826            upstream: ws_upstream,
4827        });
4828
4829    let cwd = std::env::current_dir().ok();
4830    let json = crate::meeting::start_meeting(
4831        &request_json,
4832        state.meetings.clone(),
4833        state.voice_sessions.clone(),
4834        upstream,
4835        None,
4836        cwd,
4837    )
4838    .await?;
4839    serde_json::from_str(&json).map_err(|e| e.to_string())
4840}
4841
4842#[derive(Deserialize)]
4843struct MeetingStopParams {
4844    meeting_id: String,
4845    #[serde(default = "default_summarize")]
4846    summarize: bool,
4847}
4848
4849fn default_summarize() -> bool {
4850    true
4851}
4852
4853async fn handle_meeting_stop(
4854    req: &JsonRpcMessage,
4855    state: &Arc<ServerState>,
4856    _session: &Arc<crate::session::ClientSession>,
4857) -> Result<Value, String> {
4858    let params: MeetingStopParams =
4859        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4860    let inference = if params.summarize {
4861        Some(state.inference.get().cloned()).flatten()
4862    } else {
4863        None
4864    };
4865    let json = crate::meeting::stop_meeting(
4866        &params.meeting_id,
4867        params.summarize,
4868        state.meetings.clone(),
4869        state.voice_sessions.clone(),
4870        inference,
4871    )
4872    .await?;
4873    serde_json::from_str(&json).map_err(|e| e.to_string())
4874}
4875
4876#[derive(Deserialize, Default)]
4877struct MeetingListParams {
4878    #[serde(default)]
4879    root: Option<std::path::PathBuf>,
4880}
4881
4882fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4883    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4884    let cwd = std::env::current_dir().ok();
4885    let json = crate::meeting::list_meetings(params.root, cwd)?;
4886    serde_json::from_str(&json).map_err(|e| e.to_string())
4887}
4888
4889#[derive(Deserialize)]
4890struct MeetingGetParams {
4891    meeting_id: String,
4892    #[serde(default)]
4893    root: Option<std::path::PathBuf>,
4894}
4895
4896fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4897    let params: MeetingGetParams =
4898        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4899    let cwd = std::env::current_dir().ok();
4900    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4901    serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904// ---------------------------------------------------------------------------
4905// Agent registry — file-based cross-process discovery (#111)
4906// ---------------------------------------------------------------------------
4907
4908#[derive(Deserialize, Default)]
4909struct RegistryRegisterParams {
4910    /// Caller serializes their AgentEntry as a JSON value; we
4911    /// re-serialize it so the ffi-common helper can validate the
4912    /// shape with the same parser used by the bindings.
4913    entry: Value,
4914    #[serde(default)]
4915    registry_path: Option<std::path::PathBuf>,
4916}
4917
4918fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4919    let params: RegistryRegisterParams =
4920        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4921    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4922    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4923    Ok(Value::Null)
4924}
4925
4926#[derive(Deserialize, Default)]
4927struct RegistryNameParams {
4928    name: String,
4929    #[serde(default)]
4930    registry_path: Option<std::path::PathBuf>,
4931}
4932
4933fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4934    let params: RegistryNameParams =
4935        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4936    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4937    serde_json::from_str(&json).map_err(|e| e.to_string())
4938}
4939
4940fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4941    let params: RegistryNameParams =
4942        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4943    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4944    Ok(Value::Null)
4945}
4946
4947#[derive(Deserialize, Default)]
4948struct RegistryListParams {
4949    #[serde(default)]
4950    registry_path: Option<std::path::PathBuf>,
4951}
4952
4953fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4954    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4955    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4956    serde_json::from_str(&json).map_err(|e| e.to_string())
4957}
4958
4959#[derive(Deserialize, Default)]
4960struct RegistryReapParams {
4961    /// Heartbeats older than this many seconds are reaped. Default
4962    /// 60 — two missed 20s heartbeats trigger removal.
4963    #[serde(default = "default_reap_age")]
4964    max_age_secs: u64,
4965    #[serde(default)]
4966    registry_path: Option<std::path::PathBuf>,
4967}
4968
4969fn default_reap_age() -> u64 {
4970    60
4971}
4972
4973fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4974    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4975    let json =
4976        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4977    serde_json::from_str(&json).map_err(|e| e.to_string())
4978}
4979
4980// ---------------------------------------------------------------------------
4981// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4982// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4983// a2a_server_status — closes the binding gap noted in #126).
4984// ---------------------------------------------------------------------------
4985
4986async fn handle_a2a_start(
4987    req: &JsonRpcMessage,
4988    session: &crate::session::ClientSession,
4989) -> Result<Value, String> {
4990    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4991    // Always hand the session's runtime through. start_a2a uses it
4992    // only when `share_session_runtime: true` is set in params;
4993    // otherwise it falls back to the legacy fresh-Runtime + agent_basics
4994    // path. Passing it unconditionally keeps the FFI layer ignorant of
4995    // the flag's plumbing.
4996    let json = crate::a2a::start_a2a(&params_json, Some(session.runtime.clone())).await?;
4997    serde_json::from_str(&json).map_err(|e| e.to_string())
4998}
4999
5000fn handle_a2a_stop() -> Result<Value, String> {
5001    let json = crate::a2a::stop_a2a()?;
5002    serde_json::from_str(&json).map_err(|e| e.to_string())
5003}
5004
5005fn handle_a2a_status() -> Result<Value, String> {
5006    let json = crate::a2a::a2a_status()?;
5007    serde_json::from_str(&json).map_err(|e| e.to_string())
5008}
5009
5010#[derive(Deserialize)]
5011#[serde(rename_all = "camelCase")]
5012struct A2aSendParams {
5013    endpoint: String,
5014    message: car_a2a::Message,
5015    #[serde(default)]
5016    blocking: bool,
5017    #[serde(default = "default_true")]
5018    ingest_a2ui: bool,
5019    #[serde(default)]
5020    route_auth: Option<A2aRouteAuth>,
5021    #[serde(default)]
5022    allow_untrusted_endpoint: bool,
5023}
5024
5025fn default_true() -> bool {
5026    true
5027}
5028
5029/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
5030/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
5031/// on `ServerState`. Closes Parslee-ai/car-releases#28.
5032///
5033/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
5034/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
5035/// transport-neutral surface — the standalone `start_a2a_listener`
5036/// HTTP path serves SSE for those, but the in-core WS surface is
5037/// JSON-RPC only. Same trade as the dispatcher itself.
5038async fn handle_a2a_dispatch(
5039    method: &str,
5040    req: &JsonRpcMessage,
5041    state: &Arc<ServerState>,
5042) -> Result<Value, String> {
5043    let dispatcher = state.a2a_dispatcher().await;
5044    dispatcher
5045        .dispatch(method, req.params.clone())
5046        .await
5047        .map_err(|e| e.to_string())
5048}
5049
5050async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
5051    let params: A2aSendParams =
5052        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5053    let endpoint = trusted_route_endpoint(
5054        Some(params.endpoint.clone()),
5055        params.allow_untrusted_endpoint,
5056    )
5057    .ok_or_else(|| {
5058        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
5059    })?;
5060    let client = match params.route_auth.clone() {
5061        Some(auth) => {
5062            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
5063        }
5064        None => car_a2a::A2aClient::new(endpoint.clone()),
5065    };
5066    let result = client
5067        .send_message(params.message, params.blocking)
5068        .await
5069        .map_err(|e| e.to_string())?;
5070    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
5071    let mut applied = Vec::new();
5072    if params.ingest_a2ui {
5073        state
5074            .a2ui
5075            .validate_payload(&result_value)
5076            .map_err(|e| e.to_string())?;
5077        let routed_endpoint = Some(endpoint.clone());
5078        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
5079            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
5080                if owner.endpoint.is_none() {
5081                    owner.with_endpoint(routed_endpoint.clone())
5082                } else {
5083                    owner
5084                }
5085            });
5086            applied.push(
5087                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
5088            );
5089        }
5090    }
5091    Ok(serde_json::json!({
5092        "result": result,
5093        "a2ui": {
5094            "applied": applied,
5095        }
5096    }))
5097}
5098
5099// ---------------------------------------------------------------------------
5100// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
5101// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
5102// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
5103// vision_ocr.
5104// ---------------------------------------------------------------------------
5105
5106async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5107    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5108    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5109    serde_json::from_str(&json).map_err(|e| e.to_string())
5110}
5111
5112async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5113    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5114    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5115    serde_json::from_str(&json).map_err(|e| e.to_string())
5116}
5117
5118async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5119    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5120    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5121    serde_json::from_str(&json).map_err(|e| e.to_string())
5122}
5123
5124async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5125    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5126    let json = car_ffi_common::notifications::local(&args_json).await?;
5127    serde_json::from_str(&json).map_err(|e| e.to_string())
5128}
5129
5130async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5131    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5132    let json = car_ffi_common::vision::ocr(&args_json).await?;
5133    serde_json::from_str(&json).map_err(|e| e.to_string())
5134}
5135
5136// ---------------------------------------------------------------------------
5137// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
5138// ---------------------------------------------------------------------------
5139
5140async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5141    // Observe-only mode (Parslee-ai/car-releases#44): a second
5142    // car-server on the host can't take the supervisor lock, so it
5143    // can't drive `Supervisor::list` — but it can still answer
5144    // `agents.list` by reading the on-disk manifest directly. The
5145    // `attached` decoration is local to whichever daemon the caller
5146    // is talking to, so observer-mode entries return `attached:
5147    // false` (this daemon hasn't received `session.auth` from those
5148    // children; the primary one has).
5149    let agents = match state.observer_manifest_path() {
5150        Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5151            .map_err(|e| e.to_string())?,
5152        None => {
5153            let supervisor = state.supervisor()?;
5154            supervisor.list().await
5155        }
5156    };
5157    // Decorate each entry with `attached` + `session_id` so operators
5158    // see whether the supervised process has actually called
5159    // `session.auth { agent_id }` and bound a WS connection (#169) —
5160    // the lifecycle status (`Running`, etc.) only reports the
5161    // process-level view, which can't tell "alive but never
5162    // attached" from "alive and attached".
5163    let attached = state.attached_agents.lock().await.clone();
5164    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5165    for a in agents {
5166        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5167        let session_id = attached.get(&a.spec.id).cloned();
5168        if let Some(map) = v.as_object_mut() {
5169            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5170            if let Some(sid) = session_id {
5171                map.insert("session_id".to_string(), Value::String(sid));
5172            }
5173        }
5174        decorated.push(v);
5175    }
5176    Ok(Value::Array(decorated))
5177}
5178
5179async fn handle_agents_upsert(
5180    req: &JsonRpcMessage,
5181    state: &Arc<ServerState>,
5182) -> Result<Value, String> {
5183    let mut params = req.params.clone();
5184    // Optional `interpreter` sugar (#171). When present, the
5185    // supervisor resolves the bare program name (`"node"`,
5186    // `"python"`, …) against `$PATH` and writes the absolute path
5187    // into `command` *before* validation. This keeps the strict
5188    // no-PATH-lookup rule at upsert time while letting callers
5189    // stop hand-coding `/opt/homebrew/bin/node` into every
5190    // agents.json entry. Resolution happens once; subsequent PATH
5191    // changes do not silently rewire the binding.
5192    if let Some(name) = params
5193        .get("interpreter")
5194        .and_then(|v| v.as_str())
5195        .map(str::to_string)
5196    {
5197        let resolved =
5198            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5199        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5200    }
5201    let spec: car_registry::supervisor::AgentSpec =
5202        serde_json::from_value(params).map_err(|e| e.to_string())?;
5203    let supervisor = state.supervisor()?;
5204    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5205    serde_json::to_value(agent).map_err(|e| e.to_string())
5206}
5207
5208/// `agents.install` — install a contributed-agent manifest
5209/// (Parslee-ai/car#182 phase 3). Caller passes the parsed
5210/// `AgentManifest` JSON; the daemon runs install-time validation
5211/// (`car_min_version`, capability negotiation against the daemon's
5212/// own advertisement) and adopts the manifest. Returns
5213/// `{ report, agent? }` where `agent` is the spawnable
5214/// `ManagedAgent` for `external_process` transports and absent for
5215/// `pure_data` / health_url-only manifests.
5216///
5217/// The host capability advertisement comes from
5218/// `HostCapabilities::daemon_default(car_version)` — operators that
5219/// want a tighter advertisement go through a future config phase;
5220/// this MVP uses the runtime's natural surface.
5221async fn handle_agents_install(
5222    req: &JsonRpcMessage,
5223    state: &Arc<ServerState>,
5224) -> Result<Value, String> {
5225    let manifest: car_registry::manifest::AgentManifest =
5226        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5227    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5228    let supervisor = state.supervisor()?;
5229    let (report, managed) = supervisor
5230        .install_manifest(manifest, &host)
5231        .await
5232        .map_err(|e| e.to_string())?;
5233    Ok(serde_json::json!({
5234        "report": {
5235            "missingOptional": report
5236                .missing_optional
5237                .iter()
5238                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5239                .collect::<Vec<_>>(),
5240        },
5241        "agent": managed,
5242    }))
5243}
5244
5245async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5246    // Observe-only mode (Parslee-ai/car-releases#44) — see
5247    // `handle_agents_list` for the rationale. The health view is a
5248    // pure function of each entry's `command` plus the on-disk
5249    // sandbox rules, so reading from the manifest is equivalent to
5250    // calling the live supervisor's `health()`.
5251    let entries = match state.observer_manifest_path() {
5252        Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5253            .map_err(|e| e.to_string())?,
5254        None => {
5255            let supervisor = state.supervisor()?;
5256            supervisor.health().await
5257        }
5258    };
5259    serde_json::to_value(entries).map_err(|e| e.to_string())
5260}
5261
5262fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5263    req.params
5264        .get("id")
5265        .and_then(Value::as_str)
5266        .map(str::to_string)
5267        .ok_or_else(|| "missing required `id` parameter".to_string())
5268}
5269
5270async fn handle_agents_remove(
5271    req: &JsonRpcMessage,
5272    state: &Arc<ServerState>,
5273) -> Result<Value, String> {
5274    let id = extract_agent_id(req)?;
5275    let supervisor = state.supervisor()?;
5276    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5277    Ok(serde_json::json!({ "removed": removed }))
5278}
5279
5280async fn handle_agents_start(
5281    req: &JsonRpcMessage,
5282    state: &Arc<ServerState>,
5283) -> Result<Value, String> {
5284    let id = extract_agent_id(req)?;
5285    let supervisor = state.supervisor()?;
5286    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5287    serde_json::to_value(agent).map_err(|e| e.to_string())
5288}
5289
5290async fn handle_agents_stop(
5291    req: &JsonRpcMessage,
5292    state: &Arc<ServerState>,
5293) -> Result<Value, String> {
5294    let id = extract_agent_id(req)?;
5295    let signal: car_registry::supervisor::StopSignal = req
5296        .params
5297        .get("signal")
5298        .map(|v| serde_json::from_value(v.clone()))
5299        .transpose()
5300        .map_err(|e| e.to_string())?
5301        .unwrap_or_default();
5302    let supervisor = state.supervisor()?;
5303    let agent = supervisor
5304        .stop(&id, signal)
5305        .await
5306        .map_err(|e| e.to_string())?;
5307    serde_json::to_value(agent).map_err(|e| e.to_string())
5308}
5309
5310async fn handle_agents_restart(
5311    req: &JsonRpcMessage,
5312    state: &Arc<ServerState>,
5313) -> Result<Value, String> {
5314    let id = extract_agent_id(req)?;
5315    let supervisor = state.supervisor()?;
5316    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5317    serde_json::to_value(agent).map_err(|e| e.to_string())
5318}
5319
5320async fn handle_agents_tail_log(
5321    req: &JsonRpcMessage,
5322    state: &Arc<ServerState>,
5323) -> Result<Value, String> {
5324    let id = extract_agent_id(req)?;
5325    let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5326    let supervisor = state.supervisor()?;
5327    let lines = supervisor
5328        .tail_log(&id, n)
5329        .await
5330        .map_err(|e| e.to_string())?;
5331    Ok(serde_json::json!({ "lines": lines }))
5332}
5333
5334// ---------------------------------------------------------------------------
5335// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
5336//
5337// Discovery surface for agentic CLIs the user has already installed and
5338// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
5339// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
5340// stdio adapter. The cache lives in car_ffi_common::external_agents so
5341// the in-process FFI singletons share the same snapshot.
5342// ---------------------------------------------------------------------------
5343
5344async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5345    let include_health = req
5346        .params
5347        .get("include_health")
5348        .and_then(Value::as_bool)
5349        .unwrap_or(false);
5350    let json = car_ffi_common::external_agents::list(include_health).await?;
5351    serde_json::from_str(&json).map_err(|e| e.to_string())
5352}
5353
5354async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5355    let include_health = req
5356        .params
5357        .get("include_health")
5358        .and_then(Value::as_bool)
5359        .unwrap_or(false);
5360    let json = car_ffi_common::external_agents::detect(include_health).await?;
5361    serde_json::from_str(&json).map_err(|e| e.to_string())
5362}
5363
5364/// Per-task invocation of an external CLI agent. Required params:
5365/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
5366/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
5367///
5368/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
5369/// ids return `is_error: true` with a structured `error` so hosts
5370/// can surface the gap without a separate error code.
5371///
5372/// Phase 2 stage 4a (governance): every invocation appends a
5373/// structured audit record to `~/.car/external-agents.jsonl`. The
5374/// record captures id, task, options, result, and the full
5375/// `tool_uses` list the assistant emitted — so even though the
5376/// agent executes its built-in tools in-process (which we can't
5377/// gate via stream-json), there's a complete after-the-fact audit
5378/// trail. Full policy gating (proposing each tool_use to CAR's
5379/// validator + getting a yes/no) requires the MCP server route in
5380/// stage 4b.
5381async fn handle_agents_invoke_external(
5382    req: &JsonRpcMessage,
5383    state: &Arc<ServerState>,
5384    host_session: &Arc<crate::session::ClientSession>,
5385) -> Result<Value, String> {
5386    let id = req
5387        .params
5388        .get("id")
5389        .and_then(Value::as_str)
5390        .ok_or_else(|| "missing required `id` parameter".to_string())?
5391        .to_string();
5392    let task = req
5393        .params
5394        .get("task")
5395        .and_then(Value::as_str)
5396        .ok_or_else(|| "missing required `task` parameter".to_string())?
5397        .to_string();
5398    let stream = req
5399        .params
5400        .get("stream")
5401        .and_then(Value::as_bool)
5402        .unwrap_or(false);
5403    let session_id = req
5404        .params
5405        .get("session_id")
5406        .and_then(Value::as_str)
5407        .map(str::to_string)
5408        .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5409
5410    // Build the options sub-object directly from req.params so
5411    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
5412    // `timeout_secs` as siblings of `id`/`task`. Strip the
5413    // dispatch + streaming fields so they don't pollute the
5414    // options serde.
5415    let mut options_value = req.params.clone();
5416    if let Some(obj) = options_value.as_object_mut() {
5417        obj.remove("id");
5418        obj.remove("task");
5419        obj.remove("stream");
5420        obj.remove("session_id");
5421        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
5422        // caller didn't supply one. This is the load-bearing
5423        // wiring of MCP-4: external agents get CAR's tools (memory,
5424        // skills, verify) routed through the daemon's policy +
5425        // shared memgine without any per-call host configuration.
5426        // Callers who want to opt out can pass `"mcp_endpoint": ""`
5427        // (empty string) — the runner skips the temp-file write
5428        // when the value isn't a non-empty URL.
5429        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5430        if !has_explicit_mcp {
5431            if let Some(url) = state.mcp_url.get() {
5432                obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5433            }
5434        }
5435    }
5436
5437    if !stream {
5438        // Legacy one-shot path. Unchanged shape for FFI consumers
5439        // and any caller that hasn't opted into streaming.
5440        let options_json = options_value.to_string();
5441        let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5442        let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5443        append_external_agent_audit(&id, &task, &options_value, &result);
5444        return Ok(result);
5445    }
5446
5447    // Streaming path. Returns an ack ({accepted, session_id})
5448    // immediately and streams `agents.chat.event` notifications
5449    // to the host's WS as the runner emits StreamEvents. Reuses
5450    // the chat_sessions routing infrastructure supervised agents
5451    // use — host UIs render both kinds through the same path.
5452    let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5453        .map_err(|e| format!("invalid options: {e}"))?;
5454
5455    // Register the chat session BEFORE spawning so the host's
5456    // subscriber is correctly bound to this session_id by the
5457    // time the first event arrives. Reusing the supervised
5458    // agent chat infrastructure means `agents.chat.cancel`
5459    // routes to chat_sessions[session_id] and can find this
5460    // entry — though we don't currently honor cancel for
5461    // external invocations (the child process is killed only
5462    // on timeout or task drop; a future iteration can plumb a
5463    // CancellationToken through invoke_with_emitter).
5464    {
5465        // If a chat session is already registered for this id (the
5466        // typical proxy shape: host → agents.chat → supervised agent
5467        // → agents.invoke_external with the same session_id), DO NOT
5468        // overwrite it. The existing entry owns the routing to the
5469        // original host; clobbering it with our caller's client_id
5470        // would send streaming events to the proxying agent instead
5471        // of back to the host that issued agents.chat. Only register
5472        // a fresh entry when the slot is empty (a direct
5473        // host-to-invoke_external call without a prior agents.chat).
5474        let mut chats = state.chat_sessions.lock().await;
5475        chats.entry(session_id.clone()).or_insert_with(|| {
5476            let created_at = std::time::SystemTime::now()
5477                .duration_since(std::time::UNIX_EPOCH)
5478                .map(|d| d.as_secs())
5479                .unwrap_or(0);
5480            crate::session::ChatSession {
5481                agent_id: id.clone(),
5482                host_client_id: host_session.client_id.clone(),
5483                created_at,
5484            }
5485        });
5486    }
5487
5488    // Single drain task pulls StreamEvents off an unbounded
5489    // channel and serializes WS sends to the host. Per-event
5490    // tokio::spawn would let sends race (which token arrives
5491    // first depends on lock acquisition order). The channel
5492    // is unbounded because claude's event volume is bounded
5493    // by user turn count — typically <50 events per invocation.
5494    use tokio::sync::mpsc;
5495    let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5496
5497    let drain_state = state.clone();
5498    let drain_session_id = session_id.clone();
5499    let drain_agent_id = id.clone();
5500    tokio::spawn(async move {
5501        while let Some(event) = rx.recv().await {
5502            emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5503        }
5504    });
5505
5506    let emitter_tx = tx.clone();
5507    let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5508        // Send failure (rx dropped) means the drain task has
5509        // exited — usually because the host disconnected. The
5510        // runner will keep going; let it finish so the audit
5511        // log captures the full result.
5512        let _ = emitter_tx.send(event);
5513    });
5514
5515    // Run the invocation in a separate task so this handler
5516    // can return the ack right away. The runner's child-process
5517    // future owns the spawn lifetime; if the host disconnects
5518    // mid-stream, the runner still completes (its events fall
5519    // on the floor at the drain layer) so the audit log lands.
5520    let spawn_state = state.clone();
5521    let spawn_session_id = session_id.clone();
5522    let spawn_id = id.clone();
5523    let spawn_task = task.clone();
5524    let spawn_options = options_value.clone();
5525    tokio::spawn(async move {
5526        let outcome =
5527            car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5528                .await;
5529        drop(tx); // signal drain task to exit after queue empties
5530
5531        // Synthesize a terminal agents.chat.event so the host's
5532        // bubble finalizes. The runner doesn't emit a "done" event
5533        // itself — the result is the aggregate InvokeResult. We
5534        // translate it here.
5535        let terminal_params: Value;
5536        let result_value: Value;
5537        match outcome {
5538            Ok(res) => {
5539                // Pack the metadata into `finish_reason` as a
5540                // human-readable summary so the host's existing
5541                // ChatEvent decoder surfaces it without a schema
5542                // change. Hosts that want structured data can
5543                // re-issue `agents.invoke_external` with
5544                // `stream: false` and read the InvokeResult.
5545                let mut parts: Vec<String> = Vec::new();
5546                if res.turns > 0 {
5547                    parts.push(format!(
5548                        "{} turn{}",
5549                        res.turns,
5550                        if res.turns == 1 { "" } else { "s" }
5551                    ));
5552                }
5553                if res.tool_calls > 0 {
5554                    parts.push(format!(
5555                        "{} tool{}",
5556                        res.tool_calls,
5557                        if res.tool_calls == 1 { "" } else { "s" }
5558                    ));
5559                }
5560                if res.duration_ms > 0 {
5561                    parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5562                }
5563                let summary = if parts.is_empty() {
5564                    "stop".to_string()
5565                } else {
5566                    parts.join(" · ")
5567                };
5568                if res.is_error {
5569                    terminal_params = serde_json::json!({
5570                        "session_id": spawn_session_id,
5571                        "agent_id": spawn_id,
5572                        "kind": "error",
5573                        "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5574                    });
5575                } else {
5576                    terminal_params = serde_json::json!({
5577                        "session_id": spawn_session_id,
5578                        "agent_id": spawn_id,
5579                        "kind": "done",
5580                        "finish_reason": summary,
5581                    });
5582                }
5583                result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5584            }
5585            Err(e) => {
5586                let message = format!("{e}");
5587                terminal_params = serde_json::json!({
5588                    "session_id": spawn_session_id,
5589                    "agent_id": spawn_id,
5590                    "kind": "error",
5591                    "error": message.clone(),
5592                });
5593                result_value = serde_json::json!({ "is_error": true, "error": message });
5594            }
5595        }
5596        send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5597        spawn_state
5598            .chat_sessions
5599            .lock()
5600            .await
5601            .remove(&spawn_session_id);
5602        append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5603    });
5604
5605    Ok(serde_json::json!({
5606        "accepted": true,
5607        "session_id": session_id,
5608    }))
5609}
5610
5611/// Translate one [`StreamEvent`] from the running external CLI
5612/// into an `agents.chat.event` notification on the originating
5613/// host's WS. Same wire shape supervised agents emit, so host
5614/// UIs render both kinds with one decoder.
5615///
5616/// Mapping:
5617/// - `Assistant` events with `text` content blocks → `kind: "token"`
5618///   per text block. Each block carries the full text the
5619///   assistant emitted in that turn (claude doesn't expose
5620///   word-level deltas via stream-json — it emits per-turn or
5621///   per-content-block chunks).
5622/// - `Assistant` events with `tool_use` blocks → `kind: "tool_call"`
5623///   per block (tool name in `detail`).
5624/// - `System` / `User` / `Result` / others → dropped (Result's
5625///   metadata is folded into the terminal `done` event the
5626///   outer task emits when the invocation finishes).
5627async fn emit_external_chat_event(
5628    state: &Arc<ServerState>,
5629    session_id: &str,
5630    agent_id: &str,
5631    event: car_external_agents::StreamEvent,
5632) {
5633    use car_external_agents::StreamEvent;
5634    match event {
5635        StreamEvent::Assistant(a) => {
5636            if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5637                for block in content {
5638                    let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5639                    match block_type {
5640                        "text" => {
5641                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5642                                if !text.is_empty() {
5643                                    let params = serde_json::json!({
5644                                        "session_id": session_id,
5645                                        "agent_id": agent_id,
5646                                        "kind": "token",
5647                                        "delta": text,
5648                                    });
5649                                    send_external_chat_frame(state, session_id, params).await;
5650                                }
5651                            }
5652                        }
5653                        "tool_use" => {
5654                            let name = block
5655                                .get("name")
5656                                .and_then(|v| v.as_str())
5657                                .unwrap_or("(unknown tool)");
5658                            let params = serde_json::json!({
5659                                "session_id": session_id,
5660                                "agent_id": agent_id,
5661                                "kind": "tool_call",
5662                                "detail": name,
5663                            });
5664                            send_external_chat_frame(state, session_id, params).await;
5665                        }
5666                        _ => {}
5667                    }
5668                }
5669            }
5670        }
5671        _ => {
5672            // System (session id init), User (tool result echo),
5673            // Result (final aggregate — folded into terminal
5674            // `done` event by the outer task), RateLimitEvent,
5675            // Other: not surfaced to host.
5676        }
5677    }
5678}
5679
5680/// Send a single `agents.chat.event` notification to the host
5681/// session bound by `session_id`. Best-effort: a missing route
5682/// or a closed WS is silently dropped, the runner continues so
5683/// the audit log lands.
5684async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5685    use futures::SinkExt;
5686    use tokio_tungstenite::tungstenite::Message;
5687
5688    let host_client_id = state
5689        .chat_sessions
5690        .lock()
5691        .await
5692        .get(session_id)
5693        .map(|s| s.host_client_id.clone());
5694    let Some(host_client_id) = host_client_id else {
5695        return;
5696    };
5697    let host_channel = {
5698        let sessions = state.sessions.lock().await;
5699        sessions.get(&host_client_id).map(|s| s.channel.clone())
5700    };
5701    let Some(channel) = host_channel else {
5702        return;
5703    };
5704    let frame = serde_json::json!({
5705        "jsonrpc": "2.0",
5706        "method": "agents.chat.event",
5707        "params": params,
5708    });
5709    if let Ok(text) = serde_json::to_string(&frame) {
5710        let _ = channel
5711            .write
5712            .lock()
5713            .await
5714            .send(Message::Text(text.into()))
5715            .await;
5716    }
5717}
5718
5719/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
5720/// Best-effort: a failure to open the journal must NOT fail the
5721/// invocation; the in-memory result already returned is the
5722/// authoritative answer. Logs at warn level when the write fails so
5723/// operators notice repeated failures.
5724fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5725    use std::io::Write;
5726    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5727        Some(home) => home.join(".car"),
5728        None => return,
5729    };
5730    if std::fs::create_dir_all(&car_dir).is_err() {
5731        return;
5732    }
5733    let path = car_dir.join("external-agents.jsonl");
5734    let record = serde_json::json!({
5735        "ts": chrono::Utc::now().to_rfc3339(),
5736        "adapter_id": id,
5737        "task": task,
5738        "options": options,
5739        "result": result,
5740    });
5741    let line = match serde_json::to_string(&record) {
5742        Ok(s) => s,
5743        Err(_) => return,
5744    };
5745    if let Ok(mut f) = std::fs::OpenOptions::new()
5746        .create(true)
5747        .append(true)
5748        .open(&path)
5749    {
5750        let _ = writeln!(f, "{}", line);
5751    } else {
5752        tracing::warn!(
5753            path = %path.display(),
5754            "failed to append external-agent audit record"
5755        );
5756    }
5757}
5758
5759/// Ground-truth health check. Optional `id` param picks one tool;
5760/// without it, every detected adapter is checked. `force: true`
5761/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
5762/// credential-file shape heuristic as the load-bearing signal for
5763/// "is this tool ready to invoke."
5764async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5765    let force = req
5766        .params
5767        .get("force")
5768        .and_then(Value::as_bool)
5769        .unwrap_or(false);
5770    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5771        let json = car_ffi_common::external_agents::health_one(id, force).await?;
5772        serde_json::from_str(&json).map_err(|e| e.to_string())
5773    } else {
5774        let json = car_ffi_common::external_agents::health(force).await?;
5775        serde_json::from_str(&json).map_err(|e| e.to_string())
5776    }
5777}
5778
5779// ---------------------------------------------------------------------------
5780// agents.chat — unified chat surface (docs/proposals/agent-chat-surface.md)
5781// ---------------------------------------------------------------------------
5782//
5783// Host calls `agents.chat { agent_id, prompt, session_id?, stream? }`.
5784// The server looks up the target agent's attached WS connection,
5785// reverse-calls `agent.chat { session_id, prompt, context }` on it
5786// (same pattern as `tools.execute`), and returns once the agent acks.
5787// The agent then streams `agent.chat.event` notifications back, which
5788// the dispatcher intercepts (see `try_forward_agent_chat_event`) and
5789// rewrites as `agents.chat.event` notifications on the originating
5790// host's channel.
5791
5792/// Timeout the server waits for the agent to ack `agent.chat`. The
5793/// streamed tokens come later as separate notifications and have no
5794/// bearing on this — this is just "did the agent receive the prompt
5795/// and accept it." Five seconds is generous for a local IPC ack.
5796const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5797
5798/// `agents.chat` — host issues a chat turn to a named agent. Returns
5799/// `{ accepted: true, session_id }` once the agent acks; streamed
5800/// tokens arrive on the host's channel as `agents.chat.event`
5801/// notifications keyed by the same `session_id`.
5802async fn handle_agents_chat(
5803    req: &JsonRpcMessage,
5804    state: &Arc<ServerState>,
5805    host_session: &Arc<crate::session::ClientSession>,
5806) -> Result<Value, String> {
5807    use futures::SinkExt;
5808    use tokio::sync::oneshot;
5809    use tokio_tungstenite::tungstenite::Message;
5810
5811    let agent_id = req
5812        .params
5813        .get("agent_id")
5814        .and_then(Value::as_str)
5815        .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5816        .to_string();
5817    let prompt = req
5818        .params
5819        .get("prompt")
5820        .and_then(Value::as_str)
5821        .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5822        .to_string();
5823    let session_id = req
5824        .params
5825        .get("session_id")
5826        .and_then(Value::as_str)
5827        .map(str::to_string)
5828        .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5829    let stream = req
5830        .params
5831        .get("stream")
5832        .and_then(Value::as_bool)
5833        .unwrap_or(true);
5834    let voice_input = req
5835        .params
5836        .get("voice_input")
5837        .and_then(Value::as_bool)
5838        .unwrap_or(false);
5839
5840    // Resolve the agent's attached WS channel via `attached_agents` →
5841    // `sessions` → `channel`. Both lookups must hit; a missing entry on
5842    // either side means the agent is registered in `agents.json` but
5843    // hasn't `session.auth`'d (or has disconnected), so refuse with a
5844    // structured error rather than silently parking the chat.
5845    let agent_client_id = state
5846        .attached_agents
5847        .lock()
5848        .await
5849        .get(&agent_id)
5850        .cloned()
5851        .ok_or_else(|| {
5852            format!(
5853                "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5854                agent_id
5855            )
5856        })?;
5857    let agent_channel = {
5858        let sessions = state.sessions.lock().await;
5859        sessions
5860            .get(&agent_client_id)
5861            .map(|s| s.channel.clone())
5862            .ok_or_else(|| {
5863                format!(
5864                    "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5865                    agent_id, agent_client_id
5866                )
5867            })?
5868    };
5869
5870    // Register the chat session BEFORE sending the reverse call so any
5871    // `agent.chat.event` notifications the agent sends as part of
5872    // accepting the chat (e.g. an immediate `token` delta) route
5873    // correctly. Indexed by session_id so the notification interceptor
5874    // can locate the originating host without scanning.
5875    {
5876        let created_at = std::time::SystemTime::now()
5877            .duration_since(std::time::UNIX_EPOCH)
5878            .map(|d| d.as_secs())
5879            .unwrap_or(0);
5880        state.chat_sessions.lock().await.insert(
5881            session_id.clone(),
5882            crate::session::ChatSession {
5883                agent_id: agent_id.clone(),
5884                host_client_id: host_session.client_id.clone(),
5885                created_at,
5886            },
5887        );
5888    }
5889
5890    // Reverse-callback: register a oneshot for the ack, send the
5891    // `agent.chat` JSON-RPC request on the agent's channel, await up
5892    // to AGENT_CHAT_ACK_TIMEOUT_SECS. Uses the same `pending` map the
5893    // tool-callback path uses (`WsToolExecutor`) — the dispatcher's
5894    // response demuxer at the top of `run_dispatch` already routes
5895    // `result` / `error` frames keyed by request id back through it.
5896    let request_id = agent_channel.next_request_id();
5897    let (tx, rx) = oneshot::channel();
5898    agent_channel
5899        .pending
5900        .lock()
5901        .await
5902        .insert(request_id.clone(), tx);
5903
5904    let rpc_request = serde_json::json!({
5905        "jsonrpc": "2.0",
5906        "method": "agent.chat",
5907        "params": {
5908            "session_id": session_id,
5909            "prompt": prompt,
5910            "stream": stream,
5911            "context": {
5912                "host_client_id": host_session.client_id,
5913                "voice_input": voice_input,
5914            },
5915        },
5916        "id": request_id,
5917    });
5918    let msg = Message::Text(
5919        serde_json::to_string(&rpc_request)
5920            .map_err(|e| e.to_string())?
5921            .into(),
5922    );
5923    if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5924        // Send failed — drop the pending waiter and the chat session
5925        // entry so a retry can take a fresh session_id without
5926        // colliding.
5927        agent_channel.pending.lock().await.remove(&request_id);
5928        state.chat_sessions.lock().await.remove(&session_id);
5929        return Err(format!(
5930            "failed to deliver agent.chat to `{}`: {}",
5931            agent_id, e
5932        ));
5933    }
5934
5935    // Await the agent's ack. The dispatcher's response demuxer routes
5936    // the result/error back via the oneshot. Timeout means the agent
5937    // is alive but unresponsive — clean up routing state and surface a
5938    // structured error so the host UI doesn't hang.
5939    let ack = match tokio::time::timeout(
5940        std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5941        rx,
5942    )
5943    .await
5944    {
5945        Ok(Ok(resp)) => resp,
5946        Ok(Err(_)) => {
5947            // Channel closed — agent disconnected mid-call.
5948            state.chat_sessions.lock().await.remove(&session_id);
5949            return Err(format!(
5950                "agent `{}` disconnected before acking agents.chat",
5951                agent_id
5952            ));
5953        }
5954        Err(_) => {
5955            // Timeout — agent didn't respond in time. Don't keep the
5956            // chat session around: any later events from the agent
5957            // would route to a host that already returned an error.
5958            agent_channel.pending.lock().await.remove(&request_id);
5959            state.chat_sessions.lock().await.remove(&session_id);
5960            return Err(format!(
5961                "agent `{}` did not ack agents.chat within {}s",
5962                agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5963            ));
5964        }
5965    };
5966
5967    if let Some(err) = ack.error {
5968        // Agent explicitly rejected — drop the session and propagate.
5969        state.chat_sessions.lock().await.remove(&session_id);
5970        return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5971    }
5972
5973    Ok(serde_json::json!({
5974        "accepted": true,
5975        "session_id": session_id,
5976    }))
5977}
5978
5979/// `agents.chat.cancel` — host aborts an in-flight chat. Forwards
5980/// `agent.chat.cancel` to the bound agent so the agent can short-
5981/// circuit its inference stream + free upstream resources
5982/// (`inference.stream.cancel`). The chat session is dropped from
5983/// routing state immediately whether or not the agent acks the cancel
5984/// — further `agent.chat.event` notifications for this session_id
5985/// fall on the floor by design.
5986async fn handle_agents_chat_cancel(
5987    req: &JsonRpcMessage,
5988    state: &Arc<ServerState>,
5989) -> Result<Value, String> {
5990    use futures::SinkExt;
5991    use tokio_tungstenite::tungstenite::Message;
5992
5993    let session_id = req
5994        .params
5995        .get("session_id")
5996        .and_then(Value::as_str)
5997        .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5998        .to_string();
5999
6000    let chat = state.chat_sessions.lock().await.remove(&session_id);
6001    let chat = match chat {
6002        Some(c) => c,
6003        None => {
6004            // Already cancelled or never existed — idempotent.
6005            return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
6006        }
6007    };
6008
6009    // Best-effort fire-and-forget to the agent. We've already removed
6010    // the routing entry, so no need to await any agent response.
6011    let agent_client_id = state
6012        .attached_agents
6013        .lock()
6014        .await
6015        .get(&chat.agent_id)
6016        .cloned();
6017    if let Some(client_id) = agent_client_id {
6018        let channel_opt = {
6019            let sessions = state.sessions.lock().await;
6020            sessions.get(&client_id).map(|s| s.channel.clone())
6021        };
6022        if let Some(channel) = channel_opt {
6023            let notification = serde_json::json!({
6024                "jsonrpc": "2.0",
6025                "method": "agent.chat.cancel",
6026                "params": { "session_id": session_id },
6027            });
6028            if let Ok(text) = serde_json::to_string(&notification) {
6029                let _ = channel
6030                    .write
6031                    .lock()
6032                    .await
6033                    .send(Message::Text(text.into()))
6034                    .await;
6035            }
6036        }
6037    }
6038
6039    Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
6040}
6041
6042/// Forward an `agent.chat.event` notification from an agent's
6043/// connection to the originating host's connection, rewritten as an
6044/// `agents.chat.event` notification. Returns `true` if the inbound
6045/// frame was a chat-event we routed (so the dispatcher can `continue`
6046/// past the normal method dispatch and skip the wasted "unknown
6047/// method" response), `false` otherwise.
6048///
6049/// Terminal events (`kind: "done"` / `"error"`) also drop the routing
6050/// entry from `state.chat_sessions` so a later stray notification can
6051/// be rejected as orphaned without leaking memory.
6052pub(crate) async fn try_forward_agent_chat_event(
6053    parsed: &JsonRpcMessage,
6054    state: &Arc<ServerState>,
6055) -> bool {
6056    use futures::SinkExt;
6057    use tokio_tungstenite::tungstenite::Message;
6058
6059    // Notification predicate: method is `agent.chat.event`, id is
6060    // missing/null (per JSON-RPC, notifications have no id), and
6061    // params carry a session_id.
6062    let Some(method) = parsed.method.as_deref() else {
6063        return false;
6064    };
6065    if method != "agent.chat.event" {
6066        return false;
6067    }
6068    if !parsed.id.is_null() {
6069        // Has an id → it's a request, not a notification. Let the
6070        // normal dispatcher handle it (and reply with method-not-found).
6071        return false;
6072    }
6073    let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
6074        return false;
6075    };
6076    let session_id = session_id.to_string();
6077
6078    // Look up the routing entry. If gone (cancelled, agent dropped,
6079    // disconnect cleanup), drop the event silently — late frames from
6080    // a respawned agent for a stale session are not the host's
6081    // problem.
6082    let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
6083    let Some(chat) = chat else {
6084        return true; // recognized the method, but routing has dropped — consumed.
6085    };
6086
6087    // Pull the kind early so terminal-event cleanup runs even if the
6088    // host's send fails. Agents may omit `kind` and signal terminal
6089    // state via `finish_reason` / `error` instead (car#222) — derive
6090    // it from the frame shape so both the cleanup below AND the host
6091    // see a correct, host-protocol-compliant kind. The old code
6092    // defaulted a `finish_reason`-only "done" frame to "token", so
6093    // terminal cleanup never ran and the host (which requires `kind`)
6094    // dropped every frame silently.
6095    let kind = parsed
6096        .params
6097        .get("kind")
6098        .and_then(Value::as_str)
6099        .map(str::to_string)
6100        .unwrap_or_else(|| {
6101            if parsed.params.get("error").is_some() {
6102                "error".to_string()
6103            } else if parsed.params.get("finish_reason").is_some() {
6104                "done".to_string()
6105            } else {
6106                "token".to_string()
6107            }
6108        });
6109
6110    // Forward to the host. Rewrites the method name to the host-facing
6111    // form and attaches `agent_id` so the host doesn't have to remember
6112    // which agent owns each session.
6113    let host_channel = {
6114        let sessions = state.sessions.lock().await;
6115        sessions
6116            .get(&chat.host_client_id)
6117            .map(|s| s.channel.clone())
6118    };
6119    if let Some(channel) = host_channel {
6120        let mut params = parsed.params.clone();
6121        if let Some(obj) = params.as_object_mut() {
6122            obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6123            // host-protocol.md requires a top-level `kind` on every
6124            // agents.chat.event. Agents that omit it (signalling via
6125            // finish_reason/error) were dropped wholesale by the host
6126            // decoder — normalize here so the contract holds. car#222.
6127            obj.entry("kind")
6128                .or_insert_with(|| Value::String(kind.clone()));
6129        }
6130        let forward = serde_json::json!({
6131            "jsonrpc": "2.0",
6132            "method": "agents.chat.event",
6133            "params": params,
6134        });
6135        if let Ok(text) = serde_json::to_string(&forward) {
6136            let send_result = channel
6137                .write
6138                .lock()
6139                .await
6140                .send(Message::Text(text.into()))
6141                .await;
6142            if let Err(e) = send_result {
6143                tracing::warn!(
6144                    session_id = %session_id,
6145                    agent_id = %chat.agent_id,
6146                    host_client_id = %chat.host_client_id,
6147                    kind = %kind,
6148                    error = %e,
6149                    "agent.chat.event forward to host failed at the WS send step"
6150                );
6151            }
6152        }
6153    } else {
6154        // Host disconnected mid-stream — chat_sessions still holds
6155        // the routing entry but the originating client_id no longer
6156        // resolves to a session. Pre-#233 this was silent and the
6157        // operator had no way to tell whether the event was dropped
6158        // here or never emitted by the agent. Log + drop the
6159        // routing entry so subsequent stray events are no-ops.
6160        tracing::warn!(
6161            session_id = %session_id,
6162            agent_id = %chat.agent_id,
6163            host_client_id = %chat.host_client_id,
6164            kind = %kind,
6165            "agent.chat.event from supervised agent had no host channel \
6166             (host disconnected since `agents.chat`); dropping routing entry"
6167        );
6168        state.chat_sessions.lock().await.remove(&session_id);
6169        return true;
6170    }
6171
6172    // Terminal-kind cleanup. The host_channel branch above already
6173    // forwarded the terminal event; we just remove the routing entry
6174    // here so subsequent stray frames are no-ops.
6175    if matches!(kind.as_str(), "done" | "error") {
6176        state.chat_sessions.lock().await.remove(&session_id);
6177    }
6178
6179    true
6180}
6181
6182#[cfg(test)]
6183mod fd_leak_regression {
6184    //! car#209 regression: an abrupt transport error must still run
6185    //! the connection cleanup. Before the fix, `let msg = msg?;`
6186    //! propagated the read error out of `run_dispatch`, skipping
6187    //! `remove_session`, so `state.sessions` (holding the
6188    //! `Arc<ClientSession>` -> `Arc<WsChannel>` -> socket FD) leaked
6189    //! forever on every peer reset / crash-loop disconnect.
6190    use super::run_dispatch;
6191    use futures::SinkExt;
6192    use std::sync::Arc;
6193    use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6194
6195    #[tokio::test]
6196    async fn abrupt_read_error_still_runs_session_cleanup() {
6197        let tmp = tempfile::TempDir::new().unwrap();
6198        let state = Arc::new(crate::session::ServerState::standalone(
6199            tmp.path().to_path_buf(),
6200        ));
6201
6202        // Read stream that immediately yields a transport error (peer
6203        // reset), then ends -- the exact shape of an ungraceful
6204        // client disconnect.
6205        let read = futures::stream::iter(vec![Err::<Message, WsError>(
6206            WsError::ConnectionClosed,
6207        )]);
6208        let write: crate::session::WsSink = Box::pin(
6209            futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6210        );
6211
6212        let result =
6213            run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6214        assert!(
6215            result.is_ok(),
6216            "run_dispatch must return Ok after cleanup, got {result:?}"
6217        );
6218
6219        // The session (and its channel/FD) must be gone -- cleanup
6220        // ran despite the abrupt error.
6221        assert!(
6222            state.sessions.lock().await.is_empty(),
6223            "state.sessions must be empty after an abrupt disconnect (car#209)"
6224        );
6225    }
6226}