Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150        + Unpin
151        + Send,
152{
153    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154    tracing::Span::current().record("client_id", &client_id.as_str());
155
156    info!("New connection from {}", peer);
157
158    let channel = Arc::new(WsChannel {
159        write: Mutex::new(write),
160        pending: Mutex::new(HashMap::new()),
161        next_id: AtomicU64::new(1),
162    });
163
164    let session = state.create_session(&client_id, channel.clone()).await;
165
166    // car#209: per-request handlers are spawned detached (below) and
167    // each clones `Arc<ClientSession>` → `Arc<WsChannel>`. A bare
168    // `tokio::spawn` outlives the connection, so a slow/hung handler
169    // pins the split sink and the inbound socket lingers in CLOSED
170    // until the daemon hits EMFILE. Own them in a per-connection
171    // `JoinSet` so every in-flight handler is aborted the instant the
172    // WS drops (`abort_all` in the cleanup block; `JoinSet`'s Drop
173    // also aborts on any early return), releasing the FD immediately.
174    let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176    while let Some(msg) = read.next().await {
177        // Reap finished handlers so a long-lived connection doesn't
178        // retain their `JoinHandle`s unbounded (memory, not FDs).
179        while conn_tasks.try_join_next().is_some() {}
180
181        // car#209: on an abrupt transport error (peer reset / network
182        // drop — the trader crash-loop case) `read.next()` yields
183        // `Some(Err(_))`. The old `let msg = msg?;` propagated that
184        // out of the function, *skipping the cleanup block below*, so
185        // `state.sessions` (+ the host/a2ui/chat registries) kept the
186        // session + channel — and thus the socket FD — forever. Break
187        // instead: every disconnect, clean or not, runs the single
188        // cleanup path.
189        let msg = match msg {
190            Ok(m) => m,
191            Err(e) => {
192                info!("read error from {}: {}; closing", client_id, e);
193                break;
194            }
195        };
196        if msg.is_text() {
197            let text = match msg.to_text() {
198                Ok(t) => t,
199                Err(e) => {
200                    info!("non-text frame from {}: {}; closing", client_id, e);
201                    break;
202                }
203            };
204            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205                Ok(m) => m,
206                Err(e) => {
207                    send_response(
208                        &session.channel,
209                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210                    )
211                    .await
212                    .ok();
213                    continue;
214                }
215            };
216
217            // Is this a response to a pending tool callback?
218            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219                if let Some(id_str) = parsed.id.as_str() {
220                    let mut pending = session.channel.pending.lock().await;
221                    if let Some(tx) = pending.remove(id_str) {
222                        let tool_resp = if let Some(result) = parsed.result {
223                            ToolExecuteResponse {
224                                action_id: id_str.to_string(),
225                                output: Some(result),
226                                error: None,
227                            }
228                        } else {
229                            let err_msg = parsed
230                                .error
231                                .as_ref()
232                                .and_then(|e| e.get("message"))
233                                .and_then(|m| m.as_str())
234                                .unwrap_or("unknown error")
235                                .to_string();
236                            ToolExecuteResponse {
237                                action_id: id_str.to_string(),
238                                output: None,
239                                error: Some(err_msg),
240                            }
241                        };
242                        let _ = tx.send(tool_resp);
243                        continue;
244                    }
245                }
246            }
247
248            // Agent → host chat-event interceptor. `agent.chat.event`
249            // notifications coming up the WS from a connected agent
250            // are forwarded to the originating host's channel as
251            // `agents.chat.event`. Lives ahead of the regular method
252            // dispatch so the dispatcher doesn't reply with
253            // "method-not-found" on what is a fire-and-forget
254            // notification (no id). See
255            // `docs/proposals/agent-chat-surface.md`.
256            if try_forward_agent_chat_event(&parsed, &state).await {
257                continue;
258            }
259
260            // Otherwise it's a client request
261            if let Some(method) = &parsed.method {
262                info!(method = %method, "dispatching JSON-RPC method");
263
264                // Auth gate (Parslee-ai/car-releases#32). When the
265                // server has an auth token installed, every method
266                // other than `session.auth` is rejected on
267                // unauthenticated sessions and the connection is
268                // closed after the error response goes out. When no
269                // token is installed (default), this branch never
270                // fires — preserves pre-#32 behaviour.
271                if state.auth_token.get().is_some()
272                    && !session
273                        .authenticated
274                        .load(std::sync::atomic::Ordering::Acquire)
275                    && method != "session.auth"
276                {
277                    let resp = JsonRpcResponse::error(
278                        parsed.id.clone(),
279                        -32001,
280                        "auth required: send `session.auth` with the per-launch token \
281                         from ~/Library/Application Support/ai.parslee.car/auth-token \
282                         (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283                         or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284                         as the first frame on this connection",
285                    );
286                    let _ = send_response(&session.channel, resp).await;
287                    info!(client = %client_id, method = %method,
288                        "rejecting non-auth method on unauthenticated session; closing");
289                    break;
290                }
291
292                // Approval gate (audit 2026-05). High-risk methods —
293                // anything that drives macOS automation or sends
294                // messages on the user's behalf — must be acked by
295                // the user via `host.resolve_approval` before they
296                // dispatch. The gate raises an `approval.requested`
297                // host event the local UI can render approve/deny on,
298                // then parks until resolved or the configured
299                // timeout fires. Returns a JSON-RPC error and
300                // continues the dispatch loop on deny / timeout —
301                // no connection close, since the caller may want to
302                // retry with revised parameters.
303                if state.approval_gate.requires_approval(method.as_str()) {
304                    match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305                        Ok(()) => {}
306                        Err(reason) => {
307                            let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308                            let _ = send_response(&session.channel, resp).await;
309                            info!(
310                                client = %client_id,
311                                method = %method,
312                                reason = %reason,
313                                "approval gate blocked dispatch"
314                            );
315                            continue;
316                        }
317                    }
318                }
319
320                // Spawn the per-method dispatch in a task so the read
321                // loop keeps reading frames. Without this, methods
322                // that trigger server-initiated `tools.execute`
323                // callbacks (`proposal.submit`, `workflow.run`,
324                // `multi.*` paths that fire a registered tool)
325                // deadlock the connection: the handler awaits the
326                // callback response on a oneshot, but the response is
327                // another frame on this same read half — which the
328                // synchronous `.await` here would prevent the loop
329                // from ever picking up. Surfaced by the
330                // `executeProposal: echo tool via JS callback` smoke
331                // (#173). Response ordering becomes id-keyed (the
332                // JSON-RPC demuxing contract) rather than
333                // arrival-ordered.
334                let session_task = session.clone();
335                let state_task = state.clone();
336                let method_owned = method.clone();
337                let parsed_task = parsed;
338                // car#209: owned by the per-connection JoinSet so it's
339                // aborted on disconnect instead of leaking the channel.
340                conn_tasks.spawn(async move {
341                    let session = session_task;
342                    let state = state_task;
343                    let parsed = parsed_task;
344                    let result = match method_owned.as_str() {
345                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346                        "parslee.auth" => handle_parslee_auth().await,
347                        "auth.start" => handle_auth_start(&parsed).await,
348                        "auth.complete" => handle_auth_complete(&parsed).await,
349                        "auth.status" => handle_auth_status().await,
350                        "auth.logout" => handle_auth_logout().await,
351                        "session.init" => handle_session_init(&parsed, &session).await,
352                        "host.subscribe" => handle_host_subscribe(&session, &state).await,
353                        "host.agents" => handle_host_agents(&session).await,
354                        "host.events" => handle_host_events(&parsed, &session).await,
355                        "host.approvals" => handle_host_approvals(&session).await,
356                        "host.register_agent" => {
357                            handle_host_register_agent(&parsed, &session).await
358                        }
359                        "host.unregister_agent" => {
360                            handle_host_unregister_agent(&parsed, &session).await
361                        }
362                        "host.set_status" => handle_host_set_status(&parsed, &session).await,
363                        "host.notify" => handle_host_notify(&parsed, &session).await,
364                        "host.request_approval" => {
365                            handle_host_request_approval(&parsed, &session).await
366                        }
367                        "host.resolve_approval" => {
368                            handle_host_resolve_approval(&parsed, &session).await
369                        }
370                        "tools.register" => handle_tools_register(&parsed, &session).await,
371                        "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
372                        "policy.register" => handle_policy_register(&parsed, &session).await,
373                        "session.policy.open" => handle_session_policy_open(&session).await,
374                        "session.policy.close" => {
375                            handle_session_policy_close(&parsed, &session).await
376                        }
377                        "verify" => handle_verify(&parsed, &session).await,
378                        "state.get" => handle_state_get(&parsed, &session).await,
379                        "state.set" => handle_state_set(&parsed, &session).await,
380                        "state.exists" => handle_state_exists(&parsed, &session).await,
381                        "state.keys" => handle_state_keys(&parsed, &session).await,
382                        "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
383                        "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
384                        "memory.query" => handle_memory_query(&parsed, &session).await,
385                        "memory.build_context" => {
386                            handle_memory_build_context(&parsed, &session).await
387                        }
388                        "memory.build_context_fast" => {
389                            handle_memory_build_context_fast(&parsed, &session).await
390                        }
391                        "memory.consolidate" => handle_memory_consolidate(&session).await,
392                        "memory.fact_count" => handle_memory_fact_count(&session).await,
393                        "memory.persist" => handle_memory_persist(&parsed, &session).await,
394                        "memory.load" => handle_memory_load(&parsed, &session).await,
395                        "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
396                        "skill.find" => handle_skill_find(&parsed, &session).await,
397                        "skill.report" => handle_skill_report(&parsed, &session).await,
398                        "skill.repair" => handle_skill_repair(&parsed, &session).await,
399                        "skills.ingest_distilled" => {
400                            handle_skills_ingest_distilled(&parsed, &session).await
401                        }
402                        "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
403                        "skills.domains_needing_evolution" => {
404                            handle_skills_domains_needing_evolution(&parsed, &session).await
405                        }
406                        "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
407                        "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
408                        "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
409                        "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
410                        "multi.vote" => handle_multi_vote(&parsed, &session).await,
411                        "scheduler.create" => handle_scheduler_create(&parsed),
412                        "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
413                        "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
414                        "infer" => handle_infer(&parsed, &state, &session).await,
415                        "image.generate" => handle_image_generate(&parsed, &state).await,
416                        "video.generate" => handle_video_generate(&parsed, &state).await,
417                        "embed" => handle_embed(&parsed, &state).await,
418                        "classify" => handle_classify(&parsed, &state).await,
419                        "tokenize" => handle_tokenize(&parsed, &state).await,
420                        "detokenize" => handle_detokenize(&parsed, &state).await,
421                        "rerank" => handle_rerank(&parsed, &state).await,
422                        "transcribe" => handle_transcribe(&parsed, &state).await,
423                        "synthesize" => handle_synthesize(&parsed, &state).await,
424                        "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
425                        "speech.prepare" => handle_speech_prepare(&state).await,
426                        "models.route" => handle_models_route(&parsed, &state).await,
427                        "models.stats" => handle_models_stats(&state).await,
428                        "outcomes.resolve_pending" => {
429                            handle_outcomes_resolve_pending(&parsed, &state).await
430                        }
431                        "events.count" => handle_events_count(&session).await,
432                        "events.stats" => handle_events_stats(&session).await,
433                        "events.truncate" => handle_events_truncate(&parsed, &session).await,
434                        "events.clear" => handle_events_clear(&session).await,
435                        "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
436                        "models.list" => handle_models_list(&state),
437                        "models.register" => handle_models_register(&parsed, &state).await,
438                        "models.unregister" => handle_models_unregister(&parsed, &state).await,
439                        "models.list_unified" => handle_models_list_unified(&state),
440                        "models.search" => handle_models_search(&parsed, &state),
441                        "models.upgrades" => handle_models_upgrades(&state),
442                        "models.pull" => handle_models_pull(&parsed, &state).await,
443                        "models.install" => handle_models_pull(&parsed, &state).await,
444                        "skills.distill" => handle_skills_distill(&parsed, &state).await,
445                        "skills.list" => handle_skills_list(&parsed, &session).await,
446                        "browser.run" => handle_browser_run(&parsed, &session).await,
447                        "browser.close" => handle_browser_close(&session).await,
448                        "secret.put" => handle_secret_put(&parsed),
449                        "secret.get" => handle_secret_get(&parsed),
450                        "secret.delete" => handle_secret_delete(&parsed),
451                        "secret.status" => handle_secret_status(&parsed),
452                        "secret.available" => Ok(car_ffi_common::secrets::is_available()),
453                        "permissions.status" => handle_perm_status(&parsed),
454                        "permissions.request" => handle_perm_request(&parsed),
455                        "permissions.explain" => handle_perm_explain(&parsed),
456                        "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
457                        "accounts.list" => car_ffi_common::accounts::list(),
458                        "accounts.open" => {
459                            #[derive(serde::Deserialize, Default)]
460                            struct OpenParams {
461                                #[serde(default)]
462                                account_id: Option<String>,
463                            }
464                            let p: OpenParams =
465                                serde_json::from_value(parsed.params.clone()).unwrap_or_default();
466                            car_ffi_common::accounts::open_settings(p.account_id.as_deref())
467                        }
468                        "calendar.list" => car_ffi_common::integrations::calendar_list(),
469                        "calendar.events" => handle_calendar_events(&parsed),
470                        "contacts.containers" => {
471                            car_ffi_common::integrations::contacts_containers()
472                        }
473                        "contacts.find" => handle_contacts_find(&parsed),
474                        "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
475                        "mail.inbox" => handle_mail_inbox(&parsed),
476                        "mail.send" => handle_mail_send(&parsed),
477                        "messages.services" => car_ffi_common::integrations::messages_services(),
478                        "messages.chats" => handle_messages_chats(&parsed),
479                        "messages.send" => handle_messages_send(&parsed),
480                        "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
481                        "notes.find" => handle_notes_find(&parsed),
482                        "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
483                        "reminders.items" => handle_reminders_items(&parsed),
484                        "photos.albums" => car_ffi_common::integrations::photos_albums(),
485                        "bookmarks.list" => handle_bookmarks_list(&parsed),
486                        "files.locations" => car_ffi_common::integrations::files_locations(),
487                        "keychain.status" => car_ffi_common::integrations::keychain_status(),
488                        "health.status" => car_ffi_common::health::status(),
489                        "health.sleep" => handle_health_sleep(&parsed),
490                        "health.workouts" => handle_health_workouts(&parsed),
491                        "health.activity" => handle_health_activity(&parsed),
492                        "voice.transcribe_stream.start" => {
493                            handle_voice_transcribe_stream_start(&parsed, &state, &session).await
494                        }
495                        "voice.transcribe_stream.stop" => {
496                            handle_voice_transcribe_stream_stop(&parsed, &state).await
497                        }
498                        "voice.transcribe_stream.push" => {
499                            handle_voice_transcribe_stream_push(&parsed, &state).await
500                        }
501                        "voice.tts_stream.start" => {
502                            handle_voice_tts_stream_start(&parsed, &session).await
503                        }
504                        "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
505                        "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
506                        "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
507                        "voice.dispatch_turn" => {
508                            handle_voice_dispatch_turn(&parsed, &state, &session).await
509                        }
510                        "voice.cancel_turn" => handle_voice_cancel_turn().await,
511                        "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
512                        "inference.register_runner" => {
513                            handle_inference_register_runner(&session).await
514                        }
515                        "inference.runner.event" => handle_inference_runner_event(&parsed).await,
516                        "inference.runner.complete" => {
517                            handle_inference_runner_complete(&parsed).await
518                        }
519                        "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
520                        "voice.providers.list" => {
521                            // Stateless: enumerates STT/TTS providers compiled into
522                            // this build. Runtime readiness (API key, permission,
523                            // model download) is reported via per-provider errors.
524                            serde_json::from_str::<serde_json::Value>(
525                                &car_voice::list_voice_providers_json(),
526                            )
527                            .map_err(|e| e.to_string())
528                        }
529                        "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
530                            .await
531                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532                        "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
533                            .await
534                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
535                        "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
536                        "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
537                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
538                        "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
539                        "workflow.run" => handle_workflow_run(&parsed, &session).await,
540                        "workflow.verify" => handle_workflow_verify(&parsed),
541                        "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
542                        "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
543                        "meeting.list" => handle_meeting_list(&parsed),
544                        "meeting.get" => handle_meeting_get(&parsed),
545                        "registry.register" => handle_registry_register(&parsed),
546                        "registry.heartbeat" => handle_registry_heartbeat(&parsed),
547                        "registry.unregister" => handle_registry_unregister(&parsed),
548                        "registry.list" => handle_registry_list(&parsed),
549                        "registry.reap" => handle_registry_reap(&parsed),
550                        "admission.status" => handle_admission_status(&state),
551                        "a2a.start" => handle_a2a_start(&parsed, &session).await,
552                        "a2a.stop" => handle_a2a_stop(),
553                        "a2a.status" => handle_a2a_status(),
554                        "a2a.send" => handle_a2a_send(&parsed, &state).await,
555                        "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
556                        "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
557                        "a2ui.capabilities" => handle_a2ui_capabilities(&state),
558                        "a2ui.reap" => handle_a2ui_reap(&state).await,
559                        "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
560                        "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
561                        "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
562                        "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
563                        "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
564                        "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
565                        "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
566                        "automation.run_applescript" => handle_run_applescript(&parsed).await,
567                        "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
568                        "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
569                        "notifications.local" => handle_local_notification(&parsed).await,
570                        "vision.ocr" => handle_vision_ocr(&parsed).await,
571                        "agents.list" => handle_agents_list(&state).await,
572                        "agents.health" => handle_agents_health(&state).await,
573                        "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
574                        "agents.install" => handle_agents_install(&parsed, &state).await,
575                        "agents.remove" => handle_agents_remove(&parsed, &state).await,
576                        "agents.start" => handle_agents_start(&parsed, &state).await,
577                        "agents.stop" => handle_agents_stop(&parsed, &state).await,
578                        "agents.restart" => handle_agents_restart(&parsed, &state).await,
579                        "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
580                        "agents.list_external" => handle_agents_list_external(&parsed).await,
581                        "agents.detect_external" => handle_agents_detect_external(&parsed).await,
582                        "agents.health_external" => handle_agents_health_external(&parsed).await,
583                        "agents.invoke_external" => {
584                            handle_agents_invoke_external(&parsed, &state, &session).await
585                        }
586                        "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
587                        "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
588                        // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
589                        // alias to the same in-core dispatcher per
590                        // Parslee-ai/car-releases#28. Embedders that need a
591                        // custom AgentCardSource / TaskStore plug them in
592                        // via ServerStateConfig::with_a2a_card_source /
593                        // with_a2a_store before any handler runs.
594                        "message/send"
595                        | "SendMessage"
596                        | "message/stream"
597                        | "SendStreamingMessage"
598                        | "tasks/get"
599                        | "GetTask"
600                        | "tasks/list"
601                        | "ListTasks"
602                        | "tasks/cancel"
603                        | "CancelTask"
604                        | "tasks/resubscribe"
605                        | "SubscribeToTask"
606                        | "tasks/pushNotificationConfig/set"
607                        | "CreateTaskPushNotificationConfig"
608                        | "tasks/pushNotificationConfig/get"
609                        | "GetTaskPushNotificationConfig"
610                        | "tasks/pushNotificationConfig/list"
611                        | "ListTaskPushNotificationConfigs"
612                        | "tasks/pushNotificationConfig/delete"
613                        | "DeleteTaskPushNotificationConfig"
614                        | "agent/getAuthenticatedExtendedCard"
615                        | "GetExtendedAgentCard" => {
616                            handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
617                        }
618                        _ => Err(format!("unknown method: {}", method_owned)),
619                    };
620
621                    let resp = match result {
622                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
623                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
624                    };
625                    let _ = send_response(&session.channel, resp).await;
626                });
627            }
628        } else if msg.is_binary() {
629            // CAR binary frame transport — see `car_ffi_common::voice::binary`
630            // for the canonical header definition. 26-byte fixed header
631            // followed by an opaque payload. Inbound type 0x01 carries
632            // 16-bit signed LE PCM into a `pcm_push` session; other
633            // types (0x02 TTS chunk, 0x03 final marker, 0x04 error)
634            // are server-emitted and rejected here.
635            let bytes = msg.into_data();
636            let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
637                Ok(p) => p,
638                Err(e) => {
639                    tracing::warn!("binary frame from {} rejected: {}", client_id, e);
640                    continue;
641                }
642            };
643            match parsed.frame_type {
644                car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
645                    let registry = state.voice_sessions.clone();
646                    let payload_owned = parsed.payload.to_vec();
647                    let session_id_owned = parsed.session_id_hex.clone();
648                    conn_tasks.spawn(async move {
649                        if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
650                            &session_id_owned,
651                            &payload_owned,
652                            registry,
653                        )
654                        .await
655                        {
656                            tracing::warn!(
657                                "binary PCM push to session {} failed: {}",
658                                session_id_owned,
659                                e
660                            );
661                        }
662                    });
663                }
664                other => {
665                    tracing::debug!(
666                        "binary frame type {:#04x} from {} not accepted server-side",
667                        other,
668                        client_id
669                    );
670                }
671            }
672        } else if msg.is_close() {
673            info!("Client {} disconnected", client_id);
674            break;
675        }
676    }
677
678    // car#209: abort every in-flight handler for this connection
679    // *first* — they each hold an `Arc<ClientSession>` → `Arc<WsChannel>`
680    // clone; until they're gone the split sink (and the inbound socket
681    // FD) can't drop, even after the registries below are cleared.
682    conn_tasks.abort_all();
683
684    session.host.unsubscribe(&client_id).await;
685    // Auto-cancel this session's pending approvals so the queue stays
686    // in sync with what's actually decidable — covers graceful
687    // unregister+close, hard crash (TCP reset), and ping timeout in
688    // one place. System-level gate approvals (client_id None) are not
689    // touched. car-releases#48.
690    session.host.reap_session_approvals(&client_id).await;
691    state.a2ui_subscribers.lock().await.remove(&client_id);
692
693    // Fix for MULTI-4 / WS-3: drop the session from the registry and
694    // drain any pending tool callbacks. Without this, every connection
695    // we ever accepted keeps an `Arc<ClientSession>` alive in
696    // `state.sessions`, and outstanding `oneshot::Sender`s in
697    // `session.channel.pending` outlive the closed connection until
698    // their per-call timeout (60s). Dropping the senders here causes any
699    // awaiting `recv()` in `WsToolExecutor::execute` to return
700    // `RecvError` immediately, which the existing error-handler path
701    // already maps to "callback channel closed" — same shape as the
702    // timeout path, just faster.
703    let _removed = state.remove_session(&client_id).await;
704    {
705        let mut pending = session.channel.pending.lock().await;
706        pending.clear();
707    }
708
709    Ok(())
710}
711
712async fn send_response(
713    channel: &WsChannel,
714    resp: JsonRpcResponse,
715) -> Result<(), Box<dyn std::error::Error>> {
716    use futures::SinkExt;
717    let json = serde_json::to_string(&resp)?;
718    channel
719        .write
720        .lock()
721        .await
722        .send(Message::Text(json.into()))
723        .await?;
724    Ok(())
725}
726
727// --- Request handlers ---
728
729async fn handle_host_subscribe(
730    session: &crate::session::ClientSession,
731    state: &Arc<ServerState>,
732) -> Result<Value, String> {
733    session
734        .host
735        .subscribe(&session.client_id, session.channel.clone())
736        .await;
737    serde_json::to_value(HostSnapshot {
738        subscribed: true,
739        agents: session.host.agents().await,
740        approvals: session.host.approvals().await,
741        events: session.host.events(50).await,
742        identity: Some(daemon_identity(state)),
743    })
744    .map_err(|e| e.to_string())
745}
746
747/// Snapshot the daemon-identity facts for a fresh subscriber.
748/// Cheap: non-acquiring reads on `OnceLock`s + a single
749/// `to_string_lossy` on the manifest path. Critically uses
750/// [`ServerState::supervisor_if_installed`] — not the lazy-init
751/// `supervisor()` — so a Heisenberg subscribe can't *cause* the
752/// daemon to acquire the manifest lock just by asking whether it
753/// owns one.
754fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
755    // Observer takes precedence: when both supervisor and observer
756    // markers are set (currently unreachable through the standalone
757    // binary, but an embedder could install both racily), the
758    // observer marker is the authoritative role since the
759    // supervisor handle is only installed when this daemon owns
760    // the lock.
761    let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
762        (
763            Some(p.to_string_lossy().into_owned()),
764            car_proto::HostManifestRole::Observer,
765        )
766    } else if let Some(s) = state.supervisor_if_installed() {
767        (
768            Some(s.manifest_path().to_string_lossy().into_owned()),
769            car_proto::HostManifestRole::Owner,
770        )
771    } else {
772        (None, car_proto::HostManifestRole::None)
773    };
774    car_proto::HostIdentity {
775        version: env!("CARGO_PKG_VERSION").to_string(),
776        pid: std::process::id(),
777        manifest_path,
778        manifest_role,
779        parslee: state
780            .parslee_session
781            .get()
782            .map(|session| session.identity.clone()),
783    }
784}
785
786/// Return the Parslee cloud credential for an authenticated CAR
787/// connection. Managed agents already receive `CAR_AUTH_TOKEN` and
788/// authenticate to the local daemon with `session.auth`; this method is
789/// their supported bridge from local CAR auth to the user's Parslee
790/// backend auth.
791///
792/// The bearer token is intentionally not injected into every managed
793/// child process environment. Agents ask for it only when they need it,
794/// through the same local auth gate that protects the rest of the
795/// daemon.
796async fn handle_parslee_auth() -> Result<Value, String> {
797    let session = crate::parslee_auth::load_or_refresh()
798        .await?
799        .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
800    Ok(serde_json::json!({
801        "authenticated": true,
802        "token_type": "Bearer",
803        "access_token": session.access_token,
804        "authorization_header": format!("Bearer {}", session.access_token),
805        "identity": session.identity,
806    }))
807}
808
809// --- auth.* : GUI-driven Parslee sign-in (CAR Host.app).
810// Shares one implementation with `car auth login` via the car-auth
811// crate. Stateless: the trusted in-process GUI holds the PKCE
812// verifier + state between auth.start and auth.complete (same trust
813// boundary as the keychain it ultimately writes).
814async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
815    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
816    let client_id = req
817        .params
818        .get("client_id")
819        .and_then(|v| v.as_str())
820        .unwrap_or("parslee-car");
821    let redirect_uri = req
822        .params
823        .get("redirect_uri")
824        .and_then(|v| v.as_str())
825        .ok_or_else(|| "redirect_uri is required".to_string())?;
826    let provider = req.params.get("provider").and_then(|v| v.as_str());
827    let state = car_auth::new_state();
828    let verifier = car_auth::pkce_verifier();
829    let challenge = car_auth::pkce_challenge(&verifier);
830    let url =
831        car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
832    Ok(serde_json::json!({
833        "authorize_url": url,
834        "state": state,
835        "verifier": verifier,
836    }))
837}
838
839async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
840    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
841    let client_id = req
842        .params
843        .get("client_id")
844        .and_then(|v| v.as_str())
845        .unwrap_or("parslee-car");
846    let redirect_uri = req
847        .params
848        .get("redirect_uri")
849        .and_then(|v| v.as_str())
850        .ok_or_else(|| "redirect_uri is required".to_string())?;
851    let code = req
852        .params
853        .get("code")
854        .and_then(|v| v.as_str())
855        .ok_or_else(|| "code is required".to_string())?;
856    let verifier = req
857        .params
858        .get("verifier")
859        .and_then(|v| v.as_str())
860        .ok_or_else(|| "verifier is required".to_string())?;
861    let token =
862        car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
863    car_auth::store_tokens(&api_base, &token)?;
864    Ok(serde_json::json!({ "ok": true }))
865}
866
867async fn handle_auth_status() -> Result<Value, String> {
868    match car_auth::fetch_status(None).await? {
869        Some(session_json) => {
870            let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
871            Ok(serde_json::json!({ "authenticated": true, "session": session }))
872        }
873        None => Ok(serde_json::json!({ "authenticated": false })),
874    }
875}
876
877async fn handle_auth_logout() -> Result<Value, String> {
878    car_auth::clear_tokens()?;
879    Ok(serde_json::json!({ "ok": true }))
880}
881
882async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
883    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
884}
885
886async fn handle_host_events(
887    req: &JsonRpcMessage,
888    session: &crate::session::ClientSession,
889) -> Result<Value, String> {
890    let limit = req
891        .params
892        .get("limit")
893        .and_then(|v| v.as_u64())
894        .unwrap_or(100) as usize;
895    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
896}
897
898async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
899    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
900}
901
902async fn handle_a2ui_apply(
903    req: &JsonRpcMessage,
904    state: &Arc<ServerState>,
905) -> Result<Value, String> {
906    #[derive(Deserialize)]
907    struct Params {
908        #[serde(default)]
909        envelope: Option<car_a2ui::A2uiEnvelope>,
910        #[serde(default)]
911        message: Option<car_a2ui::A2uiEnvelope>,
912    }
913
914    let envelope = if req.params.get("createSurface").is_some()
915        || req.params.get("updateComponents").is_some()
916        || req.params.get("updateDataModel").is_some()
917        || req.params.get("deleteSurface").is_some()
918    {
919        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
920            .map_err(|e| e.to_string())?
921    } else {
922        match serde_json::from_value::<Params>(req.params.clone()) {
923            Ok(params) => params
924                .envelope
925                .or(params.message)
926                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
927            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
928                .map_err(|e| e.to_string())?,
929        }
930    };
931
932    apply_a2ui_envelope(state, envelope, None, None).await
933}
934
935async fn handle_a2ui_ingest(
936    req: &JsonRpcMessage,
937    state: &Arc<ServerState>,
938) -> Result<Value, String> {
939    #[derive(Deserialize)]
940    #[serde(rename_all = "camelCase")]
941    struct Params {
942        #[serde(default)]
943        endpoint: Option<String>,
944        #[serde(default)]
945        a2a_endpoint: Option<String>,
946        #[serde(default)]
947        owner: Option<car_a2ui::A2uiSurfaceOwner>,
948        #[serde(default)]
949        route_auth: Option<A2aRouteAuth>,
950        #[serde(default)]
951        allow_untrusted_endpoint: bool,
952    }
953
954    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
955        endpoint: None,
956        a2a_endpoint: None,
957        owner: None,
958        route_auth: None,
959        allow_untrusted_endpoint: false,
960    });
961    let payload = req.params.get("payload").unwrap_or(&req.params);
962    state
963        .a2ui
964        .validate_payload(payload)
965        .map_err(|e| e.to_string())?;
966    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
967    if envelopes.is_empty() {
968        return Err("no A2UI envelopes found in payload".into());
969    }
970    let endpoint = params.endpoint.or(params.a2a_endpoint);
971    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
972    let owner = params
973        .owner
974        .or_else(|| car_a2ui::owner_from_value(payload))
975        .map(|owner| match endpoint.clone() {
976            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
977            None => owner,
978        });
979
980    let mut results = Vec::new();
981    for envelope in envelopes {
982        let value =
983            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
984        results.push(value);
985    }
986    Ok(serde_json::json!({ "applied": results }))
987}
988
989async fn apply_a2ui_envelope(
990    state: &Arc<ServerState>,
991    envelope: car_a2ui::A2uiEnvelope,
992    owner: Option<car_a2ui::A2uiSurfaceOwner>,
993    route_auth: Option<A2aRouteAuth>,
994) -> Result<Value, String> {
995    let result = state
996        .a2ui
997        .apply_with_owner(envelope, owner)
998        .await
999        .map_err(|e| e.to_string())?;
1000    update_a2ui_route_auth(state, &result, route_auth).await;
1001    let kind = if result.deleted {
1002        "a2ui.surface_deleted"
1003    } else {
1004        "a2ui.surface_updated"
1005    };
1006    let message = if result.deleted {
1007        format!("A2UI surface {} deleted", result.surface_id)
1008    } else {
1009        format!("A2UI surface {} updated", result.surface_id)
1010    };
1011    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1012    state
1013        .host
1014        .record_event(kind, None, message, payload.clone())
1015        .await;
1016    // Push the envelope result to every WS subscriber as an
1017    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
1018    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
1019    broadcast_a2ui_event(state, kind, &payload).await;
1020    serde_json::to_value(result).map_err(|e| e.to_string())
1021}
1022
1023async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1024    use futures::SinkExt;
1025    use tokio_tungstenite::tungstenite::Message;
1026    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1027        .a2ui_subscribers
1028        .lock()
1029        .await
1030        .values()
1031        .cloned()
1032        .collect();
1033    if subscribers.is_empty() {
1034        return;
1035    }
1036    let Ok(json) = serde_json::to_string(&serde_json::json!({
1037        "jsonrpc": "2.0",
1038        "method": "a2ui.event",
1039        "params": {
1040            "kind": kind,
1041            "result": result,
1042        },
1043    })) else {
1044        return;
1045    };
1046    for channel in subscribers {
1047        let _ = channel
1048            .write
1049            .lock()
1050            .await
1051            .send(Message::Text(json.clone().into()))
1052            .await;
1053    }
1054}
1055
1056async fn update_a2ui_route_auth(
1057    state: &Arc<ServerState>,
1058    result: &car_a2ui::A2uiApplyResult,
1059    route_auth: Option<A2aRouteAuth>,
1060) {
1061    let mut auth = state.a2ui_route_auth.lock().await;
1062    if result.deleted {
1063        auth.remove(&result.surface_id);
1064        return;
1065    }
1066
1067    let has_route_endpoint = result
1068        .surface
1069        .as_ref()
1070        .and_then(|surface| surface.owner.as_ref())
1071        .and_then(|owner| owner.endpoint.as_ref())
1072        .is_some();
1073    match (has_route_endpoint, route_auth) {
1074        (true, Some(route_auth)) => {
1075            auth.insert(result.surface_id.clone(), route_auth);
1076        }
1077        _ => {
1078            auth.remove(&result.surface_id);
1079        }
1080    }
1081}
1082
1083fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1084    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1085}
1086
1087async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1088    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1089    if !removed.is_empty() {
1090        let mut auth = state.a2ui_route_auth.lock().await;
1091        for surface_id in &removed {
1092            auth.remove(surface_id);
1093        }
1094    }
1095    Ok(serde_json::json!({ "removed": removed }))
1096}
1097
1098async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1099    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1100}
1101
1102async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1103    let surface_id = req
1104        .params
1105        .get("surface_id")
1106        .or_else(|| req.params.get("surfaceId"))
1107        .and_then(Value::as_str)
1108        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1109    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1110}
1111
1112/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
1113/// notifications. Subscribers receive every `apply_a2ui_envelope`
1114/// result for as long as they're connected; the cleanup hook in
1115/// `run_dispatch` removes them on disconnect. Closes
1116/// Parslee-ai/car-releases#29.
1117async fn handle_a2ui_subscribe(
1118    session: &crate::session::ClientSession,
1119    state: &Arc<ServerState>,
1120) -> Result<Value, String> {
1121    state
1122        .a2ui_subscribers
1123        .lock()
1124        .await
1125        .insert(session.client_id.clone(), session.channel.clone());
1126    Ok(serde_json::json!({ "subscribed": true }))
1127}
1128
1129/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
1130/// Idempotent: returns `{ subscribed: false }` regardless of prior
1131/// state.
1132async fn handle_a2ui_unsubscribe(
1133    session: &crate::session::ClientSession,
1134    state: &Arc<ServerState>,
1135) -> Result<Value, String> {
1136    state
1137        .a2ui_subscribers
1138        .lock()
1139        .await
1140        .remove(&session.client_id);
1141    Ok(serde_json::json!({ "subscribed": false }))
1142}
1143
1144/// `a2ui/replay` — fetch the current state of one surface. Intended
1145/// for late joiners and reconnect: a client calls `subscribe`, then
1146/// `replay` once per surface it's tracking, and from then on
1147/// notifications keep it in sync. Equivalent to `a2ui.get` on the
1148/// surface store; lives in the subscribe namespace for
1149/// discoverability.
1150async fn handle_a2ui_replay(
1151    req: &JsonRpcMessage,
1152    state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154    let surface_id = req
1155        .params
1156        .get("surface_id")
1157        .or_else(|| req.params.get("surfaceId"))
1158        .and_then(Value::as_str)
1159        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1160    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1161}
1162
1163async fn handle_a2ui_action(
1164    req: &JsonRpcMessage,
1165    state: &Arc<ServerState>,
1166) -> Result<Value, String> {
1167    let action: car_a2ui::ClientAction =
1168        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1169    let owner = state.a2ui.owner(&action.surface_id).await;
1170    let route = route_a2ui_action(state, &action, owner.clone()).await;
1171    let payload = serde_json::json!({
1172        "action": action,
1173        "owner": owner,
1174        "route": route,
1175    });
1176    let event = state
1177        .host
1178        .record_event(
1179            "a2ui.action",
1180            None,
1181            format!(
1182                "A2UI action {} from {}",
1183                action.name, action.source_component_id
1184            ),
1185            payload,
1186        )
1187        .await;
1188    Ok(serde_json::json!({
1189        "event": event,
1190        "route": route,
1191    }))
1192}
1193
1194/// `a2ui.render_report` — renderer-emitted telemetry envelope
1195/// (Parslee-ai/car#180). Fire-and-forget; we record an event in
1196/// the host log (so dev tools and the conversation log see it) and
1197/// broadcast as `a2ui.event { kind: "a2ui.render_report", result }`
1198/// to every WS subscriber. The improvement agent reads the report
1199/// and decides whether to issue a follow-up `patchComponents`.
1200async fn handle_a2ui_render_report(
1201    req: &JsonRpcMessage,
1202    state: &Arc<ServerState>,
1203) -> Result<Value, String> {
1204    // Parse into the typed struct to enforce the schema; we
1205    // re-serialize for the event/broadcast payload so downstream
1206    // consumers don't have to defensively re-validate.
1207    let report: car_a2ui::RenderReport =
1208        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1209    let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1210    let kind = "a2ui.render_report";
1211    let message = format!("A2UI render report for surface {}", report.surface_id);
1212    let event = state
1213        .host
1214        .record_event(kind, None, message, payload.clone())
1215        .await;
1216    broadcast_a2ui_event(state, kind, &payload).await;
1217
1218    // Hand the report to the in-process UI-improvement agent. The
1219    // agent is sync but cheap; we await the surface lookup before
1220    // calling it so the strategies see the same surface state the
1221    // renderer saw at report-emit time. Best-effort: surface lookup
1222    // misses (the surface might have been deleted between
1223    // report-emit and report-receive) are logged and skipped, never
1224    // surfaced as JSON-RPC errors — render_report is fire-and-forget.
1225    if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1226        // Iteration budget — runaway-loop backstop. try_consume
1227        // claims the slot atomically; if the surface is already at
1228        // the cap, we short-circuit before consulting the agent.
1229        // The slot stays consumed only if the patch actually
1230        // applies — failure paths refund.
1231        if !state.ui_agent_budget.try_consume(&report.surface_id) {
1232            tracing::warn!(
1233                surface_id = %report.surface_id,
1234                count = state.ui_agent_budget.count(&report.surface_id),
1235                max = state.ui_agent_budget.max(),
1236                "ui-agent iteration budget exhausted; skipping agent invocation"
1237            );
1238            return Ok(serde_json::json!({ "event": event }));
1239        }
1240        // From here on, every non-applied branch must `refund` the
1241        // slot we just claimed. Only the successful-apply branch
1242        // keeps it consumed.
1243        match state.ui_agent.on_render_report(&report, &surface) {
1244            car_ui_agent::Decision::Patch {
1245                envelope,
1246                strategy_id,
1247                patch_hash,
1248                elapsed_ns,
1249            } => {
1250                // Runtime-side convergence monitor — neo's deferred
1251                // ask. The agent's no-double-patch guard catches
1252                // same-sequence repeats; this catches A→B→A across
1253                // sequences by tracking recent patch hashes per
1254                // surface. When a proposal would repeat one in the
1255                // window, drop it; the loop waits for the next
1256                // signature change.
1257                if !state
1258                    .ui_agent_oscillation
1259                    .check_and_record(&report.surface_id, patch_hash)
1260                {
1261                    tracing::warn!(
1262                        surface_id = %report.surface_id,
1263                        strategy = %strategy_id,
1264                        patch_hash,
1265                        "ui-agent oscillation detected; suppressing patch"
1266                    );
1267                    // Suppressed patch never applied → release the
1268                    // budget slot we claimed above.
1269                    state.ui_agent_budget.refund(&report.surface_id);
1270                    return Ok(serde_json::json!({ "event": event }));
1271                }
1272                let a2ui_envelope = car_a2ui::A2uiEnvelope {
1273                    patch_components: Some(envelope),
1274                    ..Default::default()
1275                };
1276                if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1277                    tracing::warn!(
1278                        surface_id = %report.surface_id,
1279                        strategy = %strategy_id,
1280                        patch_hash,
1281                        elapsed_ns,
1282                        error = %e,
1283                        "ui-agent patch apply failed",
1284                    );
1285                    // Apply failed → release the budget slot.
1286                    state.ui_agent_budget.refund(&report.surface_id);
1287                } else {
1288                    tracing::debug!(
1289                        surface_id = %report.surface_id,
1290                        strategy = %strategy_id,
1291                        patch_hash,
1292                        elapsed_ns,
1293                        iteration = state.ui_agent_budget.count(&report.surface_id),
1294                        "ui-agent patch applied",
1295                    );
1296                    // Memgine trace: one Conversation node per
1297                    // successful patch, tagged "ui-agent/<surface>".
1298                    // Spreading activation can surface these on
1299                    // future renders of related surfaces. Spawned
1300                    // off the render-report hot path — ingest walks
1301                    // the graph for spreading activation and can
1302                    // take real time; we don't want two surfaces
1303                    // churning patches to serialize through the
1304                    // memgine mutex behind the WS handler.
1305                    if let Some(memgine) = state.shared_memgine.clone() {
1306                        let speaker = format!("ui-agent/{}", report.surface_id);
1307                        let text = format!("strategy applied: {}", strategy_id);
1308                        tokio::spawn(async move {
1309                            let mut guard = memgine.lock().await;
1310                            guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1311                        });
1312                    }
1313                }
1314            }
1315            car_ui_agent::Decision::StableNoChange => {
1316                // No patch this round → release the slot.
1317                state.ui_agent_budget.refund(&report.surface_id);
1318            }
1319            car_ui_agent::Decision::HardStop { reason } => {
1320                state.ui_agent_budget.refund(&report.surface_id);
1321                // Renderer painted unknown components — contract
1322                // violation between server and renderer. Per
1323                // types.rs docs: "loud failure" + "MUST pause."
1324                // `error!` not `warn!` so it surfaces in production
1325                // logs at the right severity.
1326                tracing::error!(
1327                    surface_id = %report.surface_id,
1328                    reason = %reason,
1329                    "ui-agent hard-stopped improvement loop",
1330                );
1331            }
1332        }
1333    } else {
1334        tracing::debug!(
1335            surface_id = %report.surface_id,
1336            "ui-agent skipped — surface not found in store",
1337        );
1338    }
1339
1340    Ok(serde_json::json!({ "event": event }))
1341}
1342
1343async fn route_a2ui_action(
1344    state: &Arc<ServerState>,
1345    action: &car_a2ui::ClientAction,
1346    owner: Option<car_a2ui::A2uiSurfaceOwner>,
1347) -> Value {
1348    let Some(owner) = owner else {
1349        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1350    };
1351    if owner.kind != "a2a" {
1352        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1353    }
1354    let Some(endpoint) = owner.endpoint.clone() else {
1355        return serde_json::json!({
1356            "delivered": false,
1357            "reason": "surface owner has no endpoint",
1358            "owner": owner
1359        });
1360    };
1361
1362    let message = car_a2a::Message {
1363        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1364        role: car_a2a::MessageRole::User,
1365        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1366            data: serde_json::json!({
1367                "a2uiAction": action,
1368            }),
1369            metadata: Default::default(),
1370        })],
1371        task_id: owner.task_id.clone(),
1372        context_id: owner.context_id.clone(),
1373        metadata: Default::default(),
1374    };
1375
1376    let auth = state
1377        .a2ui_route_auth
1378        .lock()
1379        .await
1380        .get(&action.surface_id)
1381        .cloned()
1382        .map(client_auth_from_route_auth)
1383        .unwrap_or(car_a2a::ClientAuth::None);
1384
1385    match car_a2a::A2aClient::new(endpoint.clone())
1386        .with_auth(auth)
1387        .send_message(message, false)
1388        .await
1389    {
1390        Ok(result) => serde_json::json!({
1391            "delivered": true,
1392            "owner": owner,
1393            "endpoint": endpoint,
1394            "result": result,
1395        }),
1396        Err(error) => serde_json::json!({
1397            "delivered": false,
1398            "owner": owner,
1399            "endpoint": endpoint,
1400            "error": error.to_string(),
1401        }),
1402    }
1403}
1404
1405fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1406    match auth {
1407        A2aRouteAuth::None => car_a2a::ClientAuth::None,
1408        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1409        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1410    }
1411}
1412
1413fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1414    let endpoint = endpoint?;
1415    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1416        Some(endpoint)
1417    } else {
1418        None
1419    }
1420}
1421
1422fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1423    endpoint == "http://localhost"
1424        || endpoint.starts_with("http://localhost:")
1425        || endpoint.starts_with("http://localhost/")
1426        || endpoint == "http://127.0.0.1"
1427        || endpoint.starts_with("http://127.0.0.1:")
1428        || endpoint.starts_with("http://127.0.0.1/")
1429        || endpoint == "http://[::1]"
1430        || endpoint.starts_with("http://[::1]:")
1431        || endpoint.starts_with("http://[::1]/")
1432}
1433
1434async fn handle_host_register_agent(
1435    req: &JsonRpcMessage,
1436    session: &crate::session::ClientSession,
1437) -> Result<Value, String> {
1438    let request: RegisterHostAgentRequest =
1439        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1440    serde_json::to_value(
1441        session
1442            .host
1443            .register_agent(&session.client_id, request)
1444            .await?,
1445    )
1446    .map_err(|e| e.to_string())
1447}
1448
1449async fn handle_host_unregister_agent(
1450    req: &JsonRpcMessage,
1451    session: &crate::session::ClientSession,
1452) -> Result<Value, String> {
1453    let agent_id = req
1454        .params
1455        .get("agent_id")
1456        .and_then(|v| v.as_str())
1457        .ok_or("missing agent_id")?;
1458    session
1459        .host
1460        .unregister_agent(&session.client_id, agent_id)
1461        .await?;
1462    Ok(serde_json::json!({"ok": true}))
1463}
1464
1465async fn handle_host_set_status(
1466    req: &JsonRpcMessage,
1467    session: &crate::session::ClientSession,
1468) -> Result<Value, String> {
1469    let request: SetHostAgentStatusRequest =
1470        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1471    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1472        .map_err(|e| e.to_string())
1473}
1474
1475async fn handle_host_notify(
1476    req: &JsonRpcMessage,
1477    session: &crate::session::ClientSession,
1478) -> Result<Value, String> {
1479    let kind = req
1480        .params
1481        .get("kind")
1482        .and_then(|v| v.as_str())
1483        .unwrap_or("host.notification");
1484    let agent_id = req
1485        .params
1486        .get("agent_id")
1487        .and_then(|v| v.as_str())
1488        .map(str::to_string);
1489    let message = req
1490        .params
1491        .get("message")
1492        .and_then(|v| v.as_str())
1493        .unwrap_or("");
1494    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1495    serde_json::to_value(
1496        session
1497            .host
1498            .record_event(kind, agent_id, message, payload)
1499            .await,
1500    )
1501    .map_err(|e| e.to_string())
1502}
1503
1504async fn handle_host_request_approval(
1505    req: &JsonRpcMessage,
1506    session: &crate::session::ClientSession,
1507) -> Result<Value, String> {
1508    let request: CreateHostApprovalRequest =
1509        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1510    if let Some(agent_id) = &request.agent_id {
1511        // Best-effort. If the caller doesn't own this agent the
1512        // ACL added 2026-05 will refuse the status update — that
1513        // is the correct semantics; we still want the approval row
1514        // itself to land so the UI can render the request.
1515        let _ = session
1516            .host
1517            .set_status(
1518                &session.client_id,
1519                SetHostAgentStatusRequest {
1520                    agent_id: agent_id.clone(),
1521                    status: HostAgentStatus::WaitingForApproval,
1522                    current_task: None,
1523                    message: Some("Waiting for approval".to_string()),
1524                    payload: Value::Null,
1525                },
1526            )
1527            .await;
1528    }
1529    // `system_level: true` opts the approval out of per-session
1530    // ownership. The host-side ACL then allows any authenticated
1531    // session (typically CarHost or `car-host approve`) to resolve.
1532    // Agents requesting user approval should always set this — the
1533    // session-owned mode is only correct when the requesting session
1534    // is also the resolving session, which approval-via-UI never is.
1535    let owner_client_id = if request.system_level {
1536        None
1537    } else {
1538        Some(session.client_id.as_str())
1539    };
1540    serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1541        .map_err(|e| e.to_string())
1542}
1543
1544async fn handle_host_resolve_approval(
1545    req: &JsonRpcMessage,
1546    session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548    let request: ResolveHostApprovalRequest =
1549        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1550    serde_json::to_value(
1551        session
1552            .host
1553            .resolve_approval(&session.client_id, request)
1554            .await?,
1555    )
1556    .map_err(|e| e.to_string())
1557}
1558
1559/// `session.auth` — present the per-launch token to unlock the
1560/// connection. When `state.auth_token` is unset, this method is a
1561/// no-op success (auth is disabled). When set, the supplied token
1562/// must equal it (constant-time comparison) — a successful auth
1563/// flips `session.authenticated` to `true` so subsequent methods
1564/// pass the gate. Wrong token returns an error AND leaves the
1565/// session unauthenticated; the dispatcher loop's gate then closes
1566/// the connection on the next non-auth method.
1567///
1568/// Closes Parslee-ai/car-releases#32.
1569async fn handle_session_auth(
1570    req: &JsonRpcMessage,
1571    session: &crate::session::ClientSession,
1572    state: &Arc<ServerState>,
1573) -> Result<Value, String> {
1574    let supplied = req
1575        .params
1576        .get("token")
1577        .and_then(Value::as_str)
1578        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1579    // #169: optional `agent_id` binds the WS connection to a
1580    // supervised lifecycle agent. When present, the supplied token
1581    // must equal the per-agent token the supervisor minted at upsert
1582    // (NOT the daemon-wide auth token). When absent, fall back to
1583    // the daemon-wide token — preserves the legacy unbound-token
1584    // path for browser/host/CLI clients.
1585    let agent_id = req
1586        .params
1587        .get("agent_id")
1588        .and_then(Value::as_str)
1589        .map(str::to_string);
1590
1591    if let Some(id) = agent_id {
1592        let supervisor = state.supervisor()?;
1593        if !supervisor.validate_agent_token(&id, supplied).await {
1594            return Err(format!(
1595                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1596            ));
1597        }
1598        // Single-claim: only one connection at a time per
1599        // agent_id. A second claim is rejected so the daemon-side
1600        // per-agent state stays unambiguous.
1601        {
1602            let mut attached = state.attached_agents.lock().await;
1603            if let Some(prior) = attached.get(&id) {
1604                if prior != &session.client_id {
1605                    return Err(format!(
1606                        "auth failed: agent_id `{id}` is already attached on \
1607                         another connection (client_id={prior})"
1608                    ));
1609                }
1610            }
1611            attached.insert(id.clone(), session.client_id.clone());
1612        }
1613        // #170: attach the daemon-owned persistent memgine for
1614        // this agent. Lazy-loaded on first connection per id from
1615        // `~/.car/memory/agents/<id>.jsonl`; retained across
1616        // disconnect so the next session sees the same state.
1617        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1618        *session.bound_memgine.lock().await = Some(agent_eng);
1619        *session.agent_id.lock().await = Some(id.clone());
1620        session
1621            .authenticated
1622            .store(true, std::sync::atomic::Ordering::Release);
1623        return Ok(serde_json::json!({
1624            "ok": true,
1625            "auth_enabled": true,
1626            "agent_id": id,
1627        }));
1628    }
1629
1630    let expected = match state.auth_token.get() {
1631        Some(t) => t,
1632        None => {
1633            // Auth disabled — accept any token politely so callers
1634            // that always include a session.auth handshake (e.g. the
1635            // FFI proxy) don't fail when the daemon happens to be
1636            // unauthed. Mark the session authenticated anyway so the
1637            // gate is a no-op below.
1638            session
1639                .authenticated
1640                .store(true, std::sync::atomic::Ordering::Release);
1641            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1642        }
1643    };
1644    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1645        return Err("auth failed: token mismatch".to_string());
1646    }
1647    session
1648        .authenticated
1649        .store(true, std::sync::atomic::Ordering::Release);
1650    Ok(serde_json::json!({
1651        "ok": true,
1652        "auth_enabled": true,
1653        "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1654    }))
1655}
1656
1657/// Length-checked constant-time byte comparison. Returns false when
1658/// lengths differ (so length itself is the only timing leak — fine
1659/// for our 43-char fixed-length tokens).
1660fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1661    if a.len() != b.len() {
1662        return false;
1663    }
1664    let mut diff: u8 = 0;
1665    for (x, y) in a.iter().zip(b.iter()) {
1666        diff |= x ^ y;
1667    }
1668    diff == 0
1669}
1670
1671/// Block dispatch of `method` until the user resolves the approval
1672/// raised on [`HostState`].
1673///
1674/// Called from the dispatcher loop only when
1675/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1676/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1677/// to the caller as JSON-RPC error code `-32003` with the supplied
1678/// reason. On timeout, the approval row stays in `Pending` so the
1679/// UI keeps a record of the unanswered request.
1680async fn gate_high_risk_method(
1681    method: &str,
1682    params: &Value,
1683    state: &Arc<ServerState>,
1684) -> Result<(), String> {
1685    let timeout = state.approval_gate.timeout;
1686    let req = CreateHostApprovalRequest {
1687        agent_id: None,
1688        action: format!("ws.method:{method}"),
1689        details: serde_json::json!({
1690            "method": method,
1691            // Truncate params for the UI — full payload is recoverable
1692            // via the request-time host event log if needed. The cap
1693            // keeps a malicious caller from drowning the UI in JSON.
1694            "params_preview": preview_params(params, 2_000),
1695        }),
1696        options: vec!["approve".to_string(), "deny".to_string()],
1697        // The high-risk-method gate is already system-level (it
1698        // passes None as the owner via request_and_wait_approval's
1699        // internal call). This field is informational here.
1700        system_level: true,
1701    };
1702    match state
1703        .host
1704        .request_and_wait_approval(req, "approve", timeout)
1705        .await
1706    {
1707        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1708        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1709            "{method} denied by user (approval gate, audit 2026-05). \
1710             To call this method without an interactive prompt, start \
1711             car-server with --no-approvals on a trusted machine."
1712        )),
1713        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1714            "{method} approval timed out after {}s with no resolution. \
1715             The approval is still visible in `host.approvals` for \
1716             forensics; resubmit the request to retry.",
1717            timeout.as_secs()
1718        )),
1719        Err(e) => Err(format!("approval gate error: {e}")),
1720    }
1721}
1722
1723fn preview_params(value: &Value, max_chars: usize) -> Value {
1724    let s = value.to_string();
1725    if s.len() <= max_chars {
1726        value.clone()
1727    } else {
1728        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1729    }
1730}
1731
1732async fn handle_session_init(
1733    req: &JsonRpcMessage,
1734    session: &crate::session::ClientSession,
1735) -> Result<Value, String> {
1736    let init: SessionInitRequest =
1737        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1738
1739    for tool in &init.tools {
1740        register_from_definition(&session.runtime, tool).await;
1741    }
1742
1743    let mut policy_count = 0;
1744    {
1745        let mut policies = session.runtime.policies.write().await;
1746        for policy_def in &init.policies {
1747            if let Some(check) = build_policy_check(policy_def) {
1748                policies.register(&policy_def.name, check, "");
1749                policy_count += 1;
1750            }
1751        }
1752    }
1753
1754    serde_json::to_value(SessionInitResponse {
1755        session_id: session.client_id.clone(),
1756        tools_registered: init.tools.len(),
1757        policies_registered: policy_count,
1758    })
1759    .map_err(|e| e.to_string())
1760}
1761
1762fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1763    match def.rule.as_str() {
1764        "deny_tool" => {
1765            let target = def.target.clone();
1766            Some(Box::new(
1767                move |action: &car_ir::Action, _: &car_state::StateStore| {
1768                    if action.tool.as_deref() == Some(&target) {
1769                        Some(format!("tool '{}' denied", target))
1770                    } else {
1771                        None
1772                    }
1773                },
1774            ))
1775        }
1776        "require_state" => {
1777            let key = def.key.clone();
1778            let value = def.value.clone();
1779            Some(Box::new(
1780                move |_: &car_ir::Action, state: &car_state::StateStore| {
1781                    if state.get(&key).as_ref() != Some(&value) {
1782                        Some(format!("state['{}'] must be {:?}", key, value))
1783                    } else {
1784                        None
1785                    }
1786                },
1787            ))
1788        }
1789        "deny_tool_param" => {
1790            let target = def.target.clone();
1791            let param = def.key.clone();
1792            let pattern = def.pattern.clone();
1793            Some(Box::new(
1794                move |action: &car_ir::Action, _: &car_state::StateStore| {
1795                    if action.tool.as_deref() != Some(&target) {
1796                        return None;
1797                    }
1798                    if let Some(val) = action.parameters.get(&param) {
1799                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1800                        if s.contains(&pattern) {
1801                            return Some(format!("param '{}' matches '{}'", param, pattern));
1802                        }
1803                    }
1804                    None
1805                },
1806            ))
1807        }
1808        _ => None,
1809    }
1810}
1811
1812async fn handle_tools_register(
1813    req: &JsonRpcMessage,
1814    session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816    let tools: Vec<ToolDefinition> =
1817        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818    for tool in &tools {
1819        register_from_definition(&session.runtime, tool).await;
1820    }
1821    Ok(Value::from(tools.len()))
1822}
1823
1824/// Bridge a wire-protocol `ToolDefinition` to the engine's
1825/// schema-aware registration. Carries the full ToolSchema shape
1826/// (description, parameters, returns, idempotency, caching, rate
1827/// limit) through to the validator. An empty `parameters` object is
1828/// the legacy schemaless registration — the validator no-ops for
1829/// those, so pre-v0.5.x callers see no change.
1830async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1831    runtime
1832        .register_tool_schema(car_ir::ToolSchema {
1833            name: def.name.clone(),
1834            description: def.description.clone(),
1835            parameters: def.parameters.clone(),
1836            returns: def.returns.clone(),
1837            idempotent: def.idempotent,
1838            cache_ttl_secs: def.cache_ttl_secs,
1839            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1840                max_calls: rl.max_calls,
1841                interval_secs: rl.interval_secs,
1842            }),
1843        })
1844        .await;
1845}
1846
1847async fn handle_proposal_submit(
1848    req: &JsonRpcMessage,
1849    session: &crate::session::ClientSession,
1850) -> Result<Value, String> {
1851    let submit: ProposalSubmitRequest =
1852        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1853    // `session_id` is sibling to `proposal` in the params object —
1854    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1855    // present, executes the proposal under the named session so any
1856    // session-scoped policies layer on top of global ones.
1857    // See docs/proposals/per-session-policy-scoping.md.
1858    let session_id = req
1859        .params
1860        .get("session_id")
1861        .and_then(|v| v.as_str())
1862        .map(str::to_string);
1863
1864    // `scope` is the optional per-execution caller / tenant surface
1865    // added in Parslee-ai/car#187 phase 3. Shape mirrors
1866    // `car_engine::RuntimeScope` ({ callerId, tenantId, claims });
1867    // serde's camelCase rename keeps the wire form consistent with
1868    // the rest of the JSON-RPC surface. When present, the proposal
1869    // routes through `execute_scoped*` so the runtime records the
1870    // identity on the event log and state R/W ops apply the tenant
1871    // prefix (#187 phase 3-B).
1872    let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1873        Some(v) if !v.is_null() => {
1874            Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1875        }
1876        _ => None,
1877    };
1878
1879    let result = match (session_id, scope) {
1880        // session_id + scope: no single combined entry point on
1881        // Runtime today. Scope takes precedence because it's the
1882        // tenant-isolation surface; per-session policies layer in
1883        // a follow-up when there's a real caller asking for both.
1884        (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1885        (Some(sid), None) => {
1886            session
1887                .runtime
1888                .execute_with_session(&submit.proposal, &sid)
1889                .await
1890        }
1891        (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1892        (None, None) => session.runtime.execute(&submit.proposal).await,
1893    };
1894    serde_json::to_value(result).map_err(|e| e.to_string())
1895}
1896
1897async fn handle_session_policy_open(
1898    session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900    let id = session.runtime.open_session().await;
1901    Ok(serde_json::json!({ "session_id": id }))
1902}
1903
1904async fn handle_session_policy_close(
1905    req: &JsonRpcMessage,
1906    session: &crate::session::ClientSession,
1907) -> Result<Value, String> {
1908    let sid = req
1909        .params
1910        .get("session_id")
1911        .and_then(|v| v.as_str())
1912        .ok_or("missing 'session_id'")?;
1913    let closed = session.runtime.close_session(sid).await;
1914    Ok(serde_json::json!({ "closed": closed }))
1915}
1916
1917/// `policy.register` — register one policy against this WebSocket
1918/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1919/// `session.init`. When `session_id` is present, the policy is scoped
1920/// to the named in-runtime session opened via `session.policy.open`;
1921/// otherwise it is global.
1922async fn handle_policy_register(
1923    req: &JsonRpcMessage,
1924    session: &crate::session::ClientSession,
1925) -> Result<Value, String> {
1926    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1927        .map_err(|e| format!("invalid policy params: {e}"))?;
1928    let session_id = req
1929        .params
1930        .get("session_id")
1931        .and_then(|v| v.as_str())
1932        .map(str::to_string);
1933    let check = build_policy_check(&def)
1934        .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1935    match session_id {
1936        Some(sid) => session
1937            .runtime
1938            .register_policy_in_session(&sid, &def.name, check, "")
1939            .await
1940            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1941        None => {
1942            let mut policies = session.runtime.policies.write().await;
1943            policies.register(&def.name, check, "");
1944            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1945        }
1946    }
1947}
1948
1949async fn handle_verify(
1950    req: &JsonRpcMessage,
1951    session: &crate::session::ClientSession,
1952) -> Result<Value, String> {
1953    let vr: VerifyRequest =
1954        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1955    // Verify against the full registered schemas so tool_call
1956    // parameters are checked for type mismatches + missing required
1957    // fields, not just tool existence (register_tool_schema's
1958    // documented contract; car-releases#56). The read guard is held
1959    // across the synchronous verify call.
1960    let tools_guard = session.runtime.tools.read().await;
1961    let result =
1962        car_verify::verify_with_schemas(&vr.proposal, Some(&vr.initial_state), Some(&tools_guard), 30);
1963    drop(tools_guard);
1964    serde_json::to_value(VerifyResponse {
1965        valid: result.valid,
1966        issues: result
1967            .issues
1968            .iter()
1969            .map(|i| VerifyIssueProto {
1970                action_id: i.action_id.clone(),
1971                severity: i.severity.clone(),
1972                message: i.message.clone(),
1973            })
1974            .collect(),
1975        simulated_state: result.simulated_state,
1976    })
1977    .map_err(|e| e.to_string())
1978}
1979
1980/// Parse the optional `tenant_id` sibling field from JSON-RPC
1981/// params (Parslee-ai/car#187 phase 3-E). When set and non-empty,
1982/// state R/W routes through `StateStore::scoped(tenant_id)` so
1983/// distinct tenants can't see each other's keys over the WS
1984/// surface — symmetric to the proposal.submit scope plumbing.
1985/// When absent / empty, the legacy unscoped namespace applies.
1986fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1987    req.params
1988        .get("tenant_id")
1989        .and_then(|v| v.as_str())
1990        .filter(|s| !s.is_empty())
1991        .map(str::to_string)
1992}
1993
1994async fn handle_state_get(
1995    req: &JsonRpcMessage,
1996    session: &crate::session::ClientSession,
1997) -> Result<Value, String> {
1998    let key = req
1999        .params
2000        .get("key")
2001        .and_then(|v| v.as_str())
2002        .ok_or("missing 'key'")?;
2003    let tenant = tenant_from_params(req);
2004    Ok(session
2005        .runtime
2006        .state
2007        .scoped(tenant.as_deref())
2008        .get(key)
2009        .unwrap_or(Value::Null))
2010}
2011
2012async fn handle_state_set(
2013    req: &JsonRpcMessage,
2014    session: &crate::session::ClientSession,
2015) -> Result<Value, String> {
2016    let key = req
2017        .params
2018        .get("key")
2019        .and_then(|v| v.as_str())
2020        .ok_or("missing 'key'")?;
2021    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2022    let tenant = tenant_from_params(req);
2023    session
2024        .runtime
2025        .state
2026        .scoped(tenant.as_deref())
2027        .set(key, value, "client");
2028    Ok(Value::from("ok"))
2029}
2030
2031/// `state.exists` — true if the key is set in this session's state
2032/// store, false otherwise. Cheaper than `state.get` + null-check on
2033/// the client side because it doesn't serialize the value.
2034async fn handle_state_exists(
2035    req: &JsonRpcMessage,
2036    session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038    let key = req
2039        .params
2040        .get("key")
2041        .and_then(|v| v.as_str())
2042        .ok_or("missing 'key'")?;
2043    let tenant = tenant_from_params(req);
2044    Ok(Value::Bool(
2045        session.runtime.state.scoped(tenant.as_deref()).exists(key),
2046    ))
2047}
2048
2049/// `state.keys` — list every key currently set in this session's
2050/// state store. Returns a JSON array of strings.
2051async fn handle_state_keys(
2052    req: &JsonRpcMessage,
2053    session: &crate::session::ClientSession,
2054) -> Result<Value, String> {
2055    let tenant = tenant_from_params(req);
2056    Ok(Value::Array(
2057        session
2058            .runtime
2059            .state
2060            .scoped(tenant.as_deref())
2061            .keys()
2062            .into_iter()
2063            .map(Value::String)
2064            .collect(),
2065    ))
2066}
2067
2068/// `state.snapshot` — return the entire session state store as a
2069/// JSON object (`{ key: value, ... }`). Equivalent to iterating
2070/// `state.keys` + `state.get` but in a single round-trip; for
2071/// inspectors/dashboards.
2072///
2073/// Tenant-scoped variant: when `tenant_id` is set, only that
2074/// tenant's keys are returned (prefix stripped on the way out).
2075/// `state.snapshot` with no `tenant_id` returns only unscoped
2076/// keys; consistent with `state.keys`'s filter behaviour and the
2077/// strict-isolation contract from phase 3-B.
2078async fn handle_state_snapshot(
2079    req: &JsonRpcMessage,
2080    session: &crate::session::ClientSession,
2081) -> Result<Value, String> {
2082    let tenant = tenant_from_params(req);
2083    let view = session.runtime.state.scoped(tenant.as_deref());
2084    let mut map = serde_json::Map::new();
2085    for key in view.keys() {
2086        if let Some(value) = view.get(&key) {
2087            map.insert(key, value);
2088        }
2089    }
2090    Ok(Value::Object(map))
2091}
2092
2093// --- Per-agent persistent memgine (#170) ---
2094
2095/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
2096/// Mirrors the existing `memory.persist` shape (flat JSON array of
2097/// fact objects) so the same loader path works.
2098fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2099    let base = car_ffi_common::memory_path::ensure_base()
2100        .map_err(|e| format!("memory base unavailable: {e}"))?;
2101    let dir = base.join("agents");
2102    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2103    Ok(dir.join(format!("{agent_id}.json")))
2104}
2105
2106/// Acquire (or lazy-create + load from disk) the daemon-owned
2107/// persistent memgine for `agent_id`. First call per id reads
2108/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
2109/// share the in-memory engine across sessions. Caller stores the
2110/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
2111/// handlers route through it via [`ClientSession::effective_memgine`].
2112async fn get_or_load_agent_memgine(
2113    state: &Arc<ServerState>,
2114    agent_id: &str,
2115) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2116    {
2117        let map = state.agent_memgines.lock().await;
2118        if let Some(eng) = map.get(agent_id) {
2119            return Ok(eng.clone());
2120        }
2121    }
2122    // Build a fresh engine and try to load from disk.
2123    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2124        None,
2125    )));
2126    let path = agent_memgine_snapshot_path(agent_id)?;
2127    if path.exists() {
2128        let content = std::fs::read_to_string(&path)
2129            .map_err(|e| format!("read {}: {}", path.display(), e))?;
2130        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2131        let mut g = engine.lock().await;
2132        let mut loaded: u32 = 0;
2133        for fact in &facts {
2134            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2135            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2136            let kind = fact
2137                .get("kind")
2138                .and_then(|v| v.as_str())
2139                .unwrap_or("pattern");
2140            let fid = format!("loaded-{loaded}");
2141            g.ingest_fact(
2142                &fid,
2143                subject,
2144                body,
2145                "user",
2146                "peer",
2147                chrono::Utc::now(),
2148                "global",
2149                None,
2150                vec![],
2151                kind == "constraint",
2152            );
2153            loaded += 1;
2154        }
2155    }
2156    let mut map = state.agent_memgines.lock().await;
2157    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2158    Ok(stored)
2159}
2160
2161/// Snapshot the agent's memgine to its disk file. Same on-wire shape
2162/// as `memory.persist` so manual snapshots and the daemon-owned
2163/// persistence stay interoperable.
2164async fn persist_agent_memgine(
2165    agent_id: &str,
2166    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2167) -> Result<(), String> {
2168    let path = agent_memgine_snapshot_path(agent_id)?;
2169    let g = engine.lock().await;
2170    let facts: Vec<Value> = g
2171        .graph
2172        .inner
2173        .node_indices()
2174        .filter_map(|nix| {
2175            let node = g.graph.inner.node_weight(nix)?;
2176            if !node.is_valid() {
2177                return None;
2178            }
2179            if node.kind == car_memgine::MemKind::Identity
2180                || node.kind == car_memgine::MemKind::Environment
2181            {
2182                return None;
2183            }
2184            Some(serde_json::json!({
2185                "subject": node.key,
2186                "body": node.value,
2187                "kind": match node.kind {
2188                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2189                    car_memgine::MemKind::Conversation => "outcome",
2190                    _ => "pattern",
2191                },
2192                "confidence": 0.5,
2193                "content_type": node.content_type.as_label(),
2194            }))
2195        })
2196        .collect();
2197    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2198    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2199    Ok(())
2200}
2201
2202// --- Memory handlers ---
2203
2204/// `memory.fact_count` — return `valid_fact_count()` of the
2205/// session's memgine. Used by FFI bindings to mirror their
2206/// embedded `fact_count()` accessor without round-tripping a full
2207/// query. No params.
2208async fn handle_memory_fact_count(
2209    session: &crate::session::ClientSession,
2210) -> Result<Value, String> {
2211    let engine_arc = session.effective_memgine().await;
2212    let engine = engine_arc.lock().await;
2213    Ok(Value::from(engine.valid_fact_count()))
2214}
2215
2216async fn handle_memory_add_fact(
2217    req: &JsonRpcMessage,
2218    session: &crate::session::ClientSession,
2219) -> Result<Value, String> {
2220    let subject = req
2221        .params
2222        .get("subject")
2223        .and_then(|v| v.as_str())
2224        .ok_or("missing subject")?;
2225    let body = req
2226        .params
2227        .get("body")
2228        .and_then(|v| v.as_str())
2229        .ok_or("missing body")?;
2230    let kind = req
2231        .params
2232        .get("kind")
2233        .and_then(|v| v.as_str())
2234        .unwrap_or("pattern");
2235    // Route through `effective_memgine` so connections bound to a
2236    // lifecycle agent (#169) write into the daemon-owned per-agent
2237    // memgine instead of the per-WS ephemeral one (#170).
2238    let engine_arc = session.effective_memgine().await;
2239    let count = {
2240        let mut engine = engine_arc.lock().await;
2241        let fid = format!("ws-{}", engine.valid_fact_count());
2242        engine.ingest_fact(
2243            &fid,
2244            subject,
2245            body,
2246            "user",
2247            "peer",
2248            chrono::Utc::now(),
2249            "global",
2250            None,
2251            vec![],
2252            kind == "constraint",
2253        );
2254        engine.valid_fact_count()
2255    };
2256    // Persist after every add when the session is bound to a
2257    // supervised agent. Synchronous write — small JSON snapshot.
2258    if let Some(id) = session.agent_id.lock().await.clone() {
2259        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2260            tracing::warn!(agent_id = %id, error = %e,
2261                "agent memgine persist failed; in-memory state is canonical");
2262        }
2263    }
2264    Ok(Value::from(count))
2265}
2266
2267async fn handle_memory_query(
2268    req: &JsonRpcMessage,
2269    session: &crate::session::ClientSession,
2270) -> Result<Value, String> {
2271    let query = req
2272        .params
2273        .get("query")
2274        .and_then(|v| v.as_str())
2275        .ok_or("missing query")?;
2276    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2277    let engine_arc = session.effective_memgine().await;
2278    let engine = engine_arc.lock().await;
2279    let seeds = engine.graph.find_seeds(query, 5);
2280    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
2281    // both use Personalized PageRank so transport choice doesn't shift
2282    // ranking semantics. Result shape (subject/body/kind/confidence)
2283    // also mirrors NAPI for the same reason.
2284    let hits = if !seeds.is_empty() {
2285        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2286    } else {
2287        vec![]
2288    };
2289    let results: Vec<Value> = hits
2290        .iter()
2291        .filter_map(|hit| {
2292            let node = engine.graph.inner.node_weight(hit.node_ix)?;
2293            Some(serde_json::json!({
2294                "subject": node.key,
2295                "body": node.value,
2296                "kind": format!("{:?}", node.kind).to_lowercase(),
2297                "confidence": hit.activation,
2298            }))
2299        })
2300        .collect();
2301    serde_json::to_value(results).map_err(|e| e.to_string())
2302}
2303
2304async fn handle_memory_build_context(
2305    req: &JsonRpcMessage,
2306    session: &crate::session::ClientSession,
2307) -> Result<Value, String> {
2308    let query = req
2309        .params
2310        .get("query")
2311        .and_then(|v| v.as_str())
2312        .unwrap_or("");
2313    // FFI parity with NAPI `build_context(query, model_context_window)`.
2314    // When supplied, sizes the assembly budget against the model's window
2315    // instead of the fixed 8K default.
2316    let model_context_window = req
2317        .params
2318        .get("model_context_window")
2319        .and_then(|v| v.as_u64())
2320        .map(|w| w as usize);
2321    let mut engine = session.memgine.lock().await;
2322    Ok(Value::from(
2323        engine.build_context_for_model(query, model_context_window),
2324    ))
2325}
2326
2327/// `memory.build_context_fast` — Fast-mode context assembly for
2328/// latency-sensitive paths (voice, real-time). Skips embedding flush,
2329/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
2330/// extraction. Keeps identity, constraints, facts (creation order),
2331/// conversation, environment.
2332async fn handle_memory_build_context_fast(
2333    req: &JsonRpcMessage,
2334    session: &crate::session::ClientSession,
2335) -> Result<Value, String> {
2336    let query = req
2337        .params
2338        .get("query")
2339        .and_then(|v| v.as_str())
2340        .unwrap_or("");
2341    let model_context_window = req
2342        .params
2343        .get("model_context_window")
2344        .and_then(|v| v.as_u64())
2345        .map(|w| w as usize);
2346    let mut engine = session.memgine.lock().await;
2347    Ok(Value::from(engine.build_context_with_options(
2348        query,
2349        model_context_window,
2350        car_memgine::ContextMode::Fast,
2351        None,
2352    )))
2353}
2354
2355/// `memory.persist` — write the session's memgine to a JSON file
2356/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
2357/// so daemon-mode clients can drive checkpoint/restore symmetrically
2358/// with embedded mode. Returns the number of facts written.
2359///
2360/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
2361/// not the caller's. Since the 2026-05 audit, `path` is also
2362/// sandboxed under `~/.car/memory/` via
2363/// [`car_ffi_common::memory_path::resolve`] — relative paths land
2364/// under the base, absolute paths must already be under the base,
2365/// `..` segments are rejected, symlinks pointing out are rejected.
2366/// Pre-2026-05 the path was passed straight to `std::fs::write` and
2367/// became an arbitrary file-write primitive. The base64-blob escape
2368/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
2369/// same resolver when it lands.
2370async fn handle_memory_persist(
2371    req: &JsonRpcMessage,
2372    session: &crate::session::ClientSession,
2373) -> Result<Value, String> {
2374    let path = req
2375        .params
2376        .get("path")
2377        .and_then(|v| v.as_str())
2378        .ok_or("missing path")?;
2379    let resolved = car_ffi_common::memory_path::resolve(path)
2380        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2381    let engine = session.memgine.lock().await;
2382    let facts: Vec<Value> = engine
2383        .graph
2384        .inner
2385        .node_indices()
2386        .filter_map(|nix| {
2387            let node = engine.graph.inner.node_weight(nix)?;
2388            if !node.is_valid() {
2389                return None;
2390            }
2391            if node.kind == car_memgine::MemKind::Identity
2392                || node.kind == car_memgine::MemKind::Environment
2393            {
2394                return None;
2395            }
2396            Some(serde_json::json!({
2397                "subject": node.key,
2398                "body": node.value,
2399                "kind": match node.kind {
2400                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2401                    car_memgine::MemKind::Conversation => "outcome",
2402                    _ => "pattern",
2403                },
2404                "confidence": 0.5,
2405                "content_type": node.content_type.as_label(),
2406            }))
2407        })
2408        .collect();
2409    let count = facts.len();
2410    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2411    std::fs::write(&resolved, json)
2412        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2413    Ok(Value::from(count as u64))
2414}
2415
2416/// `memory.load` — replace the session's memgine with facts from the
2417/// JSON file at `path`. Mirrors NAPI `load_memory`
2418/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
2419/// as `memory.persist` since the 2026-05 audit — relative paths
2420/// land under the base, anything that escapes is rejected.
2421async fn handle_memory_load(
2422    req: &JsonRpcMessage,
2423    session: &crate::session::ClientSession,
2424) -> Result<Value, String> {
2425    let path = req
2426        .params
2427        .get("path")
2428        .and_then(|v| v.as_str())
2429        .ok_or("missing path")?;
2430    let resolved = car_ffi_common::memory_path::resolve(path)
2431        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2432    let content = std::fs::read_to_string(&resolved)
2433        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2434    let facts: Vec<Value> =
2435        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2436    let mut engine = session.memgine.lock().await;
2437    engine.reset();
2438    let mut count: u32 = 0;
2439    for fact in &facts {
2440        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2441        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2442        let kind = fact
2443            .get("kind")
2444            .and_then(|v| v.as_str())
2445            .unwrap_or("pattern");
2446        let fid = format!("loaded-{}", count);
2447        engine.ingest_fact(
2448            &fid,
2449            subject,
2450            body,
2451            "user",
2452            "peer",
2453            chrono::Utc::now(),
2454            "global",
2455            None,
2456            vec![],
2457            kind == "constraint",
2458        );
2459        count += 1;
2460    }
2461    Ok(Value::from(count))
2462}
2463
2464// --- Skill handlers ---
2465
2466async fn handle_skill_ingest(
2467    req: &JsonRpcMessage,
2468    session: &crate::session::ClientSession,
2469) -> Result<Value, String> {
2470    let name = req
2471        .params
2472        .get("name")
2473        .and_then(|v| v.as_str())
2474        .ok_or("missing name")?;
2475    let code = req
2476        .params
2477        .get("code")
2478        .and_then(|v| v.as_str())
2479        .ok_or("missing code")?;
2480    let platform = req
2481        .params
2482        .get("platform")
2483        .and_then(|v| v.as_str())
2484        .unwrap_or("unknown");
2485    let persona = req
2486        .params
2487        .get("persona")
2488        .and_then(|v| v.as_str())
2489        .unwrap_or("");
2490    let url_pattern = req
2491        .params
2492        .get("url_pattern")
2493        .and_then(|v| v.as_str())
2494        .unwrap_or("");
2495    let description = req
2496        .params
2497        .get("description")
2498        .and_then(|v| v.as_str())
2499        .unwrap_or("");
2500    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2501    let keywords: Vec<String> = req
2502        .params
2503        .get("task_keywords")
2504        .and_then(|v| v.as_array())
2505        .map(|arr| {
2506            arr.iter()
2507                .filter_map(|v| v.as_str().map(String::from))
2508                .collect()
2509        })
2510        .unwrap_or_default();
2511
2512    let trigger = car_memgine::SkillTrigger {
2513        persona: persona.into(),
2514        url_pattern: url_pattern.into(),
2515        task_keywords: keywords,
2516        structured: None,
2517    };
2518    let mut engine = session.memgine.lock().await;
2519    let node = engine.ingest_skill(
2520        name,
2521        code,
2522        platform,
2523        trigger,
2524        description,
2525        supersedes,
2526        vec![],
2527        vec![],
2528    );
2529    Ok(Value::from(node.index() as u64))
2530}
2531
2532async fn handle_skill_find(
2533    req: &JsonRpcMessage,
2534    session: &crate::session::ClientSession,
2535) -> Result<Value, String> {
2536    let persona = req
2537        .params
2538        .get("persona")
2539        .and_then(|v| v.as_str())
2540        .unwrap_or("");
2541    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2542    let task = req
2543        .params
2544        .get("task")
2545        .and_then(|v| v.as_str())
2546        .unwrap_or("");
2547    let max = req
2548        .params
2549        .get("max_results")
2550        .and_then(|v| v.as_u64())
2551        .unwrap_or(1) as usize;
2552    let engine = session.memgine.lock().await;
2553    let results = engine.find_skill(persona, url, task, max);
2554    let json: Vec<Value> = results
2555        .iter()
2556        .map(|(m, s)| {
2557            serde_json::json!({
2558                "name": m.name, "code": m.code, "platform": m.platform,
2559                "description": m.description, "stats": m.stats, "match_score": s,
2560            })
2561        })
2562        .collect();
2563    serde_json::to_value(json).map_err(|e| e.to_string())
2564}
2565
2566async fn handle_skill_report(
2567    req: &JsonRpcMessage,
2568    session: &crate::session::ClientSession,
2569) -> Result<Value, String> {
2570    let name = req
2571        .params
2572        .get("skill_name")
2573        .and_then(|v| v.as_str())
2574        .ok_or("missing skill_name")?;
2575    let outcome_str = req
2576        .params
2577        .get("outcome")
2578        .and_then(|v| v.as_str())
2579        .ok_or("missing outcome")?;
2580    let outcome = match outcome_str {
2581        "success" => car_memgine::SkillOutcome::Success,
2582        _ => car_memgine::SkillOutcome::Fail,
2583    };
2584    let mut engine = session.memgine.lock().await;
2585    let stats = engine
2586        .report_outcome(name, outcome)
2587        .ok_or(format!("skill '{}' not found", name))?;
2588    serde_json::to_value(stats).map_err(|e| e.to_string())
2589}
2590
2591// ---------------------------------------------------------------------------
2592// Multi-agent coordination handlers
2593//
2594// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2595// The client runs the model loop and responds with AgentOutput JSON.
2596// ---------------------------------------------------------------------------
2597
2598/// AgentRunner backed by WebSocket callback to the client.
2599struct WsAgentRunner {
2600    channel: Arc<WsChannel>,
2601    host: Arc<crate::host::HostState>,
2602    client_id: String,
2603}
2604
2605#[async_trait::async_trait]
2606impl car_multi::AgentRunner for WsAgentRunner {
2607    async fn run(
2608        &self,
2609        spec: &car_multi::AgentSpec,
2610        task: &str,
2611        _runtime: &car_engine::Runtime,
2612        _mailbox: &car_multi::Mailbox,
2613    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2614        use futures::SinkExt;
2615
2616        let request_id = self.channel.next_request_id();
2617        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2618        let agent = self
2619            .host
2620            .register_agent(
2621                &self.client_id,
2622                RegisterHostAgentRequest {
2623                    id: Some(agent_id.clone()),
2624                    name: spec.name.clone(),
2625                    kind: "callback".to_string(),
2626                    capabilities: spec.tools.clone(),
2627                    project: spec
2628                        .metadata
2629                        .get("project")
2630                        .and_then(|v| v.as_str())
2631                        .map(str::to_string),
2632                    pid: None,
2633                    display: serde_json::from_value(
2634                        spec.metadata
2635                            .get("display")
2636                            .cloned()
2637                            .unwrap_or(serde_json::Value::Null),
2638                    )
2639                    .unwrap_or_default(),
2640                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2641                },
2642            )
2643            .await
2644            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2645        let _ = self
2646            .host
2647            .set_status(
2648                &self.client_id,
2649                SetHostAgentStatusRequest {
2650                    agent_id: agent.id.clone(),
2651                    status: HostAgentStatus::Running,
2652                    current_task: Some(task.to_string()),
2653                    message: Some(format!("{} started", spec.name)),
2654                    payload: serde_json::json!({ "task": task }),
2655                },
2656            )
2657            .await;
2658
2659        let rpc_request = serde_json::json!({
2660            "jsonrpc": "2.0",
2661            "method": "multi.run_agent",
2662            "params": {
2663                "spec": spec,
2664                "task": task,
2665            },
2666            "id": request_id,
2667        });
2668
2669        // Create oneshot channel for the response
2670        let (tx, rx) = tokio::sync::oneshot::channel();
2671        self.channel
2672            .pending
2673            .lock()
2674            .await
2675            .insert(request_id.clone(), tx);
2676
2677        let msg = Message::Text(
2678            serde_json::to_string(&rpc_request)
2679                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2680                .into(),
2681        );
2682        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2683            let _ = self
2684                .host
2685                .set_status(
2686                    &self.client_id,
2687                    SetHostAgentStatusRequest {
2688                        agent_id: agent_id.clone(),
2689                        status: HostAgentStatus::Errored,
2690                        current_task: None,
2691                        message: Some(format!("{} failed to start", spec.name)),
2692                        payload: serde_json::json!({ "error": e.to_string() }),
2693                    },
2694                )
2695                .await;
2696            return Err(car_multi::MultiError::AgentFailed(
2697                spec.name.clone(),
2698                format!("ws send error: {}", e),
2699            ));
2700        }
2701
2702        // Wait for client response (5 min timeout for model loops)
2703        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2704            Ok(Ok(response)) => response,
2705            Ok(Err(_)) => {
2706                let _ = self
2707                    .host
2708                    .set_status(
2709                        &self.client_id,
2710                        SetHostAgentStatusRequest {
2711                            agent_id: agent_id.clone(),
2712                            status: HostAgentStatus::Errored,
2713                            current_task: None,
2714                            message: Some(format!("{} callback channel closed", spec.name)),
2715                            payload: Value::Null,
2716                        },
2717                    )
2718                    .await;
2719                return Err(car_multi::MultiError::AgentFailed(
2720                    spec.name.clone(),
2721                    "agent callback channel closed".into(),
2722                ));
2723            }
2724            Err(_) => {
2725                let _ = self
2726                    .host
2727                    .set_status(
2728                        &self.client_id,
2729                        SetHostAgentStatusRequest {
2730                            agent_id: agent_id.clone(),
2731                            status: HostAgentStatus::Errored,
2732                            current_task: None,
2733                            message: Some(format!("{} timed out", spec.name)),
2734                            payload: Value::Null,
2735                        },
2736                    )
2737                    .await;
2738                return Err(car_multi::MultiError::AgentFailed(
2739                    spec.name.clone(),
2740                    "agent callback timed out (300s)".into(),
2741                ));
2742            }
2743        };
2744
2745        if let Some(err) = response.error {
2746            let _ = self
2747                .host
2748                .set_status(
2749                    &self.client_id,
2750                    SetHostAgentStatusRequest {
2751                        agent_id: agent_id.clone(),
2752                        status: HostAgentStatus::Errored,
2753                        current_task: None,
2754                        message: Some(format!("{} errored", spec.name)),
2755                        payload: serde_json::json!({ "error": err }),
2756                    },
2757                )
2758                .await;
2759            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2760        }
2761
2762        let output_value = response.output.unwrap_or(Value::Null);
2763        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2764            car_multi::MultiError::AgentFailed(
2765                spec.name.clone(),
2766                format!("invalid AgentOutput: {}", e),
2767            )
2768        })?;
2769        let status = if output.error.is_some() {
2770            HostAgentStatus::Errored
2771        } else {
2772            HostAgentStatus::Completed
2773        };
2774        let message = if output.error.is_some() {
2775            format!("{} errored", spec.name)
2776        } else {
2777            format!("{} completed", spec.name)
2778        };
2779        let _ = self
2780            .host
2781            .set_status(
2782                &self.client_id,
2783                SetHostAgentStatusRequest {
2784                    agent_id,
2785                    status,
2786                    current_task: None,
2787                    message: Some(message),
2788                    payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2789                },
2790            )
2791            .await;
2792
2793        Ok(output)
2794    }
2795}
2796
2797fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2798    let safe_name: String = name
2799        .chars()
2800        .map(|c| {
2801            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2802                c
2803            } else {
2804                '-'
2805            }
2806        })
2807        .collect();
2808    format!("{}:{}:{}", client_id, safe_name, request_id)
2809}
2810
2811async fn handle_multi_swarm(
2812    req: &JsonRpcMessage,
2813    session: &crate::session::ClientSession,
2814) -> Result<Value, String> {
2815    let mode_str = req
2816        .params
2817        .get("mode")
2818        .and_then(|v| v.as_str())
2819        .ok_or("missing 'mode'")?;
2820    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2821    let task = req
2822        .params
2823        .get("task")
2824        .and_then(|v| v.as_str())
2825        .ok_or("missing 'task'")?;
2826
2827    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2828        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2829    let agent_specs: Vec<car_multi::AgentSpec> =
2830        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2831    let synth: Option<car_multi::AgentSpec> = req
2832        .params
2833        .get("synthesizer")
2834        .map(|v| serde_json::from_value(v.clone()))
2835        .transpose()
2836        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2837
2838    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2839        channel: session.channel.clone(),
2840        host: session.host.clone(),
2841        client_id: session.client_id.clone(),
2842    });
2843    let infra = car_multi::SharedInfra::new();
2844
2845    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2846    if let Some(s) = synth {
2847        swarm = swarm.with_synthesizer(s);
2848    }
2849
2850    let result = swarm
2851        .run(task, &runner, &infra)
2852        .await
2853        .map_err(|e| format!("swarm error: {}", e))?;
2854    serde_json::to_value(result).map_err(|e| e.to_string())
2855}
2856
2857async fn handle_multi_pipeline(
2858    req: &JsonRpcMessage,
2859    session: &crate::session::ClientSession,
2860) -> Result<Value, String> {
2861    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2862    let task = req
2863        .params
2864        .get("task")
2865        .and_then(|v| v.as_str())
2866        .ok_or("missing 'task'")?;
2867
2868    let stage_specs: Vec<car_multi::AgentSpec> =
2869        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2870
2871    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2872        channel: session.channel.clone(),
2873        host: session.host.clone(),
2874        client_id: session.client_id.clone(),
2875    });
2876    let infra = car_multi::SharedInfra::new();
2877
2878    let result = car_multi::Pipeline::new(stage_specs)
2879        .run(task, &runner, &infra)
2880        .await
2881        .map_err(|e| format!("pipeline error: {}", e))?;
2882    serde_json::to_value(result).map_err(|e| e.to_string())
2883}
2884
2885async fn handle_multi_supervisor(
2886    req: &JsonRpcMessage,
2887    session: &crate::session::ClientSession,
2888) -> Result<Value, String> {
2889    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2890    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2891    let task = req
2892        .params
2893        .get("task")
2894        .and_then(|v| v.as_str())
2895        .ok_or("missing 'task'")?;
2896    let max_rounds = req
2897        .params
2898        .get("max_rounds")
2899        .and_then(|v| v.as_u64())
2900        .unwrap_or(3) as u32;
2901
2902    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2903        .map_err(|e| format!("invalid workers: {}", e))?;
2904    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2905        .map_err(|e| format!("invalid supervisor: {}", e))?;
2906
2907    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2908        channel: session.channel.clone(),
2909        host: session.host.clone(),
2910        client_id: session.client_id.clone(),
2911    });
2912    let infra = car_multi::SharedInfra::new();
2913
2914    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2915        .with_max_rounds(max_rounds)
2916        .run(task, &runner, &infra)
2917        .await
2918        .map_err(|e| format!("supervisor error: {}", e))?;
2919    serde_json::to_value(result).map_err(|e| e.to_string())
2920}
2921
2922async fn handle_multi_map_reduce(
2923    req: &JsonRpcMessage,
2924    session: &crate::session::ClientSession,
2925) -> Result<Value, String> {
2926    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2927    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2928    let task = req
2929        .params
2930        .get("task")
2931        .and_then(|v| v.as_str())
2932        .ok_or("missing 'task'")?;
2933    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2934
2935    let mapper_spec: car_multi::AgentSpec =
2936        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2937    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2938        .map_err(|e| format!("invalid reducer: {}", e))?;
2939    let items: Vec<String> =
2940        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2941
2942    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2943        channel: session.channel.clone(),
2944        host: session.host.clone(),
2945        client_id: session.client_id.clone(),
2946    });
2947    let infra = car_multi::SharedInfra::new();
2948
2949    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2950        .run(task, &items, &runner, &infra)
2951        .await
2952        .map_err(|e| format!("map_reduce error: {}", e))?;
2953    serde_json::to_value(result).map_err(|e| e.to_string())
2954}
2955
2956async fn handle_multi_vote(
2957    req: &JsonRpcMessage,
2958    session: &crate::session::ClientSession,
2959) -> Result<Value, String> {
2960    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2961    let task = req
2962        .params
2963        .get("task")
2964        .and_then(|v| v.as_str())
2965        .ok_or("missing 'task'")?;
2966
2967    let agent_specs: Vec<car_multi::AgentSpec> =
2968        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2969    let synth: Option<car_multi::AgentSpec> = req
2970        .params
2971        .get("synthesizer")
2972        .map(|v| serde_json::from_value(v.clone()))
2973        .transpose()
2974        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2975
2976    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2977        channel: session.channel.clone(),
2978        host: session.host.clone(),
2979        client_id: session.client_id.clone(),
2980    });
2981    let infra = car_multi::SharedInfra::new();
2982
2983    let mut vote = car_multi::Vote::new(agent_specs);
2984    if let Some(s) = synth {
2985        vote = vote.with_synthesizer(s);
2986    }
2987
2988    let result = vote
2989        .run(task, &runner, &infra)
2990        .await
2991        .map_err(|e| format!("vote error: {}", e))?;
2992    serde_json::to_value(result).map_err(|e| e.to_string())
2993}
2994
2995// ---------------------------------------------------------------------------
2996// Scheduler handlers
2997// ---------------------------------------------------------------------------
2998
2999fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
3000    let name = req
3001        .params
3002        .get("name")
3003        .and_then(|v| v.as_str())
3004        .ok_or("scheduler.create requires 'name'")?;
3005    let prompt = req
3006        .params
3007        .get("prompt")
3008        .and_then(|v| v.as_str())
3009        .ok_or("scheduler.create requires 'prompt'")?;
3010
3011    let mut task = car_scheduler::Task::new(name, prompt);
3012
3013    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3014        let trigger = match t {
3015            "once" => car_scheduler::TaskTrigger::Once,
3016            "cron" => car_scheduler::TaskTrigger::Cron,
3017            "interval" => car_scheduler::TaskTrigger::Interval,
3018            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3019            _ => car_scheduler::TaskTrigger::Manual,
3020        };
3021        let schedule = req
3022            .params
3023            .get("schedule")
3024            .and_then(|v| v.as_str())
3025            .unwrap_or("");
3026        task = task.with_trigger(trigger, schedule);
3027    }
3028
3029    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3030        task = task.with_system_prompt(sp);
3031    }
3032
3033    serde_json::to_value(&task).map_err(|e| e.to_string())
3034}
3035
3036async fn handle_scheduler_run(
3037    req: &JsonRpcMessage,
3038    session: &crate::session::ClientSession,
3039) -> Result<Value, String> {
3040    let task_val = req
3041        .params
3042        .get("task")
3043        .ok_or("scheduler.run requires 'task'")?;
3044    let mut task: car_scheduler::Task =
3045        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3046
3047    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3048        channel: session.channel.clone(),
3049        host: session.host.clone(),
3050        client_id: session.client_id.clone(),
3051    });
3052    let executor = car_scheduler::Executor::new(runner);
3053    let execution = executor.run_once(&mut task).await;
3054
3055    serde_json::to_value(&execution).map_err(|e| e.to_string())
3056}
3057
3058async fn handle_scheduler_run_loop(
3059    req: &JsonRpcMessage,
3060    session: &crate::session::ClientSession,
3061) -> Result<Value, String> {
3062    let task_val = req
3063        .params
3064        .get("task")
3065        .ok_or("scheduler.run_loop requires 'task'")?;
3066    let mut task: car_scheduler::Task =
3067        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3068    let max_iterations = req
3069        .params
3070        .get("max_iterations")
3071        .and_then(|v| v.as_u64())
3072        .map(|v| v as u32);
3073
3074    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3075        channel: session.channel.clone(),
3076        host: session.host.clone(),
3077        client_id: session.client_id.clone(),
3078    });
3079    let executor = car_scheduler::Executor::new(runner);
3080    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3081    let executions = executor
3082        .run_loop(&mut task, max_iterations, cancel_rx)
3083        .await;
3084
3085    serde_json::to_value(&executions).map_err(|e| e.to_string())
3086}
3087
3088// ---------------------------------------------------------------------------
3089// Inference handlers
3090// ---------------------------------------------------------------------------
3091
3092fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3093    state.inference.get_or_init(|| {
3094        Arc::new(car_inference::InferenceEngine::new(
3095            car_inference::InferenceConfig::default(),
3096        ))
3097    })
3098}
3099
3100async fn handle_infer(
3101    msg: &JsonRpcMessage,
3102    state: &ServerState,
3103    session: &crate::session::ClientSession,
3104) -> Result<Value, String> {
3105    let engine = get_inference_engine(state);
3106    let mut req: car_inference::GenerateRequest =
3107        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3108
3109    // If context_query is provided, build context from memgine and inject it
3110    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3111        let mut memgine = session.memgine.lock().await;
3112        let ctx = memgine.build_context(cq);
3113        if !ctx.is_empty() {
3114            req.context = Some(ctx);
3115        }
3116    }
3117
3118    // Process-wide admission gate. Held for the duration of the
3119    // generation so a burst of concurrent infer RPCs can't multiply
3120    // KV-cache + activation memory and take the host out. The
3121    // `_permit` binding is intentional — its `Drop` releases the slot
3122    // when this future returns.
3123    let _permit = state.admission.acquire().await;
3124
3125    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
3126    // latency_ms are preserved in the response. Plain `generate()` discards
3127    // everything except `.text`, which silently breaks tool-use over the
3128    // WebSocket protocol (issue #43).
3129    //
3130    // NOTE: This directly serializes `InferenceResult`. Any field added to
3131    // that struct in `car-inference` becomes part of the public WebSocket
3132    // protocol. The shape is locked by `inference_result_serializes_*` tests
3133    // in car-inference; updating those tests is part of intentionally
3134    // changing the wire contract.
3135    let result = engine
3136        .generate_tracked(req)
3137        .await
3138        .map_err(|e| e.to_string())?;
3139    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3140}
3141
3142/// Streaming inference — mirrors NAPI `inferStream`. Closes
3143/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
3144/// `infer`; emits `inference.stream.event` JSON-RPC notifications
3145/// during the run, and returns the final `InferenceResult` as the
3146/// JSON-RPC response when the stream completes.
3147///
3148/// Notification shape (server → client):
3149/// ```jsonc
3150/// {
3151///   "jsonrpc": "2.0",
3152///   "method": "inference.stream.event",
3153///   "params": {
3154///     "request_id": "<original RPC id>",
3155///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
3156///   }
3157/// }
3158/// ```
3159///
3160/// The final `done` event is not pushed as a notification — it's
3161/// the JSON-RPC response with the accumulated `InferenceResult`.
3162/// `video.generate` — daemon-side wrapper for
3163/// `InferenceEngine::generate_video`. Mirrors `handle_infer`'s
3164/// admission gate + JSON request shape (Parslee-ai/car#185).
3165///
3166/// Previously the CLI's `cmd_video` constructed an in-process
3167/// engine and called `generate_video` directly — a v0.7 holdover
3168/// that bypassed the daemon. With this handler the CLI proxies
3169/// here, so the engine-level audio_passthrough gate fires
3170/// inside the daemon process where all FFI surfaces converge.
3171async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3172    let engine = get_inference_engine(state);
3173    let req: car_inference::GenerateImageRequest =
3174        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3175    // Share the same admission gate as text/video generation — a burst
3176    // of image requests shouldn't smuggle around the concurrency cap.
3177    let _permit = state.admission.acquire().await;
3178    let result = engine
3179        .generate_image(req)
3180        .await
3181        .map_err(|e| e.to_string())?;
3182    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3183}
3184
3185async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3186    let engine = get_inference_engine(state);
3187    let req: car_inference::GenerateVideoRequest =
3188        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3189    let _permit = state.admission.acquire().await;
3190    let result = engine
3191        .generate_video(req)
3192        .await
3193        .map_err(|e| e.to_string())?;
3194    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3195}
3196
3197async fn handle_infer_stream(
3198    msg: &JsonRpcMessage,
3199    session: &crate::session::ClientSession,
3200    state: &ServerState,
3201) -> Result<Value, String> {
3202    use futures::SinkExt;
3203    use tokio_tungstenite::tungstenite::Message;
3204
3205    let engine = get_inference_engine(state);
3206    let mut req: car_inference::GenerateRequest =
3207        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3208
3209    // Same context-injection convenience as non-streaming `infer` so
3210    // the two methods have parity on the call shape.
3211    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3212        let mut memgine = session.memgine.lock().await;
3213        let ctx = memgine.build_context(cq);
3214        if !ctx.is_empty() {
3215            req.context = Some(ctx);
3216        }
3217    }
3218
3219    let _permit = state.admission.acquire().await;
3220    let mut rx = engine
3221        .generate_tracked_stream(req)
3222        .await
3223        .map_err(|e| e.to_string())?;
3224
3225    let mut accumulator = car_inference::StreamAccumulator::default();
3226    let request_id = msg.id.clone();
3227
3228    while let Some(event) = rx.recv().await {
3229        let event_payload = match &event {
3230            car_inference::StreamEvent::TextDelta(text) => {
3231                serde_json::json!({"type": "text", "data": text})
3232            }
3233            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3234                serde_json::json!({"type": "tool_start", "name": name, "index": index})
3235            }
3236            car_inference::StreamEvent::ToolCallDelta {
3237                index,
3238                arguments_delta,
3239            } => serde_json::json!({
3240                "type": "tool_delta",
3241                "index": index,
3242                "data": arguments_delta,
3243            }),
3244            car_inference::StreamEvent::Usage {
3245                input_tokens,
3246                output_tokens,
3247            } => serde_json::json!({
3248                "type": "usage",
3249                "input_tokens": input_tokens,
3250                "output_tokens": output_tokens,
3251            }),
3252            // Done is delivered as the JSON-RPC response, not a
3253            // notification — matches the NAPI contract where the
3254            // standalone function's return value is the accumulated
3255            // result and the callback only sees in-progress events.
3256            car_inference::StreamEvent::Done { .. } => {
3257                accumulator.push(&event);
3258                continue;
3259            }
3260        };
3261
3262        let notif = serde_json::json!({
3263            "jsonrpc": "2.0",
3264            "method": "inference.stream.event",
3265            "params": {
3266                "request_id": request_id,
3267                "event": event_payload,
3268            },
3269        });
3270        if let Ok(text) = serde_json::to_string(&notif) {
3271            let _ = session
3272                .channel
3273                .write
3274                .lock()
3275                .await
3276                .send(Message::Text(text.into()))
3277                .await;
3278        }
3279        accumulator.push(&event);
3280    }
3281
3282    let (text, tool_calls, usage) = accumulator.finish_with_usage();
3283    Ok(serde_json::json!({
3284        "text": text,
3285        "tool_calls": tool_calls,
3286        "usage": usage,
3287    }))
3288}
3289
3290async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3291    let engine = get_inference_engine(state);
3292    let req: car_inference::EmbedRequest =
3293        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3294    // Embeds load their own model weights; share the same admission
3295    // gate as generations so a burst of embed requests can't smuggle
3296    // around the concurrency cap.
3297    let _permit = state.admission.acquire().await;
3298    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3299    Ok(serde_json::json!({"embeddings": result}))
3300}
3301
3302async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3303    let engine = get_inference_engine(state);
3304    let req: car_inference::ClassifyRequest =
3305        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3306    let _permit = state.admission.acquire().await;
3307    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3308    Ok(serde_json::json!({"classifications": result}))
3309}
3310
3311/// Surface the current admission state so the menubar tray and
3312/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
3313/// snapshot — racy by definition but correct enough for status panels.
3314fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3315    let total = state.admission.permits();
3316    let available = state.admission.permits_available();
3317    let in_use = total.saturating_sub(available);
3318    Ok(serde_json::json!({
3319        "permits_total": total,
3320        "permits_available": available,
3321        "permits_in_use": in_use,
3322        "env_override": crate::admission::ENV_MAX_CONCURRENT,
3323    }))
3324}
3325
3326async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3327    let model = msg
3328        .params
3329        .get("model")
3330        .and_then(|v| v.as_str())
3331        .ok_or("missing 'model' parameter")?;
3332    let text = msg
3333        .params
3334        .get("text")
3335        .and_then(|v| v.as_str())
3336        .ok_or("missing 'text' parameter")?;
3337    let engine = get_inference_engine(state);
3338    let ids = engine
3339        .tokenize(model, text)
3340        .await
3341        .map_err(|e| e.to_string())?;
3342    Ok(serde_json::json!({"tokens": ids}))
3343}
3344
3345async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3346    let model = msg
3347        .params
3348        .get("model")
3349        .and_then(|v| v.as_str())
3350        .ok_or("missing 'model' parameter")?;
3351    let tokens: Vec<u32> = msg
3352        .params
3353        .get("tokens")
3354        .and_then(|v| v.as_array())
3355        .ok_or("missing 'tokens' parameter")?
3356        .iter()
3357        .map(|t| {
3358            t.as_u64()
3359                .and_then(|n| u32::try_from(n).ok())
3360                .ok_or_else(|| "tokens[] must be u32 values".to_string())
3361        })
3362        .collect::<Result<Vec<_>, _>>()?;
3363    let engine = get_inference_engine(state);
3364    let text = engine
3365        .detokenize(model, &tokens)
3366        .await
3367        .map_err(|e| e.to_string())?;
3368    Ok(serde_json::json!({"text": text}))
3369}
3370
3371/// `models.register` — persist a user-supplied `ModelSchema` to
3372/// `~/.car/models.json` (Parslee-ai/car-releases#39). Replaces any
3373/// existing entry with the same `id`. Returns `{id, registered}`.
3374///
3375/// **Phase 1 limitation**: the daemon's live `UnifiedRegistry` is
3376/// not updated in-process — the new model becomes visible to
3377/// `models.list`, `infer`, `infer_stream` on the **next daemon
3378/// boot** when `load_user_config` re-reads the file. This is
3379/// enough to unblock opencode's setup flow (register ahead of
3380/// time, then start the daemon). Hot-update requires either an
3381/// `RwLock<InferenceEngine>` on `ServerState` or an
3382/// interior-mutable `UnifiedRegistry`; both touch 20+ call sites
3383/// and are tracked as a follow-up.
3384///
3385/// Until hot-update lands, callers SHOULD register their models
3386/// before issuing `infer` calls against them, and operators
3387/// SHOULD restart the daemon after batches of model
3388/// registrations.
3389async fn handle_models_register(
3390    req: &JsonRpcMessage,
3391    _state: &Arc<ServerState>,
3392) -> Result<Value, String> {
3393    // The params shape mirrors v0.7's FFI `rt.registerModel(schemaJson)`:
3394    // either the bare `ModelSchema` value, OR `{ schema: ModelSchema }`.
3395    // Honor both so existing in-process callers don't have to reshape.
3396    let schema_value = match req.params.get("schema") {
3397        Some(v) => v.clone(),
3398        None => req.params.clone(),
3399    };
3400    let schema: car_inference::ModelSchema =
3401        serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3402    let id = schema.id.clone();
3403
3404    // Resolve the models.json path the same way UnifiedRegistry does:
3405    // `<models_dir>/../models.json` where models_dir defaults to
3406    // `~/.car/models/`. We read whatever's there, swap in the new
3407    // entry, and write back atomically.
3408    let home = std::env::var_os("HOME")
3409        .or_else(|| std::env::var_os("USERPROFILE"))
3410        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3411    let car_dir = std::path::PathBuf::from(home).join(".car");
3412    std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3413    let path = car_dir.join("models.json");
3414
3415    let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3416        let text =
3417            std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3418        if text.trim().is_empty() {
3419            Vec::new()
3420        } else {
3421            serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3422        }
3423    } else {
3424        Vec::new()
3425    };
3426    // Replace existing entry with the same id, else append.
3427    if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3428        *slot = schema;
3429    } else {
3430        models.push(schema);
3431    }
3432    let json =
3433        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3434    let tmp = path.with_extension("json.tmp");
3435    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3436    std::fs::rename(&tmp, &path)
3437        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3438    Ok(serde_json::json!({
3439        "id": id,
3440        "registered": true,
3441        "path": path.to_string_lossy(),
3442        "note": "Daemon restart required for live UnifiedRegistry visibility \
3443                 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3444                 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3445    }))
3446}
3447
3448/// `models.unregister` — remove an entry from `~/.car/models.json`
3449/// by id (Parslee-ai/car#186 — symmetric to `models.register`).
3450/// Returns `{ id, unregistered, path }` on success. Returns an error
3451/// when the model isn't present.
3452///
3453/// **Phase 1 limitation** (same as `models.register`): the daemon's
3454/// live `UnifiedRegistry` is not rebuilt — the removal takes effect
3455/// on the next daemon boot. Callers SHOULD restart the daemon after
3456/// a batch of unregistrations if they expect `models.list_unified`
3457/// to reflect the change immediately.
3458async fn handle_models_unregister(
3459    req: &JsonRpcMessage,
3460    _state: &Arc<ServerState>,
3461) -> Result<Value, String> {
3462    // Params shape mirrors the CLI flag: `{ id: string }`. Bare-string
3463    // params are honored for symmetry with the register handler's
3464    // tolerant shape (`{schema: ...}` OR bare schema).
3465    let id = match req.params.get("id") {
3466        Some(v) => v
3467            .as_str()
3468            .ok_or_else(|| "`id` must be a string".to_string())?
3469            .to_string(),
3470        None => match req.params.as_str() {
3471            Some(s) => s.to_string(),
3472            None => return Err("missing `id` parameter".to_string()),
3473        },
3474    };
3475
3476    let home = std::env::var_os("HOME")
3477        .or_else(|| std::env::var_os("USERPROFILE"))
3478        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3479    let car_dir = std::path::PathBuf::from(home).join(".car");
3480    let path = car_dir.join("models.json");
3481
3482    if !path.exists() {
3483        return Err(format!(
3484            "no models.json at {} — nothing to unregister",
3485            path.display()
3486        ));
3487    }
3488    let text =
3489        std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3490    let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3491        Vec::new()
3492    } else {
3493        serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3494    };
3495    let before = models.len();
3496    models.retain(|m| m.id != id);
3497    if models.len() == before {
3498        return Err(format!("model {} not found in {}", id, path.display()));
3499    }
3500    let json =
3501        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3502    let tmp = path.with_extension("json.tmp");
3503    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3504    std::fs::rename(&tmp, &path)
3505        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3506    Ok(serde_json::json!({
3507        "id": id,
3508        "unregistered": true,
3509        "path": path.to_string_lossy(),
3510        "note": "Daemon restart required for live UnifiedRegistry visibility \
3511                 (phase 1, matching models.register).",
3512    }))
3513}
3514
3515fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3516    let engine = get_inference_engine(state);
3517    let models = engine.list_models();
3518    serde_json::to_value(&models).map_err(|e| e.to_string())
3519}
3520
3521fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3522    let engine = get_inference_engine(state);
3523    let models = engine.list_models_unified();
3524    serde_json::to_value(&models).map_err(|e| e.to_string())
3525}
3526
3527#[derive(Debug, Deserialize)]
3528#[serde(rename_all = "camelCase")]
3529struct ModelSearchParams {
3530    #[serde(default)]
3531    query: Option<String>,
3532    #[serde(default)]
3533    capability: Option<car_inference::ModelCapability>,
3534    #[serde(default)]
3535    provider: Option<String>,
3536    #[serde(default)]
3537    local_only: bool,
3538    #[serde(default)]
3539    available_only: bool,
3540    #[serde(default)]
3541    limit: Option<usize>,
3542}
3543
3544#[derive(Debug, Serialize)]
3545#[serde(rename_all = "camelCase")]
3546struct ModelSearchEntry {
3547    #[serde(flatten)]
3548    info: car_inference::ModelInfo,
3549    family: String,
3550    version: String,
3551    tags: Vec<String>,
3552    pullable: bool,
3553    upgrade: Option<car_inference::ModelUpgrade>,
3554}
3555
3556#[derive(Debug, Serialize)]
3557#[serde(rename_all = "camelCase")]
3558struct ModelSearchResponse {
3559    models: Vec<ModelSearchEntry>,
3560    upgrades: Vec<car_inference::ModelUpgrade>,
3561    total: usize,
3562    available: usize,
3563    local: usize,
3564    remote: usize,
3565}
3566
3567fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3568    let params: ModelSearchParams =
3569        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3570            query: None,
3571            capability: None,
3572            provider: None,
3573            local_only: false,
3574            available_only: false,
3575            limit: None,
3576        });
3577    let engine = get_inference_engine(state);
3578    let upgrades = engine.available_model_upgrades();
3579    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3580        .iter()
3581        .cloned()
3582        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3583        .collect();
3584    let query = params
3585        .query
3586        .as_deref()
3587        .map(str::trim)
3588        .filter(|q| !q.is_empty())
3589        .map(|q| q.to_ascii_lowercase());
3590    let provider = params
3591        .provider
3592        .as_deref()
3593        .map(str::trim)
3594        .filter(|p| !p.is_empty())
3595        .map(|p| p.to_ascii_lowercase());
3596
3597    let mut entries: Vec<ModelSearchEntry> = engine
3598        .list_schemas()
3599        .into_iter()
3600        .filter(|schema| {
3601            if let Some(capability) = params.capability {
3602                if !schema.has_capability(capability) {
3603                    return false;
3604                }
3605            }
3606            if let Some(provider) = provider.as_deref() {
3607                if schema.provider.to_ascii_lowercase() != provider {
3608                    return false;
3609                }
3610            }
3611            if params.local_only && !schema.is_local() {
3612                return false;
3613            }
3614            if params.available_only && !schema.available {
3615                return false;
3616            }
3617            if let Some(query) = query.as_deref() {
3618                let capability_text = schema
3619                    .capabilities
3620                    .iter()
3621                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3622                    .collect::<Vec<_>>()
3623                    .join(" ");
3624                let haystack = format!(
3625                    "{} {} {} {} {} {}",
3626                    schema.id,
3627                    schema.name,
3628                    schema.provider,
3629                    schema.family,
3630                    schema.tags.join(" "),
3631                    capability_text
3632                )
3633                .to_ascii_lowercase();
3634                if !haystack.contains(query) {
3635                    return false;
3636                }
3637            }
3638            true
3639        })
3640        .map(|schema| {
3641            let pullable = !schema.available
3642                && matches!(
3643                    schema.source,
3644                    car_inference::ModelSource::Local { .. }
3645                        | car_inference::ModelSource::Mlx { .. }
3646                );
3647            let info = car_inference::ModelInfo::from(&schema);
3648            let upgrade = upgrades_by_from.get(&schema.id).cloned();
3649            ModelSearchEntry {
3650                info,
3651                family: schema.family,
3652                version: schema.version,
3653                tags: schema.tags,
3654                pullable,
3655                upgrade,
3656            }
3657        })
3658        .collect();
3659    entries.sort_by(|a, b| {
3660        b.info
3661            .available
3662            .cmp(&a.info.available)
3663            .then(b.info.is_local.cmp(&a.info.is_local))
3664            .then(a.info.name.cmp(&b.info.name))
3665    });
3666    if let Some(limit) = params.limit {
3667        entries.truncate(limit);
3668    }
3669
3670    let total = entries.len();
3671    let available = entries.iter().filter(|entry| entry.info.available).count();
3672    let local = entries.iter().filter(|entry| entry.info.is_local).count();
3673    let response = ModelSearchResponse {
3674        models: entries,
3675        upgrades,
3676        total,
3677        available,
3678        local,
3679        remote: total.saturating_sub(local),
3680    };
3681    serde_json::to_value(response).map_err(|e| e.to_string())
3682}
3683
3684fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3685    let engine = get_inference_engine(state);
3686    serde_json::to_value(serde_json::json!({
3687        "upgrades": engine.available_model_upgrades()
3688    }))
3689    .map_err(|e| e.to_string())
3690}
3691
3692async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3693    let name = msg
3694        .params
3695        .get("name")
3696        .or_else(|| msg.params.get("id"))
3697        .or_else(|| msg.params.get("model"))
3698        .and_then(|v| v.as_str())
3699        .ok_or("missing 'name' parameter")?;
3700    let engine = get_inference_engine(state);
3701    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3702    Ok(serde_json::json!({"path": path.display().to_string()}))
3703}
3704
3705async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3706    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3707        msg.params
3708            .get("events")
3709            .cloned()
3710            .unwrap_or(msg.params.clone()),
3711    )
3712    .map_err(|e| format!("invalid events: {}", e))?;
3713
3714    let inference = get_inference_engine(state).clone();
3715    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3716
3717    let skills = engine.distill_skills(&events).await;
3718    serde_json::to_value(&skills).map_err(|e| e.to_string())
3719}
3720
3721/// Run memory consolidation against this client's session memgine
3722/// (or the daemon-owned per-agent memgine when bound — #170).
3723/// Returns the JSON `ConsolidationReport`.
3724async fn handle_memory_consolidate(
3725    session: &crate::session::ClientSession,
3726) -> Result<Value, String> {
3727    let engine_arc = session.effective_memgine().await;
3728    let report = {
3729        let mut engine = engine_arc.lock().await;
3730        engine.consolidate().await
3731    };
3732    if let Some(id) = session.agent_id.lock().await.clone() {
3733        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3734            tracing::warn!(agent_id = %id, error = %e,
3735                "agent memgine persist after consolidate failed");
3736        }
3737    }
3738    serde_json::to_value(&report).map_err(|e| e.to_string())
3739}
3740
3741/// Repair a degraded skill on this client's session memgine.
3742/// Returns `{ code: "..." }` on success, `null` if the skill
3743/// isn't broken or repair failed.
3744async fn handle_skill_repair(
3745    msg: &JsonRpcMessage,
3746    session: &crate::session::ClientSession,
3747) -> Result<Value, String> {
3748    let name = msg
3749        .params
3750        .get("skill_name")
3751        .and_then(|v| v.as_str())
3752        .ok_or("missing 'skill_name' parameter")?;
3753    let mut engine = session.memgine.lock().await;
3754    let code = engine.repair_skill(name).await;
3755    Ok(match code {
3756        Some(c) => serde_json::json!({ "code": c }),
3757        None => Value::Null,
3758    })
3759}
3760
3761/// Ingest distilled skills into this client's session memgine.
3762/// Returns the number of nodes inserted.
3763async fn handle_skills_ingest_distilled(
3764    msg: &JsonRpcMessage,
3765    session: &crate::session::ClientSession,
3766) -> Result<Value, String> {
3767    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3768        msg.params
3769            .get("skills")
3770            .cloned()
3771            .unwrap_or(msg.params.clone()),
3772    )
3773    .map_err(|e| format!("invalid skills: {}", e))?;
3774    let mut engine = session.memgine.lock().await;
3775    let nodes = engine.ingest_distilled_skills(&skills);
3776    Ok(serde_json::json!({ "ingested": nodes.len() }))
3777}
3778
3779/// Run skill evolution against this session's memgine for a
3780/// specified domain.  Returns the resulting `DistilledSkill` array.
3781async fn handle_skills_evolve(
3782    msg: &JsonRpcMessage,
3783    session: &crate::session::ClientSession,
3784) -> Result<Value, String> {
3785    let domain = msg
3786        .params
3787        .get("domain")
3788        .and_then(|v| v.as_str())
3789        .ok_or("missing 'domain' parameter")?
3790        .to_string();
3791    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3792        msg.params
3793            .get("events")
3794            .cloned()
3795            .unwrap_or(Value::Array(vec![])),
3796    )
3797    .map_err(|e| format!("invalid events: {}", e))?;
3798    let mut engine = session.memgine.lock().await;
3799    let skills = engine.evolve_skills(&events, &domain).await;
3800    serde_json::to_value(&skills).map_err(|e| e.to_string())
3801}
3802
3803/// List domains whose skills are underperforming on this session.
3804async fn handle_skills_domains_needing_evolution(
3805    msg: &JsonRpcMessage,
3806    session: &crate::session::ClientSession,
3807) -> Result<Value, String> {
3808    let threshold = msg
3809        .params
3810        .get("threshold")
3811        .and_then(|v| v.as_f64())
3812        .unwrap_or(0.6);
3813    let engine = session.memgine.lock().await;
3814    let domains = engine.domains_needing_evolution(threshold);
3815    serde_json::to_value(&domains).map_err(|e| e.to_string())
3816}
3817
3818/// Rerank documents against a query using a cross-encoder model.
3819async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3820    let engine = get_inference_engine(state);
3821    let req: car_inference::RerankRequest =
3822        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3823    let _permit = state.admission.acquire().await;
3824    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3825    serde_json::to_value(&result).map_err(|e| e.to_string())
3826}
3827
3828/// Transcribe audio at the given path. The path is interpreted on
3829/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3830/// callers must pass a path the daemon can read (typically a
3831/// shared `~/.car/...` location or stdin push via the streaming
3832/// API).
3833async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3834    use base64::Engine as _;
3835    let engine = get_inference_engine(state);
3836
3837    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3838    // the caller can't share a filesystem view with the daemon (e.g.
3839    // unsandboxed Milo talking to a sandboxed car-host), they pass
3840    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3841    // run transcribe against the path the engine expects, and clean up
3842    // on drop. Accepts either form; `audio_b64` wins if both are set.
3843    let mut params = msg.params.clone();
3844    let audio_b64 = params
3845        .as_object_mut()
3846        .and_then(|m| m.remove("audio_b64"))
3847        .and_then(|v| v.as_str().map(str::to_string));
3848    let _tmp_audio = if let Some(b64) = audio_b64 {
3849        let bytes = base64::engine::general_purpose::STANDARD
3850            .decode(b64.as_bytes())
3851            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3852        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3853        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3854        let path = tmp.path().to_string_lossy().into_owned();
3855        if let Some(obj) = params.as_object_mut() {
3856            obj.insert("audio_path".to_string(), Value::String(path));
3857        }
3858        Some(tmp)
3859    } else {
3860        None
3861    };
3862
3863    let req: car_inference::TranscribeRequest =
3864        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3865    let _permit = state.admission.acquire().await;
3866    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3867    serde_json::to_value(&result).map_err(|e| e.to_string())
3868}
3869
3870/// Synthesize speech. By default writes to `output_path` on the
3871/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3872/// was supplied) the result also includes an `audio_b64` field with
3873/// the rendered bytes inline so cross-sandbox callers can avoid
3874/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3875async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3876    use base64::Engine as _;
3877    let engine = get_inference_engine(state);
3878
3879    let mut params = msg.params.clone();
3880    let return_b64 = params
3881        .as_object_mut()
3882        .and_then(|m| m.remove("return_b64"))
3883        .and_then(|v| v.as_bool())
3884        .unwrap_or(false);
3885    let no_output_path = params
3886        .as_object()
3887        .map(|m| !m.contains_key("output_path"))
3888        .unwrap_or(true);
3889
3890    let req: car_inference::SynthesizeRequest =
3891        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3892    let _permit = state.admission.acquire().await;
3893    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3894    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3895
3896    // Inline the bytes when the caller asked for them OR when no
3897    // output_path was specified (typical sandbox-crossing case —
3898    // they didn't pick a path because they have no shared one).
3899    if return_b64 || no_output_path {
3900        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3901            format!(
3902                "synthesize: failed to read rendered audio at {}: {e}",
3903                result.audio_path
3904            )
3905        })?;
3906        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3907        if let Some(obj) = value.as_object_mut() {
3908            obj.insert("audio_b64".to_string(), Value::String(encoded));
3909        }
3910    }
3911    Ok(value)
3912}
3913
3914/// Prepare the speech runtime (downloads / warmup). Returns a
3915/// JSON status string, mirroring the embedded
3916/// `prepare_speech_runtime` shape.
3917async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3918    let engine = get_inference_engine(state);
3919    let status = engine
3920        .prepare_speech_runtime()
3921        .await
3922        .map_err(|e| e.to_string())?;
3923    serde_json::to_value(&status).map_err(|e| e.to_string())
3924}
3925
3926/// Adaptive route decision for a prompt — returns the routing
3927/// JSON the FFI's `route_model` returns.
3928async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3929    let prompt = msg
3930        .params
3931        .get("prompt")
3932        .and_then(|v| v.as_str())
3933        .ok_or("missing 'prompt' parameter")?;
3934    let engine = get_inference_engine(state);
3935    let decision = engine.route_adaptive(prompt).await;
3936    serde_json::to_value(&decision).map_err(|e| e.to_string())
3937}
3938
3939/// Model performance profiles snapshot.
3940async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3941    let engine = get_inference_engine(state);
3942    let profiles = engine.export_profiles().await;
3943    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3944}
3945
3946#[derive(Deserialize)]
3947#[serde(rename_all = "camelCase")]
3948struct OutcomesResolvePendingParams {
3949    /// Flat `(trace_id, success, confidence, output)` tuples from the
3950    /// caller. Same shape `car-reason`'s session produces from its
3951    /// `ActionOutcome` vector. Daemon side runs the inference rules
3952    /// and writes resolved outcomes back to the shared tracker.
3953    action_results: Vec<(String, bool, f64, String)>,
3954}
3955
3956/// `outcomes.resolve_pending` — write inferred outcomes back to the
3957/// shared engine's `OutcomeTracker` (Parslee-ai/car#189 follow-up).
3958///
3959/// Symmetric to the in-process path
3960/// `ReasoningInferenceHandle::record_inferred_outcomes` on
3961/// `InferenceEngine`: takes the per-action result tuples the
3962/// reasoning session produces, runs
3963/// `OutcomeTracker::infer_outcomes_from_action_sequence` to convert
3964/// them into `InferredOutcome` records, and calls
3965/// `resolve_pending_from_signals` under the tracker write lock. The
3966/// learning loop that adjusts routing decisions therefore survives
3967/// daemon-routed reasoning runs (previously a best-effort no-op on
3968/// the daemon side).
3969///
3970/// Returns `{ recorded: N }` where N is the number of action results
3971/// the caller passed. The tracker doesn't surface how many of those
3972/// actually had pending entries to resolve; that count would require
3973/// expanding the tracker API and isn't load-bearing for any caller
3974/// yet.
3975async fn handle_outcomes_resolve_pending(
3976    req: &JsonRpcMessage,
3977    state: &ServerState,
3978) -> Result<Value, String> {
3979    let params: OutcomesResolvePendingParams =
3980        serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3981    let engine = get_inference_engine(state);
3982    let mut tracker = engine.outcome_tracker.write().await;
3983    let inferred = tracker.infer_outcomes_from_action_sequence(&params.action_results);
3984    tracker.resolve_pending_from_signals(inferred);
3985    Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3986}
3987
3988/// Per-session event log size.
3989async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3990    let n = session.runtime.log.lock().await.len();
3991    Ok(Value::from(n as u64))
3992}
3993
3994async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3995    let stats = session.runtime.log.lock().await.stats();
3996    serde_json::to_value(stats).map_err(|e| e.to_string())
3997}
3998
3999#[derive(Deserialize)]
4000#[serde(rename_all = "camelCase")]
4001struct EventsTruncateParams {
4002    #[serde(default)]
4003    max_events: Option<usize>,
4004    #[serde(default)]
4005    max_spans: Option<usize>,
4006}
4007
4008async fn handle_events_truncate(
4009    msg: &JsonRpcMessage,
4010    session: &crate::session::ClientSession,
4011) -> Result<Value, String> {
4012    let params: EventsTruncateParams =
4013        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4014            max_events: None,
4015            max_spans: None,
4016        });
4017    let mut log = session.runtime.log.lock().await;
4018    let removed_events = params
4019        .max_events
4020        .map(|max| log.truncate_events_keep_last(max))
4021        .unwrap_or(0);
4022    let removed_spans = params
4023        .max_spans
4024        .map(|max| log.truncate_spans_keep_last(max))
4025        .unwrap_or(0);
4026    let stats = log.stats();
4027    Ok(serde_json::json!({
4028        "removedEvents": removed_events,
4029        "removedSpans": removed_spans,
4030        "stats": stats,
4031    }))
4032}
4033
4034async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4035    let mut log = session.runtime.log.lock().await;
4036    let removed = log.clear();
4037    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4038}
4039
4040/// Update the per-session replan config. Wire shape mirrors the
4041/// FFI's positional `set_replan_config` arguments — the engine
4042/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
4043/// reconstruct it from a flat object here.
4044async fn handle_replan_set_config(
4045    msg: &JsonRpcMessage,
4046    session: &crate::session::ClientSession,
4047) -> Result<Value, String> {
4048    let max_replans = msg
4049        .params
4050        .get("max_replans")
4051        .and_then(|v| v.as_u64())
4052        .unwrap_or(0) as u32;
4053    let delay_ms = msg
4054        .params
4055        .get("delay_ms")
4056        .and_then(|v| v.as_u64())
4057        .unwrap_or(0);
4058    let verify_before_execute = msg
4059        .params
4060        .get("verify_before_execute")
4061        .and_then(|v| v.as_bool())
4062        .unwrap_or(true);
4063    let cfg = car_engine::ReplanConfig {
4064        max_replans,
4065        delay_ms,
4066        verify_before_execute,
4067    };
4068    session.runtime.set_replan_config(cfg).await;
4069    Ok(Value::Null)
4070}
4071
4072async fn handle_skills_list(
4073    msg: &JsonRpcMessage,
4074    session: &crate::session::ClientSession,
4075) -> Result<Value, String> {
4076    let domain = msg.params.get("domain").and_then(|v| v.as_str());
4077    let engine = session.memgine.lock().await;
4078    let skills: Vec<serde_json::Value> = engine
4079        .graph
4080        .inner
4081        .node_indices()
4082        .filter_map(|nix| {
4083            let node = engine.graph.inner.node_weight(nix)?;
4084            if node.kind != car_memgine::MemKind::Skill {
4085                return None;
4086            }
4087            let meta = car_memgine::SkillMeta::from_node(node)?;
4088            if let Some(d) = domain {
4089                match &meta.scope {
4090                    car_memgine::SkillScope::Global => {}
4091                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
4092                    _ => return None,
4093                }
4094            }
4095            Some(serde_json::to_value(&meta).unwrap_or_default())
4096        })
4097        .collect();
4098    serde_json::to_value(&skills).map_err(|e| e.to_string())
4099}
4100
4101#[derive(serde::Deserialize)]
4102struct SecretParams {
4103    #[serde(default)]
4104    service: Option<String>,
4105    key: String,
4106    #[serde(default)]
4107    value: Option<String>,
4108}
4109
4110fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4111    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4112    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4113    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4114}
4115
4116fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4117    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4118    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4119}
4120
4121fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4122    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4123    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4124}
4125
4126fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4127    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4128    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4129}
4130
4131#[derive(serde::Deserialize)]
4132struct PermParams {
4133    domain: String,
4134    #[serde(default)]
4135    target_bundle_id: Option<String>,
4136}
4137
4138fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4139    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4140    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4141}
4142
4143fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4144    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4145    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4146}
4147
4148fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4149    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4150    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4151}
4152
4153fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4154    #[derive(serde::Deserialize)]
4155    struct P {
4156        start: String,
4157        end: String,
4158        #[serde(default)]
4159        calendar_ids: Vec<String>,
4160    }
4161    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4162    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4163        .map_err(|e| format!("parse start: {}", e))?
4164        .with_timezone(&chrono::Utc);
4165    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4166        .map_err(|e| format!("parse end: {}", e))?
4167        .with_timezone(&chrono::Utc);
4168    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4169}
4170
4171fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4172    #[derive(serde::Deserialize)]
4173    struct P {
4174        query: String,
4175        #[serde(default = "default_limit")]
4176        limit: usize,
4177        #[serde(default)]
4178        container_ids: Vec<String>,
4179    }
4180    fn default_limit() -> usize {
4181        50
4182    }
4183    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4184    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4185}
4186
4187fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4188    #[derive(serde::Deserialize, Default)]
4189    struct P {
4190        #[serde(default)]
4191        account_ids: Vec<String>,
4192    }
4193    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4194    car_ffi_common::integrations::mail_inbox(&p.account_ids)
4195}
4196
4197fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4198    let raw = req.params.to_string();
4199    car_ffi_common::integrations::mail_send(&raw)
4200}
4201
4202fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4203    #[derive(serde::Deserialize)]
4204    struct P {
4205        #[serde(default = "default_limit")]
4206        limit: usize,
4207    }
4208    fn default_limit() -> usize {
4209        50
4210    }
4211    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4212    car_ffi_common::integrations::messages_chats(p.limit)
4213}
4214
4215fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4216    let raw = req.params.to_string();
4217    car_ffi_common::integrations::messages_send(&raw)
4218}
4219
4220fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4221    #[derive(serde::Deserialize)]
4222    struct P {
4223        query: String,
4224        #[serde(default = "default_limit")]
4225        limit: usize,
4226    }
4227    fn default_limit() -> usize {
4228        50
4229    }
4230    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4231    car_ffi_common::integrations::notes_find(&p.query, p.limit)
4232}
4233
4234fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4235    #[derive(serde::Deserialize)]
4236    struct P {
4237        #[serde(default = "default_limit")]
4238        limit: usize,
4239    }
4240    fn default_limit() -> usize {
4241        50
4242    }
4243    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4244    car_ffi_common::integrations::reminders_items(p.limit)
4245}
4246
4247fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4248    #[derive(serde::Deserialize)]
4249    struct P {
4250        #[serde(default = "default_limit")]
4251        limit: usize,
4252    }
4253    fn default_limit() -> usize {
4254        100
4255    }
4256    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4257    car_ffi_common::integrations::bookmarks_list(p.limit)
4258}
4259
4260fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4261    #[derive(serde::Deserialize)]
4262    struct P {
4263        start: String,
4264        end: String,
4265    }
4266    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4267    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4268        .map_err(|e| format!("parse start: {}", e))?
4269        .with_timezone(&chrono::Utc);
4270    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4271        .map_err(|e| format!("parse end: {}", e))?
4272        .with_timezone(&chrono::Utc);
4273    car_ffi_common::health::sleep_windows(s, e)
4274}
4275
4276fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4277    #[derive(serde::Deserialize)]
4278    struct P {
4279        start: String,
4280        end: String,
4281    }
4282    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4283    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4284        .map_err(|e| format!("parse start: {}", e))?
4285        .with_timezone(&chrono::Utc);
4286    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4287        .map_err(|e| format!("parse end: {}", e))?
4288        .with_timezone(&chrono::Utc);
4289    car_ffi_common::health::workouts(s, e)
4290}
4291
4292fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4293    #[derive(serde::Deserialize)]
4294    struct P {
4295        start: String,
4296        end: String,
4297    }
4298    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4299    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4300        .map_err(|e| format!("parse start: {}", e))?;
4301    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4302        .map_err(|e| format!("parse end: {}", e))?;
4303    car_ffi_common::health::activity(s, e)
4304}
4305
4306async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4307    let closed = session.browser.close().await?;
4308    Ok(serde_json::json!({"closed": closed}))
4309}
4310
4311async fn handle_browser_run(
4312    req: &JsonRpcMessage,
4313    session: &crate::session::ClientSession,
4314) -> Result<Value, String> {
4315    #[derive(serde::Deserialize)]
4316    struct BrowserRunParams {
4317        /// Inline JSON string (CLI-compatible), OR the structured object.
4318        script: Value,
4319        #[serde(default)]
4320        width: Option<u32>,
4321        #[serde(default)]
4322        height: Option<u32>,
4323        /// When true, launches a visible Chromium window for interactive
4324        /// flows (first-time auth, 2FA, supervised runs). Only honored on
4325        /// the call that first launches the browser session — subsequent
4326        /// calls reuse the existing browser regardless.
4327        #[serde(default)]
4328        headed: Option<bool>,
4329        /// Extra Chromium command-line flags appended verbatim at
4330        /// launch (#112). Honoured only on the launch call.
4331        #[serde(default)]
4332        extra_args: Option<Vec<String>>,
4333    }
4334    let params: BrowserRunParams =
4335        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4336
4337    // Accept either a JSON string OR a structured object under `script`.
4338    let script_json = match params.script {
4339        Value::String(s) => s,
4340        other => other.to_string(),
4341    };
4342
4343    let browser_session = session
4344        .browser
4345        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4346            width: params.width.unwrap_or(1280),
4347            height: params.height.unwrap_or(720),
4348            headless: !params.headed.unwrap_or(false),
4349            extra_args: params.extra_args.unwrap_or_default(),
4350        })
4351        .await?;
4352
4353    let trace_json = browser_session.run(&script_json).await?;
4354    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4355}
4356
4357// ---------------------------------------------------------------------------
4358// Voice streaming JSON-RPC methods
4359//
4360// Events are pushed back to the originating client as JSON-RPC notifications:
4361//   { "jsonrpc": "2.0", "method": "voice.event",
4362//     "params": { "session_id": "...", "event": {...} } }
4363//
4364// The session registry is process-wide (ServerState.voice_sessions); per-call
4365// WsVoiceEventSink instances bind each session to its originating WS so a
4366// client only ever sees events for sessions it started.
4367// ---------------------------------------------------------------------------
4368
4369#[derive(Deserialize)]
4370struct VoiceStartParams {
4371    session_id: String,
4372    audio_source: Value,
4373    #[serde(default)]
4374    options: Option<Value>,
4375}
4376
4377async fn handle_voice_transcribe_stream_start(
4378    req: &JsonRpcMessage,
4379    state: &Arc<ServerState>,
4380    session: &Arc<crate::session::ClientSession>,
4381) -> Result<Value, String> {
4382    let params: VoiceStartParams =
4383        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4384    let audio_source_json =
4385        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
4386    let options_json = params
4387        .options
4388        .as_ref()
4389        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4390        .transpose()?;
4391    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4392        channel: session.channel.clone(),
4393    });
4394    let json = car_ffi_common::voice::transcribe_stream_start(
4395        &params.session_id,
4396        &audio_source_json,
4397        options_json.as_deref(),
4398        state.voice_sessions.clone(),
4399        sink,
4400    )
4401    .await?;
4402    serde_json::from_str(&json).map_err(|e| e.to_string())
4403}
4404
4405#[derive(Deserialize)]
4406struct VoiceStopParams {
4407    session_id: String,
4408}
4409
4410async fn handle_voice_transcribe_stream_stop(
4411    req: &JsonRpcMessage,
4412    state: &Arc<ServerState>,
4413) -> Result<Value, String> {
4414    let params: VoiceStopParams =
4415        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4416    let json = car_ffi_common::voice::transcribe_stream_stop(
4417        &params.session_id,
4418        state.voice_sessions.clone(),
4419    )
4420    .await?;
4421    serde_json::from_str(&json).map_err(|e| e.to_string())
4422}
4423
4424#[derive(Deserialize)]
4425struct VoicePushParams {
4426    session_id: String,
4427    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
4428    /// audio frames have to be encoded; clients in WS-binary contexts that
4429    /// want to skip the round trip can call the FFI directly.
4430    pcm_b64: String,
4431}
4432
4433async fn handle_voice_transcribe_stream_push(
4434    req: &JsonRpcMessage,
4435    state: &Arc<ServerState>,
4436) -> Result<Value, String> {
4437    use base64::Engine;
4438    let params: VoicePushParams =
4439        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4440    let pcm = base64::engine::general_purpose::STANDARD
4441        .decode(&params.pcm_b64)
4442        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4443    let json = car_ffi_common::voice::transcribe_stream_push(
4444        &params.session_id,
4445        &pcm,
4446        state.voice_sessions.clone(),
4447    )
4448    .await?;
4449    serde_json::from_str(&json).map_err(|e| e.to_string())
4450}
4451
4452fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4453    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4454    serde_json::from_str(&json).unwrap_or(Value::Null)
4455}
4456
4457#[derive(Deserialize)]
4458struct VoiceTtsStreamStartParams {
4459    /// Caller-chosen opaque id for this stream. Used both as the
4460    /// session_id wrapped onto each `voice.event` notification AND as
4461    /// the key for `voice.tts_stream.cancel`.
4462    stream_id: String,
4463    /// Text to synthesize. Splitting into multiple synth calls (for
4464    /// long-form narration) is the caller's responsibility.
4465    text: String,
4466    /// Optional [`car_ffi_common::voice::TtsStreamOptions`] as a raw
4467    /// JSON value (provider, voice_id, binary_frames).
4468    #[serde(default)]
4469    options: Option<Value>,
4470}
4471
4472async fn handle_voice_tts_stream_start(
4473    req: &JsonRpcMessage,
4474    session: &Arc<crate::session::ClientSession>,
4475) -> Result<Value, String> {
4476    let params: VoiceTtsStreamStartParams =
4477        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4478    let opts_str = params
4479        .options
4480        .as_ref()
4481        .map(|v| v.to_string())
4482        .filter(|s| !s.is_empty());
4483    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4484        channel: session.channel.clone(),
4485    });
4486    let json = car_ffi_common::voice::tts_stream_start(
4487        &params.stream_id,
4488        &params.text,
4489        opts_str.as_deref(),
4490        sink,
4491    )
4492    .await?;
4493    serde_json::from_str(&json).map_err(|e| e.to_string())
4494}
4495
4496#[derive(Deserialize)]
4497struct VoiceTtsStreamCancelParams {
4498    stream_id: String,
4499}
4500
4501async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
4502    let params: VoiceTtsStreamCancelParams =
4503        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4504    let json = car_ffi_common::voice::tts_stream_cancel(&params.stream_id).await?;
4505    serde_json::from_str(&json).map_err(|e| e.to_string())
4506}
4507
4508fn handle_voice_tts_stream_list() -> Value {
4509    let json = car_ffi_common::voice::list_tts_streams();
4510    serde_json::from_str(&json).unwrap_or(Value::Null)
4511}
4512
4513async fn handle_voice_dispatch_turn(
4514    req: &JsonRpcMessage,
4515    state: &Arc<ServerState>,
4516    session: &Arc<crate::session::ClientSession>,
4517) -> Result<Value, String> {
4518    let req_value = req.params.clone();
4519    let request: crate::voice_turn::DispatchVoiceTurnRequest =
4520        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4521    let engine = get_inference_engine(state).clone();
4522    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4523        channel: session.channel.clone(),
4524    });
4525    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4526    serde_json::to_value(resp).map_err(|e| e.to_string())
4527}
4528
4529async fn handle_voice_cancel_turn() -> Result<Value, String> {
4530    crate::voice_turn::cancel().await;
4531    Ok(serde_json::json!({"cancelled": true}))
4532}
4533
4534async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4535    let engine = get_inference_engine(state).clone();
4536    crate::voice_turn::prewarm(engine).await;
4537    Ok(serde_json::json!({"prewarmed": true}))
4538}
4539
4540// ---------------------------------------------------------------------------
4541// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
4542//
4543// Bidirectional protocol shape:
4544//   1. Client → server: `inference.register_runner` (no params). The
4545//      session that calls this becomes the host for delegated models.
4546//   2. Server → client: `inference.runner.invoke` notification with
4547//      {call_id, request} when CAR needs to dispatch a delegated turn.
4548//   3. Client → server: `inference.runner.event` with {call_id, event}
4549//      for each chunk; `inference.runner.complete` with {call_id, result}
4550//      on success; `inference.runner.fail` with {call_id, error} on
4551//      failure.
4552//
4553// The server-side data is process-wide because only one inference
4554// runner can be registered at a time (matches the FFI bindings'
4555// constraint). The per-call mailboxes live in dedicated DashMaps.
4556// ---------------------------------------------------------------------------
4557
4558fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4559    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4560        std::sync::OnceLock::new();
4561    SLOT.get_or_init(|| std::sync::RwLock::new(None))
4562}
4563
4564fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4565    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4566        std::sync::OnceLock::new();
4567    MAP.get_or_init(dashmap::DashMap::new)
4568}
4569
4570fn ws_runner_completions() -> &'static dashmap::DashMap<
4571    String,
4572    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4573> {
4574    static MAP: std::sync::OnceLock<
4575        dashmap::DashMap<
4576            String,
4577            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4578        >,
4579    > = std::sync::OnceLock::new();
4580    MAP.get_or_init(dashmap::DashMap::new)
4581}
4582
4583struct WsInferenceRunner;
4584
4585#[async_trait::async_trait]
4586impl car_inference::InferenceRunner for WsInferenceRunner {
4587    async fn run(
4588        &self,
4589        request: car_inference::tasks::generate::GenerateRequest,
4590        emitter: car_inference::EventEmitter,
4591    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4592        let channel = ws_runner_session()
4593            .read()
4594            .map_err(|e| {
4595                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4596            })?
4597            .clone()
4598            .ok_or_else(|| {
4599                car_inference::RunnerError::Declined(
4600                    "no WebSocket inference runner registered — call inference.register_runner first"
4601                        .into(),
4602                )
4603            })?;
4604
4605        let call_id = uuid::Uuid::new_v4().to_string();
4606        let request_json = serde_json::to_value(&request)
4607            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4608        let (tx, rx) = tokio::sync::oneshot::channel();
4609        ws_runner_calls().insert(call_id.clone(), emitter);
4610        ws_runner_completions().insert(call_id.clone(), tx);
4611
4612        // Fire the invoke notification.
4613        use futures::SinkExt;
4614        let notification = serde_json::json!({
4615            "jsonrpc": "2.0",
4616            "method": "inference.runner.invoke",
4617            "params": {
4618                "call_id": call_id,
4619                "request": request_json,
4620            },
4621        });
4622        let text = serde_json::to_string(&notification)
4623            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4624        let _ = channel
4625            .write
4626            .lock()
4627            .await
4628            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4629            .await;
4630
4631        let result = rx.await.map_err(|_| {
4632            car_inference::RunnerError::Failed("runner completion channel dropped".into())
4633        })?;
4634        ws_runner_calls().remove(&call_id);
4635        result.map_err(car_inference::RunnerError::Failed)
4636    }
4637}
4638
4639async fn handle_inference_register_runner(
4640    session: &Arc<crate::session::ClientSession>,
4641) -> Result<Value, String> {
4642    let mut guard = ws_runner_session()
4643        .write()
4644        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4645    *guard = Some(session.channel.clone());
4646    drop(guard);
4647    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4648    Ok(serde_json::json!({"registered": true}))
4649}
4650
4651#[derive(serde::Deserialize)]
4652struct InferenceRunnerEventParams {
4653    call_id: String,
4654    event: Value,
4655}
4656
4657async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4658    let params: InferenceRunnerEventParams =
4659        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4660    let stream_event = match parse_runner_event_value(&params.event) {
4661        Some(e) => e,
4662        None => return Err("unrecognised runner event shape".into()),
4663    };
4664    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
4665        let emitter = entry.value().clone();
4666        tokio::spawn(async move { emitter.emit(stream_event).await });
4667    }
4668    Ok(serde_json::json!({"emitted": true}))
4669}
4670
4671#[derive(serde::Deserialize)]
4672struct InferenceRunnerCompleteParams {
4673    call_id: String,
4674    result: Value,
4675}
4676
4677async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4678    let params: InferenceRunnerCompleteParams =
4679        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4680    let result: std::result::Result<car_inference::RunnerResult, String> =
4681        serde_json::from_value(params.result)
4682            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4683    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4684        let _ = tx.send(result);
4685    }
4686    Ok(serde_json::json!({"completed": true}))
4687}
4688
4689#[derive(serde::Deserialize)]
4690struct InferenceRunnerFailParams {
4691    call_id: String,
4692    error: String,
4693}
4694
4695async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4696    let params: InferenceRunnerFailParams =
4697        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4698    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4699        let _ = tx.send(Err(params.error));
4700    }
4701    Ok(serde_json::json!({"failed": true}))
4702}
4703
4704fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4705    let ty = v.get("type").and_then(|t| t.as_str())?;
4706    match ty {
4707        "text" => Some(car_inference::StreamEvent::TextDelta(
4708            v.get("data")?.as_str()?.to_string(),
4709        )),
4710        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4711            name: v.get("name")?.as_str()?.to_string(),
4712            index: v.get("index")?.as_u64()? as usize,
4713            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4714        }),
4715        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4716            index: v.get("index")?.as_u64()? as usize,
4717            arguments_delta: v.get("data")?.as_str()?.to_string(),
4718        }),
4719        "usage" => Some(car_inference::StreamEvent::Usage {
4720            input_tokens: v.get("input_tokens")?.as_u64()?,
4721            output_tokens: v.get("output_tokens")?.as_u64()?,
4722        }),
4723        "done" => Some(car_inference::StreamEvent::Done {
4724            text: v.get("text")?.as_str()?.to_string(),
4725            tool_calls: v
4726                .get("tool_calls")
4727                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4728                .unwrap_or_default(),
4729        }),
4730        _ => None,
4731    }
4732}
4733
4734#[derive(Deserialize)]
4735struct EnrollSpeakerParams {
4736    label: String,
4737    audio: Value,
4738}
4739
4740async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4741    let params: EnrollSpeakerParams =
4742        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4743    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
4744    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
4745    serde_json::from_str(&json).map_err(|e| e.to_string())
4746}
4747
4748#[derive(Deserialize)]
4749struct RemoveEnrollmentParams {
4750    label: String,
4751}
4752
4753fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4754    let params: RemoveEnrollmentParams =
4755        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4756    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
4757    serde_json::from_str(&json).map_err(|e| e.to_string())
4758}
4759
4760#[derive(Deserialize)]
4761struct WorkflowRunParams {
4762    workflow: Value,
4763}
4764
4765async fn handle_workflow_run(
4766    req: &JsonRpcMessage,
4767    session: &Arc<crate::session::ClientSession>,
4768) -> Result<Value, String> {
4769    let params: WorkflowRunParams =
4770        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4771    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4772    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4773        channel: session.channel.clone(),
4774        host: session.host.clone(),
4775        client_id: session.client_id.clone(),
4776    });
4777    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4778    serde_json::from_str(&json).map_err(|e| e.to_string())
4779}
4780
4781#[derive(Deserialize)]
4782struct WorkflowVerifyParams {
4783    workflow: Value,
4784}
4785
4786fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4787    let params: WorkflowVerifyParams =
4788        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4789    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4790    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4791    serde_json::from_str(&json).map_err(|e| e.to_string())
4792}
4793
4794// ---------------------------------------------------------------------------
4795// Meeting JSON-RPC methods
4796// ---------------------------------------------------------------------------
4797
4798async fn handle_meeting_start(
4799    req: &JsonRpcMessage,
4800    state: &Arc<ServerState>,
4801    session: &Arc<crate::session::ClientSession>,
4802) -> Result<Value, String> {
4803    // We need the meeting id BEFORE handing the upstream sink to
4804    // start_meeting so the WsMemgineIngestSink stamps transcripts with
4805    // the correct `meeting/<id>/<source>` speaker. Parse the request
4806    // here, mint an id if none was provided, and pass the same id
4807    // through to start_meeting via the request JSON.
4808    let mut req_value = req.params.clone();
4809    let meeting_id = req_value
4810        .get("id")
4811        .and_then(|v| v.as_str())
4812        .map(str::to_string)
4813        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4814    if let Some(map) = req_value.as_object_mut() {
4815        map.insert("id".into(), Value::String(meeting_id.clone()));
4816    }
4817    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4818
4819    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4820        Arc::new(crate::session::WsVoiceEventSink {
4821            channel: session.channel.clone(),
4822        });
4823
4824    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4825    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4826    // the FFI-common `start_meeting` memgine arg to avoid the
4827    // sync-mutex contract there — ingest happens here instead.
4828    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4829        Arc::new(crate::session::WsMemgineIngestSink {
4830            meeting_id,
4831            engine: session.memgine.clone(),
4832            upstream: ws_upstream,
4833        });
4834
4835    let cwd = std::env::current_dir().ok();
4836    let json = crate::meeting::start_meeting(
4837        &request_json,
4838        state.meetings.clone(),
4839        state.voice_sessions.clone(),
4840        upstream,
4841        None,
4842        cwd,
4843    )
4844    .await?;
4845    serde_json::from_str(&json).map_err(|e| e.to_string())
4846}
4847
4848#[derive(Deserialize)]
4849struct MeetingStopParams {
4850    meeting_id: String,
4851    #[serde(default = "default_summarize")]
4852    summarize: bool,
4853}
4854
4855fn default_summarize() -> bool {
4856    true
4857}
4858
4859async fn handle_meeting_stop(
4860    req: &JsonRpcMessage,
4861    state: &Arc<ServerState>,
4862    _session: &Arc<crate::session::ClientSession>,
4863) -> Result<Value, String> {
4864    let params: MeetingStopParams =
4865        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4866    let inference = if params.summarize {
4867        Some(state.inference.get().cloned()).flatten()
4868    } else {
4869        None
4870    };
4871    let json = crate::meeting::stop_meeting(
4872        &params.meeting_id,
4873        params.summarize,
4874        state.meetings.clone(),
4875        state.voice_sessions.clone(),
4876        inference,
4877    )
4878    .await?;
4879    serde_json::from_str(&json).map_err(|e| e.to_string())
4880}
4881
4882#[derive(Deserialize, Default)]
4883struct MeetingListParams {
4884    #[serde(default)]
4885    root: Option<std::path::PathBuf>,
4886}
4887
4888fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4889    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4890    let cwd = std::env::current_dir().ok();
4891    let json = crate::meeting::list_meetings(params.root, cwd)?;
4892    serde_json::from_str(&json).map_err(|e| e.to_string())
4893}
4894
4895#[derive(Deserialize)]
4896struct MeetingGetParams {
4897    meeting_id: String,
4898    #[serde(default)]
4899    root: Option<std::path::PathBuf>,
4900}
4901
4902fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4903    let params: MeetingGetParams =
4904        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4905    let cwd = std::env::current_dir().ok();
4906    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4907    serde_json::from_str(&json).map_err(|e| e.to_string())
4908}
4909
4910// ---------------------------------------------------------------------------
4911// Agent registry — file-based cross-process discovery (#111)
4912// ---------------------------------------------------------------------------
4913
4914#[derive(Deserialize, Default)]
4915struct RegistryRegisterParams {
4916    /// Caller serializes their AgentEntry as a JSON value; we
4917    /// re-serialize it so the ffi-common helper can validate the
4918    /// shape with the same parser used by the bindings.
4919    entry: Value,
4920    #[serde(default)]
4921    registry_path: Option<std::path::PathBuf>,
4922}
4923
4924fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4925    let params: RegistryRegisterParams =
4926        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4927    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4928    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4929    Ok(Value::Null)
4930}
4931
4932#[derive(Deserialize, Default)]
4933struct RegistryNameParams {
4934    name: String,
4935    #[serde(default)]
4936    registry_path: Option<std::path::PathBuf>,
4937}
4938
4939fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4940    let params: RegistryNameParams =
4941        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4942    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4943    serde_json::from_str(&json).map_err(|e| e.to_string())
4944}
4945
4946fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4947    let params: RegistryNameParams =
4948        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4949    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4950    Ok(Value::Null)
4951}
4952
4953#[derive(Deserialize, Default)]
4954struct RegistryListParams {
4955    #[serde(default)]
4956    registry_path: Option<std::path::PathBuf>,
4957}
4958
4959fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4960    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4961    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4962    serde_json::from_str(&json).map_err(|e| e.to_string())
4963}
4964
4965#[derive(Deserialize, Default)]
4966struct RegistryReapParams {
4967    /// Heartbeats older than this many seconds are reaped. Default
4968    /// 60 — two missed 20s heartbeats trigger removal.
4969    #[serde(default = "default_reap_age")]
4970    max_age_secs: u64,
4971    #[serde(default)]
4972    registry_path: Option<std::path::PathBuf>,
4973}
4974
4975fn default_reap_age() -> u64 {
4976    60
4977}
4978
4979fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4980    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4981    let json =
4982        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4983    serde_json::from_str(&json).map_err(|e| e.to_string())
4984}
4985
4986// ---------------------------------------------------------------------------
4987// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4988// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4989// a2a_server_status — closes the binding gap noted in #126).
4990// ---------------------------------------------------------------------------
4991
4992async fn handle_a2a_start(
4993    req: &JsonRpcMessage,
4994    session: &crate::session::ClientSession,
4995) -> Result<Value, String> {
4996    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4997    // Always hand the session's runtime through. start_a2a uses it
4998    // only when `share_session_runtime: true` is set in params;
4999    // otherwise it falls back to the legacy fresh-Runtime + agent_basics
5000    // path. Passing it unconditionally keeps the FFI layer ignorant of
5001    // the flag's plumbing.
5002    let json = crate::a2a::start_a2a(&params_json, Some(session.runtime.clone())).await?;
5003    serde_json::from_str(&json).map_err(|e| e.to_string())
5004}
5005
5006fn handle_a2a_stop() -> Result<Value, String> {
5007    let json = crate::a2a::stop_a2a()?;
5008    serde_json::from_str(&json).map_err(|e| e.to_string())
5009}
5010
5011fn handle_a2a_status() -> Result<Value, String> {
5012    let json = crate::a2a::a2a_status()?;
5013    serde_json::from_str(&json).map_err(|e| e.to_string())
5014}
5015
5016#[derive(Deserialize)]
5017#[serde(rename_all = "camelCase")]
5018struct A2aSendParams {
5019    endpoint: String,
5020    message: car_a2a::Message,
5021    #[serde(default)]
5022    blocking: bool,
5023    #[serde(default = "default_true")]
5024    ingest_a2ui: bool,
5025    #[serde(default)]
5026    route_auth: Option<A2aRouteAuth>,
5027    #[serde(default)]
5028    allow_untrusted_endpoint: bool,
5029}
5030
5031fn default_true() -> bool {
5032    true
5033}
5034
5035/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
5036/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
5037/// on `ServerState`. Closes Parslee-ai/car-releases#28.
5038///
5039/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
5040/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
5041/// transport-neutral surface — the standalone `start_a2a_listener`
5042/// HTTP path serves SSE for those, but the in-core WS surface is
5043/// JSON-RPC only. Same trade as the dispatcher itself.
5044async fn handle_a2a_dispatch(
5045    method: &str,
5046    req: &JsonRpcMessage,
5047    state: &Arc<ServerState>,
5048) -> Result<Value, String> {
5049    let dispatcher = state.a2a_dispatcher().await;
5050    dispatcher
5051        .dispatch(method, req.params.clone())
5052        .await
5053        .map_err(|e| e.to_string())
5054}
5055
5056async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
5057    let params: A2aSendParams =
5058        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5059    let endpoint = trusted_route_endpoint(
5060        Some(params.endpoint.clone()),
5061        params.allow_untrusted_endpoint,
5062    )
5063    .ok_or_else(|| {
5064        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
5065    })?;
5066    let client = match params.route_auth.clone() {
5067        Some(auth) => {
5068            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
5069        }
5070        None => car_a2a::A2aClient::new(endpoint.clone()),
5071    };
5072    let result = client
5073        .send_message(params.message, params.blocking)
5074        .await
5075        .map_err(|e| e.to_string())?;
5076    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
5077    let mut applied = Vec::new();
5078    if params.ingest_a2ui {
5079        state
5080            .a2ui
5081            .validate_payload(&result_value)
5082            .map_err(|e| e.to_string())?;
5083        let routed_endpoint = Some(endpoint.clone());
5084        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
5085            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
5086                if owner.endpoint.is_none() {
5087                    owner.with_endpoint(routed_endpoint.clone())
5088                } else {
5089                    owner
5090                }
5091            });
5092            applied.push(
5093                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
5094            );
5095        }
5096    }
5097    Ok(serde_json::json!({
5098        "result": result,
5099        "a2ui": {
5100            "applied": applied,
5101        }
5102    }))
5103}
5104
5105// ---------------------------------------------------------------------------
5106// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
5107// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
5108// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
5109// vision_ocr.
5110// ---------------------------------------------------------------------------
5111
5112async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5113    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5114    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5115    serde_json::from_str(&json).map_err(|e| e.to_string())
5116}
5117
5118async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5119    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5120    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5121    serde_json::from_str(&json).map_err(|e| e.to_string())
5122}
5123
5124async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5125    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5126    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5127    serde_json::from_str(&json).map_err(|e| e.to_string())
5128}
5129
5130async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5131    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5132    let json = car_ffi_common::notifications::local(&args_json).await?;
5133    serde_json::from_str(&json).map_err(|e| e.to_string())
5134}
5135
5136async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5137    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5138    let json = car_ffi_common::vision::ocr(&args_json).await?;
5139    serde_json::from_str(&json).map_err(|e| e.to_string())
5140}
5141
5142// ---------------------------------------------------------------------------
5143// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
5144// ---------------------------------------------------------------------------
5145
5146async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5147    // Observe-only mode (Parslee-ai/car-releases#44): a second
5148    // car-server on the host can't take the supervisor lock, so it
5149    // can't drive `Supervisor::list` — but it can still answer
5150    // `agents.list` by reading the on-disk manifest directly. The
5151    // `attached` decoration is local to whichever daemon the caller
5152    // is talking to, so observer-mode entries return `attached:
5153    // false` (this daemon hasn't received `session.auth` from those
5154    // children; the primary one has).
5155    let agents = match state.observer_manifest_path() {
5156        Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5157            .map_err(|e| e.to_string())?,
5158        None => {
5159            let supervisor = state.supervisor()?;
5160            supervisor.list().await
5161        }
5162    };
5163    // Decorate each entry with `attached` + `session_id` so operators
5164    // see whether the supervised process has actually called
5165    // `session.auth { agent_id }` and bound a WS connection (#169) —
5166    // the lifecycle status (`Running`, etc.) only reports the
5167    // process-level view, which can't tell "alive but never
5168    // attached" from "alive and attached".
5169    let attached = state.attached_agents.lock().await.clone();
5170    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5171    for a in agents {
5172        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5173        let session_id = attached.get(&a.spec.id).cloned();
5174        if let Some(map) = v.as_object_mut() {
5175            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5176            if let Some(sid) = session_id {
5177                map.insert("session_id".to_string(), Value::String(sid));
5178            }
5179        }
5180        decorated.push(v);
5181    }
5182    Ok(Value::Array(decorated))
5183}
5184
5185async fn handle_agents_upsert(
5186    req: &JsonRpcMessage,
5187    state: &Arc<ServerState>,
5188) -> Result<Value, String> {
5189    let mut params = req.params.clone();
5190    // Optional `interpreter` sugar (#171). When present, the
5191    // supervisor resolves the bare program name (`"node"`,
5192    // `"python"`, …) against `$PATH` and writes the absolute path
5193    // into `command` *before* validation. This keeps the strict
5194    // no-PATH-lookup rule at upsert time while letting callers
5195    // stop hand-coding `/opt/homebrew/bin/node` into every
5196    // agents.json entry. Resolution happens once; subsequent PATH
5197    // changes do not silently rewire the binding.
5198    if let Some(name) = params
5199        .get("interpreter")
5200        .and_then(|v| v.as_str())
5201        .map(str::to_string)
5202    {
5203        let resolved =
5204            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5205        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5206    }
5207    let spec: car_registry::supervisor::AgentSpec =
5208        serde_json::from_value(params).map_err(|e| e.to_string())?;
5209    let supervisor = state.supervisor()?;
5210    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5211    serde_json::to_value(agent).map_err(|e| e.to_string())
5212}
5213
5214/// `agents.install` — install a contributed-agent manifest
5215/// (Parslee-ai/car#182 phase 3). Caller passes the parsed
5216/// `AgentManifest` JSON; the daemon runs install-time validation
5217/// (`car_min_version`, capability negotiation against the daemon's
5218/// own advertisement) and adopts the manifest. Returns
5219/// `{ report, agent? }` where `agent` is the spawnable
5220/// `ManagedAgent` for `external_process` transports and absent for
5221/// `pure_data` / health_url-only manifests.
5222///
5223/// The host capability advertisement comes from
5224/// `HostCapabilities::daemon_default(car_version)` — operators that
5225/// want a tighter advertisement go through a future config phase;
5226/// this MVP uses the runtime's natural surface.
5227async fn handle_agents_install(
5228    req: &JsonRpcMessage,
5229    state: &Arc<ServerState>,
5230) -> Result<Value, String> {
5231    let manifest: car_registry::manifest::AgentManifest =
5232        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5233    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5234    let supervisor = state.supervisor()?;
5235    let (report, managed) = supervisor
5236        .install_manifest(manifest, &host)
5237        .await
5238        .map_err(|e| e.to_string())?;
5239    Ok(serde_json::json!({
5240        "report": {
5241            "missingOptional": report
5242                .missing_optional
5243                .iter()
5244                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5245                .collect::<Vec<_>>(),
5246        },
5247        "agent": managed,
5248    }))
5249}
5250
5251async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5252    // Observe-only mode (Parslee-ai/car-releases#44) — see
5253    // `handle_agents_list` for the rationale. The health view is a
5254    // pure function of each entry's `command` plus the on-disk
5255    // sandbox rules, so reading from the manifest is equivalent to
5256    // calling the live supervisor's `health()`.
5257    let entries = match state.observer_manifest_path() {
5258        Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5259            .map_err(|e| e.to_string())?,
5260        None => {
5261            let supervisor = state.supervisor()?;
5262            supervisor.health().await
5263        }
5264    };
5265    serde_json::to_value(entries).map_err(|e| e.to_string())
5266}
5267
5268fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5269    req.params
5270        .get("id")
5271        .and_then(Value::as_str)
5272        .map(str::to_string)
5273        .ok_or_else(|| "missing required `id` parameter".to_string())
5274}
5275
5276async fn handle_agents_remove(
5277    req: &JsonRpcMessage,
5278    state: &Arc<ServerState>,
5279) -> Result<Value, String> {
5280    let id = extract_agent_id(req)?;
5281    let supervisor = state.supervisor()?;
5282    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5283    Ok(serde_json::json!({ "removed": removed }))
5284}
5285
5286async fn handle_agents_start(
5287    req: &JsonRpcMessage,
5288    state: &Arc<ServerState>,
5289) -> Result<Value, String> {
5290    let id = extract_agent_id(req)?;
5291    let supervisor = state.supervisor()?;
5292    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5293    serde_json::to_value(agent).map_err(|e| e.to_string())
5294}
5295
5296async fn handle_agents_stop(
5297    req: &JsonRpcMessage,
5298    state: &Arc<ServerState>,
5299) -> Result<Value, String> {
5300    let id = extract_agent_id(req)?;
5301    let signal: car_registry::supervisor::StopSignal = req
5302        .params
5303        .get("signal")
5304        .map(|v| serde_json::from_value(v.clone()))
5305        .transpose()
5306        .map_err(|e| e.to_string())?
5307        .unwrap_or_default();
5308    let supervisor = state.supervisor()?;
5309    let agent = supervisor
5310        .stop(&id, signal)
5311        .await
5312        .map_err(|e| e.to_string())?;
5313    serde_json::to_value(agent).map_err(|e| e.to_string())
5314}
5315
5316async fn handle_agents_restart(
5317    req: &JsonRpcMessage,
5318    state: &Arc<ServerState>,
5319) -> Result<Value, String> {
5320    let id = extract_agent_id(req)?;
5321    let supervisor = state.supervisor()?;
5322    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5323    serde_json::to_value(agent).map_err(|e| e.to_string())
5324}
5325
5326async fn handle_agents_tail_log(
5327    req: &JsonRpcMessage,
5328    state: &Arc<ServerState>,
5329) -> Result<Value, String> {
5330    let id = extract_agent_id(req)?;
5331    let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5332    let supervisor = state.supervisor()?;
5333    let lines = supervisor
5334        .tail_log(&id, n)
5335        .await
5336        .map_err(|e| e.to_string())?;
5337    Ok(serde_json::json!({ "lines": lines }))
5338}
5339
5340// ---------------------------------------------------------------------------
5341// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
5342//
5343// Discovery surface for agentic CLIs the user has already installed and
5344// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
5345// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
5346// stdio adapter. The cache lives in car_ffi_common::external_agents so
5347// the in-process FFI singletons share the same snapshot.
5348// ---------------------------------------------------------------------------
5349
5350async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5351    let include_health = req
5352        .params
5353        .get("include_health")
5354        .and_then(Value::as_bool)
5355        .unwrap_or(false);
5356    let json = car_ffi_common::external_agents::list(include_health).await?;
5357    serde_json::from_str(&json).map_err(|e| e.to_string())
5358}
5359
5360async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5361    let include_health = req
5362        .params
5363        .get("include_health")
5364        .and_then(Value::as_bool)
5365        .unwrap_or(false);
5366    let json = car_ffi_common::external_agents::detect(include_health).await?;
5367    serde_json::from_str(&json).map_err(|e| e.to_string())
5368}
5369
5370/// Per-task invocation of an external CLI agent. Required params:
5371/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
5372/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
5373///
5374/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
5375/// ids return `is_error: true` with a structured `error` so hosts
5376/// can surface the gap without a separate error code.
5377///
5378/// Phase 2 stage 4a (governance): every invocation appends a
5379/// structured audit record to `~/.car/external-agents.jsonl`. The
5380/// record captures id, task, options, result, and the full
5381/// `tool_uses` list the assistant emitted — so even though the
5382/// agent executes its built-in tools in-process (which we can't
5383/// gate via stream-json), there's a complete after-the-fact audit
5384/// trail. Full policy gating (proposing each tool_use to CAR's
5385/// validator + getting a yes/no) requires the MCP server route in
5386/// stage 4b.
5387async fn handle_agents_invoke_external(
5388    req: &JsonRpcMessage,
5389    state: &Arc<ServerState>,
5390    host_session: &Arc<crate::session::ClientSession>,
5391) -> Result<Value, String> {
5392    let id = req
5393        .params
5394        .get("id")
5395        .and_then(Value::as_str)
5396        .ok_or_else(|| "missing required `id` parameter".to_string())?
5397        .to_string();
5398    let task = req
5399        .params
5400        .get("task")
5401        .and_then(Value::as_str)
5402        .ok_or_else(|| "missing required `task` parameter".to_string())?
5403        .to_string();
5404    let stream = req
5405        .params
5406        .get("stream")
5407        .and_then(Value::as_bool)
5408        .unwrap_or(false);
5409    let session_id = req
5410        .params
5411        .get("session_id")
5412        .and_then(Value::as_str)
5413        .map(str::to_string)
5414        .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5415
5416    // Build the options sub-object directly from req.params so
5417    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
5418    // `timeout_secs` as siblings of `id`/`task`. Strip the
5419    // dispatch + streaming fields so they don't pollute the
5420    // options serde.
5421    let mut options_value = req.params.clone();
5422    if let Some(obj) = options_value.as_object_mut() {
5423        obj.remove("id");
5424        obj.remove("task");
5425        obj.remove("stream");
5426        obj.remove("session_id");
5427        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
5428        // caller didn't supply one. This is the load-bearing
5429        // wiring of MCP-4: external agents get CAR's tools (memory,
5430        // skills, verify) routed through the daemon's policy +
5431        // shared memgine without any per-call host configuration.
5432        // Callers who want to opt out can pass `"mcp_endpoint": ""`
5433        // (empty string) — the runner skips the temp-file write
5434        // when the value isn't a non-empty URL.
5435        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5436        if !has_explicit_mcp {
5437            if let Some(url) = state.mcp_url.get() {
5438                obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5439            }
5440        }
5441    }
5442
5443    if !stream {
5444        // Legacy one-shot path. Unchanged shape for FFI consumers
5445        // and any caller that hasn't opted into streaming.
5446        let options_json = options_value.to_string();
5447        let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5448        let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5449        append_external_agent_audit(&id, &task, &options_value, &result);
5450        return Ok(result);
5451    }
5452
5453    // Streaming path. Returns an ack ({accepted, session_id})
5454    // immediately and streams `agents.chat.event` notifications
5455    // to the host's WS as the runner emits StreamEvents. Reuses
5456    // the chat_sessions routing infrastructure supervised agents
5457    // use — host UIs render both kinds through the same path.
5458    let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5459        .map_err(|e| format!("invalid options: {e}"))?;
5460
5461    // Register the chat session BEFORE spawning so the host's
5462    // subscriber is correctly bound to this session_id by the
5463    // time the first event arrives. Reusing the supervised
5464    // agent chat infrastructure means `agents.chat.cancel`
5465    // routes to chat_sessions[session_id] and can find this
5466    // entry — though we don't currently honor cancel for
5467    // external invocations (the child process is killed only
5468    // on timeout or task drop; a future iteration can plumb a
5469    // CancellationToken through invoke_with_emitter).
5470    {
5471        // If a chat session is already registered for this id (the
5472        // typical proxy shape: host → agents.chat → supervised agent
5473        // → agents.invoke_external with the same session_id), DO NOT
5474        // overwrite it. The existing entry owns the routing to the
5475        // original host; clobbering it with our caller's client_id
5476        // would send streaming events to the proxying agent instead
5477        // of back to the host that issued agents.chat. Only register
5478        // a fresh entry when the slot is empty (a direct
5479        // host-to-invoke_external call without a prior agents.chat).
5480        let mut chats = state.chat_sessions.lock().await;
5481        chats.entry(session_id.clone()).or_insert_with(|| {
5482            let created_at = std::time::SystemTime::now()
5483                .duration_since(std::time::UNIX_EPOCH)
5484                .map(|d| d.as_secs())
5485                .unwrap_or(0);
5486            crate::session::ChatSession {
5487                agent_id: id.clone(),
5488                host_client_id: host_session.client_id.clone(),
5489                created_at,
5490            }
5491        });
5492    }
5493
5494    // Single drain task pulls StreamEvents off an unbounded
5495    // channel and serializes WS sends to the host. Per-event
5496    // tokio::spawn would let sends race (which token arrives
5497    // first depends on lock acquisition order). The channel
5498    // is unbounded because claude's event volume is bounded
5499    // by user turn count — typically <50 events per invocation.
5500    use tokio::sync::mpsc;
5501    let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5502
5503    let drain_state = state.clone();
5504    let drain_session_id = session_id.clone();
5505    let drain_agent_id = id.clone();
5506    tokio::spawn(async move {
5507        while let Some(event) = rx.recv().await {
5508            emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5509        }
5510    });
5511
5512    let emitter_tx = tx.clone();
5513    let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5514        // Send failure (rx dropped) means the drain task has
5515        // exited — usually because the host disconnected. The
5516        // runner will keep going; let it finish so the audit
5517        // log captures the full result.
5518        let _ = emitter_tx.send(event);
5519    });
5520
5521    // Run the invocation in a separate task so this handler
5522    // can return the ack right away. The runner's child-process
5523    // future owns the spawn lifetime; if the host disconnects
5524    // mid-stream, the runner still completes (its events fall
5525    // on the floor at the drain layer) so the audit log lands.
5526    let spawn_state = state.clone();
5527    let spawn_session_id = session_id.clone();
5528    let spawn_id = id.clone();
5529    let spawn_task = task.clone();
5530    let spawn_options = options_value.clone();
5531    tokio::spawn(async move {
5532        let outcome =
5533            car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5534                .await;
5535        drop(tx); // signal drain task to exit after queue empties
5536
5537        // Synthesize a terminal agents.chat.event so the host's
5538        // bubble finalizes. The runner doesn't emit a "done" event
5539        // itself — the result is the aggregate InvokeResult. We
5540        // translate it here.
5541        let terminal_params: Value;
5542        let result_value: Value;
5543        match outcome {
5544            Ok(res) => {
5545                // Pack the metadata into `finish_reason` as a
5546                // human-readable summary so the host's existing
5547                // ChatEvent decoder surfaces it without a schema
5548                // change. Hosts that want structured data can
5549                // re-issue `agents.invoke_external` with
5550                // `stream: false` and read the InvokeResult.
5551                let mut parts: Vec<String> = Vec::new();
5552                if res.turns > 0 {
5553                    parts.push(format!(
5554                        "{} turn{}",
5555                        res.turns,
5556                        if res.turns == 1 { "" } else { "s" }
5557                    ));
5558                }
5559                if res.tool_calls > 0 {
5560                    parts.push(format!(
5561                        "{} tool{}",
5562                        res.tool_calls,
5563                        if res.tool_calls == 1 { "" } else { "s" }
5564                    ));
5565                }
5566                if res.duration_ms > 0 {
5567                    parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5568                }
5569                let summary = if parts.is_empty() {
5570                    "stop".to_string()
5571                } else {
5572                    parts.join(" · ")
5573                };
5574                if res.is_error {
5575                    terminal_params = serde_json::json!({
5576                        "session_id": spawn_session_id,
5577                        "agent_id": spawn_id,
5578                        "kind": "error",
5579                        "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5580                    });
5581                } else {
5582                    terminal_params = serde_json::json!({
5583                        "session_id": spawn_session_id,
5584                        "agent_id": spawn_id,
5585                        "kind": "done",
5586                        "finish_reason": summary,
5587                    });
5588                }
5589                result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5590            }
5591            Err(e) => {
5592                let message = format!("{e}");
5593                terminal_params = serde_json::json!({
5594                    "session_id": spawn_session_id,
5595                    "agent_id": spawn_id,
5596                    "kind": "error",
5597                    "error": message.clone(),
5598                });
5599                result_value = serde_json::json!({ "is_error": true, "error": message });
5600            }
5601        }
5602        send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5603        spawn_state
5604            .chat_sessions
5605            .lock()
5606            .await
5607            .remove(&spawn_session_id);
5608        append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5609    });
5610
5611    Ok(serde_json::json!({
5612        "accepted": true,
5613        "session_id": session_id,
5614    }))
5615}
5616
5617/// Translate one [`StreamEvent`] from the running external CLI
5618/// into an `agents.chat.event` notification on the originating
5619/// host's WS. Same wire shape supervised agents emit, so host
5620/// UIs render both kinds with one decoder.
5621///
5622/// Mapping:
5623/// - `Assistant` events with `text` content blocks → `kind: "token"`
5624///   per text block. Each block carries the full text the
5625///   assistant emitted in that turn (claude doesn't expose
5626///   word-level deltas via stream-json — it emits per-turn or
5627///   per-content-block chunks).
5628/// - `Assistant` events with `tool_use` blocks → `kind: "tool_call"`
5629///   per block (tool name in `detail`).
5630/// - `System` / `User` / `Result` / others → dropped (Result's
5631///   metadata is folded into the terminal `done` event the
5632///   outer task emits when the invocation finishes).
5633async fn emit_external_chat_event(
5634    state: &Arc<ServerState>,
5635    session_id: &str,
5636    agent_id: &str,
5637    event: car_external_agents::StreamEvent,
5638) {
5639    use car_external_agents::StreamEvent;
5640    match event {
5641        StreamEvent::Assistant(a) => {
5642            if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5643                for block in content {
5644                    let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5645                    match block_type {
5646                        "text" => {
5647                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5648                                if !text.is_empty() {
5649                                    let params = serde_json::json!({
5650                                        "session_id": session_id,
5651                                        "agent_id": agent_id,
5652                                        "kind": "token",
5653                                        "delta": text,
5654                                    });
5655                                    send_external_chat_frame(state, session_id, params).await;
5656                                }
5657                            }
5658                        }
5659                        "tool_use" => {
5660                            let name = block
5661                                .get("name")
5662                                .and_then(|v| v.as_str())
5663                                .unwrap_or("(unknown tool)");
5664                            let params = serde_json::json!({
5665                                "session_id": session_id,
5666                                "agent_id": agent_id,
5667                                "kind": "tool_call",
5668                                "detail": name,
5669                            });
5670                            send_external_chat_frame(state, session_id, params).await;
5671                        }
5672                        _ => {}
5673                    }
5674                }
5675            }
5676        }
5677        _ => {
5678            // System (session id init), User (tool result echo),
5679            // Result (final aggregate — folded into terminal
5680            // `done` event by the outer task), RateLimitEvent,
5681            // Other: not surfaced to host.
5682        }
5683    }
5684}
5685
5686/// Send a single `agents.chat.event` notification to the host
5687/// session bound by `session_id`. Best-effort: a missing route
5688/// or a closed WS is silently dropped, the runner continues so
5689/// the audit log lands.
5690async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5691    use futures::SinkExt;
5692    use tokio_tungstenite::tungstenite::Message;
5693
5694    let host_client_id = state
5695        .chat_sessions
5696        .lock()
5697        .await
5698        .get(session_id)
5699        .map(|s| s.host_client_id.clone());
5700    let Some(host_client_id) = host_client_id else {
5701        return;
5702    };
5703    let host_channel = {
5704        let sessions = state.sessions.lock().await;
5705        sessions.get(&host_client_id).map(|s| s.channel.clone())
5706    };
5707    let Some(channel) = host_channel else {
5708        return;
5709    };
5710    let frame = serde_json::json!({
5711        "jsonrpc": "2.0",
5712        "method": "agents.chat.event",
5713        "params": params,
5714    });
5715    if let Ok(text) = serde_json::to_string(&frame) {
5716        let _ = channel
5717            .write
5718            .lock()
5719            .await
5720            .send(Message::Text(text.into()))
5721            .await;
5722    }
5723}
5724
5725/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
5726/// Best-effort: a failure to open the journal must NOT fail the
5727/// invocation; the in-memory result already returned is the
5728/// authoritative answer. Logs at warn level when the write fails so
5729/// operators notice repeated failures.
5730fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5731    use std::io::Write;
5732    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5733        Some(home) => home.join(".car"),
5734        None => return,
5735    };
5736    if std::fs::create_dir_all(&car_dir).is_err() {
5737        return;
5738    }
5739    let path = car_dir.join("external-agents.jsonl");
5740    let record = serde_json::json!({
5741        "ts": chrono::Utc::now().to_rfc3339(),
5742        "adapter_id": id,
5743        "task": task,
5744        "options": options,
5745        "result": result,
5746    });
5747    let line = match serde_json::to_string(&record) {
5748        Ok(s) => s,
5749        Err(_) => return,
5750    };
5751    if let Ok(mut f) = std::fs::OpenOptions::new()
5752        .create(true)
5753        .append(true)
5754        .open(&path)
5755    {
5756        let _ = writeln!(f, "{}", line);
5757    } else {
5758        tracing::warn!(
5759            path = %path.display(),
5760            "failed to append external-agent audit record"
5761        );
5762    }
5763}
5764
5765/// Ground-truth health check. Optional `id` param picks one tool;
5766/// without it, every detected adapter is checked. `force: true`
5767/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
5768/// credential-file shape heuristic as the load-bearing signal for
5769/// "is this tool ready to invoke."
5770async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5771    let force = req
5772        .params
5773        .get("force")
5774        .and_then(Value::as_bool)
5775        .unwrap_or(false);
5776    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5777        let json = car_ffi_common::external_agents::health_one(id, force).await?;
5778        serde_json::from_str(&json).map_err(|e| e.to_string())
5779    } else {
5780        let json = car_ffi_common::external_agents::health(force).await?;
5781        serde_json::from_str(&json).map_err(|e| e.to_string())
5782    }
5783}
5784
5785// ---------------------------------------------------------------------------
5786// agents.chat — unified chat surface (docs/proposals/agent-chat-surface.md)
5787// ---------------------------------------------------------------------------
5788//
5789// Host calls `agents.chat { agent_id, prompt, session_id?, stream? }`.
5790// The server looks up the target agent's attached WS connection,
5791// reverse-calls `agent.chat { session_id, prompt, context }` on it
5792// (same pattern as `tools.execute`), and returns once the agent acks.
5793// The agent then streams `agent.chat.event` notifications back, which
5794// the dispatcher intercepts (see `try_forward_agent_chat_event`) and
5795// rewrites as `agents.chat.event` notifications on the originating
5796// host's channel.
5797
5798/// Timeout the server waits for the agent to ack `agent.chat`. The
5799/// streamed tokens come later as separate notifications and have no
5800/// bearing on this — this is just "did the agent receive the prompt
5801/// and accept it." Five seconds is generous for a local IPC ack.
5802const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5803
5804/// `agents.chat` — host issues a chat turn to a named agent. Returns
5805/// `{ accepted: true, session_id }` once the agent acks; streamed
5806/// tokens arrive on the host's channel as `agents.chat.event`
5807/// notifications keyed by the same `session_id`.
5808async fn handle_agents_chat(
5809    req: &JsonRpcMessage,
5810    state: &Arc<ServerState>,
5811    host_session: &Arc<crate::session::ClientSession>,
5812) -> Result<Value, String> {
5813    use futures::SinkExt;
5814    use tokio::sync::oneshot;
5815    use tokio_tungstenite::tungstenite::Message;
5816
5817    let agent_id = req
5818        .params
5819        .get("agent_id")
5820        .and_then(Value::as_str)
5821        .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5822        .to_string();
5823    let prompt = req
5824        .params
5825        .get("prompt")
5826        .and_then(Value::as_str)
5827        .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5828        .to_string();
5829    let session_id = req
5830        .params
5831        .get("session_id")
5832        .and_then(Value::as_str)
5833        .map(str::to_string)
5834        .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5835    let stream = req
5836        .params
5837        .get("stream")
5838        .and_then(Value::as_bool)
5839        .unwrap_or(true);
5840    let voice_input = req
5841        .params
5842        .get("voice_input")
5843        .and_then(Value::as_bool)
5844        .unwrap_or(false);
5845
5846    // Resolve the agent's attached WS channel via `attached_agents` →
5847    // `sessions` → `channel`. Both lookups must hit; a missing entry on
5848    // either side means the agent is registered in `agents.json` but
5849    // hasn't `session.auth`'d (or has disconnected), so refuse with a
5850    // structured error rather than silently parking the chat.
5851    let agent_client_id = state
5852        .attached_agents
5853        .lock()
5854        .await
5855        .get(&agent_id)
5856        .cloned()
5857        .ok_or_else(|| {
5858            format!(
5859                "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5860                agent_id
5861            )
5862        })?;
5863    let agent_channel = {
5864        let sessions = state.sessions.lock().await;
5865        sessions
5866            .get(&agent_client_id)
5867            .map(|s| s.channel.clone())
5868            .ok_or_else(|| {
5869                format!(
5870                    "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5871                    agent_id, agent_client_id
5872                )
5873            })?
5874    };
5875
5876    // Register the chat session BEFORE sending the reverse call so any
5877    // `agent.chat.event` notifications the agent sends as part of
5878    // accepting the chat (e.g. an immediate `token` delta) route
5879    // correctly. Indexed by session_id so the notification interceptor
5880    // can locate the originating host without scanning.
5881    {
5882        let created_at = std::time::SystemTime::now()
5883            .duration_since(std::time::UNIX_EPOCH)
5884            .map(|d| d.as_secs())
5885            .unwrap_or(0);
5886        state.chat_sessions.lock().await.insert(
5887            session_id.clone(),
5888            crate::session::ChatSession {
5889                agent_id: agent_id.clone(),
5890                host_client_id: host_session.client_id.clone(),
5891                created_at,
5892            },
5893        );
5894    }
5895
5896    // Reverse-callback: register a oneshot for the ack, send the
5897    // `agent.chat` JSON-RPC request on the agent's channel, await up
5898    // to AGENT_CHAT_ACK_TIMEOUT_SECS. Uses the same `pending` map the
5899    // tool-callback path uses (`WsToolExecutor`) — the dispatcher's
5900    // response demuxer at the top of `run_dispatch` already routes
5901    // `result` / `error` frames keyed by request id back through it.
5902    let request_id = agent_channel.next_request_id();
5903    let (tx, rx) = oneshot::channel();
5904    agent_channel
5905        .pending
5906        .lock()
5907        .await
5908        .insert(request_id.clone(), tx);
5909
5910    let rpc_request = serde_json::json!({
5911        "jsonrpc": "2.0",
5912        "method": "agent.chat",
5913        "params": {
5914            "session_id": session_id,
5915            "prompt": prompt,
5916            "stream": stream,
5917            "context": {
5918                "host_client_id": host_session.client_id,
5919                "voice_input": voice_input,
5920            },
5921        },
5922        "id": request_id,
5923    });
5924    let msg = Message::Text(
5925        serde_json::to_string(&rpc_request)
5926            .map_err(|e| e.to_string())?
5927            .into(),
5928    );
5929    if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5930        // Send failed — drop the pending waiter and the chat session
5931        // entry so a retry can take a fresh session_id without
5932        // colliding.
5933        agent_channel.pending.lock().await.remove(&request_id);
5934        state.chat_sessions.lock().await.remove(&session_id);
5935        return Err(format!(
5936            "failed to deliver agent.chat to `{}`: {}",
5937            agent_id, e
5938        ));
5939    }
5940
5941    // Await the agent's ack. The dispatcher's response demuxer routes
5942    // the result/error back via the oneshot. Timeout means the agent
5943    // is alive but unresponsive — clean up routing state and surface a
5944    // structured error so the host UI doesn't hang.
5945    let ack = match tokio::time::timeout(
5946        std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5947        rx,
5948    )
5949    .await
5950    {
5951        Ok(Ok(resp)) => resp,
5952        Ok(Err(_)) => {
5953            // Channel closed — agent disconnected mid-call.
5954            state.chat_sessions.lock().await.remove(&session_id);
5955            return Err(format!(
5956                "agent `{}` disconnected before acking agents.chat",
5957                agent_id
5958            ));
5959        }
5960        Err(_) => {
5961            // Timeout — agent didn't respond in time. Don't keep the
5962            // chat session around: any later events from the agent
5963            // would route to a host that already returned an error.
5964            agent_channel.pending.lock().await.remove(&request_id);
5965            state.chat_sessions.lock().await.remove(&session_id);
5966            return Err(format!(
5967                "agent `{}` did not ack agents.chat within {}s",
5968                agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5969            ));
5970        }
5971    };
5972
5973    if let Some(err) = ack.error {
5974        // Agent explicitly rejected — drop the session and propagate.
5975        state.chat_sessions.lock().await.remove(&session_id);
5976        return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5977    }
5978
5979    Ok(serde_json::json!({
5980        "accepted": true,
5981        "session_id": session_id,
5982    }))
5983}
5984
5985/// `agents.chat.cancel` — host aborts an in-flight chat. Forwards
5986/// `agent.chat.cancel` to the bound agent so the agent can short-
5987/// circuit its inference stream + free upstream resources
5988/// (`inference.stream.cancel`). The chat session is dropped from
5989/// routing state immediately whether or not the agent acks the cancel
5990/// — further `agent.chat.event` notifications for this session_id
5991/// fall on the floor by design.
5992async fn handle_agents_chat_cancel(
5993    req: &JsonRpcMessage,
5994    state: &Arc<ServerState>,
5995) -> Result<Value, String> {
5996    use futures::SinkExt;
5997    use tokio_tungstenite::tungstenite::Message;
5998
5999    let session_id = req
6000        .params
6001        .get("session_id")
6002        .and_then(Value::as_str)
6003        .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
6004        .to_string();
6005
6006    let chat = state.chat_sessions.lock().await.remove(&session_id);
6007    let chat = match chat {
6008        Some(c) => c,
6009        None => {
6010            // Already cancelled or never existed — idempotent.
6011            return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
6012        }
6013    };
6014
6015    // Best-effort fire-and-forget to the agent. We've already removed
6016    // the routing entry, so no need to await any agent response.
6017    let agent_client_id = state
6018        .attached_agents
6019        .lock()
6020        .await
6021        .get(&chat.agent_id)
6022        .cloned();
6023    if let Some(client_id) = agent_client_id {
6024        let channel_opt = {
6025            let sessions = state.sessions.lock().await;
6026            sessions.get(&client_id).map(|s| s.channel.clone())
6027        };
6028        if let Some(channel) = channel_opt {
6029            let notification = serde_json::json!({
6030                "jsonrpc": "2.0",
6031                "method": "agent.chat.cancel",
6032                "params": { "session_id": session_id },
6033            });
6034            if let Ok(text) = serde_json::to_string(&notification) {
6035                let _ = channel
6036                    .write
6037                    .lock()
6038                    .await
6039                    .send(Message::Text(text.into()))
6040                    .await;
6041            }
6042        }
6043    }
6044
6045    Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
6046}
6047
6048/// Forward an `agent.chat.event` notification from an agent's
6049/// connection to the originating host's connection, rewritten as an
6050/// `agents.chat.event` notification. Returns `true` if the inbound
6051/// frame was a chat-event we routed (so the dispatcher can `continue`
6052/// past the normal method dispatch and skip the wasted "unknown
6053/// method" response), `false` otherwise.
6054///
6055/// Terminal events (`kind: "done"` / `"error"`) also drop the routing
6056/// entry from `state.chat_sessions` so a later stray notification can
6057/// be rejected as orphaned without leaking memory.
6058pub(crate) async fn try_forward_agent_chat_event(
6059    parsed: &JsonRpcMessage,
6060    state: &Arc<ServerState>,
6061) -> bool {
6062    use futures::SinkExt;
6063    use tokio_tungstenite::tungstenite::Message;
6064
6065    // Notification predicate: method is `agent.chat.event`, id is
6066    // missing/null (per JSON-RPC, notifications have no id), and
6067    // params carry a session_id.
6068    let Some(method) = parsed.method.as_deref() else {
6069        return false;
6070    };
6071    if method != "agent.chat.event" {
6072        return false;
6073    }
6074    if !parsed.id.is_null() {
6075        // Has an id → it's a request, not a notification. Let the
6076        // normal dispatcher handle it (and reply with method-not-found).
6077        return false;
6078    }
6079    let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
6080        return false;
6081    };
6082    let session_id = session_id.to_string();
6083
6084    // Look up the routing entry. If gone (cancelled, agent dropped,
6085    // disconnect cleanup), drop the event silently — late frames from
6086    // a respawned agent for a stale session are not the host's
6087    // problem.
6088    let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
6089    let Some(chat) = chat else {
6090        return true; // recognized the method, but routing has dropped — consumed.
6091    };
6092
6093    // Pull the kind early so terminal-event cleanup runs even if the
6094    // host's send fails. Agents may omit `kind` and signal terminal
6095    // state via `finish_reason` / `error` instead (car#222) — derive
6096    // it from the frame shape so both the cleanup below AND the host
6097    // see a correct, host-protocol-compliant kind. The old code
6098    // defaulted a `finish_reason`-only "done" frame to "token", so
6099    // terminal cleanup never ran and the host (which requires `kind`)
6100    // dropped every frame silently.
6101    let kind = parsed
6102        .params
6103        .get("kind")
6104        .and_then(Value::as_str)
6105        .map(str::to_string)
6106        .unwrap_or_else(|| {
6107            if parsed.params.get("error").is_some() {
6108                "error".to_string()
6109            } else if parsed.params.get("finish_reason").is_some() {
6110                "done".to_string()
6111            } else {
6112                "token".to_string()
6113            }
6114        });
6115
6116    // Forward to the host. Rewrites the method name to the host-facing
6117    // form and attaches `agent_id` so the host doesn't have to remember
6118    // which agent owns each session.
6119    let host_channel = {
6120        let sessions = state.sessions.lock().await;
6121        sessions
6122            .get(&chat.host_client_id)
6123            .map(|s| s.channel.clone())
6124    };
6125    if let Some(channel) = host_channel {
6126        let mut params = parsed.params.clone();
6127        if let Some(obj) = params.as_object_mut() {
6128            obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6129            // host-protocol.md requires a top-level `kind` on every
6130            // agents.chat.event. Agents that omit it (signalling via
6131            // finish_reason/error) were dropped wholesale by the host
6132            // decoder — normalize here so the contract holds. car#222.
6133            obj.entry("kind")
6134                .or_insert_with(|| Value::String(kind.clone()));
6135        }
6136        let forward = serde_json::json!({
6137            "jsonrpc": "2.0",
6138            "method": "agents.chat.event",
6139            "params": params,
6140        });
6141        if let Ok(text) = serde_json::to_string(&forward) {
6142            let send_result = channel
6143                .write
6144                .lock()
6145                .await
6146                .send(Message::Text(text.into()))
6147                .await;
6148            if let Err(e) = send_result {
6149                tracing::warn!(
6150                    session_id = %session_id,
6151                    agent_id = %chat.agent_id,
6152                    host_client_id = %chat.host_client_id,
6153                    kind = %kind,
6154                    error = %e,
6155                    "agent.chat.event forward to host failed at the WS send step"
6156                );
6157            }
6158        }
6159    } else {
6160        // Host disconnected mid-stream — chat_sessions still holds
6161        // the routing entry but the originating client_id no longer
6162        // resolves to a session. Pre-#233 this was silent and the
6163        // operator had no way to tell whether the event was dropped
6164        // here or never emitted by the agent. Log + drop the
6165        // routing entry so subsequent stray events are no-ops.
6166        tracing::warn!(
6167            session_id = %session_id,
6168            agent_id = %chat.agent_id,
6169            host_client_id = %chat.host_client_id,
6170            kind = %kind,
6171            "agent.chat.event from supervised agent had no host channel \
6172             (host disconnected since `agents.chat`); dropping routing entry"
6173        );
6174        state.chat_sessions.lock().await.remove(&session_id);
6175        return true;
6176    }
6177
6178    // Terminal-kind cleanup. The host_channel branch above already
6179    // forwarded the terminal event; we just remove the routing entry
6180    // here so subsequent stray frames are no-ops.
6181    if matches!(kind.as_str(), "done" | "error") {
6182        state.chat_sessions.lock().await.remove(&session_id);
6183    }
6184
6185    true
6186}
6187
6188#[cfg(test)]
6189mod fd_leak_regression {
6190    //! car#209 regression: an abrupt transport error must still run
6191    //! the connection cleanup. Before the fix, `let msg = msg?;`
6192    //! propagated the read error out of `run_dispatch`, skipping
6193    //! `remove_session`, so `state.sessions` (holding the
6194    //! `Arc<ClientSession>` -> `Arc<WsChannel>` -> socket FD) leaked
6195    //! forever on every peer reset / crash-loop disconnect.
6196    use super::run_dispatch;
6197    use futures::SinkExt;
6198    use std::sync::Arc;
6199    use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6200
6201    #[tokio::test]
6202    async fn abrupt_read_error_still_runs_session_cleanup() {
6203        let tmp = tempfile::TempDir::new().unwrap();
6204        let state = Arc::new(crate::session::ServerState::standalone(
6205            tmp.path().to_path_buf(),
6206        ));
6207
6208        // Read stream that immediately yields a transport error (peer
6209        // reset), then ends -- the exact shape of an ungraceful
6210        // client disconnect.
6211        let read = futures::stream::iter(vec![Err::<Message, WsError>(
6212            WsError::ConnectionClosed,
6213        )]);
6214        let write: crate::session::WsSink = Box::pin(
6215            futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6216        );
6217
6218        let result =
6219            run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6220        assert!(
6221            result.is_ok(),
6222            "run_dispatch must return Ok after cleanup, got {result:?}"
6223        );
6224
6225        // The session (and its channel/FD) must be gone -- cleanup
6226        // ran despite the abrupt error.
6227        assert!(
6228            state.sessions.lock().await.is_empty(),
6229            "state.sessions must be empty after an abrupt disconnect (car#209)"
6230        );
6231    }
6232}