Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150        + Unpin
151        + Send,
152{
153    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154    tracing::Span::current().record("client_id", &client_id.as_str());
155
156    info!("New connection from {}", peer);
157
158    let channel = Arc::new(WsChannel {
159        write: Mutex::new(write),
160        pending: Mutex::new(HashMap::new()),
161        next_id: AtomicU64::new(1),
162    });
163
164    let session = state.create_session(&client_id, channel.clone()).await;
165
166    // car#209: per-request handlers are spawned detached (below) and
167    // each clones `Arc<ClientSession>` → `Arc<WsChannel>`. A bare
168    // `tokio::spawn` outlives the connection, so a slow/hung handler
169    // pins the split sink and the inbound socket lingers in CLOSED
170    // until the daemon hits EMFILE. Own them in a per-connection
171    // `JoinSet` so every in-flight handler is aborted the instant the
172    // WS drops (`abort_all` in the cleanup block; `JoinSet`'s Drop
173    // also aborts on any early return), releasing the FD immediately.
174    let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176    while let Some(msg) = read.next().await {
177        // Reap finished handlers so a long-lived connection doesn't
178        // retain their `JoinHandle`s unbounded (memory, not FDs).
179        while conn_tasks.try_join_next().is_some() {}
180
181        // car#209: on an abrupt transport error (peer reset / network
182        // drop — the trader crash-loop case) `read.next()` yields
183        // `Some(Err(_))`. The old `let msg = msg?;` propagated that
184        // out of the function, *skipping the cleanup block below*, so
185        // `state.sessions` (+ the host/a2ui/chat registries) kept the
186        // session + channel — and thus the socket FD — forever. Break
187        // instead: every disconnect, clean or not, runs the single
188        // cleanup path.
189        let msg = match msg {
190            Ok(m) => m,
191            Err(e) => {
192                info!("read error from {}: {}; closing", client_id, e);
193                break;
194            }
195        };
196        if msg.is_text() {
197            let text = match msg.to_text() {
198                Ok(t) => t,
199                Err(e) => {
200                    info!("non-text frame from {}: {}; closing", client_id, e);
201                    break;
202                }
203            };
204            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205                Ok(m) => m,
206                Err(e) => {
207                    send_response(
208                        &session.channel,
209                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210                    )
211                    .await
212                    .ok();
213                    continue;
214                }
215            };
216
217            // Is this a response to a pending tool callback?
218            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219                if let Some(id_str) = parsed.id.as_str() {
220                    let mut pending = session.channel.pending.lock().await;
221                    if let Some(tx) = pending.remove(id_str) {
222                        let tool_resp = if let Some(result) = parsed.result {
223                            ToolExecuteResponse {
224                                action_id: id_str.to_string(),
225                                output: Some(result),
226                                error: None,
227                            }
228                        } else {
229                            let err_msg = parsed
230                                .error
231                                .as_ref()
232                                .and_then(|e| e.get("message"))
233                                .and_then(|m| m.as_str())
234                                .unwrap_or("unknown error")
235                                .to_string();
236                            ToolExecuteResponse {
237                                action_id: id_str.to_string(),
238                                output: None,
239                                error: Some(err_msg),
240                            }
241                        };
242                        let _ = tx.send(tool_resp);
243                        continue;
244                    }
245                }
246            }
247
248            // Agent → host chat-event interceptor. `agent.chat.event`
249            // notifications coming up the WS from a connected agent
250            // are forwarded to the originating host's channel as
251            // `agents.chat.event`. Lives ahead of the regular method
252            // dispatch so the dispatcher doesn't reply with
253            // "method-not-found" on what is a fire-and-forget
254            // notification (no id). See
255            // `docs/proposals/agent-chat-surface.md`.
256            if try_forward_agent_chat_event(&parsed, &state).await {
257                continue;
258            }
259
260            // Otherwise it's a client request
261            if let Some(method) = &parsed.method {
262                info!(method = %method, "dispatching JSON-RPC method");
263
264                // Auth gate (Parslee-ai/car-releases#32). When the
265                // server has an auth token installed, every method
266                // other than `session.auth` is rejected on
267                // unauthenticated sessions and the connection is
268                // closed after the error response goes out. When no
269                // token is installed (default), this branch never
270                // fires — preserves pre-#32 behaviour.
271                if state.auth_token.get().is_some()
272                    && !session
273                        .authenticated
274                        .load(std::sync::atomic::Ordering::Acquire)
275                    && method != "session.auth"
276                {
277                    let resp = JsonRpcResponse::error(
278                        parsed.id.clone(),
279                        -32001,
280                        "auth required: send `session.auth` with the per-launch token \
281                         from ~/Library/Application Support/ai.parslee.car/auth-token \
282                         (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
283                         as the first frame on this connection",
284                    );
285                    let _ = send_response(&session.channel, resp).await;
286                    info!(client = %client_id, method = %method,
287                        "rejecting non-auth method on unauthenticated session; closing");
288                    break;
289                }
290
291                // Approval gate (audit 2026-05). High-risk methods —
292                // anything that drives macOS automation or sends
293                // messages on the user's behalf — must be acked by
294                // the user via `host.resolve_approval` before they
295                // dispatch. The gate raises an `approval.requested`
296                // host event the local UI can render approve/deny on,
297                // then parks until resolved or the configured
298                // timeout fires. Returns a JSON-RPC error and
299                // continues the dispatch loop on deny / timeout —
300                // no connection close, since the caller may want to
301                // retry with revised parameters.
302                if state.approval_gate.requires_approval(method.as_str()) {
303                    match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
304                        Ok(()) => {}
305                        Err(reason) => {
306                            let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
307                            let _ = send_response(&session.channel, resp).await;
308                            info!(
309                                client = %client_id,
310                                method = %method,
311                                reason = %reason,
312                                "approval gate blocked dispatch"
313                            );
314                            continue;
315                        }
316                    }
317                }
318
319                // Spawn the per-method dispatch in a task so the read
320                // loop keeps reading frames. Without this, methods
321                // that trigger server-initiated `tools.execute`
322                // callbacks (`proposal.submit`, `workflow.run`,
323                // `multi.*` paths that fire a registered tool)
324                // deadlock the connection: the handler awaits the
325                // callback response on a oneshot, but the response is
326                // another frame on this same read half — which the
327                // synchronous `.await` here would prevent the loop
328                // from ever picking up. Surfaced by the
329                // `executeProposal: echo tool via JS callback` smoke
330                // (#173). Response ordering becomes id-keyed (the
331                // JSON-RPC demuxing contract) rather than
332                // arrival-ordered.
333                let session_task = session.clone();
334                let state_task = state.clone();
335                let method_owned = method.clone();
336                let parsed_task = parsed;
337                // car#209: owned by the per-connection JoinSet so it's
338                // aborted on disconnect instead of leaking the channel.
339                conn_tasks.spawn(async move {
340                    let session = session_task;
341                    let state = state_task;
342                    let parsed = parsed_task;
343                    let result = match method_owned.as_str() {
344                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
345                        "parslee.auth" => handle_parslee_auth().await,
346                        "auth.start" => handle_auth_start(&parsed).await,
347                        "auth.complete" => handle_auth_complete(&parsed).await,
348                        "auth.status" => handle_auth_status().await,
349                        "auth.logout" => handle_auth_logout().await,
350                        "session.init" => handle_session_init(&parsed, &session).await,
351                        "host.subscribe" => handle_host_subscribe(&session, &state).await,
352                        "host.agents" => handle_host_agents(&session).await,
353                        "host.events" => handle_host_events(&parsed, &session).await,
354                        "host.approvals" => handle_host_approvals(&session).await,
355                        "host.register_agent" => {
356                            handle_host_register_agent(&parsed, &session).await
357                        }
358                        "host.unregister_agent" => {
359                            handle_host_unregister_agent(&parsed, &session).await
360                        }
361                        "host.set_status" => handle_host_set_status(&parsed, &session).await,
362                        "host.notify" => handle_host_notify(&parsed, &session).await,
363                        "host.request_approval" => {
364                            handle_host_request_approval(&parsed, &session).await
365                        }
366                        "host.resolve_approval" => {
367                            handle_host_resolve_approval(&parsed, &session).await
368                        }
369                        "tools.register" => handle_tools_register(&parsed, &session).await,
370                        "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
371                        "policy.register" => handle_policy_register(&parsed, &session).await,
372                        "session.policy.open" => handle_session_policy_open(&session).await,
373                        "session.policy.close" => {
374                            handle_session_policy_close(&parsed, &session).await
375                        }
376                        "verify" => handle_verify(&parsed, &session).await,
377                        "state.get" => handle_state_get(&parsed, &session).await,
378                        "state.set" => handle_state_set(&parsed, &session).await,
379                        "state.exists" => handle_state_exists(&parsed, &session).await,
380                        "state.keys" => handle_state_keys(&parsed, &session).await,
381                        "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
382                        "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
383                        "memory.query" => handle_memory_query(&parsed, &session).await,
384                        "memory.build_context" => {
385                            handle_memory_build_context(&parsed, &session).await
386                        }
387                        "memory.build_context_fast" => {
388                            handle_memory_build_context_fast(&parsed, &session).await
389                        }
390                        "memory.consolidate" => handle_memory_consolidate(&session).await,
391                        "memory.fact_count" => handle_memory_fact_count(&session).await,
392                        "memory.persist" => handle_memory_persist(&parsed, &session).await,
393                        "memory.load" => handle_memory_load(&parsed, &session).await,
394                        "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
395                        "skill.find" => handle_skill_find(&parsed, &session).await,
396                        "skill.report" => handle_skill_report(&parsed, &session).await,
397                        "skill.repair" => handle_skill_repair(&parsed, &session).await,
398                        "skills.ingest_distilled" => {
399                            handle_skills_ingest_distilled(&parsed, &session).await
400                        }
401                        "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
402                        "skills.domains_needing_evolution" => {
403                            handle_skills_domains_needing_evolution(&parsed, &session).await
404                        }
405                        "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
406                        "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
407                        "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
408                        "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
409                        "multi.vote" => handle_multi_vote(&parsed, &session).await,
410                        "scheduler.create" => handle_scheduler_create(&parsed),
411                        "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
412                        "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
413                        "infer" => handle_infer(&parsed, &state, &session).await,
414                        "image.generate" => handle_image_generate(&parsed, &state).await,
415                        "video.generate" => handle_video_generate(&parsed, &state).await,
416                        "embed" => handle_embed(&parsed, &state).await,
417                        "classify" => handle_classify(&parsed, &state).await,
418                        "tokenize" => handle_tokenize(&parsed, &state).await,
419                        "detokenize" => handle_detokenize(&parsed, &state).await,
420                        "rerank" => handle_rerank(&parsed, &state).await,
421                        "transcribe" => handle_transcribe(&parsed, &state).await,
422                        "synthesize" => handle_synthesize(&parsed, &state).await,
423                        "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
424                        "speech.prepare" => handle_speech_prepare(&state).await,
425                        "models.route" => handle_models_route(&parsed, &state).await,
426                        "models.stats" => handle_models_stats(&state).await,
427                        "outcomes.resolve_pending" => {
428                            handle_outcomes_resolve_pending(&parsed, &state).await
429                        }
430                        "events.count" => handle_events_count(&session).await,
431                        "events.stats" => handle_events_stats(&session).await,
432                        "events.truncate" => handle_events_truncate(&parsed, &session).await,
433                        "events.clear" => handle_events_clear(&session).await,
434                        "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
435                        "models.list" => handle_models_list(&state),
436                        "models.register" => handle_models_register(&parsed, &state).await,
437                        "models.unregister" => handle_models_unregister(&parsed, &state).await,
438                        "models.list_unified" => handle_models_list_unified(&state),
439                        "models.search" => handle_models_search(&parsed, &state),
440                        "models.upgrades" => handle_models_upgrades(&state),
441                        "models.pull" => handle_models_pull(&parsed, &state).await,
442                        "models.install" => handle_models_pull(&parsed, &state).await,
443                        "skills.distill" => handle_skills_distill(&parsed, &state).await,
444                        "skills.list" => handle_skills_list(&parsed, &session).await,
445                        "browser.run" => handle_browser_run(&parsed, &session).await,
446                        "browser.close" => handle_browser_close(&session).await,
447                        "secret.put" => handle_secret_put(&parsed),
448                        "secret.get" => handle_secret_get(&parsed),
449                        "secret.delete" => handle_secret_delete(&parsed),
450                        "secret.status" => handle_secret_status(&parsed),
451                        "secret.available" => Ok(car_ffi_common::secrets::is_available()),
452                        "permissions.status" => handle_perm_status(&parsed),
453                        "permissions.request" => handle_perm_request(&parsed),
454                        "permissions.explain" => handle_perm_explain(&parsed),
455                        "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
456                        "accounts.list" => car_ffi_common::accounts::list(),
457                        "accounts.open" => {
458                            #[derive(serde::Deserialize, Default)]
459                            struct OpenParams {
460                                #[serde(default)]
461                                account_id: Option<String>,
462                            }
463                            let p: OpenParams =
464                                serde_json::from_value(parsed.params.clone()).unwrap_or_default();
465                            car_ffi_common::accounts::open_settings(p.account_id.as_deref())
466                        }
467                        "calendar.list" => car_ffi_common::integrations::calendar_list(),
468                        "calendar.events" => handle_calendar_events(&parsed),
469                        "contacts.containers" => {
470                            car_ffi_common::integrations::contacts_containers()
471                        }
472                        "contacts.find" => handle_contacts_find(&parsed),
473                        "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
474                        "mail.inbox" => handle_mail_inbox(&parsed),
475                        "mail.send" => handle_mail_send(&parsed),
476                        "messages.services" => car_ffi_common::integrations::messages_services(),
477                        "messages.chats" => handle_messages_chats(&parsed),
478                        "messages.send" => handle_messages_send(&parsed),
479                        "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
480                        "notes.find" => handle_notes_find(&parsed),
481                        "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
482                        "reminders.items" => handle_reminders_items(&parsed),
483                        "photos.albums" => car_ffi_common::integrations::photos_albums(),
484                        "bookmarks.list" => handle_bookmarks_list(&parsed),
485                        "files.locations" => car_ffi_common::integrations::files_locations(),
486                        "keychain.status" => car_ffi_common::integrations::keychain_status(),
487                        "health.status" => car_ffi_common::health::status(),
488                        "health.sleep" => handle_health_sleep(&parsed),
489                        "health.workouts" => handle_health_workouts(&parsed),
490                        "health.activity" => handle_health_activity(&parsed),
491                        "voice.transcribe_stream.start" => {
492                            handle_voice_transcribe_stream_start(&parsed, &state, &session).await
493                        }
494                        "voice.transcribe_stream.stop" => {
495                            handle_voice_transcribe_stream_stop(&parsed, &state).await
496                        }
497                        "voice.transcribe_stream.push" => {
498                            handle_voice_transcribe_stream_push(&parsed, &state).await
499                        }
500                        "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
501                        "voice.dispatch_turn" => {
502                            handle_voice_dispatch_turn(&parsed, &state, &session).await
503                        }
504                        "voice.cancel_turn" => handle_voice_cancel_turn().await,
505                        "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
506                        "inference.register_runner" => {
507                            handle_inference_register_runner(&session).await
508                        }
509                        "inference.runner.event" => handle_inference_runner_event(&parsed).await,
510                        "inference.runner.complete" => {
511                            handle_inference_runner_complete(&parsed).await
512                        }
513                        "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
514                        "voice.providers.list" => {
515                            // Stateless: enumerates STT/TTS providers compiled into
516                            // this build. Runtime readiness (API key, permission,
517                            // model download) is reported via per-provider errors.
518                            serde_json::from_str::<serde_json::Value>(
519                                &car_voice::list_voice_providers_json(),
520                            )
521                            .map_err(|e| e.to_string())
522                        }
523                        "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
524                            .await
525                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
526                        "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
527                            .await
528                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
529                        "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
530                        "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
531                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532                        "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
533                        "workflow.run" => handle_workflow_run(&parsed, &session).await,
534                        "workflow.verify" => handle_workflow_verify(&parsed),
535                        "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
536                        "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
537                        "meeting.list" => handle_meeting_list(&parsed),
538                        "meeting.get" => handle_meeting_get(&parsed),
539                        "registry.register" => handle_registry_register(&parsed),
540                        "registry.heartbeat" => handle_registry_heartbeat(&parsed),
541                        "registry.unregister" => handle_registry_unregister(&parsed),
542                        "registry.list" => handle_registry_list(&parsed),
543                        "registry.reap" => handle_registry_reap(&parsed),
544                        "admission.status" => handle_admission_status(&state),
545                        "a2a.start" => handle_a2a_start(&parsed, &session).await,
546                        "a2a.stop" => handle_a2a_stop(),
547                        "a2a.status" => handle_a2a_status(),
548                        "a2a.send" => handle_a2a_send(&parsed, &state).await,
549                        "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
550                        "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
551                        "a2ui.capabilities" => handle_a2ui_capabilities(&state),
552                        "a2ui.reap" => handle_a2ui_reap(&state).await,
553                        "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
554                        "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
555                        "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
556                        "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
557                        "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
558                        "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
559                        "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
560                        "automation.run_applescript" => handle_run_applescript(&parsed).await,
561                        "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
562                        "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
563                        "notifications.local" => handle_local_notification(&parsed).await,
564                        "vision.ocr" => handle_vision_ocr(&parsed).await,
565                        "agents.list" => handle_agents_list(&state).await,
566                        "agents.health" => handle_agents_health(&state).await,
567                        "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
568                        "agents.install" => handle_agents_install(&parsed, &state).await,
569                        "agents.remove" => handle_agents_remove(&parsed, &state).await,
570                        "agents.start" => handle_agents_start(&parsed, &state).await,
571                        "agents.stop" => handle_agents_stop(&parsed, &state).await,
572                        "agents.restart" => handle_agents_restart(&parsed, &state).await,
573                        "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
574                        "agents.list_external" => handle_agents_list_external(&parsed).await,
575                        "agents.detect_external" => handle_agents_detect_external(&parsed).await,
576                        "agents.health_external" => handle_agents_health_external(&parsed).await,
577                        "agents.invoke_external" => {
578                            handle_agents_invoke_external(&parsed, &state, &session).await
579                        }
580                        "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
581                        "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
582                        // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
583                        // alias to the same in-core dispatcher per
584                        // Parslee-ai/car-releases#28. Embedders that need a
585                        // custom AgentCardSource / TaskStore plug them in
586                        // via ServerStateConfig::with_a2a_card_source /
587                        // with_a2a_store before any handler runs.
588                        "message/send"
589                        | "SendMessage"
590                        | "message/stream"
591                        | "SendStreamingMessage"
592                        | "tasks/get"
593                        | "GetTask"
594                        | "tasks/list"
595                        | "ListTasks"
596                        | "tasks/cancel"
597                        | "CancelTask"
598                        | "tasks/resubscribe"
599                        | "SubscribeToTask"
600                        | "tasks/pushNotificationConfig/set"
601                        | "CreateTaskPushNotificationConfig"
602                        | "tasks/pushNotificationConfig/get"
603                        | "GetTaskPushNotificationConfig"
604                        | "tasks/pushNotificationConfig/list"
605                        | "ListTaskPushNotificationConfigs"
606                        | "tasks/pushNotificationConfig/delete"
607                        | "DeleteTaskPushNotificationConfig"
608                        | "agent/getAuthenticatedExtendedCard"
609                        | "GetExtendedAgentCard" => {
610                            handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
611                        }
612                        _ => Err(format!("unknown method: {}", method_owned)),
613                    };
614
615                    let resp = match result {
616                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
617                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
618                    };
619                    let _ = send_response(&session.channel, resp).await;
620                });
621            }
622        } else if msg.is_close() {
623            info!("Client {} disconnected", client_id);
624            break;
625        }
626    }
627
628    // car#209: abort every in-flight handler for this connection
629    // *first* — they each hold an `Arc<ClientSession>` → `Arc<WsChannel>`
630    // clone; until they're gone the split sink (and the inbound socket
631    // FD) can't drop, even after the registries below are cleared.
632    conn_tasks.abort_all();
633
634    session.host.unsubscribe(&client_id).await;
635    // Auto-cancel this session's pending approvals so the queue stays
636    // in sync with what's actually decidable — covers graceful
637    // unregister+close, hard crash (TCP reset), and ping timeout in
638    // one place. System-level gate approvals (client_id None) are not
639    // touched. car-releases#48.
640    session.host.reap_session_approvals(&client_id).await;
641    state.a2ui_subscribers.lock().await.remove(&client_id);
642
643    // Fix for MULTI-4 / WS-3: drop the session from the registry and
644    // drain any pending tool callbacks. Without this, every connection
645    // we ever accepted keeps an `Arc<ClientSession>` alive in
646    // `state.sessions`, and outstanding `oneshot::Sender`s in
647    // `session.channel.pending` outlive the closed connection until
648    // their per-call timeout (60s). Dropping the senders here causes any
649    // awaiting `recv()` in `WsToolExecutor::execute` to return
650    // `RecvError` immediately, which the existing error-handler path
651    // already maps to "callback channel closed" — same shape as the
652    // timeout path, just faster.
653    let _removed = state.remove_session(&client_id).await;
654    {
655        let mut pending = session.channel.pending.lock().await;
656        pending.clear();
657    }
658
659    Ok(())
660}
661
662async fn send_response(
663    channel: &WsChannel,
664    resp: JsonRpcResponse,
665) -> Result<(), Box<dyn std::error::Error>> {
666    use futures::SinkExt;
667    let json = serde_json::to_string(&resp)?;
668    channel
669        .write
670        .lock()
671        .await
672        .send(Message::Text(json.into()))
673        .await?;
674    Ok(())
675}
676
677// --- Request handlers ---
678
679async fn handle_host_subscribe(
680    session: &crate::session::ClientSession,
681    state: &Arc<ServerState>,
682) -> Result<Value, String> {
683    session
684        .host
685        .subscribe(&session.client_id, session.channel.clone())
686        .await;
687    serde_json::to_value(HostSnapshot {
688        subscribed: true,
689        agents: session.host.agents().await,
690        approvals: session.host.approvals().await,
691        events: session.host.events(50).await,
692        identity: Some(daemon_identity(state)),
693    })
694    .map_err(|e| e.to_string())
695}
696
697/// Snapshot the daemon-identity facts for a fresh subscriber.
698/// Cheap: non-acquiring reads on `OnceLock`s + a single
699/// `to_string_lossy` on the manifest path. Critically uses
700/// [`ServerState::supervisor_if_installed`] — not the lazy-init
701/// `supervisor()` — so a Heisenberg subscribe can't *cause* the
702/// daemon to acquire the manifest lock just by asking whether it
703/// owns one.
704fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
705    // Observer takes precedence: when both supervisor and observer
706    // markers are set (currently unreachable through the standalone
707    // binary, but an embedder could install both racily), the
708    // observer marker is the authoritative role since the
709    // supervisor handle is only installed when this daemon owns
710    // the lock.
711    let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
712        (
713            Some(p.to_string_lossy().into_owned()),
714            car_proto::HostManifestRole::Observer,
715        )
716    } else if let Some(s) = state.supervisor_if_installed() {
717        (
718            Some(s.manifest_path().to_string_lossy().into_owned()),
719            car_proto::HostManifestRole::Owner,
720        )
721    } else {
722        (None, car_proto::HostManifestRole::None)
723    };
724    car_proto::HostIdentity {
725        version: env!("CARGO_PKG_VERSION").to_string(),
726        pid: std::process::id(),
727        manifest_path,
728        manifest_role,
729        parslee: state
730            .parslee_session
731            .get()
732            .map(|session| session.identity.clone()),
733    }
734}
735
736/// Return the Parslee cloud credential for an authenticated CAR
737/// connection. Managed agents already receive `CAR_AUTH_TOKEN` and
738/// authenticate to the local daemon with `session.auth`; this method is
739/// their supported bridge from local CAR auth to the user's Parslee
740/// backend auth.
741///
742/// The bearer token is intentionally not injected into every managed
743/// child process environment. Agents ask for it only when they need it,
744/// through the same local auth gate that protects the rest of the
745/// daemon.
746async fn handle_parslee_auth() -> Result<Value, String> {
747    let session = crate::parslee_auth::load_or_refresh()
748        .await?
749        .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
750    Ok(serde_json::json!({
751        "authenticated": true,
752        "token_type": "Bearer",
753        "access_token": session.access_token,
754        "authorization_header": format!("Bearer {}", session.access_token),
755        "identity": session.identity,
756    }))
757}
758
759// --- auth.* : GUI-driven Parslee sign-in (CAR Host.app).
760// Shares one implementation with `car auth login` via the car-auth
761// crate. Stateless: the trusted in-process GUI holds the PKCE
762// verifier + state between auth.start and auth.complete (same trust
763// boundary as the keychain it ultimately writes).
764async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
765    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
766    let client_id = req
767        .params
768        .get("client_id")
769        .and_then(|v| v.as_str())
770        .unwrap_or("parslee-car");
771    let redirect_uri = req
772        .params
773        .get("redirect_uri")
774        .and_then(|v| v.as_str())
775        .ok_or_else(|| "redirect_uri is required".to_string())?;
776    let provider = req.params.get("provider").and_then(|v| v.as_str());
777    let state = car_auth::new_state();
778    let verifier = car_auth::pkce_verifier();
779    let challenge = car_auth::pkce_challenge(&verifier);
780    let url =
781        car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
782    Ok(serde_json::json!({
783        "authorize_url": url,
784        "state": state,
785        "verifier": verifier,
786    }))
787}
788
789async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
790    let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
791    let client_id = req
792        .params
793        .get("client_id")
794        .and_then(|v| v.as_str())
795        .unwrap_or("parslee-car");
796    let redirect_uri = req
797        .params
798        .get("redirect_uri")
799        .and_then(|v| v.as_str())
800        .ok_or_else(|| "redirect_uri is required".to_string())?;
801    let code = req
802        .params
803        .get("code")
804        .and_then(|v| v.as_str())
805        .ok_or_else(|| "code is required".to_string())?;
806    let verifier = req
807        .params
808        .get("verifier")
809        .and_then(|v| v.as_str())
810        .ok_or_else(|| "verifier is required".to_string())?;
811    let token =
812        car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
813    car_auth::store_tokens(&api_base, &token)?;
814    Ok(serde_json::json!({ "ok": true }))
815}
816
817async fn handle_auth_status() -> Result<Value, String> {
818    match car_auth::fetch_status(None).await? {
819        Some(session_json) => {
820            let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
821            Ok(serde_json::json!({ "authenticated": true, "session": session }))
822        }
823        None => Ok(serde_json::json!({ "authenticated": false })),
824    }
825}
826
827async fn handle_auth_logout() -> Result<Value, String> {
828    car_auth::clear_tokens()?;
829    Ok(serde_json::json!({ "ok": true }))
830}
831
832async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
833    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
834}
835
836async fn handle_host_events(
837    req: &JsonRpcMessage,
838    session: &crate::session::ClientSession,
839) -> Result<Value, String> {
840    let limit = req
841        .params
842        .get("limit")
843        .and_then(|v| v.as_u64())
844        .unwrap_or(100) as usize;
845    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
846}
847
848async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
849    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
850}
851
852async fn handle_a2ui_apply(
853    req: &JsonRpcMessage,
854    state: &Arc<ServerState>,
855) -> Result<Value, String> {
856    #[derive(Deserialize)]
857    struct Params {
858        #[serde(default)]
859        envelope: Option<car_a2ui::A2uiEnvelope>,
860        #[serde(default)]
861        message: Option<car_a2ui::A2uiEnvelope>,
862    }
863
864    let envelope = if req.params.get("createSurface").is_some()
865        || req.params.get("updateComponents").is_some()
866        || req.params.get("updateDataModel").is_some()
867        || req.params.get("deleteSurface").is_some()
868    {
869        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
870            .map_err(|e| e.to_string())?
871    } else {
872        match serde_json::from_value::<Params>(req.params.clone()) {
873            Ok(params) => params
874                .envelope
875                .or(params.message)
876                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
877            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
878                .map_err(|e| e.to_string())?,
879        }
880    };
881
882    apply_a2ui_envelope(state, envelope, None, None).await
883}
884
885async fn handle_a2ui_ingest(
886    req: &JsonRpcMessage,
887    state: &Arc<ServerState>,
888) -> Result<Value, String> {
889    #[derive(Deserialize)]
890    #[serde(rename_all = "camelCase")]
891    struct Params {
892        #[serde(default)]
893        endpoint: Option<String>,
894        #[serde(default)]
895        a2a_endpoint: Option<String>,
896        #[serde(default)]
897        owner: Option<car_a2ui::A2uiSurfaceOwner>,
898        #[serde(default)]
899        route_auth: Option<A2aRouteAuth>,
900        #[serde(default)]
901        allow_untrusted_endpoint: bool,
902    }
903
904    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
905        endpoint: None,
906        a2a_endpoint: None,
907        owner: None,
908        route_auth: None,
909        allow_untrusted_endpoint: false,
910    });
911    let payload = req.params.get("payload").unwrap_or(&req.params);
912    state
913        .a2ui
914        .validate_payload(payload)
915        .map_err(|e| e.to_string())?;
916    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
917    if envelopes.is_empty() {
918        return Err("no A2UI envelopes found in payload".into());
919    }
920    let endpoint = params.endpoint.or(params.a2a_endpoint);
921    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
922    let owner = params
923        .owner
924        .or_else(|| car_a2ui::owner_from_value(payload))
925        .map(|owner| match endpoint.clone() {
926            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
927            None => owner,
928        });
929
930    let mut results = Vec::new();
931    for envelope in envelopes {
932        let value =
933            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
934        results.push(value);
935    }
936    Ok(serde_json::json!({ "applied": results }))
937}
938
939async fn apply_a2ui_envelope(
940    state: &Arc<ServerState>,
941    envelope: car_a2ui::A2uiEnvelope,
942    owner: Option<car_a2ui::A2uiSurfaceOwner>,
943    route_auth: Option<A2aRouteAuth>,
944) -> Result<Value, String> {
945    let result = state
946        .a2ui
947        .apply_with_owner(envelope, owner)
948        .await
949        .map_err(|e| e.to_string())?;
950    update_a2ui_route_auth(state, &result, route_auth).await;
951    let kind = if result.deleted {
952        "a2ui.surface_deleted"
953    } else {
954        "a2ui.surface_updated"
955    };
956    let message = if result.deleted {
957        format!("A2UI surface {} deleted", result.surface_id)
958    } else {
959        format!("A2UI surface {} updated", result.surface_id)
960    };
961    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
962    state
963        .host
964        .record_event(kind, None, message, payload.clone())
965        .await;
966    // Push the envelope result to every WS subscriber as an
967    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
968    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
969    broadcast_a2ui_event(state, kind, &payload).await;
970    serde_json::to_value(result).map_err(|e| e.to_string())
971}
972
973async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
974    use futures::SinkExt;
975    use tokio_tungstenite::tungstenite::Message;
976    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
977        .a2ui_subscribers
978        .lock()
979        .await
980        .values()
981        .cloned()
982        .collect();
983    if subscribers.is_empty() {
984        return;
985    }
986    let Ok(json) = serde_json::to_string(&serde_json::json!({
987        "jsonrpc": "2.0",
988        "method": "a2ui.event",
989        "params": {
990            "kind": kind,
991            "result": result,
992        },
993    })) else {
994        return;
995    };
996    for channel in subscribers {
997        let _ = channel
998            .write
999            .lock()
1000            .await
1001            .send(Message::Text(json.clone().into()))
1002            .await;
1003    }
1004}
1005
1006async fn update_a2ui_route_auth(
1007    state: &Arc<ServerState>,
1008    result: &car_a2ui::A2uiApplyResult,
1009    route_auth: Option<A2aRouteAuth>,
1010) {
1011    let mut auth = state.a2ui_route_auth.lock().await;
1012    if result.deleted {
1013        auth.remove(&result.surface_id);
1014        return;
1015    }
1016
1017    let has_route_endpoint = result
1018        .surface
1019        .as_ref()
1020        .and_then(|surface| surface.owner.as_ref())
1021        .and_then(|owner| owner.endpoint.as_ref())
1022        .is_some();
1023    match (has_route_endpoint, route_auth) {
1024        (true, Some(route_auth)) => {
1025            auth.insert(result.surface_id.clone(), route_auth);
1026        }
1027        _ => {
1028            auth.remove(&result.surface_id);
1029        }
1030    }
1031}
1032
1033fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1034    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1035}
1036
1037async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1038    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1039    if !removed.is_empty() {
1040        let mut auth = state.a2ui_route_auth.lock().await;
1041        for surface_id in &removed {
1042            auth.remove(surface_id);
1043        }
1044    }
1045    Ok(serde_json::json!({ "removed": removed }))
1046}
1047
1048async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1049    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1050}
1051
1052async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1053    let surface_id = req
1054        .params
1055        .get("surface_id")
1056        .or_else(|| req.params.get("surfaceId"))
1057        .and_then(Value::as_str)
1058        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1059    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1060}
1061
1062/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
1063/// notifications. Subscribers receive every `apply_a2ui_envelope`
1064/// result for as long as they're connected; the cleanup hook in
1065/// `run_dispatch` removes them on disconnect. Closes
1066/// Parslee-ai/car-releases#29.
1067async fn handle_a2ui_subscribe(
1068    session: &crate::session::ClientSession,
1069    state: &Arc<ServerState>,
1070) -> Result<Value, String> {
1071    state
1072        .a2ui_subscribers
1073        .lock()
1074        .await
1075        .insert(session.client_id.clone(), session.channel.clone());
1076    Ok(serde_json::json!({ "subscribed": true }))
1077}
1078
1079/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
1080/// Idempotent: returns `{ subscribed: false }` regardless of prior
1081/// state.
1082async fn handle_a2ui_unsubscribe(
1083    session: &crate::session::ClientSession,
1084    state: &Arc<ServerState>,
1085) -> Result<Value, String> {
1086    state
1087        .a2ui_subscribers
1088        .lock()
1089        .await
1090        .remove(&session.client_id);
1091    Ok(serde_json::json!({ "subscribed": false }))
1092}
1093
1094/// `a2ui/replay` — fetch the current state of one surface. Intended
1095/// for late joiners and reconnect: a client calls `subscribe`, then
1096/// `replay` once per surface it's tracking, and from then on
1097/// notifications keep it in sync. Equivalent to `a2ui.get` on the
1098/// surface store; lives in the subscribe namespace for
1099/// discoverability.
1100async fn handle_a2ui_replay(
1101    req: &JsonRpcMessage,
1102    state: &Arc<ServerState>,
1103) -> Result<Value, String> {
1104    let surface_id = req
1105        .params
1106        .get("surface_id")
1107        .or_else(|| req.params.get("surfaceId"))
1108        .and_then(Value::as_str)
1109        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1110    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1111}
1112
1113async fn handle_a2ui_action(
1114    req: &JsonRpcMessage,
1115    state: &Arc<ServerState>,
1116) -> Result<Value, String> {
1117    let action: car_a2ui::ClientAction =
1118        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1119    let owner = state.a2ui.owner(&action.surface_id).await;
1120    let route = route_a2ui_action(state, &action, owner.clone()).await;
1121    let payload = serde_json::json!({
1122        "action": action,
1123        "owner": owner,
1124        "route": route,
1125    });
1126    let event = state
1127        .host
1128        .record_event(
1129            "a2ui.action",
1130            None,
1131            format!(
1132                "A2UI action {} from {}",
1133                action.name, action.source_component_id
1134            ),
1135            payload,
1136        )
1137        .await;
1138    Ok(serde_json::json!({
1139        "event": event,
1140        "route": route,
1141    }))
1142}
1143
1144/// `a2ui.render_report` — renderer-emitted telemetry envelope
1145/// (Parslee-ai/car#180). Fire-and-forget; we record an event in
1146/// the host log (so dev tools and the conversation log see it) and
1147/// broadcast as `a2ui.event { kind: "a2ui.render_report", result }`
1148/// to every WS subscriber. The improvement agent reads the report
1149/// and decides whether to issue a follow-up `patchComponents`.
1150async fn handle_a2ui_render_report(
1151    req: &JsonRpcMessage,
1152    state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154    // Parse into the typed struct to enforce the schema; we
1155    // re-serialize for the event/broadcast payload so downstream
1156    // consumers don't have to defensively re-validate.
1157    let report: car_a2ui::RenderReport =
1158        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1159    let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1160    let kind = "a2ui.render_report";
1161    let message = format!("A2UI render report for surface {}", report.surface_id);
1162    let event = state
1163        .host
1164        .record_event(kind, None, message, payload.clone())
1165        .await;
1166    broadcast_a2ui_event(state, kind, &payload).await;
1167
1168    // Hand the report to the in-process UI-improvement agent. The
1169    // agent is sync but cheap; we await the surface lookup before
1170    // calling it so the strategies see the same surface state the
1171    // renderer saw at report-emit time. Best-effort: surface lookup
1172    // misses (the surface might have been deleted between
1173    // report-emit and report-receive) are logged and skipped, never
1174    // surfaced as JSON-RPC errors — render_report is fire-and-forget.
1175    if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1176        // Iteration budget — runaway-loop backstop. try_consume
1177        // claims the slot atomically; if the surface is already at
1178        // the cap, we short-circuit before consulting the agent.
1179        // The slot stays consumed only if the patch actually
1180        // applies — failure paths refund.
1181        if !state.ui_agent_budget.try_consume(&report.surface_id) {
1182            tracing::warn!(
1183                surface_id = %report.surface_id,
1184                count = state.ui_agent_budget.count(&report.surface_id),
1185                max = state.ui_agent_budget.max(),
1186                "ui-agent iteration budget exhausted; skipping agent invocation"
1187            );
1188            return Ok(serde_json::json!({ "event": event }));
1189        }
1190        // From here on, every non-applied branch must `refund` the
1191        // slot we just claimed. Only the successful-apply branch
1192        // keeps it consumed.
1193        match state.ui_agent.on_render_report(&report, &surface) {
1194            car_ui_agent::Decision::Patch {
1195                envelope,
1196                strategy_id,
1197                patch_hash,
1198                elapsed_ns,
1199            } => {
1200                // Runtime-side convergence monitor — neo's deferred
1201                // ask. The agent's no-double-patch guard catches
1202                // same-sequence repeats; this catches A→B→A across
1203                // sequences by tracking recent patch hashes per
1204                // surface. When a proposal would repeat one in the
1205                // window, drop it; the loop waits for the next
1206                // signature change.
1207                if !state
1208                    .ui_agent_oscillation
1209                    .check_and_record(&report.surface_id, patch_hash)
1210                {
1211                    tracing::warn!(
1212                        surface_id = %report.surface_id,
1213                        strategy = %strategy_id,
1214                        patch_hash,
1215                        "ui-agent oscillation detected; suppressing patch"
1216                    );
1217                    // Suppressed patch never applied → release the
1218                    // budget slot we claimed above.
1219                    state.ui_agent_budget.refund(&report.surface_id);
1220                    return Ok(serde_json::json!({ "event": event }));
1221                }
1222                let a2ui_envelope = car_a2ui::A2uiEnvelope {
1223                    patch_components: Some(envelope),
1224                    ..Default::default()
1225                };
1226                if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1227                    tracing::warn!(
1228                        surface_id = %report.surface_id,
1229                        strategy = %strategy_id,
1230                        patch_hash,
1231                        elapsed_ns,
1232                        error = %e,
1233                        "ui-agent patch apply failed",
1234                    );
1235                    // Apply failed → release the budget slot.
1236                    state.ui_agent_budget.refund(&report.surface_id);
1237                } else {
1238                    tracing::debug!(
1239                        surface_id = %report.surface_id,
1240                        strategy = %strategy_id,
1241                        patch_hash,
1242                        elapsed_ns,
1243                        iteration = state.ui_agent_budget.count(&report.surface_id),
1244                        "ui-agent patch applied",
1245                    );
1246                    // Memgine trace: one Conversation node per
1247                    // successful patch, tagged "ui-agent/<surface>".
1248                    // Spreading activation can surface these on
1249                    // future renders of related surfaces. Spawned
1250                    // off the render-report hot path — ingest walks
1251                    // the graph for spreading activation and can
1252                    // take real time; we don't want two surfaces
1253                    // churning patches to serialize through the
1254                    // memgine mutex behind the WS handler.
1255                    if let Some(memgine) = state.shared_memgine.clone() {
1256                        let speaker = format!("ui-agent/{}", report.surface_id);
1257                        let text = format!("strategy applied: {}", strategy_id);
1258                        tokio::spawn(async move {
1259                            let mut guard = memgine.lock().await;
1260                            guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1261                        });
1262                    }
1263                }
1264            }
1265            car_ui_agent::Decision::StableNoChange => {
1266                // No patch this round → release the slot.
1267                state.ui_agent_budget.refund(&report.surface_id);
1268            }
1269            car_ui_agent::Decision::HardStop { reason } => {
1270                state.ui_agent_budget.refund(&report.surface_id);
1271                // Renderer painted unknown components — contract
1272                // violation between server and renderer. Per
1273                // types.rs docs: "loud failure" + "MUST pause."
1274                // `error!` not `warn!` so it surfaces in production
1275                // logs at the right severity.
1276                tracing::error!(
1277                    surface_id = %report.surface_id,
1278                    reason = %reason,
1279                    "ui-agent hard-stopped improvement loop",
1280                );
1281            }
1282        }
1283    } else {
1284        tracing::debug!(
1285            surface_id = %report.surface_id,
1286            "ui-agent skipped — surface not found in store",
1287        );
1288    }
1289
1290    Ok(serde_json::json!({ "event": event }))
1291}
1292
1293async fn route_a2ui_action(
1294    state: &Arc<ServerState>,
1295    action: &car_a2ui::ClientAction,
1296    owner: Option<car_a2ui::A2uiSurfaceOwner>,
1297) -> Value {
1298    let Some(owner) = owner else {
1299        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1300    };
1301    if owner.kind != "a2a" {
1302        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1303    }
1304    let Some(endpoint) = owner.endpoint.clone() else {
1305        return serde_json::json!({
1306            "delivered": false,
1307            "reason": "surface owner has no endpoint",
1308            "owner": owner
1309        });
1310    };
1311
1312    let message = car_a2a::Message {
1313        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1314        role: car_a2a::MessageRole::User,
1315        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1316            data: serde_json::json!({
1317                "a2uiAction": action,
1318            }),
1319            metadata: Default::default(),
1320        })],
1321        task_id: owner.task_id.clone(),
1322        context_id: owner.context_id.clone(),
1323        metadata: Default::default(),
1324    };
1325
1326    let auth = state
1327        .a2ui_route_auth
1328        .lock()
1329        .await
1330        .get(&action.surface_id)
1331        .cloned()
1332        .map(client_auth_from_route_auth)
1333        .unwrap_or(car_a2a::ClientAuth::None);
1334
1335    match car_a2a::A2aClient::new(endpoint.clone())
1336        .with_auth(auth)
1337        .send_message(message, false)
1338        .await
1339    {
1340        Ok(result) => serde_json::json!({
1341            "delivered": true,
1342            "owner": owner,
1343            "endpoint": endpoint,
1344            "result": result,
1345        }),
1346        Err(error) => serde_json::json!({
1347            "delivered": false,
1348            "owner": owner,
1349            "endpoint": endpoint,
1350            "error": error.to_string(),
1351        }),
1352    }
1353}
1354
1355fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1356    match auth {
1357        A2aRouteAuth::None => car_a2a::ClientAuth::None,
1358        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1359        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1360    }
1361}
1362
1363fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1364    let endpoint = endpoint?;
1365    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1366        Some(endpoint)
1367    } else {
1368        None
1369    }
1370}
1371
1372fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1373    endpoint == "http://localhost"
1374        || endpoint.starts_with("http://localhost:")
1375        || endpoint.starts_with("http://localhost/")
1376        || endpoint == "http://127.0.0.1"
1377        || endpoint.starts_with("http://127.0.0.1:")
1378        || endpoint.starts_with("http://127.0.0.1/")
1379        || endpoint == "http://[::1]"
1380        || endpoint.starts_with("http://[::1]:")
1381        || endpoint.starts_with("http://[::1]/")
1382}
1383
1384async fn handle_host_register_agent(
1385    req: &JsonRpcMessage,
1386    session: &crate::session::ClientSession,
1387) -> Result<Value, String> {
1388    let request: RegisterHostAgentRequest =
1389        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1390    serde_json::to_value(
1391        session
1392            .host
1393            .register_agent(&session.client_id, request)
1394            .await?,
1395    )
1396    .map_err(|e| e.to_string())
1397}
1398
1399async fn handle_host_unregister_agent(
1400    req: &JsonRpcMessage,
1401    session: &crate::session::ClientSession,
1402) -> Result<Value, String> {
1403    let agent_id = req
1404        .params
1405        .get("agent_id")
1406        .and_then(|v| v.as_str())
1407        .ok_or("missing agent_id")?;
1408    session
1409        .host
1410        .unregister_agent(&session.client_id, agent_id)
1411        .await?;
1412    Ok(serde_json::json!({"ok": true}))
1413}
1414
1415async fn handle_host_set_status(
1416    req: &JsonRpcMessage,
1417    session: &crate::session::ClientSession,
1418) -> Result<Value, String> {
1419    let request: SetHostAgentStatusRequest =
1420        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1421    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1422        .map_err(|e| e.to_string())
1423}
1424
1425async fn handle_host_notify(
1426    req: &JsonRpcMessage,
1427    session: &crate::session::ClientSession,
1428) -> Result<Value, String> {
1429    let kind = req
1430        .params
1431        .get("kind")
1432        .and_then(|v| v.as_str())
1433        .unwrap_or("host.notification");
1434    let agent_id = req
1435        .params
1436        .get("agent_id")
1437        .and_then(|v| v.as_str())
1438        .map(str::to_string);
1439    let message = req
1440        .params
1441        .get("message")
1442        .and_then(|v| v.as_str())
1443        .unwrap_or("");
1444    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1445    serde_json::to_value(
1446        session
1447            .host
1448            .record_event(kind, agent_id, message, payload)
1449            .await,
1450    )
1451    .map_err(|e| e.to_string())
1452}
1453
1454async fn handle_host_request_approval(
1455    req: &JsonRpcMessage,
1456    session: &crate::session::ClientSession,
1457) -> Result<Value, String> {
1458    let request: CreateHostApprovalRequest =
1459        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1460    if let Some(agent_id) = &request.agent_id {
1461        // Best-effort. If the caller doesn't own this agent the
1462        // ACL added 2026-05 will refuse the status update — that
1463        // is the correct semantics; we still want the approval row
1464        // itself to land so the UI can render the request.
1465        let _ = session
1466            .host
1467            .set_status(
1468                &session.client_id,
1469                SetHostAgentStatusRequest {
1470                    agent_id: agent_id.clone(),
1471                    status: HostAgentStatus::WaitingForApproval,
1472                    current_task: None,
1473                    message: Some("Waiting for approval".to_string()),
1474                    payload: Value::Null,
1475                },
1476            )
1477            .await;
1478    }
1479    // `system_level: true` opts the approval out of per-session
1480    // ownership. The host-side ACL then allows any authenticated
1481    // session (typically CarHost or `car-host approve`) to resolve.
1482    // Agents requesting user approval should always set this — the
1483    // session-owned mode is only correct when the requesting session
1484    // is also the resolving session, which approval-via-UI never is.
1485    let owner_client_id = if request.system_level {
1486        None
1487    } else {
1488        Some(session.client_id.as_str())
1489    };
1490    serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1491        .map_err(|e| e.to_string())
1492}
1493
1494async fn handle_host_resolve_approval(
1495    req: &JsonRpcMessage,
1496    session: &crate::session::ClientSession,
1497) -> Result<Value, String> {
1498    let request: ResolveHostApprovalRequest =
1499        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1500    serde_json::to_value(
1501        session
1502            .host
1503            .resolve_approval(&session.client_id, request)
1504            .await?,
1505    )
1506    .map_err(|e| e.to_string())
1507}
1508
1509/// `session.auth` — present the per-launch token to unlock the
1510/// connection. When `state.auth_token` is unset, this method is a
1511/// no-op success (auth is disabled). When set, the supplied token
1512/// must equal it (constant-time comparison) — a successful auth
1513/// flips `session.authenticated` to `true` so subsequent methods
1514/// pass the gate. Wrong token returns an error AND leaves the
1515/// session unauthenticated; the dispatcher loop's gate then closes
1516/// the connection on the next non-auth method.
1517///
1518/// Closes Parslee-ai/car-releases#32.
1519async fn handle_session_auth(
1520    req: &JsonRpcMessage,
1521    session: &crate::session::ClientSession,
1522    state: &Arc<ServerState>,
1523) -> Result<Value, String> {
1524    let supplied = req
1525        .params
1526        .get("token")
1527        .and_then(Value::as_str)
1528        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1529    // #169: optional `agent_id` binds the WS connection to a
1530    // supervised lifecycle agent. When present, the supplied token
1531    // must equal the per-agent token the supervisor minted at upsert
1532    // (NOT the daemon-wide auth token). When absent, fall back to
1533    // the daemon-wide token — preserves the legacy unbound-token
1534    // path for browser/host/CLI clients.
1535    let agent_id = req
1536        .params
1537        .get("agent_id")
1538        .and_then(Value::as_str)
1539        .map(str::to_string);
1540
1541    if let Some(id) = agent_id {
1542        let supervisor = state.supervisor()?;
1543        if !supervisor.validate_agent_token(&id, supplied).await {
1544            return Err(format!(
1545                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1546            ));
1547        }
1548        // Single-claim: only one connection at a time per
1549        // agent_id. A second claim is rejected so the daemon-side
1550        // per-agent state stays unambiguous.
1551        {
1552            let mut attached = state.attached_agents.lock().await;
1553            if let Some(prior) = attached.get(&id) {
1554                if prior != &session.client_id {
1555                    return Err(format!(
1556                        "auth failed: agent_id `{id}` is already attached on \
1557                         another connection (client_id={prior})"
1558                    ));
1559                }
1560            }
1561            attached.insert(id.clone(), session.client_id.clone());
1562        }
1563        // #170: attach the daemon-owned persistent memgine for
1564        // this agent. Lazy-loaded on first connection per id from
1565        // `~/.car/memory/agents/<id>.jsonl`; retained across
1566        // disconnect so the next session sees the same state.
1567        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1568        *session.bound_memgine.lock().await = Some(agent_eng);
1569        *session.agent_id.lock().await = Some(id.clone());
1570        session
1571            .authenticated
1572            .store(true, std::sync::atomic::Ordering::Release);
1573        return Ok(serde_json::json!({
1574            "ok": true,
1575            "auth_enabled": true,
1576            "agent_id": id,
1577        }));
1578    }
1579
1580    let expected = match state.auth_token.get() {
1581        Some(t) => t,
1582        None => {
1583            // Auth disabled — accept any token politely so callers
1584            // that always include a session.auth handshake (e.g. the
1585            // FFI proxy) don't fail when the daemon happens to be
1586            // unauthed. Mark the session authenticated anyway so the
1587            // gate is a no-op below.
1588            session
1589                .authenticated
1590                .store(true, std::sync::atomic::Ordering::Release);
1591            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1592        }
1593    };
1594    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1595        return Err("auth failed: token mismatch".to_string());
1596    }
1597    session
1598        .authenticated
1599        .store(true, std::sync::atomic::Ordering::Release);
1600    Ok(serde_json::json!({
1601        "ok": true,
1602        "auth_enabled": true,
1603        "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1604    }))
1605}
1606
1607/// Length-checked constant-time byte comparison. Returns false when
1608/// lengths differ (so length itself is the only timing leak — fine
1609/// for our 43-char fixed-length tokens).
1610fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1611    if a.len() != b.len() {
1612        return false;
1613    }
1614    let mut diff: u8 = 0;
1615    for (x, y) in a.iter().zip(b.iter()) {
1616        diff |= x ^ y;
1617    }
1618    diff == 0
1619}
1620
1621/// Block dispatch of `method` until the user resolves the approval
1622/// raised on [`HostState`].
1623///
1624/// Called from the dispatcher loop only when
1625/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1626/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1627/// to the caller as JSON-RPC error code `-32003` with the supplied
1628/// reason. On timeout, the approval row stays in `Pending` so the
1629/// UI keeps a record of the unanswered request.
1630async fn gate_high_risk_method(
1631    method: &str,
1632    params: &Value,
1633    state: &Arc<ServerState>,
1634) -> Result<(), String> {
1635    let timeout = state.approval_gate.timeout;
1636    let req = CreateHostApprovalRequest {
1637        agent_id: None,
1638        action: format!("ws.method:{method}"),
1639        details: serde_json::json!({
1640            "method": method,
1641            // Truncate params for the UI — full payload is recoverable
1642            // via the request-time host event log if needed. The cap
1643            // keeps a malicious caller from drowning the UI in JSON.
1644            "params_preview": preview_params(params, 2_000),
1645        }),
1646        options: vec!["approve".to_string(), "deny".to_string()],
1647        // The high-risk-method gate is already system-level (it
1648        // passes None as the owner via request_and_wait_approval's
1649        // internal call). This field is informational here.
1650        system_level: true,
1651    };
1652    match state
1653        .host
1654        .request_and_wait_approval(req, "approve", timeout)
1655        .await
1656    {
1657        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1658        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1659            "{method} denied by user (approval gate, audit 2026-05). \
1660             To call this method without an interactive prompt, start \
1661             car-server with --no-approvals on a trusted machine."
1662        )),
1663        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1664            "{method} approval timed out after {}s with no resolution. \
1665             The approval is still visible in `host.approvals` for \
1666             forensics; resubmit the request to retry.",
1667            timeout.as_secs()
1668        )),
1669        Err(e) => Err(format!("approval gate error: {e}")),
1670    }
1671}
1672
1673fn preview_params(value: &Value, max_chars: usize) -> Value {
1674    let s = value.to_string();
1675    if s.len() <= max_chars {
1676        value.clone()
1677    } else {
1678        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1679    }
1680}
1681
1682async fn handle_session_init(
1683    req: &JsonRpcMessage,
1684    session: &crate::session::ClientSession,
1685) -> Result<Value, String> {
1686    let init: SessionInitRequest =
1687        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1688
1689    for tool in &init.tools {
1690        register_from_definition(&session.runtime, tool).await;
1691    }
1692
1693    let mut policy_count = 0;
1694    {
1695        let mut policies = session.runtime.policies.write().await;
1696        for policy_def in &init.policies {
1697            if let Some(check) = build_policy_check(policy_def) {
1698                policies.register(&policy_def.name, check, "");
1699                policy_count += 1;
1700            }
1701        }
1702    }
1703
1704    serde_json::to_value(SessionInitResponse {
1705        session_id: session.client_id.clone(),
1706        tools_registered: init.tools.len(),
1707        policies_registered: policy_count,
1708    })
1709    .map_err(|e| e.to_string())
1710}
1711
1712fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1713    match def.rule.as_str() {
1714        "deny_tool" => {
1715            let target = def.target.clone();
1716            Some(Box::new(
1717                move |action: &car_ir::Action, _: &car_state::StateStore| {
1718                    if action.tool.as_deref() == Some(&target) {
1719                        Some(format!("tool '{}' denied", target))
1720                    } else {
1721                        None
1722                    }
1723                },
1724            ))
1725        }
1726        "require_state" => {
1727            let key = def.key.clone();
1728            let value = def.value.clone();
1729            Some(Box::new(
1730                move |_: &car_ir::Action, state: &car_state::StateStore| {
1731                    if state.get(&key).as_ref() != Some(&value) {
1732                        Some(format!("state['{}'] must be {:?}", key, value))
1733                    } else {
1734                        None
1735                    }
1736                },
1737            ))
1738        }
1739        "deny_tool_param" => {
1740            let target = def.target.clone();
1741            let param = def.key.clone();
1742            let pattern = def.pattern.clone();
1743            Some(Box::new(
1744                move |action: &car_ir::Action, _: &car_state::StateStore| {
1745                    if action.tool.as_deref() != Some(&target) {
1746                        return None;
1747                    }
1748                    if let Some(val) = action.parameters.get(&param) {
1749                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1750                        if s.contains(&pattern) {
1751                            return Some(format!("param '{}' matches '{}'", param, pattern));
1752                        }
1753                    }
1754                    None
1755                },
1756            ))
1757        }
1758        _ => None,
1759    }
1760}
1761
1762async fn handle_tools_register(
1763    req: &JsonRpcMessage,
1764    session: &crate::session::ClientSession,
1765) -> Result<Value, String> {
1766    let tools: Vec<ToolDefinition> =
1767        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1768    for tool in &tools {
1769        register_from_definition(&session.runtime, tool).await;
1770    }
1771    Ok(Value::from(tools.len()))
1772}
1773
1774/// Bridge a wire-protocol `ToolDefinition` to the engine's
1775/// schema-aware registration. Carries the full ToolSchema shape
1776/// (description, parameters, returns, idempotency, caching, rate
1777/// limit) through to the validator. An empty `parameters` object is
1778/// the legacy schemaless registration — the validator no-ops for
1779/// those, so pre-v0.5.x callers see no change.
1780async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1781    runtime
1782        .register_tool_schema(car_ir::ToolSchema {
1783            name: def.name.clone(),
1784            description: def.description.clone(),
1785            parameters: def.parameters.clone(),
1786            returns: def.returns.clone(),
1787            idempotent: def.idempotent,
1788            cache_ttl_secs: def.cache_ttl_secs,
1789            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1790                max_calls: rl.max_calls,
1791                interval_secs: rl.interval_secs,
1792            }),
1793        })
1794        .await;
1795}
1796
1797async fn handle_proposal_submit(
1798    req: &JsonRpcMessage,
1799    session: &crate::session::ClientSession,
1800) -> Result<Value, String> {
1801    let submit: ProposalSubmitRequest =
1802        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1803    // `session_id` is sibling to `proposal` in the params object —
1804    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1805    // present, executes the proposal under the named session so any
1806    // session-scoped policies layer on top of global ones.
1807    // See docs/proposals/per-session-policy-scoping.md.
1808    let session_id = req
1809        .params
1810        .get("session_id")
1811        .and_then(|v| v.as_str())
1812        .map(str::to_string);
1813
1814    // `scope` is the optional per-execution caller / tenant surface
1815    // added in Parslee-ai/car#187 phase 3. Shape mirrors
1816    // `car_engine::RuntimeScope` ({ callerId, tenantId, claims });
1817    // serde's camelCase rename keeps the wire form consistent with
1818    // the rest of the JSON-RPC surface. When present, the proposal
1819    // routes through `execute_scoped*` so the runtime records the
1820    // identity on the event log and state R/W ops apply the tenant
1821    // prefix (#187 phase 3-B).
1822    let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1823        Some(v) if !v.is_null() => {
1824            Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1825        }
1826        _ => None,
1827    };
1828
1829    let result = match (session_id, scope) {
1830        // session_id + scope: no single combined entry point on
1831        // Runtime today. Scope takes precedence because it's the
1832        // tenant-isolation surface; per-session policies layer in
1833        // a follow-up when there's a real caller asking for both.
1834        (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1835        (Some(sid), None) => {
1836            session
1837                .runtime
1838                .execute_with_session(&submit.proposal, &sid)
1839                .await
1840        }
1841        (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1842        (None, None) => session.runtime.execute(&submit.proposal).await,
1843    };
1844    serde_json::to_value(result).map_err(|e| e.to_string())
1845}
1846
1847async fn handle_session_policy_open(
1848    session: &crate::session::ClientSession,
1849) -> Result<Value, String> {
1850    let id = session.runtime.open_session().await;
1851    Ok(serde_json::json!({ "session_id": id }))
1852}
1853
1854async fn handle_session_policy_close(
1855    req: &JsonRpcMessage,
1856    session: &crate::session::ClientSession,
1857) -> Result<Value, String> {
1858    let sid = req
1859        .params
1860        .get("session_id")
1861        .and_then(|v| v.as_str())
1862        .ok_or("missing 'session_id'")?;
1863    let closed = session.runtime.close_session(sid).await;
1864    Ok(serde_json::json!({ "closed": closed }))
1865}
1866
1867/// `policy.register` — register one policy against this WebSocket
1868/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1869/// `session.init`. When `session_id` is present, the policy is scoped
1870/// to the named in-runtime session opened via `session.policy.open`;
1871/// otherwise it is global.
1872async fn handle_policy_register(
1873    req: &JsonRpcMessage,
1874    session: &crate::session::ClientSession,
1875) -> Result<Value, String> {
1876    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1877        .map_err(|e| format!("invalid policy params: {e}"))?;
1878    let session_id = req
1879        .params
1880        .get("session_id")
1881        .and_then(|v| v.as_str())
1882        .map(str::to_string);
1883    let check = build_policy_check(&def)
1884        .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1885    match session_id {
1886        Some(sid) => session
1887            .runtime
1888            .register_policy_in_session(&sid, &def.name, check, "")
1889            .await
1890            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1891        None => {
1892            let mut policies = session.runtime.policies.write().await;
1893            policies.register(&def.name, check, "");
1894            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1895        }
1896    }
1897}
1898
1899async fn handle_verify(
1900    req: &JsonRpcMessage,
1901    session: &crate::session::ClientSession,
1902) -> Result<Value, String> {
1903    let vr: VerifyRequest =
1904        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1905    let tools: std::collections::HashSet<String> =
1906        session.runtime.tools.read().await.keys().cloned().collect();
1907    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1908    serde_json::to_value(VerifyResponse {
1909        valid: result.valid,
1910        issues: result
1911            .issues
1912            .iter()
1913            .map(|i| VerifyIssueProto {
1914                action_id: i.action_id.clone(),
1915                severity: i.severity.clone(),
1916                message: i.message.clone(),
1917            })
1918            .collect(),
1919        simulated_state: result.simulated_state,
1920    })
1921    .map_err(|e| e.to_string())
1922}
1923
1924/// Parse the optional `tenant_id` sibling field from JSON-RPC
1925/// params (Parslee-ai/car#187 phase 3-E). When set and non-empty,
1926/// state R/W routes through `StateStore::scoped(tenant_id)` so
1927/// distinct tenants can't see each other's keys over the WS
1928/// surface — symmetric to the proposal.submit scope plumbing.
1929/// When absent / empty, the legacy unscoped namespace applies.
1930fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1931    req.params
1932        .get("tenant_id")
1933        .and_then(|v| v.as_str())
1934        .filter(|s| !s.is_empty())
1935        .map(str::to_string)
1936}
1937
1938async fn handle_state_get(
1939    req: &JsonRpcMessage,
1940    session: &crate::session::ClientSession,
1941) -> Result<Value, String> {
1942    let key = req
1943        .params
1944        .get("key")
1945        .and_then(|v| v.as_str())
1946        .ok_or("missing 'key'")?;
1947    let tenant = tenant_from_params(req);
1948    Ok(session
1949        .runtime
1950        .state
1951        .scoped(tenant.as_deref())
1952        .get(key)
1953        .unwrap_or(Value::Null))
1954}
1955
1956async fn handle_state_set(
1957    req: &JsonRpcMessage,
1958    session: &crate::session::ClientSession,
1959) -> Result<Value, String> {
1960    let key = req
1961        .params
1962        .get("key")
1963        .and_then(|v| v.as_str())
1964        .ok_or("missing 'key'")?;
1965    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1966    let tenant = tenant_from_params(req);
1967    session
1968        .runtime
1969        .state
1970        .scoped(tenant.as_deref())
1971        .set(key, value, "client");
1972    Ok(Value::from("ok"))
1973}
1974
1975/// `state.exists` — true if the key is set in this session's state
1976/// store, false otherwise. Cheaper than `state.get` + null-check on
1977/// the client side because it doesn't serialize the value.
1978async fn handle_state_exists(
1979    req: &JsonRpcMessage,
1980    session: &crate::session::ClientSession,
1981) -> Result<Value, String> {
1982    let key = req
1983        .params
1984        .get("key")
1985        .and_then(|v| v.as_str())
1986        .ok_or("missing 'key'")?;
1987    let tenant = tenant_from_params(req);
1988    Ok(Value::Bool(
1989        session.runtime.state.scoped(tenant.as_deref()).exists(key),
1990    ))
1991}
1992
1993/// `state.keys` — list every key currently set in this session's
1994/// state store. Returns a JSON array of strings.
1995async fn handle_state_keys(
1996    req: &JsonRpcMessage,
1997    session: &crate::session::ClientSession,
1998) -> Result<Value, String> {
1999    let tenant = tenant_from_params(req);
2000    Ok(Value::Array(
2001        session
2002            .runtime
2003            .state
2004            .scoped(tenant.as_deref())
2005            .keys()
2006            .into_iter()
2007            .map(Value::String)
2008            .collect(),
2009    ))
2010}
2011
2012/// `state.snapshot` — return the entire session state store as a
2013/// JSON object (`{ key: value, ... }`). Equivalent to iterating
2014/// `state.keys` + `state.get` but in a single round-trip; for
2015/// inspectors/dashboards.
2016///
2017/// Tenant-scoped variant: when `tenant_id` is set, only that
2018/// tenant's keys are returned (prefix stripped on the way out).
2019/// `state.snapshot` with no `tenant_id` returns only unscoped
2020/// keys; consistent with `state.keys`'s filter behaviour and the
2021/// strict-isolation contract from phase 3-B.
2022async fn handle_state_snapshot(
2023    req: &JsonRpcMessage,
2024    session: &crate::session::ClientSession,
2025) -> Result<Value, String> {
2026    let tenant = tenant_from_params(req);
2027    let view = session.runtime.state.scoped(tenant.as_deref());
2028    let mut map = serde_json::Map::new();
2029    for key in view.keys() {
2030        if let Some(value) = view.get(&key) {
2031            map.insert(key, value);
2032        }
2033    }
2034    Ok(Value::Object(map))
2035}
2036
2037// --- Per-agent persistent memgine (#170) ---
2038
2039/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
2040/// Mirrors the existing `memory.persist` shape (flat JSON array of
2041/// fact objects) so the same loader path works.
2042fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2043    let base = car_ffi_common::memory_path::ensure_base()
2044        .map_err(|e| format!("memory base unavailable: {e}"))?;
2045    let dir = base.join("agents");
2046    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2047    Ok(dir.join(format!("{agent_id}.json")))
2048}
2049
2050/// Acquire (or lazy-create + load from disk) the daemon-owned
2051/// persistent memgine for `agent_id`. First call per id reads
2052/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
2053/// share the in-memory engine across sessions. Caller stores the
2054/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
2055/// handlers route through it via [`ClientSession::effective_memgine`].
2056async fn get_or_load_agent_memgine(
2057    state: &Arc<ServerState>,
2058    agent_id: &str,
2059) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2060    {
2061        let map = state.agent_memgines.lock().await;
2062        if let Some(eng) = map.get(agent_id) {
2063            return Ok(eng.clone());
2064        }
2065    }
2066    // Build a fresh engine and try to load from disk.
2067    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2068        None,
2069    )));
2070    let path = agent_memgine_snapshot_path(agent_id)?;
2071    if path.exists() {
2072        let content = std::fs::read_to_string(&path)
2073            .map_err(|e| format!("read {}: {}", path.display(), e))?;
2074        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2075        let mut g = engine.lock().await;
2076        let mut loaded: u32 = 0;
2077        for fact in &facts {
2078            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2079            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2080            let kind = fact
2081                .get("kind")
2082                .and_then(|v| v.as_str())
2083                .unwrap_or("pattern");
2084            let fid = format!("loaded-{loaded}");
2085            g.ingest_fact(
2086                &fid,
2087                subject,
2088                body,
2089                "user",
2090                "peer",
2091                chrono::Utc::now(),
2092                "global",
2093                None,
2094                vec![],
2095                kind == "constraint",
2096            );
2097            loaded += 1;
2098        }
2099    }
2100    let mut map = state.agent_memgines.lock().await;
2101    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2102    Ok(stored)
2103}
2104
2105/// Snapshot the agent's memgine to its disk file. Same on-wire shape
2106/// as `memory.persist` so manual snapshots and the daemon-owned
2107/// persistence stay interoperable.
2108async fn persist_agent_memgine(
2109    agent_id: &str,
2110    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2111) -> Result<(), String> {
2112    let path = agent_memgine_snapshot_path(agent_id)?;
2113    let g = engine.lock().await;
2114    let facts: Vec<Value> = g
2115        .graph
2116        .inner
2117        .node_indices()
2118        .filter_map(|nix| {
2119            let node = g.graph.inner.node_weight(nix)?;
2120            if !node.is_valid() {
2121                return None;
2122            }
2123            if node.kind == car_memgine::MemKind::Identity
2124                || node.kind == car_memgine::MemKind::Environment
2125            {
2126                return None;
2127            }
2128            Some(serde_json::json!({
2129                "subject": node.key,
2130                "body": node.value,
2131                "kind": match node.kind {
2132                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2133                    car_memgine::MemKind::Conversation => "outcome",
2134                    _ => "pattern",
2135                },
2136                "confidence": 0.5,
2137                "content_type": node.content_type.as_label(),
2138            }))
2139        })
2140        .collect();
2141    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2142    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2143    Ok(())
2144}
2145
2146// --- Memory handlers ---
2147
2148/// `memory.fact_count` — return `valid_fact_count()` of the
2149/// session's memgine. Used by FFI bindings to mirror their
2150/// embedded `fact_count()` accessor without round-tripping a full
2151/// query. No params.
2152async fn handle_memory_fact_count(
2153    session: &crate::session::ClientSession,
2154) -> Result<Value, String> {
2155    let engine_arc = session.effective_memgine().await;
2156    let engine = engine_arc.lock().await;
2157    Ok(Value::from(engine.valid_fact_count()))
2158}
2159
2160async fn handle_memory_add_fact(
2161    req: &JsonRpcMessage,
2162    session: &crate::session::ClientSession,
2163) -> Result<Value, String> {
2164    let subject = req
2165        .params
2166        .get("subject")
2167        .and_then(|v| v.as_str())
2168        .ok_or("missing subject")?;
2169    let body = req
2170        .params
2171        .get("body")
2172        .and_then(|v| v.as_str())
2173        .ok_or("missing body")?;
2174    let kind = req
2175        .params
2176        .get("kind")
2177        .and_then(|v| v.as_str())
2178        .unwrap_or("pattern");
2179    // Route through `effective_memgine` so connections bound to a
2180    // lifecycle agent (#169) write into the daemon-owned per-agent
2181    // memgine instead of the per-WS ephemeral one (#170).
2182    let engine_arc = session.effective_memgine().await;
2183    let count = {
2184        let mut engine = engine_arc.lock().await;
2185        let fid = format!("ws-{}", engine.valid_fact_count());
2186        engine.ingest_fact(
2187            &fid,
2188            subject,
2189            body,
2190            "user",
2191            "peer",
2192            chrono::Utc::now(),
2193            "global",
2194            None,
2195            vec![],
2196            kind == "constraint",
2197        );
2198        engine.valid_fact_count()
2199    };
2200    // Persist after every add when the session is bound to a
2201    // supervised agent. Synchronous write — small JSON snapshot.
2202    if let Some(id) = session.agent_id.lock().await.clone() {
2203        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2204            tracing::warn!(agent_id = %id, error = %e,
2205                "agent memgine persist failed; in-memory state is canonical");
2206        }
2207    }
2208    Ok(Value::from(count))
2209}
2210
2211async fn handle_memory_query(
2212    req: &JsonRpcMessage,
2213    session: &crate::session::ClientSession,
2214) -> Result<Value, String> {
2215    let query = req
2216        .params
2217        .get("query")
2218        .and_then(|v| v.as_str())
2219        .ok_or("missing query")?;
2220    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2221    let engine_arc = session.effective_memgine().await;
2222    let engine = engine_arc.lock().await;
2223    let seeds = engine.graph.find_seeds(query, 5);
2224    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
2225    // both use Personalized PageRank so transport choice doesn't shift
2226    // ranking semantics. Result shape (subject/body/kind/confidence)
2227    // also mirrors NAPI for the same reason.
2228    let hits = if !seeds.is_empty() {
2229        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2230    } else {
2231        vec![]
2232    };
2233    let results: Vec<Value> = hits
2234        .iter()
2235        .filter_map(|hit| {
2236            let node = engine.graph.inner.node_weight(hit.node_ix)?;
2237            Some(serde_json::json!({
2238                "subject": node.key,
2239                "body": node.value,
2240                "kind": format!("{:?}", node.kind).to_lowercase(),
2241                "confidence": hit.activation,
2242            }))
2243        })
2244        .collect();
2245    serde_json::to_value(results).map_err(|e| e.to_string())
2246}
2247
2248async fn handle_memory_build_context(
2249    req: &JsonRpcMessage,
2250    session: &crate::session::ClientSession,
2251) -> Result<Value, String> {
2252    let query = req
2253        .params
2254        .get("query")
2255        .and_then(|v| v.as_str())
2256        .unwrap_or("");
2257    // FFI parity with NAPI `build_context(query, model_context_window)`.
2258    // When supplied, sizes the assembly budget against the model's window
2259    // instead of the fixed 8K default.
2260    let model_context_window = req
2261        .params
2262        .get("model_context_window")
2263        .and_then(|v| v.as_u64())
2264        .map(|w| w as usize);
2265    let mut engine = session.memgine.lock().await;
2266    Ok(Value::from(
2267        engine.build_context_for_model(query, model_context_window),
2268    ))
2269}
2270
2271/// `memory.build_context_fast` — Fast-mode context assembly for
2272/// latency-sensitive paths (voice, real-time). Skips embedding flush,
2273/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
2274/// extraction. Keeps identity, constraints, facts (creation order),
2275/// conversation, environment.
2276async fn handle_memory_build_context_fast(
2277    req: &JsonRpcMessage,
2278    session: &crate::session::ClientSession,
2279) -> Result<Value, String> {
2280    let query = req
2281        .params
2282        .get("query")
2283        .and_then(|v| v.as_str())
2284        .unwrap_or("");
2285    let model_context_window = req
2286        .params
2287        .get("model_context_window")
2288        .and_then(|v| v.as_u64())
2289        .map(|w| w as usize);
2290    let mut engine = session.memgine.lock().await;
2291    Ok(Value::from(engine.build_context_with_options(
2292        query,
2293        model_context_window,
2294        car_memgine::ContextMode::Fast,
2295        None,
2296    )))
2297}
2298
2299/// `memory.persist` — write the session's memgine to a JSON file
2300/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
2301/// so daemon-mode clients can drive checkpoint/restore symmetrically
2302/// with embedded mode. Returns the number of facts written.
2303///
2304/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
2305/// not the caller's. Since the 2026-05 audit, `path` is also
2306/// sandboxed under `~/.car/memory/` via
2307/// [`car_ffi_common::memory_path::resolve`] — relative paths land
2308/// under the base, absolute paths must already be under the base,
2309/// `..` segments are rejected, symlinks pointing out are rejected.
2310/// Pre-2026-05 the path was passed straight to `std::fs::write` and
2311/// became an arbitrary file-write primitive. The base64-blob escape
2312/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
2313/// same resolver when it lands.
2314async fn handle_memory_persist(
2315    req: &JsonRpcMessage,
2316    session: &crate::session::ClientSession,
2317) -> Result<Value, String> {
2318    let path = req
2319        .params
2320        .get("path")
2321        .and_then(|v| v.as_str())
2322        .ok_or("missing path")?;
2323    let resolved = car_ffi_common::memory_path::resolve(path)
2324        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2325    let engine = session.memgine.lock().await;
2326    let facts: Vec<Value> = engine
2327        .graph
2328        .inner
2329        .node_indices()
2330        .filter_map(|nix| {
2331            let node = engine.graph.inner.node_weight(nix)?;
2332            if !node.is_valid() {
2333                return None;
2334            }
2335            if node.kind == car_memgine::MemKind::Identity
2336                || node.kind == car_memgine::MemKind::Environment
2337            {
2338                return None;
2339            }
2340            Some(serde_json::json!({
2341                "subject": node.key,
2342                "body": node.value,
2343                "kind": match node.kind {
2344                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2345                    car_memgine::MemKind::Conversation => "outcome",
2346                    _ => "pattern",
2347                },
2348                "confidence": 0.5,
2349                "content_type": node.content_type.as_label(),
2350            }))
2351        })
2352        .collect();
2353    let count = facts.len();
2354    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2355    std::fs::write(&resolved, json)
2356        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2357    Ok(Value::from(count as u64))
2358}
2359
2360/// `memory.load` — replace the session's memgine with facts from the
2361/// JSON file at `path`. Mirrors NAPI `load_memory`
2362/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
2363/// as `memory.persist` since the 2026-05 audit — relative paths
2364/// land under the base, anything that escapes is rejected.
2365async fn handle_memory_load(
2366    req: &JsonRpcMessage,
2367    session: &crate::session::ClientSession,
2368) -> Result<Value, String> {
2369    let path = req
2370        .params
2371        .get("path")
2372        .and_then(|v| v.as_str())
2373        .ok_or("missing path")?;
2374    let resolved = car_ffi_common::memory_path::resolve(path)
2375        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2376    let content = std::fs::read_to_string(&resolved)
2377        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2378    let facts: Vec<Value> =
2379        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2380    let mut engine = session.memgine.lock().await;
2381    engine.reset();
2382    let mut count: u32 = 0;
2383    for fact in &facts {
2384        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2385        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2386        let kind = fact
2387            .get("kind")
2388            .and_then(|v| v.as_str())
2389            .unwrap_or("pattern");
2390        let fid = format!("loaded-{}", count);
2391        engine.ingest_fact(
2392            &fid,
2393            subject,
2394            body,
2395            "user",
2396            "peer",
2397            chrono::Utc::now(),
2398            "global",
2399            None,
2400            vec![],
2401            kind == "constraint",
2402        );
2403        count += 1;
2404    }
2405    Ok(Value::from(count))
2406}
2407
2408// --- Skill handlers ---
2409
2410async fn handle_skill_ingest(
2411    req: &JsonRpcMessage,
2412    session: &crate::session::ClientSession,
2413) -> Result<Value, String> {
2414    let name = req
2415        .params
2416        .get("name")
2417        .and_then(|v| v.as_str())
2418        .ok_or("missing name")?;
2419    let code = req
2420        .params
2421        .get("code")
2422        .and_then(|v| v.as_str())
2423        .ok_or("missing code")?;
2424    let platform = req
2425        .params
2426        .get("platform")
2427        .and_then(|v| v.as_str())
2428        .unwrap_or("unknown");
2429    let persona = req
2430        .params
2431        .get("persona")
2432        .and_then(|v| v.as_str())
2433        .unwrap_or("");
2434    let url_pattern = req
2435        .params
2436        .get("url_pattern")
2437        .and_then(|v| v.as_str())
2438        .unwrap_or("");
2439    let description = req
2440        .params
2441        .get("description")
2442        .and_then(|v| v.as_str())
2443        .unwrap_or("");
2444    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2445    let keywords: Vec<String> = req
2446        .params
2447        .get("task_keywords")
2448        .and_then(|v| v.as_array())
2449        .map(|arr| {
2450            arr.iter()
2451                .filter_map(|v| v.as_str().map(String::from))
2452                .collect()
2453        })
2454        .unwrap_or_default();
2455
2456    let trigger = car_memgine::SkillTrigger {
2457        persona: persona.into(),
2458        url_pattern: url_pattern.into(),
2459        task_keywords: keywords,
2460        structured: None,
2461    };
2462    let mut engine = session.memgine.lock().await;
2463    let node = engine.ingest_skill(
2464        name,
2465        code,
2466        platform,
2467        trigger,
2468        description,
2469        supersedes,
2470        vec![],
2471        vec![],
2472    );
2473    Ok(Value::from(node.index() as u64))
2474}
2475
2476async fn handle_skill_find(
2477    req: &JsonRpcMessage,
2478    session: &crate::session::ClientSession,
2479) -> Result<Value, String> {
2480    let persona = req
2481        .params
2482        .get("persona")
2483        .and_then(|v| v.as_str())
2484        .unwrap_or("");
2485    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2486    let task = req
2487        .params
2488        .get("task")
2489        .and_then(|v| v.as_str())
2490        .unwrap_or("");
2491    let max = req
2492        .params
2493        .get("max_results")
2494        .and_then(|v| v.as_u64())
2495        .unwrap_or(1) as usize;
2496    let engine = session.memgine.lock().await;
2497    let results = engine.find_skill(persona, url, task, max);
2498    let json: Vec<Value> = results
2499        .iter()
2500        .map(|(m, s)| {
2501            serde_json::json!({
2502                "name": m.name, "code": m.code, "platform": m.platform,
2503                "description": m.description, "stats": m.stats, "match_score": s,
2504            })
2505        })
2506        .collect();
2507    serde_json::to_value(json).map_err(|e| e.to_string())
2508}
2509
2510async fn handle_skill_report(
2511    req: &JsonRpcMessage,
2512    session: &crate::session::ClientSession,
2513) -> Result<Value, String> {
2514    let name = req
2515        .params
2516        .get("skill_name")
2517        .and_then(|v| v.as_str())
2518        .ok_or("missing skill_name")?;
2519    let outcome_str = req
2520        .params
2521        .get("outcome")
2522        .and_then(|v| v.as_str())
2523        .ok_or("missing outcome")?;
2524    let outcome = match outcome_str {
2525        "success" => car_memgine::SkillOutcome::Success,
2526        _ => car_memgine::SkillOutcome::Fail,
2527    };
2528    let mut engine = session.memgine.lock().await;
2529    let stats = engine
2530        .report_outcome(name, outcome)
2531        .ok_or(format!("skill '{}' not found", name))?;
2532    serde_json::to_value(stats).map_err(|e| e.to_string())
2533}
2534
2535// ---------------------------------------------------------------------------
2536// Multi-agent coordination handlers
2537//
2538// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2539// The client runs the model loop and responds with AgentOutput JSON.
2540// ---------------------------------------------------------------------------
2541
2542/// AgentRunner backed by WebSocket callback to the client.
2543struct WsAgentRunner {
2544    channel: Arc<WsChannel>,
2545    host: Arc<crate::host::HostState>,
2546    client_id: String,
2547}
2548
2549#[async_trait::async_trait]
2550impl car_multi::AgentRunner for WsAgentRunner {
2551    async fn run(
2552        &self,
2553        spec: &car_multi::AgentSpec,
2554        task: &str,
2555        _runtime: &car_engine::Runtime,
2556        _mailbox: &car_multi::Mailbox,
2557    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2558        use futures::SinkExt;
2559
2560        let request_id = self.channel.next_request_id();
2561        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2562        let agent = self
2563            .host
2564            .register_agent(
2565                &self.client_id,
2566                RegisterHostAgentRequest {
2567                    id: Some(agent_id.clone()),
2568                    name: spec.name.clone(),
2569                    kind: "callback".to_string(),
2570                    capabilities: spec.tools.clone(),
2571                    project: spec
2572                        .metadata
2573                        .get("project")
2574                        .and_then(|v| v.as_str())
2575                        .map(str::to_string),
2576                    pid: None,
2577                    display: serde_json::from_value(
2578                        spec.metadata
2579                            .get("display")
2580                            .cloned()
2581                            .unwrap_or(serde_json::Value::Null),
2582                    )
2583                    .unwrap_or_default(),
2584                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2585                },
2586            )
2587            .await
2588            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2589        let _ = self
2590            .host
2591            .set_status(
2592                &self.client_id,
2593                SetHostAgentStatusRequest {
2594                    agent_id: agent.id.clone(),
2595                    status: HostAgentStatus::Running,
2596                    current_task: Some(task.to_string()),
2597                    message: Some(format!("{} started", spec.name)),
2598                    payload: serde_json::json!({ "task": task }),
2599                },
2600            )
2601            .await;
2602
2603        let rpc_request = serde_json::json!({
2604            "jsonrpc": "2.0",
2605            "method": "multi.run_agent",
2606            "params": {
2607                "spec": spec,
2608                "task": task,
2609            },
2610            "id": request_id,
2611        });
2612
2613        // Create oneshot channel for the response
2614        let (tx, rx) = tokio::sync::oneshot::channel();
2615        self.channel
2616            .pending
2617            .lock()
2618            .await
2619            .insert(request_id.clone(), tx);
2620
2621        let msg = Message::Text(
2622            serde_json::to_string(&rpc_request)
2623                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2624                .into(),
2625        );
2626        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2627            let _ = self
2628                .host
2629                .set_status(
2630                    &self.client_id,
2631                    SetHostAgentStatusRequest {
2632                        agent_id: agent_id.clone(),
2633                        status: HostAgentStatus::Errored,
2634                        current_task: None,
2635                        message: Some(format!("{} failed to start", spec.name)),
2636                        payload: serde_json::json!({ "error": e.to_string() }),
2637                    },
2638                )
2639                .await;
2640            return Err(car_multi::MultiError::AgentFailed(
2641                spec.name.clone(),
2642                format!("ws send error: {}", e),
2643            ));
2644        }
2645
2646        // Wait for client response (5 min timeout for model loops)
2647        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2648            Ok(Ok(response)) => response,
2649            Ok(Err(_)) => {
2650                let _ = self
2651                    .host
2652                    .set_status(
2653                        &self.client_id,
2654                        SetHostAgentStatusRequest {
2655                            agent_id: agent_id.clone(),
2656                            status: HostAgentStatus::Errored,
2657                            current_task: None,
2658                            message: Some(format!("{} callback channel closed", spec.name)),
2659                            payload: Value::Null,
2660                        },
2661                    )
2662                    .await;
2663                return Err(car_multi::MultiError::AgentFailed(
2664                    spec.name.clone(),
2665                    "agent callback channel closed".into(),
2666                ));
2667            }
2668            Err(_) => {
2669                let _ = self
2670                    .host
2671                    .set_status(
2672                        &self.client_id,
2673                        SetHostAgentStatusRequest {
2674                            agent_id: agent_id.clone(),
2675                            status: HostAgentStatus::Errored,
2676                            current_task: None,
2677                            message: Some(format!("{} timed out", spec.name)),
2678                            payload: Value::Null,
2679                        },
2680                    )
2681                    .await;
2682                return Err(car_multi::MultiError::AgentFailed(
2683                    spec.name.clone(),
2684                    "agent callback timed out (300s)".into(),
2685                ));
2686            }
2687        };
2688
2689        if let Some(err) = response.error {
2690            let _ = self
2691                .host
2692                .set_status(
2693                    &self.client_id,
2694                    SetHostAgentStatusRequest {
2695                        agent_id: agent_id.clone(),
2696                        status: HostAgentStatus::Errored,
2697                        current_task: None,
2698                        message: Some(format!("{} errored", spec.name)),
2699                        payload: serde_json::json!({ "error": err }),
2700                    },
2701                )
2702                .await;
2703            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2704        }
2705
2706        let output_value = response.output.unwrap_or(Value::Null);
2707        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2708            car_multi::MultiError::AgentFailed(
2709                spec.name.clone(),
2710                format!("invalid AgentOutput: {}", e),
2711            )
2712        })?;
2713        let status = if output.error.is_some() {
2714            HostAgentStatus::Errored
2715        } else {
2716            HostAgentStatus::Completed
2717        };
2718        let message = if output.error.is_some() {
2719            format!("{} errored", spec.name)
2720        } else {
2721            format!("{} completed", spec.name)
2722        };
2723        let _ = self
2724            .host
2725            .set_status(
2726                &self.client_id,
2727                SetHostAgentStatusRequest {
2728                    agent_id,
2729                    status,
2730                    current_task: None,
2731                    message: Some(message),
2732                    payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2733                },
2734            )
2735            .await;
2736
2737        Ok(output)
2738    }
2739}
2740
2741fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2742    let safe_name: String = name
2743        .chars()
2744        .map(|c| {
2745            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2746                c
2747            } else {
2748                '-'
2749            }
2750        })
2751        .collect();
2752    format!("{}:{}:{}", client_id, safe_name, request_id)
2753}
2754
2755async fn handle_multi_swarm(
2756    req: &JsonRpcMessage,
2757    session: &crate::session::ClientSession,
2758) -> Result<Value, String> {
2759    let mode_str = req
2760        .params
2761        .get("mode")
2762        .and_then(|v| v.as_str())
2763        .ok_or("missing 'mode'")?;
2764    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2765    let task = req
2766        .params
2767        .get("task")
2768        .and_then(|v| v.as_str())
2769        .ok_or("missing 'task'")?;
2770
2771    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2772        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2773    let agent_specs: Vec<car_multi::AgentSpec> =
2774        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2775    let synth: Option<car_multi::AgentSpec> = req
2776        .params
2777        .get("synthesizer")
2778        .map(|v| serde_json::from_value(v.clone()))
2779        .transpose()
2780        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2781
2782    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2783        channel: session.channel.clone(),
2784        host: session.host.clone(),
2785        client_id: session.client_id.clone(),
2786    });
2787    let infra = car_multi::SharedInfra::new();
2788
2789    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2790    if let Some(s) = synth {
2791        swarm = swarm.with_synthesizer(s);
2792    }
2793
2794    let result = swarm
2795        .run(task, &runner, &infra)
2796        .await
2797        .map_err(|e| format!("swarm error: {}", e))?;
2798    serde_json::to_value(result).map_err(|e| e.to_string())
2799}
2800
2801async fn handle_multi_pipeline(
2802    req: &JsonRpcMessage,
2803    session: &crate::session::ClientSession,
2804) -> Result<Value, String> {
2805    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2806    let task = req
2807        .params
2808        .get("task")
2809        .and_then(|v| v.as_str())
2810        .ok_or("missing 'task'")?;
2811
2812    let stage_specs: Vec<car_multi::AgentSpec> =
2813        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2814
2815    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2816        channel: session.channel.clone(),
2817        host: session.host.clone(),
2818        client_id: session.client_id.clone(),
2819    });
2820    let infra = car_multi::SharedInfra::new();
2821
2822    let result = car_multi::Pipeline::new(stage_specs)
2823        .run(task, &runner, &infra)
2824        .await
2825        .map_err(|e| format!("pipeline error: {}", e))?;
2826    serde_json::to_value(result).map_err(|e| e.to_string())
2827}
2828
2829async fn handle_multi_supervisor(
2830    req: &JsonRpcMessage,
2831    session: &crate::session::ClientSession,
2832) -> Result<Value, String> {
2833    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2834    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2835    let task = req
2836        .params
2837        .get("task")
2838        .and_then(|v| v.as_str())
2839        .ok_or("missing 'task'")?;
2840    let max_rounds = req
2841        .params
2842        .get("max_rounds")
2843        .and_then(|v| v.as_u64())
2844        .unwrap_or(3) as u32;
2845
2846    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2847        .map_err(|e| format!("invalid workers: {}", e))?;
2848    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2849        .map_err(|e| format!("invalid supervisor: {}", e))?;
2850
2851    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2852        channel: session.channel.clone(),
2853        host: session.host.clone(),
2854        client_id: session.client_id.clone(),
2855    });
2856    let infra = car_multi::SharedInfra::new();
2857
2858    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2859        .with_max_rounds(max_rounds)
2860        .run(task, &runner, &infra)
2861        .await
2862        .map_err(|e| format!("supervisor error: {}", e))?;
2863    serde_json::to_value(result).map_err(|e| e.to_string())
2864}
2865
2866async fn handle_multi_map_reduce(
2867    req: &JsonRpcMessage,
2868    session: &crate::session::ClientSession,
2869) -> Result<Value, String> {
2870    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2871    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2872    let task = req
2873        .params
2874        .get("task")
2875        .and_then(|v| v.as_str())
2876        .ok_or("missing 'task'")?;
2877    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2878
2879    let mapper_spec: car_multi::AgentSpec =
2880        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2881    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2882        .map_err(|e| format!("invalid reducer: {}", e))?;
2883    let items: Vec<String> =
2884        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2885
2886    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2887        channel: session.channel.clone(),
2888        host: session.host.clone(),
2889        client_id: session.client_id.clone(),
2890    });
2891    let infra = car_multi::SharedInfra::new();
2892
2893    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2894        .run(task, &items, &runner, &infra)
2895        .await
2896        .map_err(|e| format!("map_reduce error: {}", e))?;
2897    serde_json::to_value(result).map_err(|e| e.to_string())
2898}
2899
2900async fn handle_multi_vote(
2901    req: &JsonRpcMessage,
2902    session: &crate::session::ClientSession,
2903) -> Result<Value, String> {
2904    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2905    let task = req
2906        .params
2907        .get("task")
2908        .and_then(|v| v.as_str())
2909        .ok_or("missing 'task'")?;
2910
2911    let agent_specs: Vec<car_multi::AgentSpec> =
2912        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2913    let synth: Option<car_multi::AgentSpec> = req
2914        .params
2915        .get("synthesizer")
2916        .map(|v| serde_json::from_value(v.clone()))
2917        .transpose()
2918        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2919
2920    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2921        channel: session.channel.clone(),
2922        host: session.host.clone(),
2923        client_id: session.client_id.clone(),
2924    });
2925    let infra = car_multi::SharedInfra::new();
2926
2927    let mut vote = car_multi::Vote::new(agent_specs);
2928    if let Some(s) = synth {
2929        vote = vote.with_synthesizer(s);
2930    }
2931
2932    let result = vote
2933        .run(task, &runner, &infra)
2934        .await
2935        .map_err(|e| format!("vote error: {}", e))?;
2936    serde_json::to_value(result).map_err(|e| e.to_string())
2937}
2938
2939// ---------------------------------------------------------------------------
2940// Scheduler handlers
2941// ---------------------------------------------------------------------------
2942
2943fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2944    let name = req
2945        .params
2946        .get("name")
2947        .and_then(|v| v.as_str())
2948        .ok_or("scheduler.create requires 'name'")?;
2949    let prompt = req
2950        .params
2951        .get("prompt")
2952        .and_then(|v| v.as_str())
2953        .ok_or("scheduler.create requires 'prompt'")?;
2954
2955    let mut task = car_scheduler::Task::new(name, prompt);
2956
2957    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2958        let trigger = match t {
2959            "once" => car_scheduler::TaskTrigger::Once,
2960            "cron" => car_scheduler::TaskTrigger::Cron,
2961            "interval" => car_scheduler::TaskTrigger::Interval,
2962            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2963            _ => car_scheduler::TaskTrigger::Manual,
2964        };
2965        let schedule = req
2966            .params
2967            .get("schedule")
2968            .and_then(|v| v.as_str())
2969            .unwrap_or("");
2970        task = task.with_trigger(trigger, schedule);
2971    }
2972
2973    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2974        task = task.with_system_prompt(sp);
2975    }
2976
2977    serde_json::to_value(&task).map_err(|e| e.to_string())
2978}
2979
2980async fn handle_scheduler_run(
2981    req: &JsonRpcMessage,
2982    session: &crate::session::ClientSession,
2983) -> Result<Value, String> {
2984    let task_val = req
2985        .params
2986        .get("task")
2987        .ok_or("scheduler.run requires 'task'")?;
2988    let mut task: car_scheduler::Task =
2989        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2990
2991    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2992        channel: session.channel.clone(),
2993        host: session.host.clone(),
2994        client_id: session.client_id.clone(),
2995    });
2996    let executor = car_scheduler::Executor::new(runner);
2997    let execution = executor.run_once(&mut task).await;
2998
2999    serde_json::to_value(&execution).map_err(|e| e.to_string())
3000}
3001
3002async fn handle_scheduler_run_loop(
3003    req: &JsonRpcMessage,
3004    session: &crate::session::ClientSession,
3005) -> Result<Value, String> {
3006    let task_val = req
3007        .params
3008        .get("task")
3009        .ok_or("scheduler.run_loop requires 'task'")?;
3010    let mut task: car_scheduler::Task =
3011        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3012    let max_iterations = req
3013        .params
3014        .get("max_iterations")
3015        .and_then(|v| v.as_u64())
3016        .map(|v| v as u32);
3017
3018    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3019        channel: session.channel.clone(),
3020        host: session.host.clone(),
3021        client_id: session.client_id.clone(),
3022    });
3023    let executor = car_scheduler::Executor::new(runner);
3024    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3025    let executions = executor
3026        .run_loop(&mut task, max_iterations, cancel_rx)
3027        .await;
3028
3029    serde_json::to_value(&executions).map_err(|e| e.to_string())
3030}
3031
3032// ---------------------------------------------------------------------------
3033// Inference handlers
3034// ---------------------------------------------------------------------------
3035
3036fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3037    state.inference.get_or_init(|| {
3038        Arc::new(car_inference::InferenceEngine::new(
3039            car_inference::InferenceConfig::default(),
3040        ))
3041    })
3042}
3043
3044async fn handle_infer(
3045    msg: &JsonRpcMessage,
3046    state: &ServerState,
3047    session: &crate::session::ClientSession,
3048) -> Result<Value, String> {
3049    let engine = get_inference_engine(state);
3050    let mut req: car_inference::GenerateRequest =
3051        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3052
3053    // If context_query is provided, build context from memgine and inject it
3054    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3055        let mut memgine = session.memgine.lock().await;
3056        let ctx = memgine.build_context(cq);
3057        if !ctx.is_empty() {
3058            req.context = Some(ctx);
3059        }
3060    }
3061
3062    // Process-wide admission gate. Held for the duration of the
3063    // generation so a burst of concurrent infer RPCs can't multiply
3064    // KV-cache + activation memory and take the host out. The
3065    // `_permit` binding is intentional — its `Drop` releases the slot
3066    // when this future returns.
3067    let _permit = state.admission.acquire().await;
3068
3069    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
3070    // latency_ms are preserved in the response. Plain `generate()` discards
3071    // everything except `.text`, which silently breaks tool-use over the
3072    // WebSocket protocol (issue #43).
3073    //
3074    // NOTE: This directly serializes `InferenceResult`. Any field added to
3075    // that struct in `car-inference` becomes part of the public WebSocket
3076    // protocol. The shape is locked by `inference_result_serializes_*` tests
3077    // in car-inference; updating those tests is part of intentionally
3078    // changing the wire contract.
3079    let result = engine
3080        .generate_tracked(req)
3081        .await
3082        .map_err(|e| e.to_string())?;
3083    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3084}
3085
3086/// Streaming inference — mirrors NAPI `inferStream`. Closes
3087/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
3088/// `infer`; emits `inference.stream.event` JSON-RPC notifications
3089/// during the run, and returns the final `InferenceResult` as the
3090/// JSON-RPC response when the stream completes.
3091///
3092/// Notification shape (server → client):
3093/// ```jsonc
3094/// {
3095///   "jsonrpc": "2.0",
3096///   "method": "inference.stream.event",
3097///   "params": {
3098///     "request_id": "<original RPC id>",
3099///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
3100///   }
3101/// }
3102/// ```
3103///
3104/// The final `done` event is not pushed as a notification — it's
3105/// the JSON-RPC response with the accumulated `InferenceResult`.
3106/// `video.generate` — daemon-side wrapper for
3107/// `InferenceEngine::generate_video`. Mirrors `handle_infer`'s
3108/// admission gate + JSON request shape (Parslee-ai/car#185).
3109///
3110/// Previously the CLI's `cmd_video` constructed an in-process
3111/// engine and called `generate_video` directly — a v0.7 holdover
3112/// that bypassed the daemon. With this handler the CLI proxies
3113/// here, so the engine-level audio_passthrough gate fires
3114/// inside the daemon process where all FFI surfaces converge.
3115async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3116    let engine = get_inference_engine(state);
3117    let req: car_inference::GenerateImageRequest =
3118        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3119    // Share the same admission gate as text/video generation — a burst
3120    // of image requests shouldn't smuggle around the concurrency cap.
3121    let _permit = state.admission.acquire().await;
3122    let result = engine
3123        .generate_image(req)
3124        .await
3125        .map_err(|e| e.to_string())?;
3126    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3127}
3128
3129async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3130    let engine = get_inference_engine(state);
3131    let req: car_inference::GenerateVideoRequest =
3132        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3133    let _permit = state.admission.acquire().await;
3134    let result = engine
3135        .generate_video(req)
3136        .await
3137        .map_err(|e| e.to_string())?;
3138    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3139}
3140
3141async fn handle_infer_stream(
3142    msg: &JsonRpcMessage,
3143    session: &crate::session::ClientSession,
3144    state: &ServerState,
3145) -> Result<Value, String> {
3146    use futures::SinkExt;
3147    use tokio_tungstenite::tungstenite::Message;
3148
3149    let engine = get_inference_engine(state);
3150    let mut req: car_inference::GenerateRequest =
3151        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3152
3153    // Same context-injection convenience as non-streaming `infer` so
3154    // the two methods have parity on the call shape.
3155    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3156        let mut memgine = session.memgine.lock().await;
3157        let ctx = memgine.build_context(cq);
3158        if !ctx.is_empty() {
3159            req.context = Some(ctx);
3160        }
3161    }
3162
3163    let _permit = state.admission.acquire().await;
3164    let mut rx = engine
3165        .generate_tracked_stream(req)
3166        .await
3167        .map_err(|e| e.to_string())?;
3168
3169    let mut accumulator = car_inference::StreamAccumulator::default();
3170    let request_id = msg.id.clone();
3171
3172    while let Some(event) = rx.recv().await {
3173        let event_payload = match &event {
3174            car_inference::StreamEvent::TextDelta(text) => {
3175                serde_json::json!({"type": "text", "data": text})
3176            }
3177            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3178                serde_json::json!({"type": "tool_start", "name": name, "index": index})
3179            }
3180            car_inference::StreamEvent::ToolCallDelta {
3181                index,
3182                arguments_delta,
3183            } => serde_json::json!({
3184                "type": "tool_delta",
3185                "index": index,
3186                "data": arguments_delta,
3187            }),
3188            car_inference::StreamEvent::Usage {
3189                input_tokens,
3190                output_tokens,
3191            } => serde_json::json!({
3192                "type": "usage",
3193                "input_tokens": input_tokens,
3194                "output_tokens": output_tokens,
3195            }),
3196            // Done is delivered as the JSON-RPC response, not a
3197            // notification — matches the NAPI contract where the
3198            // standalone function's return value is the accumulated
3199            // result and the callback only sees in-progress events.
3200            car_inference::StreamEvent::Done { .. } => {
3201                accumulator.push(&event);
3202                continue;
3203            }
3204        };
3205
3206        let notif = serde_json::json!({
3207            "jsonrpc": "2.0",
3208            "method": "inference.stream.event",
3209            "params": {
3210                "request_id": request_id,
3211                "event": event_payload,
3212            },
3213        });
3214        if let Ok(text) = serde_json::to_string(&notif) {
3215            let _ = session
3216                .channel
3217                .write
3218                .lock()
3219                .await
3220                .send(Message::Text(text.into()))
3221                .await;
3222        }
3223        accumulator.push(&event);
3224    }
3225
3226    let (text, tool_calls, usage) = accumulator.finish_with_usage();
3227    Ok(serde_json::json!({
3228        "text": text,
3229        "tool_calls": tool_calls,
3230        "usage": usage,
3231    }))
3232}
3233
3234async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3235    let engine = get_inference_engine(state);
3236    let req: car_inference::EmbedRequest =
3237        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3238    // Embeds load their own model weights; share the same admission
3239    // gate as generations so a burst of embed requests can't smuggle
3240    // around the concurrency cap.
3241    let _permit = state.admission.acquire().await;
3242    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3243    Ok(serde_json::json!({"embeddings": result}))
3244}
3245
3246async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3247    let engine = get_inference_engine(state);
3248    let req: car_inference::ClassifyRequest =
3249        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3250    let _permit = state.admission.acquire().await;
3251    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3252    Ok(serde_json::json!({"classifications": result}))
3253}
3254
3255/// Surface the current admission state so the menubar tray and
3256/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
3257/// snapshot — racy by definition but correct enough for status panels.
3258fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3259    let total = state.admission.permits();
3260    let available = state.admission.permits_available();
3261    let in_use = total.saturating_sub(available);
3262    Ok(serde_json::json!({
3263        "permits_total": total,
3264        "permits_available": available,
3265        "permits_in_use": in_use,
3266        "env_override": crate::admission::ENV_MAX_CONCURRENT,
3267    }))
3268}
3269
3270async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3271    let model = msg
3272        .params
3273        .get("model")
3274        .and_then(|v| v.as_str())
3275        .ok_or("missing 'model' parameter")?;
3276    let text = msg
3277        .params
3278        .get("text")
3279        .and_then(|v| v.as_str())
3280        .ok_or("missing 'text' parameter")?;
3281    let engine = get_inference_engine(state);
3282    let ids = engine
3283        .tokenize(model, text)
3284        .await
3285        .map_err(|e| e.to_string())?;
3286    Ok(serde_json::json!({"tokens": ids}))
3287}
3288
3289async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3290    let model = msg
3291        .params
3292        .get("model")
3293        .and_then(|v| v.as_str())
3294        .ok_or("missing 'model' parameter")?;
3295    let tokens: Vec<u32> = msg
3296        .params
3297        .get("tokens")
3298        .and_then(|v| v.as_array())
3299        .ok_or("missing 'tokens' parameter")?
3300        .iter()
3301        .map(|t| {
3302            t.as_u64()
3303                .and_then(|n| u32::try_from(n).ok())
3304                .ok_or_else(|| "tokens[] must be u32 values".to_string())
3305        })
3306        .collect::<Result<Vec<_>, _>>()?;
3307    let engine = get_inference_engine(state);
3308    let text = engine
3309        .detokenize(model, &tokens)
3310        .await
3311        .map_err(|e| e.to_string())?;
3312    Ok(serde_json::json!({"text": text}))
3313}
3314
3315/// `models.register` — persist a user-supplied `ModelSchema` to
3316/// `~/.car/models.json` (Parslee-ai/car-releases#39). Replaces any
3317/// existing entry with the same `id`. Returns `{id, registered}`.
3318///
3319/// **Phase 1 limitation**: the daemon's live `UnifiedRegistry` is
3320/// not updated in-process — the new model becomes visible to
3321/// `models.list`, `infer`, `infer_stream` on the **next daemon
3322/// boot** when `load_user_config` re-reads the file. This is
3323/// enough to unblock opencode's setup flow (register ahead of
3324/// time, then start the daemon). Hot-update requires either an
3325/// `RwLock<InferenceEngine>` on `ServerState` or an
3326/// interior-mutable `UnifiedRegistry`; both touch 20+ call sites
3327/// and are tracked as a follow-up.
3328///
3329/// Until hot-update lands, callers SHOULD register their models
3330/// before issuing `infer` calls against them, and operators
3331/// SHOULD restart the daemon after batches of model
3332/// registrations.
3333async fn handle_models_register(
3334    req: &JsonRpcMessage,
3335    _state: &Arc<ServerState>,
3336) -> Result<Value, String> {
3337    // The params shape mirrors v0.7's FFI `rt.registerModel(schemaJson)`:
3338    // either the bare `ModelSchema` value, OR `{ schema: ModelSchema }`.
3339    // Honor both so existing in-process callers don't have to reshape.
3340    let schema_value = match req.params.get("schema") {
3341        Some(v) => v.clone(),
3342        None => req.params.clone(),
3343    };
3344    let schema: car_inference::ModelSchema =
3345        serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3346    let id = schema.id.clone();
3347
3348    // Resolve the models.json path the same way UnifiedRegistry does:
3349    // `<models_dir>/../models.json` where models_dir defaults to
3350    // `~/.car/models/`. We read whatever's there, swap in the new
3351    // entry, and write back atomically.
3352    let home = std::env::var_os("HOME")
3353        .or_else(|| std::env::var_os("USERPROFILE"))
3354        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3355    let car_dir = std::path::PathBuf::from(home).join(".car");
3356    std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3357    let path = car_dir.join("models.json");
3358
3359    let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3360        let text =
3361            std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3362        if text.trim().is_empty() {
3363            Vec::new()
3364        } else {
3365            serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3366        }
3367    } else {
3368        Vec::new()
3369    };
3370    // Replace existing entry with the same id, else append.
3371    if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3372        *slot = schema;
3373    } else {
3374        models.push(schema);
3375    }
3376    let json =
3377        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3378    let tmp = path.with_extension("json.tmp");
3379    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3380    std::fs::rename(&tmp, &path)
3381        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3382    Ok(serde_json::json!({
3383        "id": id,
3384        "registered": true,
3385        "path": path.to_string_lossy(),
3386        "note": "Daemon restart required for live UnifiedRegistry visibility \
3387                 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3388                 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3389    }))
3390}
3391
3392/// `models.unregister` — remove an entry from `~/.car/models.json`
3393/// by id (Parslee-ai/car#186 — symmetric to `models.register`).
3394/// Returns `{ id, unregistered, path }` on success. Returns an error
3395/// when the model isn't present.
3396///
3397/// **Phase 1 limitation** (same as `models.register`): the daemon's
3398/// live `UnifiedRegistry` is not rebuilt — the removal takes effect
3399/// on the next daemon boot. Callers SHOULD restart the daemon after
3400/// a batch of unregistrations if they expect `models.list_unified`
3401/// to reflect the change immediately.
3402async fn handle_models_unregister(
3403    req: &JsonRpcMessage,
3404    _state: &Arc<ServerState>,
3405) -> Result<Value, String> {
3406    // Params shape mirrors the CLI flag: `{ id: string }`. Bare-string
3407    // params are honored for symmetry with the register handler's
3408    // tolerant shape (`{schema: ...}` OR bare schema).
3409    let id = match req.params.get("id") {
3410        Some(v) => v
3411            .as_str()
3412            .ok_or_else(|| "`id` must be a string".to_string())?
3413            .to_string(),
3414        None => match req.params.as_str() {
3415            Some(s) => s.to_string(),
3416            None => return Err("missing `id` parameter".to_string()),
3417        },
3418    };
3419
3420    let home = std::env::var_os("HOME")
3421        .or_else(|| std::env::var_os("USERPROFILE"))
3422        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3423    let car_dir = std::path::PathBuf::from(home).join(".car");
3424    let path = car_dir.join("models.json");
3425
3426    if !path.exists() {
3427        return Err(format!(
3428            "no models.json at {} — nothing to unregister",
3429            path.display()
3430        ));
3431    }
3432    let text =
3433        std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3434    let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3435        Vec::new()
3436    } else {
3437        serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3438    };
3439    let before = models.len();
3440    models.retain(|m| m.id != id);
3441    if models.len() == before {
3442        return Err(format!("model {} not found in {}", id, path.display()));
3443    }
3444    let json =
3445        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3446    let tmp = path.with_extension("json.tmp");
3447    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3448    std::fs::rename(&tmp, &path)
3449        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3450    Ok(serde_json::json!({
3451        "id": id,
3452        "unregistered": true,
3453        "path": path.to_string_lossy(),
3454        "note": "Daemon restart required for live UnifiedRegistry visibility \
3455                 (phase 1, matching models.register).",
3456    }))
3457}
3458
3459fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3460    let engine = get_inference_engine(state);
3461    let models = engine.list_models();
3462    serde_json::to_value(&models).map_err(|e| e.to_string())
3463}
3464
3465fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3466    let engine = get_inference_engine(state);
3467    let models = engine.list_models_unified();
3468    serde_json::to_value(&models).map_err(|e| e.to_string())
3469}
3470
3471#[derive(Debug, Deserialize)]
3472#[serde(rename_all = "camelCase")]
3473struct ModelSearchParams {
3474    #[serde(default)]
3475    query: Option<String>,
3476    #[serde(default)]
3477    capability: Option<car_inference::ModelCapability>,
3478    #[serde(default)]
3479    provider: Option<String>,
3480    #[serde(default)]
3481    local_only: bool,
3482    #[serde(default)]
3483    available_only: bool,
3484    #[serde(default)]
3485    limit: Option<usize>,
3486}
3487
3488#[derive(Debug, Serialize)]
3489#[serde(rename_all = "camelCase")]
3490struct ModelSearchEntry {
3491    #[serde(flatten)]
3492    info: car_inference::ModelInfo,
3493    family: String,
3494    version: String,
3495    tags: Vec<String>,
3496    pullable: bool,
3497    upgrade: Option<car_inference::ModelUpgrade>,
3498}
3499
3500#[derive(Debug, Serialize)]
3501#[serde(rename_all = "camelCase")]
3502struct ModelSearchResponse {
3503    models: Vec<ModelSearchEntry>,
3504    upgrades: Vec<car_inference::ModelUpgrade>,
3505    total: usize,
3506    available: usize,
3507    local: usize,
3508    remote: usize,
3509}
3510
3511fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3512    let params: ModelSearchParams =
3513        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3514            query: None,
3515            capability: None,
3516            provider: None,
3517            local_only: false,
3518            available_only: false,
3519            limit: None,
3520        });
3521    let engine = get_inference_engine(state);
3522    let upgrades = engine.available_model_upgrades();
3523    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3524        .iter()
3525        .cloned()
3526        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3527        .collect();
3528    let query = params
3529        .query
3530        .as_deref()
3531        .map(str::trim)
3532        .filter(|q| !q.is_empty())
3533        .map(|q| q.to_ascii_lowercase());
3534    let provider = params
3535        .provider
3536        .as_deref()
3537        .map(str::trim)
3538        .filter(|p| !p.is_empty())
3539        .map(|p| p.to_ascii_lowercase());
3540
3541    let mut entries: Vec<ModelSearchEntry> = engine
3542        .list_schemas()
3543        .into_iter()
3544        .filter(|schema| {
3545            if let Some(capability) = params.capability {
3546                if !schema.has_capability(capability) {
3547                    return false;
3548                }
3549            }
3550            if let Some(provider) = provider.as_deref() {
3551                if schema.provider.to_ascii_lowercase() != provider {
3552                    return false;
3553                }
3554            }
3555            if params.local_only && !schema.is_local() {
3556                return false;
3557            }
3558            if params.available_only && !schema.available {
3559                return false;
3560            }
3561            if let Some(query) = query.as_deref() {
3562                let capability_text = schema
3563                    .capabilities
3564                    .iter()
3565                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3566                    .collect::<Vec<_>>()
3567                    .join(" ");
3568                let haystack = format!(
3569                    "{} {} {} {} {} {}",
3570                    schema.id,
3571                    schema.name,
3572                    schema.provider,
3573                    schema.family,
3574                    schema.tags.join(" "),
3575                    capability_text
3576                )
3577                .to_ascii_lowercase();
3578                if !haystack.contains(query) {
3579                    return false;
3580                }
3581            }
3582            true
3583        })
3584        .map(|schema| {
3585            let pullable = !schema.available
3586                && matches!(
3587                    schema.source,
3588                    car_inference::ModelSource::Local { .. }
3589                        | car_inference::ModelSource::Mlx { .. }
3590                );
3591            let info = car_inference::ModelInfo::from(&schema);
3592            let upgrade = upgrades_by_from.get(&schema.id).cloned();
3593            ModelSearchEntry {
3594                info,
3595                family: schema.family,
3596                version: schema.version,
3597                tags: schema.tags,
3598                pullable,
3599                upgrade,
3600            }
3601        })
3602        .collect();
3603    entries.sort_by(|a, b| {
3604        b.info
3605            .available
3606            .cmp(&a.info.available)
3607            .then(b.info.is_local.cmp(&a.info.is_local))
3608            .then(a.info.name.cmp(&b.info.name))
3609    });
3610    if let Some(limit) = params.limit {
3611        entries.truncate(limit);
3612    }
3613
3614    let total = entries.len();
3615    let available = entries.iter().filter(|entry| entry.info.available).count();
3616    let local = entries.iter().filter(|entry| entry.info.is_local).count();
3617    let response = ModelSearchResponse {
3618        models: entries,
3619        upgrades,
3620        total,
3621        available,
3622        local,
3623        remote: total.saturating_sub(local),
3624    };
3625    serde_json::to_value(response).map_err(|e| e.to_string())
3626}
3627
3628fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3629    let engine = get_inference_engine(state);
3630    serde_json::to_value(serde_json::json!({
3631        "upgrades": engine.available_model_upgrades()
3632    }))
3633    .map_err(|e| e.to_string())
3634}
3635
3636async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3637    let name = msg
3638        .params
3639        .get("name")
3640        .or_else(|| msg.params.get("id"))
3641        .or_else(|| msg.params.get("model"))
3642        .and_then(|v| v.as_str())
3643        .ok_or("missing 'name' parameter")?;
3644    let engine = get_inference_engine(state);
3645    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3646    Ok(serde_json::json!({"path": path.display().to_string()}))
3647}
3648
3649async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3650    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3651        msg.params
3652            .get("events")
3653            .cloned()
3654            .unwrap_or(msg.params.clone()),
3655    )
3656    .map_err(|e| format!("invalid events: {}", e))?;
3657
3658    let inference = get_inference_engine(state).clone();
3659    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3660
3661    let skills = engine.distill_skills(&events).await;
3662    serde_json::to_value(&skills).map_err(|e| e.to_string())
3663}
3664
3665/// Run memory consolidation against this client's session memgine
3666/// (or the daemon-owned per-agent memgine when bound — #170).
3667/// Returns the JSON `ConsolidationReport`.
3668async fn handle_memory_consolidate(
3669    session: &crate::session::ClientSession,
3670) -> Result<Value, String> {
3671    let engine_arc = session.effective_memgine().await;
3672    let report = {
3673        let mut engine = engine_arc.lock().await;
3674        engine.consolidate().await
3675    };
3676    if let Some(id) = session.agent_id.lock().await.clone() {
3677        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3678            tracing::warn!(agent_id = %id, error = %e,
3679                "agent memgine persist after consolidate failed");
3680        }
3681    }
3682    serde_json::to_value(&report).map_err(|e| e.to_string())
3683}
3684
3685/// Repair a degraded skill on this client's session memgine.
3686/// Returns `{ code: "..." }` on success, `null` if the skill
3687/// isn't broken or repair failed.
3688async fn handle_skill_repair(
3689    msg: &JsonRpcMessage,
3690    session: &crate::session::ClientSession,
3691) -> Result<Value, String> {
3692    let name = msg
3693        .params
3694        .get("skill_name")
3695        .and_then(|v| v.as_str())
3696        .ok_or("missing 'skill_name' parameter")?;
3697    let mut engine = session.memgine.lock().await;
3698    let code = engine.repair_skill(name).await;
3699    Ok(match code {
3700        Some(c) => serde_json::json!({ "code": c }),
3701        None => Value::Null,
3702    })
3703}
3704
3705/// Ingest distilled skills into this client's session memgine.
3706/// Returns the number of nodes inserted.
3707async fn handle_skills_ingest_distilled(
3708    msg: &JsonRpcMessage,
3709    session: &crate::session::ClientSession,
3710) -> Result<Value, String> {
3711    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3712        msg.params
3713            .get("skills")
3714            .cloned()
3715            .unwrap_or(msg.params.clone()),
3716    )
3717    .map_err(|e| format!("invalid skills: {}", e))?;
3718    let mut engine = session.memgine.lock().await;
3719    let nodes = engine.ingest_distilled_skills(&skills);
3720    Ok(serde_json::json!({ "ingested": nodes.len() }))
3721}
3722
3723/// Run skill evolution against this session's memgine for a
3724/// specified domain.  Returns the resulting `DistilledSkill` array.
3725async fn handle_skills_evolve(
3726    msg: &JsonRpcMessage,
3727    session: &crate::session::ClientSession,
3728) -> Result<Value, String> {
3729    let domain = msg
3730        .params
3731        .get("domain")
3732        .and_then(|v| v.as_str())
3733        .ok_or("missing 'domain' parameter")?
3734        .to_string();
3735    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3736        msg.params
3737            .get("events")
3738            .cloned()
3739            .unwrap_or(Value::Array(vec![])),
3740    )
3741    .map_err(|e| format!("invalid events: {}", e))?;
3742    let mut engine = session.memgine.lock().await;
3743    let skills = engine.evolve_skills(&events, &domain).await;
3744    serde_json::to_value(&skills).map_err(|e| e.to_string())
3745}
3746
3747/// List domains whose skills are underperforming on this session.
3748async fn handle_skills_domains_needing_evolution(
3749    msg: &JsonRpcMessage,
3750    session: &crate::session::ClientSession,
3751) -> Result<Value, String> {
3752    let threshold = msg
3753        .params
3754        .get("threshold")
3755        .and_then(|v| v.as_f64())
3756        .unwrap_or(0.6);
3757    let engine = session.memgine.lock().await;
3758    let domains = engine.domains_needing_evolution(threshold);
3759    serde_json::to_value(&domains).map_err(|e| e.to_string())
3760}
3761
3762/// Rerank documents against a query using a cross-encoder model.
3763async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3764    let engine = get_inference_engine(state);
3765    let req: car_inference::RerankRequest =
3766        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3767    let _permit = state.admission.acquire().await;
3768    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3769    serde_json::to_value(&result).map_err(|e| e.to_string())
3770}
3771
3772/// Transcribe audio at the given path. The path is interpreted on
3773/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3774/// callers must pass a path the daemon can read (typically a
3775/// shared `~/.car/...` location or stdin push via the streaming
3776/// API).
3777async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3778    use base64::Engine as _;
3779    let engine = get_inference_engine(state);
3780
3781    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3782    // the caller can't share a filesystem view with the daemon (e.g.
3783    // unsandboxed Milo talking to a sandboxed car-host), they pass
3784    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3785    // run transcribe against the path the engine expects, and clean up
3786    // on drop. Accepts either form; `audio_b64` wins if both are set.
3787    let mut params = msg.params.clone();
3788    let audio_b64 = params
3789        .as_object_mut()
3790        .and_then(|m| m.remove("audio_b64"))
3791        .and_then(|v| v.as_str().map(str::to_string));
3792    let _tmp_audio = if let Some(b64) = audio_b64 {
3793        let bytes = base64::engine::general_purpose::STANDARD
3794            .decode(b64.as_bytes())
3795            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3796        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3797        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3798        let path = tmp.path().to_string_lossy().into_owned();
3799        if let Some(obj) = params.as_object_mut() {
3800            obj.insert("audio_path".to_string(), Value::String(path));
3801        }
3802        Some(tmp)
3803    } else {
3804        None
3805    };
3806
3807    let req: car_inference::TranscribeRequest =
3808        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3809    let _permit = state.admission.acquire().await;
3810    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3811    serde_json::to_value(&result).map_err(|e| e.to_string())
3812}
3813
3814/// Synthesize speech. By default writes to `output_path` on the
3815/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3816/// was supplied) the result also includes an `audio_b64` field with
3817/// the rendered bytes inline so cross-sandbox callers can avoid
3818/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3819async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3820    use base64::Engine as _;
3821    let engine = get_inference_engine(state);
3822
3823    let mut params = msg.params.clone();
3824    let return_b64 = params
3825        .as_object_mut()
3826        .and_then(|m| m.remove("return_b64"))
3827        .and_then(|v| v.as_bool())
3828        .unwrap_or(false);
3829    let no_output_path = params
3830        .as_object()
3831        .map(|m| !m.contains_key("output_path"))
3832        .unwrap_or(true);
3833
3834    let req: car_inference::SynthesizeRequest =
3835        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3836    let _permit = state.admission.acquire().await;
3837    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3838    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3839
3840    // Inline the bytes when the caller asked for them OR when no
3841    // output_path was specified (typical sandbox-crossing case —
3842    // they didn't pick a path because they have no shared one).
3843    if return_b64 || no_output_path {
3844        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3845            format!(
3846                "synthesize: failed to read rendered audio at {}: {e}",
3847                result.audio_path
3848            )
3849        })?;
3850        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3851        if let Some(obj) = value.as_object_mut() {
3852            obj.insert("audio_b64".to_string(), Value::String(encoded));
3853        }
3854    }
3855    Ok(value)
3856}
3857
3858/// Prepare the speech runtime (downloads / warmup). Returns a
3859/// JSON status string, mirroring the embedded
3860/// `prepare_speech_runtime` shape.
3861async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3862    let engine = get_inference_engine(state);
3863    let status = engine
3864        .prepare_speech_runtime()
3865        .await
3866        .map_err(|e| e.to_string())?;
3867    serde_json::to_value(&status).map_err(|e| e.to_string())
3868}
3869
3870/// Adaptive route decision for a prompt — returns the routing
3871/// JSON the FFI's `route_model` returns.
3872async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3873    let prompt = msg
3874        .params
3875        .get("prompt")
3876        .and_then(|v| v.as_str())
3877        .ok_or("missing 'prompt' parameter")?;
3878    let engine = get_inference_engine(state);
3879    let decision = engine.route_adaptive(prompt).await;
3880    serde_json::to_value(&decision).map_err(|e| e.to_string())
3881}
3882
3883/// Model performance profiles snapshot.
3884async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3885    let engine = get_inference_engine(state);
3886    let profiles = engine.export_profiles().await;
3887    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3888}
3889
3890#[derive(Deserialize)]
3891#[serde(rename_all = "camelCase")]
3892struct OutcomesResolvePendingParams {
3893    /// Flat `(trace_id, success, confidence, output)` tuples from the
3894    /// caller. Same shape `car-reason`'s session produces from its
3895    /// `ActionOutcome` vector. Daemon side runs the inference rules
3896    /// and writes resolved outcomes back to the shared tracker.
3897    action_results: Vec<(String, bool, f64, String)>,
3898}
3899
3900/// `outcomes.resolve_pending` — write inferred outcomes back to the
3901/// shared engine's `OutcomeTracker` (Parslee-ai/car#189 follow-up).
3902///
3903/// Symmetric to the in-process path
3904/// `ReasoningInferenceHandle::record_inferred_outcomes` on
3905/// `InferenceEngine`: takes the per-action result tuples the
3906/// reasoning session produces, runs
3907/// `OutcomeTracker::infer_outcomes_from_action_sequence` to convert
3908/// them into `InferredOutcome` records, and calls
3909/// `resolve_pending_from_signals` under the tracker write lock. The
3910/// learning loop that adjusts routing decisions therefore survives
3911/// daemon-routed reasoning runs (previously a best-effort no-op on
3912/// the daemon side).
3913///
3914/// Returns `{ recorded: N }` where N is the number of action results
3915/// the caller passed. The tracker doesn't surface how many of those
3916/// actually had pending entries to resolve; that count would require
3917/// expanding the tracker API and isn't load-bearing for any caller
3918/// yet.
3919async fn handle_outcomes_resolve_pending(
3920    req: &JsonRpcMessage,
3921    state: &ServerState,
3922) -> Result<Value, String> {
3923    let params: OutcomesResolvePendingParams =
3924        serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3925    let engine = get_inference_engine(state);
3926    let mut tracker = engine.outcome_tracker.write().await;
3927    let inferred = tracker.infer_outcomes_from_action_sequence(&params.action_results);
3928    tracker.resolve_pending_from_signals(inferred);
3929    Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3930}
3931
3932/// Per-session event log size.
3933async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3934    let n = session.runtime.log.lock().await.len();
3935    Ok(Value::from(n as u64))
3936}
3937
3938async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3939    let stats = session.runtime.log.lock().await.stats();
3940    serde_json::to_value(stats).map_err(|e| e.to_string())
3941}
3942
3943#[derive(Deserialize)]
3944#[serde(rename_all = "camelCase")]
3945struct EventsTruncateParams {
3946    #[serde(default)]
3947    max_events: Option<usize>,
3948    #[serde(default)]
3949    max_spans: Option<usize>,
3950}
3951
3952async fn handle_events_truncate(
3953    msg: &JsonRpcMessage,
3954    session: &crate::session::ClientSession,
3955) -> Result<Value, String> {
3956    let params: EventsTruncateParams =
3957        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3958            max_events: None,
3959            max_spans: None,
3960        });
3961    let mut log = session.runtime.log.lock().await;
3962    let removed_events = params
3963        .max_events
3964        .map(|max| log.truncate_events_keep_last(max))
3965        .unwrap_or(0);
3966    let removed_spans = params
3967        .max_spans
3968        .map(|max| log.truncate_spans_keep_last(max))
3969        .unwrap_or(0);
3970    let stats = log.stats();
3971    Ok(serde_json::json!({
3972        "removedEvents": removed_events,
3973        "removedSpans": removed_spans,
3974        "stats": stats,
3975    }))
3976}
3977
3978async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3979    let mut log = session.runtime.log.lock().await;
3980    let removed = log.clear();
3981    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3982}
3983
3984/// Update the per-session replan config. Wire shape mirrors the
3985/// FFI's positional `set_replan_config` arguments — the engine
3986/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
3987/// reconstruct it from a flat object here.
3988async fn handle_replan_set_config(
3989    msg: &JsonRpcMessage,
3990    session: &crate::session::ClientSession,
3991) -> Result<Value, String> {
3992    let max_replans = msg
3993        .params
3994        .get("max_replans")
3995        .and_then(|v| v.as_u64())
3996        .unwrap_or(0) as u32;
3997    let delay_ms = msg
3998        .params
3999        .get("delay_ms")
4000        .and_then(|v| v.as_u64())
4001        .unwrap_or(0);
4002    let verify_before_execute = msg
4003        .params
4004        .get("verify_before_execute")
4005        .and_then(|v| v.as_bool())
4006        .unwrap_or(true);
4007    let cfg = car_engine::ReplanConfig {
4008        max_replans,
4009        delay_ms,
4010        verify_before_execute,
4011    };
4012    session.runtime.set_replan_config(cfg).await;
4013    Ok(Value::Null)
4014}
4015
4016async fn handle_skills_list(
4017    msg: &JsonRpcMessage,
4018    session: &crate::session::ClientSession,
4019) -> Result<Value, String> {
4020    let domain = msg.params.get("domain").and_then(|v| v.as_str());
4021    let engine = session.memgine.lock().await;
4022    let skills: Vec<serde_json::Value> = engine
4023        .graph
4024        .inner
4025        .node_indices()
4026        .filter_map(|nix| {
4027            let node = engine.graph.inner.node_weight(nix)?;
4028            if node.kind != car_memgine::MemKind::Skill {
4029                return None;
4030            }
4031            let meta = car_memgine::SkillMeta::from_node(node)?;
4032            if let Some(d) = domain {
4033                match &meta.scope {
4034                    car_memgine::SkillScope::Global => {}
4035                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
4036                    _ => return None,
4037                }
4038            }
4039            Some(serde_json::to_value(&meta).unwrap_or_default())
4040        })
4041        .collect();
4042    serde_json::to_value(&skills).map_err(|e| e.to_string())
4043}
4044
4045#[derive(serde::Deserialize)]
4046struct SecretParams {
4047    #[serde(default)]
4048    service: Option<String>,
4049    key: String,
4050    #[serde(default)]
4051    value: Option<String>,
4052}
4053
4054fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4055    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4056    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4057    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4058}
4059
4060fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4061    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4062    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4063}
4064
4065fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4066    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4067    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4068}
4069
4070fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4071    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4072    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4073}
4074
4075#[derive(serde::Deserialize)]
4076struct PermParams {
4077    domain: String,
4078    #[serde(default)]
4079    target_bundle_id: Option<String>,
4080}
4081
4082fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4083    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4084    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4085}
4086
4087fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4088    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4089    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4090}
4091
4092fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4093    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4094    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4095}
4096
4097fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4098    #[derive(serde::Deserialize)]
4099    struct P {
4100        start: String,
4101        end: String,
4102        #[serde(default)]
4103        calendar_ids: Vec<String>,
4104    }
4105    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4106    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4107        .map_err(|e| format!("parse start: {}", e))?
4108        .with_timezone(&chrono::Utc);
4109    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4110        .map_err(|e| format!("parse end: {}", e))?
4111        .with_timezone(&chrono::Utc);
4112    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4113}
4114
4115fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4116    #[derive(serde::Deserialize)]
4117    struct P {
4118        query: String,
4119        #[serde(default = "default_limit")]
4120        limit: usize,
4121        #[serde(default)]
4122        container_ids: Vec<String>,
4123    }
4124    fn default_limit() -> usize {
4125        50
4126    }
4127    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4128    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4129}
4130
4131fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4132    #[derive(serde::Deserialize, Default)]
4133    struct P {
4134        #[serde(default)]
4135        account_ids: Vec<String>,
4136    }
4137    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4138    car_ffi_common::integrations::mail_inbox(&p.account_ids)
4139}
4140
4141fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4142    let raw = req.params.to_string();
4143    car_ffi_common::integrations::mail_send(&raw)
4144}
4145
4146fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4147    #[derive(serde::Deserialize)]
4148    struct P {
4149        #[serde(default = "default_limit")]
4150        limit: usize,
4151    }
4152    fn default_limit() -> usize {
4153        50
4154    }
4155    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4156    car_ffi_common::integrations::messages_chats(p.limit)
4157}
4158
4159fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4160    let raw = req.params.to_string();
4161    car_ffi_common::integrations::messages_send(&raw)
4162}
4163
4164fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4165    #[derive(serde::Deserialize)]
4166    struct P {
4167        query: String,
4168        #[serde(default = "default_limit")]
4169        limit: usize,
4170    }
4171    fn default_limit() -> usize {
4172        50
4173    }
4174    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4175    car_ffi_common::integrations::notes_find(&p.query, p.limit)
4176}
4177
4178fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4179    #[derive(serde::Deserialize)]
4180    struct P {
4181        #[serde(default = "default_limit")]
4182        limit: usize,
4183    }
4184    fn default_limit() -> usize {
4185        50
4186    }
4187    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4188    car_ffi_common::integrations::reminders_items(p.limit)
4189}
4190
4191fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4192    #[derive(serde::Deserialize)]
4193    struct P {
4194        #[serde(default = "default_limit")]
4195        limit: usize,
4196    }
4197    fn default_limit() -> usize {
4198        100
4199    }
4200    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4201    car_ffi_common::integrations::bookmarks_list(p.limit)
4202}
4203
4204fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4205    #[derive(serde::Deserialize)]
4206    struct P {
4207        start: String,
4208        end: String,
4209    }
4210    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4211    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4212        .map_err(|e| format!("parse start: {}", e))?
4213        .with_timezone(&chrono::Utc);
4214    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4215        .map_err(|e| format!("parse end: {}", e))?
4216        .with_timezone(&chrono::Utc);
4217    car_ffi_common::health::sleep_windows(s, e)
4218}
4219
4220fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4221    #[derive(serde::Deserialize)]
4222    struct P {
4223        start: String,
4224        end: String,
4225    }
4226    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4227    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4228        .map_err(|e| format!("parse start: {}", e))?
4229        .with_timezone(&chrono::Utc);
4230    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4231        .map_err(|e| format!("parse end: {}", e))?
4232        .with_timezone(&chrono::Utc);
4233    car_ffi_common::health::workouts(s, e)
4234}
4235
4236fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4237    #[derive(serde::Deserialize)]
4238    struct P {
4239        start: String,
4240        end: String,
4241    }
4242    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4243    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4244        .map_err(|e| format!("parse start: {}", e))?;
4245    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4246        .map_err(|e| format!("parse end: {}", e))?;
4247    car_ffi_common::health::activity(s, e)
4248}
4249
4250async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4251    let closed = session.browser.close().await?;
4252    Ok(serde_json::json!({"closed": closed}))
4253}
4254
4255async fn handle_browser_run(
4256    req: &JsonRpcMessage,
4257    session: &crate::session::ClientSession,
4258) -> Result<Value, String> {
4259    #[derive(serde::Deserialize)]
4260    struct BrowserRunParams {
4261        /// Inline JSON string (CLI-compatible), OR the structured object.
4262        script: Value,
4263        #[serde(default)]
4264        width: Option<u32>,
4265        #[serde(default)]
4266        height: Option<u32>,
4267        /// When true, launches a visible Chromium window for interactive
4268        /// flows (first-time auth, 2FA, supervised runs). Only honored on
4269        /// the call that first launches the browser session — subsequent
4270        /// calls reuse the existing browser regardless.
4271        #[serde(default)]
4272        headed: Option<bool>,
4273        /// Extra Chromium command-line flags appended verbatim at
4274        /// launch (#112). Honoured only on the launch call.
4275        #[serde(default)]
4276        extra_args: Option<Vec<String>>,
4277    }
4278    let params: BrowserRunParams =
4279        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4280
4281    // Accept either a JSON string OR a structured object under `script`.
4282    let script_json = match params.script {
4283        Value::String(s) => s,
4284        other => other.to_string(),
4285    };
4286
4287    let browser_session = session
4288        .browser
4289        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4290            width: params.width.unwrap_or(1280),
4291            height: params.height.unwrap_or(720),
4292            headless: !params.headed.unwrap_or(false),
4293            extra_args: params.extra_args.unwrap_or_default(),
4294        })
4295        .await?;
4296
4297    let trace_json = browser_session.run(&script_json).await?;
4298    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4299}
4300
4301// ---------------------------------------------------------------------------
4302// Voice streaming JSON-RPC methods
4303//
4304// Events are pushed back to the originating client as JSON-RPC notifications:
4305//   { "jsonrpc": "2.0", "method": "voice.event",
4306//     "params": { "session_id": "...", "event": {...} } }
4307//
4308// The session registry is process-wide (ServerState.voice_sessions); per-call
4309// WsVoiceEventSink instances bind each session to its originating WS so a
4310// client only ever sees events for sessions it started.
4311// ---------------------------------------------------------------------------
4312
4313#[derive(Deserialize)]
4314struct VoiceStartParams {
4315    session_id: String,
4316    audio_source: Value,
4317    #[serde(default)]
4318    options: Option<Value>,
4319}
4320
4321async fn handle_voice_transcribe_stream_start(
4322    req: &JsonRpcMessage,
4323    state: &Arc<ServerState>,
4324    session: &Arc<crate::session::ClientSession>,
4325) -> Result<Value, String> {
4326    let params: VoiceStartParams =
4327        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4328    let audio_source_json =
4329        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
4330    let options_json = params
4331        .options
4332        .as_ref()
4333        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4334        .transpose()?;
4335    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4336        channel: session.channel.clone(),
4337    });
4338    let json = car_ffi_common::voice::transcribe_stream_start(
4339        &params.session_id,
4340        &audio_source_json,
4341        options_json.as_deref(),
4342        state.voice_sessions.clone(),
4343        sink,
4344    )
4345    .await?;
4346    serde_json::from_str(&json).map_err(|e| e.to_string())
4347}
4348
4349#[derive(Deserialize)]
4350struct VoiceStopParams {
4351    session_id: String,
4352}
4353
4354async fn handle_voice_transcribe_stream_stop(
4355    req: &JsonRpcMessage,
4356    state: &Arc<ServerState>,
4357) -> Result<Value, String> {
4358    let params: VoiceStopParams =
4359        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4360    let json = car_ffi_common::voice::transcribe_stream_stop(
4361        &params.session_id,
4362        state.voice_sessions.clone(),
4363    )
4364    .await?;
4365    serde_json::from_str(&json).map_err(|e| e.to_string())
4366}
4367
4368#[derive(Deserialize)]
4369struct VoicePushParams {
4370    session_id: String,
4371    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
4372    /// audio frames have to be encoded; clients in WS-binary contexts that
4373    /// want to skip the round trip can call the FFI directly.
4374    pcm_b64: String,
4375}
4376
4377async fn handle_voice_transcribe_stream_push(
4378    req: &JsonRpcMessage,
4379    state: &Arc<ServerState>,
4380) -> Result<Value, String> {
4381    use base64::Engine;
4382    let params: VoicePushParams =
4383        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4384    let pcm = base64::engine::general_purpose::STANDARD
4385        .decode(&params.pcm_b64)
4386        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4387    let json = car_ffi_common::voice::transcribe_stream_push(
4388        &params.session_id,
4389        &pcm,
4390        state.voice_sessions.clone(),
4391    )
4392    .await?;
4393    serde_json::from_str(&json).map_err(|e| e.to_string())
4394}
4395
4396fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4397    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4398    serde_json::from_str(&json).unwrap_or(Value::Null)
4399}
4400
4401async fn handle_voice_dispatch_turn(
4402    req: &JsonRpcMessage,
4403    state: &Arc<ServerState>,
4404    session: &Arc<crate::session::ClientSession>,
4405) -> Result<Value, String> {
4406    let req_value = req.params.clone();
4407    let request: crate::voice_turn::DispatchVoiceTurnRequest =
4408        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4409    let engine = get_inference_engine(state).clone();
4410    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4411        channel: session.channel.clone(),
4412    });
4413    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4414    serde_json::to_value(resp).map_err(|e| e.to_string())
4415}
4416
4417async fn handle_voice_cancel_turn() -> Result<Value, String> {
4418    crate::voice_turn::cancel().await;
4419    Ok(serde_json::json!({"cancelled": true}))
4420}
4421
4422async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4423    let engine = get_inference_engine(state).clone();
4424    crate::voice_turn::prewarm(engine).await;
4425    Ok(serde_json::json!({"prewarmed": true}))
4426}
4427
4428// ---------------------------------------------------------------------------
4429// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
4430//
4431// Bidirectional protocol shape:
4432//   1. Client → server: `inference.register_runner` (no params). The
4433//      session that calls this becomes the host for delegated models.
4434//   2. Server → client: `inference.runner.invoke` notification with
4435//      {call_id, request} when CAR needs to dispatch a delegated turn.
4436//   3. Client → server: `inference.runner.event` with {call_id, event}
4437//      for each chunk; `inference.runner.complete` with {call_id, result}
4438//      on success; `inference.runner.fail` with {call_id, error} on
4439//      failure.
4440//
4441// The server-side data is process-wide because only one inference
4442// runner can be registered at a time (matches the FFI bindings'
4443// constraint). The per-call mailboxes live in dedicated DashMaps.
4444// ---------------------------------------------------------------------------
4445
4446fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4447    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4448        std::sync::OnceLock::new();
4449    SLOT.get_or_init(|| std::sync::RwLock::new(None))
4450}
4451
4452fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4453    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4454        std::sync::OnceLock::new();
4455    MAP.get_or_init(dashmap::DashMap::new)
4456}
4457
4458fn ws_runner_completions() -> &'static dashmap::DashMap<
4459    String,
4460    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4461> {
4462    static MAP: std::sync::OnceLock<
4463        dashmap::DashMap<
4464            String,
4465            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4466        >,
4467    > = std::sync::OnceLock::new();
4468    MAP.get_or_init(dashmap::DashMap::new)
4469}
4470
4471struct WsInferenceRunner;
4472
4473#[async_trait::async_trait]
4474impl car_inference::InferenceRunner for WsInferenceRunner {
4475    async fn run(
4476        &self,
4477        request: car_inference::tasks::generate::GenerateRequest,
4478        emitter: car_inference::EventEmitter,
4479    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4480        let channel = ws_runner_session()
4481            .read()
4482            .map_err(|e| {
4483                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4484            })?
4485            .clone()
4486            .ok_or_else(|| {
4487                car_inference::RunnerError::Declined(
4488                    "no WebSocket inference runner registered — call inference.register_runner first"
4489                        .into(),
4490                )
4491            })?;
4492
4493        let call_id = uuid::Uuid::new_v4().to_string();
4494        let request_json = serde_json::to_value(&request)
4495            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4496        let (tx, rx) = tokio::sync::oneshot::channel();
4497        ws_runner_calls().insert(call_id.clone(), emitter);
4498        ws_runner_completions().insert(call_id.clone(), tx);
4499
4500        // Fire the invoke notification.
4501        use futures::SinkExt;
4502        let notification = serde_json::json!({
4503            "jsonrpc": "2.0",
4504            "method": "inference.runner.invoke",
4505            "params": {
4506                "call_id": call_id,
4507                "request": request_json,
4508            },
4509        });
4510        let text = serde_json::to_string(&notification)
4511            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4512        let _ = channel
4513            .write
4514            .lock()
4515            .await
4516            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4517            .await;
4518
4519        let result = rx.await.map_err(|_| {
4520            car_inference::RunnerError::Failed("runner completion channel dropped".into())
4521        })?;
4522        ws_runner_calls().remove(&call_id);
4523        result.map_err(car_inference::RunnerError::Failed)
4524    }
4525}
4526
4527async fn handle_inference_register_runner(
4528    session: &Arc<crate::session::ClientSession>,
4529) -> Result<Value, String> {
4530    let mut guard = ws_runner_session()
4531        .write()
4532        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4533    *guard = Some(session.channel.clone());
4534    drop(guard);
4535    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4536    Ok(serde_json::json!({"registered": true}))
4537}
4538
4539#[derive(serde::Deserialize)]
4540struct InferenceRunnerEventParams {
4541    call_id: String,
4542    event: Value,
4543}
4544
4545async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4546    let params: InferenceRunnerEventParams =
4547        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4548    let stream_event = match parse_runner_event_value(&params.event) {
4549        Some(e) => e,
4550        None => return Err("unrecognised runner event shape".into()),
4551    };
4552    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
4553        let emitter = entry.value().clone();
4554        tokio::spawn(async move { emitter.emit(stream_event).await });
4555    }
4556    Ok(serde_json::json!({"emitted": true}))
4557}
4558
4559#[derive(serde::Deserialize)]
4560struct InferenceRunnerCompleteParams {
4561    call_id: String,
4562    result: Value,
4563}
4564
4565async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4566    let params: InferenceRunnerCompleteParams =
4567        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4568    let result: std::result::Result<car_inference::RunnerResult, String> =
4569        serde_json::from_value(params.result)
4570            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4571    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4572        let _ = tx.send(result);
4573    }
4574    Ok(serde_json::json!({"completed": true}))
4575}
4576
4577#[derive(serde::Deserialize)]
4578struct InferenceRunnerFailParams {
4579    call_id: String,
4580    error: String,
4581}
4582
4583async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4584    let params: InferenceRunnerFailParams =
4585        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4586    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4587        let _ = tx.send(Err(params.error));
4588    }
4589    Ok(serde_json::json!({"failed": true}))
4590}
4591
4592fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4593    let ty = v.get("type").and_then(|t| t.as_str())?;
4594    match ty {
4595        "text" => Some(car_inference::StreamEvent::TextDelta(
4596            v.get("data")?.as_str()?.to_string(),
4597        )),
4598        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4599            name: v.get("name")?.as_str()?.to_string(),
4600            index: v.get("index")?.as_u64()? as usize,
4601            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4602        }),
4603        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4604            index: v.get("index")?.as_u64()? as usize,
4605            arguments_delta: v.get("data")?.as_str()?.to_string(),
4606        }),
4607        "usage" => Some(car_inference::StreamEvent::Usage {
4608            input_tokens: v.get("input_tokens")?.as_u64()?,
4609            output_tokens: v.get("output_tokens")?.as_u64()?,
4610        }),
4611        "done" => Some(car_inference::StreamEvent::Done {
4612            text: v.get("text")?.as_str()?.to_string(),
4613            tool_calls: v
4614                .get("tool_calls")
4615                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4616                .unwrap_or_default(),
4617        }),
4618        _ => None,
4619    }
4620}
4621
4622#[derive(Deserialize)]
4623struct EnrollSpeakerParams {
4624    label: String,
4625    audio: Value,
4626}
4627
4628async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4629    let params: EnrollSpeakerParams =
4630        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4631    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
4632    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
4633    serde_json::from_str(&json).map_err(|e| e.to_string())
4634}
4635
4636#[derive(Deserialize)]
4637struct RemoveEnrollmentParams {
4638    label: String,
4639}
4640
4641fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4642    let params: RemoveEnrollmentParams =
4643        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4644    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
4645    serde_json::from_str(&json).map_err(|e| e.to_string())
4646}
4647
4648#[derive(Deserialize)]
4649struct WorkflowRunParams {
4650    workflow: Value,
4651}
4652
4653async fn handle_workflow_run(
4654    req: &JsonRpcMessage,
4655    session: &Arc<crate::session::ClientSession>,
4656) -> Result<Value, String> {
4657    let params: WorkflowRunParams =
4658        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4659    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4660    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4661        channel: session.channel.clone(),
4662        host: session.host.clone(),
4663        client_id: session.client_id.clone(),
4664    });
4665    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4666    serde_json::from_str(&json).map_err(|e| e.to_string())
4667}
4668
4669#[derive(Deserialize)]
4670struct WorkflowVerifyParams {
4671    workflow: Value,
4672}
4673
4674fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4675    let params: WorkflowVerifyParams =
4676        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4677    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4678    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4679    serde_json::from_str(&json).map_err(|e| e.to_string())
4680}
4681
4682// ---------------------------------------------------------------------------
4683// Meeting JSON-RPC methods
4684// ---------------------------------------------------------------------------
4685
4686async fn handle_meeting_start(
4687    req: &JsonRpcMessage,
4688    state: &Arc<ServerState>,
4689    session: &Arc<crate::session::ClientSession>,
4690) -> Result<Value, String> {
4691    // We need the meeting id BEFORE handing the upstream sink to
4692    // start_meeting so the WsMemgineIngestSink stamps transcripts with
4693    // the correct `meeting/<id>/<source>` speaker. Parse the request
4694    // here, mint an id if none was provided, and pass the same id
4695    // through to start_meeting via the request JSON.
4696    let mut req_value = req.params.clone();
4697    let meeting_id = req_value
4698        .get("id")
4699        .and_then(|v| v.as_str())
4700        .map(str::to_string)
4701        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4702    if let Some(map) = req_value.as_object_mut() {
4703        map.insert("id".into(), Value::String(meeting_id.clone()));
4704    }
4705    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4706
4707    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4708        Arc::new(crate::session::WsVoiceEventSink {
4709            channel: session.channel.clone(),
4710        });
4711
4712    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4713    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4714    // the FFI-common `start_meeting` memgine arg to avoid the
4715    // sync-mutex contract there — ingest happens here instead.
4716    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4717        Arc::new(crate::session::WsMemgineIngestSink {
4718            meeting_id,
4719            engine: session.memgine.clone(),
4720            upstream: ws_upstream,
4721        });
4722
4723    let cwd = std::env::current_dir().ok();
4724    let json = crate::meeting::start_meeting(
4725        &request_json,
4726        state.meetings.clone(),
4727        state.voice_sessions.clone(),
4728        upstream,
4729        None,
4730        cwd,
4731    )
4732    .await?;
4733    serde_json::from_str(&json).map_err(|e| e.to_string())
4734}
4735
4736#[derive(Deserialize)]
4737struct MeetingStopParams {
4738    meeting_id: String,
4739    #[serde(default = "default_summarize")]
4740    summarize: bool,
4741}
4742
4743fn default_summarize() -> bool {
4744    true
4745}
4746
4747async fn handle_meeting_stop(
4748    req: &JsonRpcMessage,
4749    state: &Arc<ServerState>,
4750    _session: &Arc<crate::session::ClientSession>,
4751) -> Result<Value, String> {
4752    let params: MeetingStopParams =
4753        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4754    let inference = if params.summarize {
4755        Some(state.inference.get().cloned()).flatten()
4756    } else {
4757        None
4758    };
4759    let json = crate::meeting::stop_meeting(
4760        &params.meeting_id,
4761        params.summarize,
4762        state.meetings.clone(),
4763        state.voice_sessions.clone(),
4764        inference,
4765    )
4766    .await?;
4767    serde_json::from_str(&json).map_err(|e| e.to_string())
4768}
4769
4770#[derive(Deserialize, Default)]
4771struct MeetingListParams {
4772    #[serde(default)]
4773    root: Option<std::path::PathBuf>,
4774}
4775
4776fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4777    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4778    let cwd = std::env::current_dir().ok();
4779    let json = crate::meeting::list_meetings(params.root, cwd)?;
4780    serde_json::from_str(&json).map_err(|e| e.to_string())
4781}
4782
4783#[derive(Deserialize)]
4784struct MeetingGetParams {
4785    meeting_id: String,
4786    #[serde(default)]
4787    root: Option<std::path::PathBuf>,
4788}
4789
4790fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4791    let params: MeetingGetParams =
4792        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4793    let cwd = std::env::current_dir().ok();
4794    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4795    serde_json::from_str(&json).map_err(|e| e.to_string())
4796}
4797
4798// ---------------------------------------------------------------------------
4799// Agent registry — file-based cross-process discovery (#111)
4800// ---------------------------------------------------------------------------
4801
4802#[derive(Deserialize, Default)]
4803struct RegistryRegisterParams {
4804    /// Caller serializes their AgentEntry as a JSON value; we
4805    /// re-serialize it so the ffi-common helper can validate the
4806    /// shape with the same parser used by the bindings.
4807    entry: Value,
4808    #[serde(default)]
4809    registry_path: Option<std::path::PathBuf>,
4810}
4811
4812fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4813    let params: RegistryRegisterParams =
4814        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4815    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4816    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4817    Ok(Value::Null)
4818}
4819
4820#[derive(Deserialize, Default)]
4821struct RegistryNameParams {
4822    name: String,
4823    #[serde(default)]
4824    registry_path: Option<std::path::PathBuf>,
4825}
4826
4827fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4828    let params: RegistryNameParams =
4829        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4830    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4831    serde_json::from_str(&json).map_err(|e| e.to_string())
4832}
4833
4834fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4835    let params: RegistryNameParams =
4836        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4837    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4838    Ok(Value::Null)
4839}
4840
4841#[derive(Deserialize, Default)]
4842struct RegistryListParams {
4843    #[serde(default)]
4844    registry_path: Option<std::path::PathBuf>,
4845}
4846
4847fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4848    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4849    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4850    serde_json::from_str(&json).map_err(|e| e.to_string())
4851}
4852
4853#[derive(Deserialize, Default)]
4854struct RegistryReapParams {
4855    /// Heartbeats older than this many seconds are reaped. Default
4856    /// 60 — two missed 20s heartbeats trigger removal.
4857    #[serde(default = "default_reap_age")]
4858    max_age_secs: u64,
4859    #[serde(default)]
4860    registry_path: Option<std::path::PathBuf>,
4861}
4862
4863fn default_reap_age() -> u64 {
4864    60
4865}
4866
4867fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4868    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4869    let json =
4870        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4871    serde_json::from_str(&json).map_err(|e| e.to_string())
4872}
4873
4874// ---------------------------------------------------------------------------
4875// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4876// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4877// a2a_server_status — closes the binding gap noted in #126).
4878// ---------------------------------------------------------------------------
4879
4880async fn handle_a2a_start(
4881    req: &JsonRpcMessage,
4882    session: &crate::session::ClientSession,
4883) -> Result<Value, String> {
4884    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4885    // Always hand the session's runtime through. start_a2a uses it
4886    // only when `share_session_runtime: true` is set in params;
4887    // otherwise it falls back to the legacy fresh-Runtime + agent_basics
4888    // path. Passing it unconditionally keeps the FFI layer ignorant of
4889    // the flag's plumbing.
4890    let json = crate::a2a::start_a2a(&params_json, Some(session.runtime.clone())).await?;
4891    serde_json::from_str(&json).map_err(|e| e.to_string())
4892}
4893
4894fn handle_a2a_stop() -> Result<Value, String> {
4895    let json = crate::a2a::stop_a2a()?;
4896    serde_json::from_str(&json).map_err(|e| e.to_string())
4897}
4898
4899fn handle_a2a_status() -> Result<Value, String> {
4900    let json = crate::a2a::a2a_status()?;
4901    serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904#[derive(Deserialize)]
4905#[serde(rename_all = "camelCase")]
4906struct A2aSendParams {
4907    endpoint: String,
4908    message: car_a2a::Message,
4909    #[serde(default)]
4910    blocking: bool,
4911    #[serde(default = "default_true")]
4912    ingest_a2ui: bool,
4913    #[serde(default)]
4914    route_auth: Option<A2aRouteAuth>,
4915    #[serde(default)]
4916    allow_untrusted_endpoint: bool,
4917}
4918
4919fn default_true() -> bool {
4920    true
4921}
4922
4923/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
4924/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
4925/// on `ServerState`. Closes Parslee-ai/car-releases#28.
4926///
4927/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
4928/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
4929/// transport-neutral surface — the standalone `start_a2a_listener`
4930/// HTTP path serves SSE for those, but the in-core WS surface is
4931/// JSON-RPC only. Same trade as the dispatcher itself.
4932async fn handle_a2a_dispatch(
4933    method: &str,
4934    req: &JsonRpcMessage,
4935    state: &Arc<ServerState>,
4936) -> Result<Value, String> {
4937    let dispatcher = state.a2a_dispatcher().await;
4938    dispatcher
4939        .dispatch(method, req.params.clone())
4940        .await
4941        .map_err(|e| e.to_string())
4942}
4943
4944async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4945    let params: A2aSendParams =
4946        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4947    let endpoint = trusted_route_endpoint(
4948        Some(params.endpoint.clone()),
4949        params.allow_untrusted_endpoint,
4950    )
4951    .ok_or_else(|| {
4952        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4953    })?;
4954    let client = match params.route_auth.clone() {
4955        Some(auth) => {
4956            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4957        }
4958        None => car_a2a::A2aClient::new(endpoint.clone()),
4959    };
4960    let result = client
4961        .send_message(params.message, params.blocking)
4962        .await
4963        .map_err(|e| e.to_string())?;
4964    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4965    let mut applied = Vec::new();
4966    if params.ingest_a2ui {
4967        state
4968            .a2ui
4969            .validate_payload(&result_value)
4970            .map_err(|e| e.to_string())?;
4971        let routed_endpoint = Some(endpoint.clone());
4972        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4973            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4974                if owner.endpoint.is_none() {
4975                    owner.with_endpoint(routed_endpoint.clone())
4976                } else {
4977                    owner
4978                }
4979            });
4980            applied.push(
4981                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4982            );
4983        }
4984    }
4985    Ok(serde_json::json!({
4986        "result": result,
4987        "a2ui": {
4988            "applied": applied,
4989        }
4990    }))
4991}
4992
4993// ---------------------------------------------------------------------------
4994// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
4995// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
4996// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
4997// vision_ocr.
4998// ---------------------------------------------------------------------------
4999
5000async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5001    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5002    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5003    serde_json::from_str(&json).map_err(|e| e.to_string())
5004}
5005
5006async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5007    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5008    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5009    serde_json::from_str(&json).map_err(|e| e.to_string())
5010}
5011
5012async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5013    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5014    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5015    serde_json::from_str(&json).map_err(|e| e.to_string())
5016}
5017
5018async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5019    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5020    let json = car_ffi_common::notifications::local(&args_json).await?;
5021    serde_json::from_str(&json).map_err(|e| e.to_string())
5022}
5023
5024async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5025    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5026    let json = car_ffi_common::vision::ocr(&args_json).await?;
5027    serde_json::from_str(&json).map_err(|e| e.to_string())
5028}
5029
5030// ---------------------------------------------------------------------------
5031// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
5032// ---------------------------------------------------------------------------
5033
5034async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5035    // Observe-only mode (Parslee-ai/car-releases#44): a second
5036    // car-server on the host can't take the supervisor lock, so it
5037    // can't drive `Supervisor::list` — but it can still answer
5038    // `agents.list` by reading the on-disk manifest directly. The
5039    // `attached` decoration is local to whichever daemon the caller
5040    // is talking to, so observer-mode entries return `attached:
5041    // false` (this daemon hasn't received `session.auth` from those
5042    // children; the primary one has).
5043    let agents = match state.observer_manifest_path() {
5044        Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5045            .map_err(|e| e.to_string())?,
5046        None => {
5047            let supervisor = state.supervisor()?;
5048            supervisor.list().await
5049        }
5050    };
5051    // Decorate each entry with `attached` + `session_id` so operators
5052    // see whether the supervised process has actually called
5053    // `session.auth { agent_id }` and bound a WS connection (#169) —
5054    // the lifecycle status (`Running`, etc.) only reports the
5055    // process-level view, which can't tell "alive but never
5056    // attached" from "alive and attached".
5057    let attached = state.attached_agents.lock().await.clone();
5058    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5059    for a in agents {
5060        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5061        let session_id = attached.get(&a.spec.id).cloned();
5062        if let Some(map) = v.as_object_mut() {
5063            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5064            if let Some(sid) = session_id {
5065                map.insert("session_id".to_string(), Value::String(sid));
5066            }
5067        }
5068        decorated.push(v);
5069    }
5070    Ok(Value::Array(decorated))
5071}
5072
5073async fn handle_agents_upsert(
5074    req: &JsonRpcMessage,
5075    state: &Arc<ServerState>,
5076) -> Result<Value, String> {
5077    let mut params = req.params.clone();
5078    // Optional `interpreter` sugar (#171). When present, the
5079    // supervisor resolves the bare program name (`"node"`,
5080    // `"python"`, …) against `$PATH` and writes the absolute path
5081    // into `command` *before* validation. This keeps the strict
5082    // no-PATH-lookup rule at upsert time while letting callers
5083    // stop hand-coding `/opt/homebrew/bin/node` into every
5084    // agents.json entry. Resolution happens once; subsequent PATH
5085    // changes do not silently rewire the binding.
5086    if let Some(name) = params
5087        .get("interpreter")
5088        .and_then(|v| v.as_str())
5089        .map(str::to_string)
5090    {
5091        let resolved =
5092            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5093        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5094    }
5095    let spec: car_registry::supervisor::AgentSpec =
5096        serde_json::from_value(params).map_err(|e| e.to_string())?;
5097    let supervisor = state.supervisor()?;
5098    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5099    serde_json::to_value(agent).map_err(|e| e.to_string())
5100}
5101
5102/// `agents.install` — install a contributed-agent manifest
5103/// (Parslee-ai/car#182 phase 3). Caller passes the parsed
5104/// `AgentManifest` JSON; the daemon runs install-time validation
5105/// (`car_min_version`, capability negotiation against the daemon's
5106/// own advertisement) and adopts the manifest. Returns
5107/// `{ report, agent? }` where `agent` is the spawnable
5108/// `ManagedAgent` for `external_process` transports and absent for
5109/// `pure_data` / health_url-only manifests.
5110///
5111/// The host capability advertisement comes from
5112/// `HostCapabilities::daemon_default(car_version)` — operators that
5113/// want a tighter advertisement go through a future config phase;
5114/// this MVP uses the runtime's natural surface.
5115async fn handle_agents_install(
5116    req: &JsonRpcMessage,
5117    state: &Arc<ServerState>,
5118) -> Result<Value, String> {
5119    let manifest: car_registry::manifest::AgentManifest =
5120        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5121    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5122    let supervisor = state.supervisor()?;
5123    let (report, managed) = supervisor
5124        .install_manifest(manifest, &host)
5125        .await
5126        .map_err(|e| e.to_string())?;
5127    Ok(serde_json::json!({
5128        "report": {
5129            "missingOptional": report
5130                .missing_optional
5131                .iter()
5132                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5133                .collect::<Vec<_>>(),
5134        },
5135        "agent": managed,
5136    }))
5137}
5138
5139async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5140    // Observe-only mode (Parslee-ai/car-releases#44) — see
5141    // `handle_agents_list` for the rationale. The health view is a
5142    // pure function of each entry's `command` plus the on-disk
5143    // sandbox rules, so reading from the manifest is equivalent to
5144    // calling the live supervisor's `health()`.
5145    let entries = match state.observer_manifest_path() {
5146        Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5147            .map_err(|e| e.to_string())?,
5148        None => {
5149            let supervisor = state.supervisor()?;
5150            supervisor.health().await
5151        }
5152    };
5153    serde_json::to_value(entries).map_err(|e| e.to_string())
5154}
5155
5156fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5157    req.params
5158        .get("id")
5159        .and_then(Value::as_str)
5160        .map(str::to_string)
5161        .ok_or_else(|| "missing required `id` parameter".to_string())
5162}
5163
5164async fn handle_agents_remove(
5165    req: &JsonRpcMessage,
5166    state: &Arc<ServerState>,
5167) -> Result<Value, String> {
5168    let id = extract_agent_id(req)?;
5169    let supervisor = state.supervisor()?;
5170    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5171    Ok(serde_json::json!({ "removed": removed }))
5172}
5173
5174async fn handle_agents_start(
5175    req: &JsonRpcMessage,
5176    state: &Arc<ServerState>,
5177) -> Result<Value, String> {
5178    let id = extract_agent_id(req)?;
5179    let supervisor = state.supervisor()?;
5180    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5181    serde_json::to_value(agent).map_err(|e| e.to_string())
5182}
5183
5184async fn handle_agents_stop(
5185    req: &JsonRpcMessage,
5186    state: &Arc<ServerState>,
5187) -> Result<Value, String> {
5188    let id = extract_agent_id(req)?;
5189    let signal: car_registry::supervisor::StopSignal = req
5190        .params
5191        .get("signal")
5192        .map(|v| serde_json::from_value(v.clone()))
5193        .transpose()
5194        .map_err(|e| e.to_string())?
5195        .unwrap_or_default();
5196    let supervisor = state.supervisor()?;
5197    let agent = supervisor
5198        .stop(&id, signal)
5199        .await
5200        .map_err(|e| e.to_string())?;
5201    serde_json::to_value(agent).map_err(|e| e.to_string())
5202}
5203
5204async fn handle_agents_restart(
5205    req: &JsonRpcMessage,
5206    state: &Arc<ServerState>,
5207) -> Result<Value, String> {
5208    let id = extract_agent_id(req)?;
5209    let supervisor = state.supervisor()?;
5210    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5211    serde_json::to_value(agent).map_err(|e| e.to_string())
5212}
5213
5214async fn handle_agents_tail_log(
5215    req: &JsonRpcMessage,
5216    state: &Arc<ServerState>,
5217) -> Result<Value, String> {
5218    let id = extract_agent_id(req)?;
5219    let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5220    let supervisor = state.supervisor()?;
5221    let lines = supervisor
5222        .tail_log(&id, n)
5223        .await
5224        .map_err(|e| e.to_string())?;
5225    Ok(serde_json::json!({ "lines": lines }))
5226}
5227
5228// ---------------------------------------------------------------------------
5229// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
5230//
5231// Discovery surface for agentic CLIs the user has already installed and
5232// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
5233// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
5234// stdio adapter. The cache lives in car_ffi_common::external_agents so
5235// the in-process FFI singletons share the same snapshot.
5236// ---------------------------------------------------------------------------
5237
5238async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5239    let include_health = req
5240        .params
5241        .get("include_health")
5242        .and_then(Value::as_bool)
5243        .unwrap_or(false);
5244    let json = car_ffi_common::external_agents::list(include_health).await?;
5245    serde_json::from_str(&json).map_err(|e| e.to_string())
5246}
5247
5248async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5249    let include_health = req
5250        .params
5251        .get("include_health")
5252        .and_then(Value::as_bool)
5253        .unwrap_or(false);
5254    let json = car_ffi_common::external_agents::detect(include_health).await?;
5255    serde_json::from_str(&json).map_err(|e| e.to_string())
5256}
5257
5258/// Per-task invocation of an external CLI agent. Required params:
5259/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
5260/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
5261///
5262/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
5263/// ids return `is_error: true` with a structured `error` so hosts
5264/// can surface the gap without a separate error code.
5265///
5266/// Phase 2 stage 4a (governance): every invocation appends a
5267/// structured audit record to `~/.car/external-agents.jsonl`. The
5268/// record captures id, task, options, result, and the full
5269/// `tool_uses` list the assistant emitted — so even though the
5270/// agent executes its built-in tools in-process (which we can't
5271/// gate via stream-json), there's a complete after-the-fact audit
5272/// trail. Full policy gating (proposing each tool_use to CAR's
5273/// validator + getting a yes/no) requires the MCP server route in
5274/// stage 4b.
5275async fn handle_agents_invoke_external(
5276    req: &JsonRpcMessage,
5277    state: &Arc<ServerState>,
5278    host_session: &Arc<crate::session::ClientSession>,
5279) -> Result<Value, String> {
5280    let id = req
5281        .params
5282        .get("id")
5283        .and_then(Value::as_str)
5284        .ok_or_else(|| "missing required `id` parameter".to_string())?
5285        .to_string();
5286    let task = req
5287        .params
5288        .get("task")
5289        .and_then(Value::as_str)
5290        .ok_or_else(|| "missing required `task` parameter".to_string())?
5291        .to_string();
5292    let stream = req
5293        .params
5294        .get("stream")
5295        .and_then(Value::as_bool)
5296        .unwrap_or(false);
5297    let session_id = req
5298        .params
5299        .get("session_id")
5300        .and_then(Value::as_str)
5301        .map(str::to_string)
5302        .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5303
5304    // Build the options sub-object directly from req.params so
5305    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
5306    // `timeout_secs` as siblings of `id`/`task`. Strip the
5307    // dispatch + streaming fields so they don't pollute the
5308    // options serde.
5309    let mut options_value = req.params.clone();
5310    if let Some(obj) = options_value.as_object_mut() {
5311        obj.remove("id");
5312        obj.remove("task");
5313        obj.remove("stream");
5314        obj.remove("session_id");
5315        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
5316        // caller didn't supply one. This is the load-bearing
5317        // wiring of MCP-4: external agents get CAR's tools (memory,
5318        // skills, verify) routed through the daemon's policy +
5319        // shared memgine without any per-call host configuration.
5320        // Callers who want to opt out can pass `"mcp_endpoint": ""`
5321        // (empty string) — the runner skips the temp-file write
5322        // when the value isn't a non-empty URL.
5323        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5324        if !has_explicit_mcp {
5325            if let Some(url) = state.mcp_url.get() {
5326                obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5327            }
5328        }
5329    }
5330
5331    if !stream {
5332        // Legacy one-shot path. Unchanged shape for FFI consumers
5333        // and any caller that hasn't opted into streaming.
5334        let options_json = options_value.to_string();
5335        let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5336        let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5337        append_external_agent_audit(&id, &task, &options_value, &result);
5338        return Ok(result);
5339    }
5340
5341    // Streaming path. Returns an ack ({accepted, session_id})
5342    // immediately and streams `agents.chat.event` notifications
5343    // to the host's WS as the runner emits StreamEvents. Reuses
5344    // the chat_sessions routing infrastructure supervised agents
5345    // use — host UIs render both kinds through the same path.
5346    let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5347        .map_err(|e| format!("invalid options: {e}"))?;
5348
5349    // Register the chat session BEFORE spawning so the host's
5350    // subscriber is correctly bound to this session_id by the
5351    // time the first event arrives. Reusing the supervised
5352    // agent chat infrastructure means `agents.chat.cancel`
5353    // routes to chat_sessions[session_id] and can find this
5354    // entry — though we don't currently honor cancel for
5355    // external invocations (the child process is killed only
5356    // on timeout or task drop; a future iteration can plumb a
5357    // CancellationToken through invoke_with_emitter).
5358    {
5359        // If a chat session is already registered for this id (the
5360        // typical proxy shape: host → agents.chat → supervised agent
5361        // → agents.invoke_external with the same session_id), DO NOT
5362        // overwrite it. The existing entry owns the routing to the
5363        // original host; clobbering it with our caller's client_id
5364        // would send streaming events to the proxying agent instead
5365        // of back to the host that issued agents.chat. Only register
5366        // a fresh entry when the slot is empty (a direct
5367        // host-to-invoke_external call without a prior agents.chat).
5368        let mut chats = state.chat_sessions.lock().await;
5369        chats.entry(session_id.clone()).or_insert_with(|| {
5370            let created_at = std::time::SystemTime::now()
5371                .duration_since(std::time::UNIX_EPOCH)
5372                .map(|d| d.as_secs())
5373                .unwrap_or(0);
5374            crate::session::ChatSession {
5375                agent_id: id.clone(),
5376                host_client_id: host_session.client_id.clone(),
5377                created_at,
5378            }
5379        });
5380    }
5381
5382    // Single drain task pulls StreamEvents off an unbounded
5383    // channel and serializes WS sends to the host. Per-event
5384    // tokio::spawn would let sends race (which token arrives
5385    // first depends on lock acquisition order). The channel
5386    // is unbounded because claude's event volume is bounded
5387    // by user turn count — typically <50 events per invocation.
5388    use tokio::sync::mpsc;
5389    let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5390
5391    let drain_state = state.clone();
5392    let drain_session_id = session_id.clone();
5393    let drain_agent_id = id.clone();
5394    tokio::spawn(async move {
5395        while let Some(event) = rx.recv().await {
5396            emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5397        }
5398    });
5399
5400    let emitter_tx = tx.clone();
5401    let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5402        // Send failure (rx dropped) means the drain task has
5403        // exited — usually because the host disconnected. The
5404        // runner will keep going; let it finish so the audit
5405        // log captures the full result.
5406        let _ = emitter_tx.send(event);
5407    });
5408
5409    // Run the invocation in a separate task so this handler
5410    // can return the ack right away. The runner's child-process
5411    // future owns the spawn lifetime; if the host disconnects
5412    // mid-stream, the runner still completes (its events fall
5413    // on the floor at the drain layer) so the audit log lands.
5414    let spawn_state = state.clone();
5415    let spawn_session_id = session_id.clone();
5416    let spawn_id = id.clone();
5417    let spawn_task = task.clone();
5418    let spawn_options = options_value.clone();
5419    tokio::spawn(async move {
5420        let outcome =
5421            car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5422                .await;
5423        drop(tx); // signal drain task to exit after queue empties
5424
5425        // Synthesize a terminal agents.chat.event so the host's
5426        // bubble finalizes. The runner doesn't emit a "done" event
5427        // itself — the result is the aggregate InvokeResult. We
5428        // translate it here.
5429        let terminal_params: Value;
5430        let result_value: Value;
5431        match outcome {
5432            Ok(res) => {
5433                // Pack the metadata into `finish_reason` as a
5434                // human-readable summary so the host's existing
5435                // ChatEvent decoder surfaces it without a schema
5436                // change. Hosts that want structured data can
5437                // re-issue `agents.invoke_external` with
5438                // `stream: false` and read the InvokeResult.
5439                let mut parts: Vec<String> = Vec::new();
5440                if res.turns > 0 {
5441                    parts.push(format!(
5442                        "{} turn{}",
5443                        res.turns,
5444                        if res.turns == 1 { "" } else { "s" }
5445                    ));
5446                }
5447                if res.tool_calls > 0 {
5448                    parts.push(format!(
5449                        "{} tool{}",
5450                        res.tool_calls,
5451                        if res.tool_calls == 1 { "" } else { "s" }
5452                    ));
5453                }
5454                if res.duration_ms > 0 {
5455                    parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5456                }
5457                let summary = if parts.is_empty() {
5458                    "stop".to_string()
5459                } else {
5460                    parts.join(" · ")
5461                };
5462                if res.is_error {
5463                    terminal_params = serde_json::json!({
5464                        "session_id": spawn_session_id,
5465                        "agent_id": spawn_id,
5466                        "kind": "error",
5467                        "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5468                    });
5469                } else {
5470                    terminal_params = serde_json::json!({
5471                        "session_id": spawn_session_id,
5472                        "agent_id": spawn_id,
5473                        "kind": "done",
5474                        "finish_reason": summary,
5475                    });
5476                }
5477                result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5478            }
5479            Err(e) => {
5480                let message = format!("{e}");
5481                terminal_params = serde_json::json!({
5482                    "session_id": spawn_session_id,
5483                    "agent_id": spawn_id,
5484                    "kind": "error",
5485                    "error": message.clone(),
5486                });
5487                result_value = serde_json::json!({ "is_error": true, "error": message });
5488            }
5489        }
5490        send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5491        spawn_state
5492            .chat_sessions
5493            .lock()
5494            .await
5495            .remove(&spawn_session_id);
5496        append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5497    });
5498
5499    Ok(serde_json::json!({
5500        "accepted": true,
5501        "session_id": session_id,
5502    }))
5503}
5504
5505/// Translate one [`StreamEvent`] from the running external CLI
5506/// into an `agents.chat.event` notification on the originating
5507/// host's WS. Same wire shape supervised agents emit, so host
5508/// UIs render both kinds with one decoder.
5509///
5510/// Mapping:
5511/// - `Assistant` events with `text` content blocks → `kind: "token"`
5512///   per text block. Each block carries the full text the
5513///   assistant emitted in that turn (claude doesn't expose
5514///   word-level deltas via stream-json — it emits per-turn or
5515///   per-content-block chunks).
5516/// - `Assistant` events with `tool_use` blocks → `kind: "tool_call"`
5517///   per block (tool name in `detail`).
5518/// - `System` / `User` / `Result` / others → dropped (Result's
5519///   metadata is folded into the terminal `done` event the
5520///   outer task emits when the invocation finishes).
5521async fn emit_external_chat_event(
5522    state: &Arc<ServerState>,
5523    session_id: &str,
5524    agent_id: &str,
5525    event: car_external_agents::StreamEvent,
5526) {
5527    use car_external_agents::StreamEvent;
5528    match event {
5529        StreamEvent::Assistant(a) => {
5530            if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5531                for block in content {
5532                    let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5533                    match block_type {
5534                        "text" => {
5535                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5536                                if !text.is_empty() {
5537                                    let params = serde_json::json!({
5538                                        "session_id": session_id,
5539                                        "agent_id": agent_id,
5540                                        "kind": "token",
5541                                        "delta": text,
5542                                    });
5543                                    send_external_chat_frame(state, session_id, params).await;
5544                                }
5545                            }
5546                        }
5547                        "tool_use" => {
5548                            let name = block
5549                                .get("name")
5550                                .and_then(|v| v.as_str())
5551                                .unwrap_or("(unknown tool)");
5552                            let params = serde_json::json!({
5553                                "session_id": session_id,
5554                                "agent_id": agent_id,
5555                                "kind": "tool_call",
5556                                "detail": name,
5557                            });
5558                            send_external_chat_frame(state, session_id, params).await;
5559                        }
5560                        _ => {}
5561                    }
5562                }
5563            }
5564        }
5565        _ => {
5566            // System (session id init), User (tool result echo),
5567            // Result (final aggregate — folded into terminal
5568            // `done` event by the outer task), RateLimitEvent,
5569            // Other: not surfaced to host.
5570        }
5571    }
5572}
5573
5574/// Send a single `agents.chat.event` notification to the host
5575/// session bound by `session_id`. Best-effort: a missing route
5576/// or a closed WS is silently dropped, the runner continues so
5577/// the audit log lands.
5578async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5579    use futures::SinkExt;
5580    use tokio_tungstenite::tungstenite::Message;
5581
5582    let host_client_id = state
5583        .chat_sessions
5584        .lock()
5585        .await
5586        .get(session_id)
5587        .map(|s| s.host_client_id.clone());
5588    let Some(host_client_id) = host_client_id else {
5589        return;
5590    };
5591    let host_channel = {
5592        let sessions = state.sessions.lock().await;
5593        sessions.get(&host_client_id).map(|s| s.channel.clone())
5594    };
5595    let Some(channel) = host_channel else {
5596        return;
5597    };
5598    let frame = serde_json::json!({
5599        "jsonrpc": "2.0",
5600        "method": "agents.chat.event",
5601        "params": params,
5602    });
5603    if let Ok(text) = serde_json::to_string(&frame) {
5604        let _ = channel
5605            .write
5606            .lock()
5607            .await
5608            .send(Message::Text(text.into()))
5609            .await;
5610    }
5611}
5612
5613/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
5614/// Best-effort: a failure to open the journal must NOT fail the
5615/// invocation; the in-memory result already returned is the
5616/// authoritative answer. Logs at warn level when the write fails so
5617/// operators notice repeated failures.
5618fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5619    use std::io::Write;
5620    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5621        Some(home) => home.join(".car"),
5622        None => return,
5623    };
5624    if std::fs::create_dir_all(&car_dir).is_err() {
5625        return;
5626    }
5627    let path = car_dir.join("external-agents.jsonl");
5628    let record = serde_json::json!({
5629        "ts": chrono::Utc::now().to_rfc3339(),
5630        "adapter_id": id,
5631        "task": task,
5632        "options": options,
5633        "result": result,
5634    });
5635    let line = match serde_json::to_string(&record) {
5636        Ok(s) => s,
5637        Err(_) => return,
5638    };
5639    if let Ok(mut f) = std::fs::OpenOptions::new()
5640        .create(true)
5641        .append(true)
5642        .open(&path)
5643    {
5644        let _ = writeln!(f, "{}", line);
5645    } else {
5646        tracing::warn!(
5647            path = %path.display(),
5648            "failed to append external-agent audit record"
5649        );
5650    }
5651}
5652
5653/// Ground-truth health check. Optional `id` param picks one tool;
5654/// without it, every detected adapter is checked. `force: true`
5655/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
5656/// credential-file shape heuristic as the load-bearing signal for
5657/// "is this tool ready to invoke."
5658async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5659    let force = req
5660        .params
5661        .get("force")
5662        .and_then(Value::as_bool)
5663        .unwrap_or(false);
5664    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5665        let json = car_ffi_common::external_agents::health_one(id, force).await?;
5666        serde_json::from_str(&json).map_err(|e| e.to_string())
5667    } else {
5668        let json = car_ffi_common::external_agents::health(force).await?;
5669        serde_json::from_str(&json).map_err(|e| e.to_string())
5670    }
5671}
5672
5673// ---------------------------------------------------------------------------
5674// agents.chat — unified chat surface (docs/proposals/agent-chat-surface.md)
5675// ---------------------------------------------------------------------------
5676//
5677// Host calls `agents.chat { agent_id, prompt, session_id?, stream? }`.
5678// The server looks up the target agent's attached WS connection,
5679// reverse-calls `agent.chat { session_id, prompt, context }` on it
5680// (same pattern as `tools.execute`), and returns once the agent acks.
5681// The agent then streams `agent.chat.event` notifications back, which
5682// the dispatcher intercepts (see `try_forward_agent_chat_event`) and
5683// rewrites as `agents.chat.event` notifications on the originating
5684// host's channel.
5685
5686/// Timeout the server waits for the agent to ack `agent.chat`. The
5687/// streamed tokens come later as separate notifications and have no
5688/// bearing on this — this is just "did the agent receive the prompt
5689/// and accept it." Five seconds is generous for a local IPC ack.
5690const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5691
5692/// `agents.chat` — host issues a chat turn to a named agent. Returns
5693/// `{ accepted: true, session_id }` once the agent acks; streamed
5694/// tokens arrive on the host's channel as `agents.chat.event`
5695/// notifications keyed by the same `session_id`.
5696async fn handle_agents_chat(
5697    req: &JsonRpcMessage,
5698    state: &Arc<ServerState>,
5699    host_session: &Arc<crate::session::ClientSession>,
5700) -> Result<Value, String> {
5701    use futures::SinkExt;
5702    use tokio::sync::oneshot;
5703    use tokio_tungstenite::tungstenite::Message;
5704
5705    let agent_id = req
5706        .params
5707        .get("agent_id")
5708        .and_then(Value::as_str)
5709        .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5710        .to_string();
5711    let prompt = req
5712        .params
5713        .get("prompt")
5714        .and_then(Value::as_str)
5715        .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5716        .to_string();
5717    let session_id = req
5718        .params
5719        .get("session_id")
5720        .and_then(Value::as_str)
5721        .map(str::to_string)
5722        .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5723    let stream = req
5724        .params
5725        .get("stream")
5726        .and_then(Value::as_bool)
5727        .unwrap_or(true);
5728    let voice_input = req
5729        .params
5730        .get("voice_input")
5731        .and_then(Value::as_bool)
5732        .unwrap_or(false);
5733
5734    // Resolve the agent's attached WS channel via `attached_agents` →
5735    // `sessions` → `channel`. Both lookups must hit; a missing entry on
5736    // either side means the agent is registered in `agents.json` but
5737    // hasn't `session.auth`'d (or has disconnected), so refuse with a
5738    // structured error rather than silently parking the chat.
5739    let agent_client_id = state
5740        .attached_agents
5741        .lock()
5742        .await
5743        .get(&agent_id)
5744        .cloned()
5745        .ok_or_else(|| {
5746            format!(
5747                "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5748                agent_id
5749            )
5750        })?;
5751    let agent_channel = {
5752        let sessions = state.sessions.lock().await;
5753        sessions
5754            .get(&agent_client_id)
5755            .map(|s| s.channel.clone())
5756            .ok_or_else(|| {
5757                format!(
5758                    "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5759                    agent_id, agent_client_id
5760                )
5761            })?
5762    };
5763
5764    // Register the chat session BEFORE sending the reverse call so any
5765    // `agent.chat.event` notifications the agent sends as part of
5766    // accepting the chat (e.g. an immediate `token` delta) route
5767    // correctly. Indexed by session_id so the notification interceptor
5768    // can locate the originating host without scanning.
5769    {
5770        let created_at = std::time::SystemTime::now()
5771            .duration_since(std::time::UNIX_EPOCH)
5772            .map(|d| d.as_secs())
5773            .unwrap_or(0);
5774        state.chat_sessions.lock().await.insert(
5775            session_id.clone(),
5776            crate::session::ChatSession {
5777                agent_id: agent_id.clone(),
5778                host_client_id: host_session.client_id.clone(),
5779                created_at,
5780            },
5781        );
5782    }
5783
5784    // Reverse-callback: register a oneshot for the ack, send the
5785    // `agent.chat` JSON-RPC request on the agent's channel, await up
5786    // to AGENT_CHAT_ACK_TIMEOUT_SECS. Uses the same `pending` map the
5787    // tool-callback path uses (`WsToolExecutor`) — the dispatcher's
5788    // response demuxer at the top of `run_dispatch` already routes
5789    // `result` / `error` frames keyed by request id back through it.
5790    let request_id = agent_channel.next_request_id();
5791    let (tx, rx) = oneshot::channel();
5792    agent_channel
5793        .pending
5794        .lock()
5795        .await
5796        .insert(request_id.clone(), tx);
5797
5798    let rpc_request = serde_json::json!({
5799        "jsonrpc": "2.0",
5800        "method": "agent.chat",
5801        "params": {
5802            "session_id": session_id,
5803            "prompt": prompt,
5804            "stream": stream,
5805            "context": {
5806                "host_client_id": host_session.client_id,
5807                "voice_input": voice_input,
5808            },
5809        },
5810        "id": request_id,
5811    });
5812    let msg = Message::Text(
5813        serde_json::to_string(&rpc_request)
5814            .map_err(|e| e.to_string())?
5815            .into(),
5816    );
5817    if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5818        // Send failed — drop the pending waiter and the chat session
5819        // entry so a retry can take a fresh session_id without
5820        // colliding.
5821        agent_channel.pending.lock().await.remove(&request_id);
5822        state.chat_sessions.lock().await.remove(&session_id);
5823        return Err(format!(
5824            "failed to deliver agent.chat to `{}`: {}",
5825            agent_id, e
5826        ));
5827    }
5828
5829    // Await the agent's ack. The dispatcher's response demuxer routes
5830    // the result/error back via the oneshot. Timeout means the agent
5831    // is alive but unresponsive — clean up routing state and surface a
5832    // structured error so the host UI doesn't hang.
5833    let ack = match tokio::time::timeout(
5834        std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5835        rx,
5836    )
5837    .await
5838    {
5839        Ok(Ok(resp)) => resp,
5840        Ok(Err(_)) => {
5841            // Channel closed — agent disconnected mid-call.
5842            state.chat_sessions.lock().await.remove(&session_id);
5843            return Err(format!(
5844                "agent `{}` disconnected before acking agents.chat",
5845                agent_id
5846            ));
5847        }
5848        Err(_) => {
5849            // Timeout — agent didn't respond in time. Don't keep the
5850            // chat session around: any later events from the agent
5851            // would route to a host that already returned an error.
5852            agent_channel.pending.lock().await.remove(&request_id);
5853            state.chat_sessions.lock().await.remove(&session_id);
5854            return Err(format!(
5855                "agent `{}` did not ack agents.chat within {}s",
5856                agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5857            ));
5858        }
5859    };
5860
5861    if let Some(err) = ack.error {
5862        // Agent explicitly rejected — drop the session and propagate.
5863        state.chat_sessions.lock().await.remove(&session_id);
5864        return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5865    }
5866
5867    Ok(serde_json::json!({
5868        "accepted": true,
5869        "session_id": session_id,
5870    }))
5871}
5872
5873/// `agents.chat.cancel` — host aborts an in-flight chat. Forwards
5874/// `agent.chat.cancel` to the bound agent so the agent can short-
5875/// circuit its inference stream + free upstream resources
5876/// (`inference.stream.cancel`). The chat session is dropped from
5877/// routing state immediately whether or not the agent acks the cancel
5878/// — further `agent.chat.event` notifications for this session_id
5879/// fall on the floor by design.
5880async fn handle_agents_chat_cancel(
5881    req: &JsonRpcMessage,
5882    state: &Arc<ServerState>,
5883) -> Result<Value, String> {
5884    use futures::SinkExt;
5885    use tokio_tungstenite::tungstenite::Message;
5886
5887    let session_id = req
5888        .params
5889        .get("session_id")
5890        .and_then(Value::as_str)
5891        .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5892        .to_string();
5893
5894    let chat = state.chat_sessions.lock().await.remove(&session_id);
5895    let chat = match chat {
5896        Some(c) => c,
5897        None => {
5898            // Already cancelled or never existed — idempotent.
5899            return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
5900        }
5901    };
5902
5903    // Best-effort fire-and-forget to the agent. We've already removed
5904    // the routing entry, so no need to await any agent response.
5905    let agent_client_id = state
5906        .attached_agents
5907        .lock()
5908        .await
5909        .get(&chat.agent_id)
5910        .cloned();
5911    if let Some(client_id) = agent_client_id {
5912        let channel_opt = {
5913            let sessions = state.sessions.lock().await;
5914            sessions.get(&client_id).map(|s| s.channel.clone())
5915        };
5916        if let Some(channel) = channel_opt {
5917            let notification = serde_json::json!({
5918                "jsonrpc": "2.0",
5919                "method": "agent.chat.cancel",
5920                "params": { "session_id": session_id },
5921            });
5922            if let Ok(text) = serde_json::to_string(&notification) {
5923                let _ = channel
5924                    .write
5925                    .lock()
5926                    .await
5927                    .send(Message::Text(text.into()))
5928                    .await;
5929            }
5930        }
5931    }
5932
5933    Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
5934}
5935
5936/// Forward an `agent.chat.event` notification from an agent's
5937/// connection to the originating host's connection, rewritten as an
5938/// `agents.chat.event` notification. Returns `true` if the inbound
5939/// frame was a chat-event we routed (so the dispatcher can `continue`
5940/// past the normal method dispatch and skip the wasted "unknown
5941/// method" response), `false` otherwise.
5942///
5943/// Terminal events (`kind: "done"` / `"error"`) also drop the routing
5944/// entry from `state.chat_sessions` so a later stray notification can
5945/// be rejected as orphaned without leaking memory.
5946pub(crate) async fn try_forward_agent_chat_event(
5947    parsed: &JsonRpcMessage,
5948    state: &Arc<ServerState>,
5949) -> bool {
5950    use futures::SinkExt;
5951    use tokio_tungstenite::tungstenite::Message;
5952
5953    // Notification predicate: method is `agent.chat.event`, id is
5954    // missing/null (per JSON-RPC, notifications have no id), and
5955    // params carry a session_id.
5956    let Some(method) = parsed.method.as_deref() else {
5957        return false;
5958    };
5959    if method != "agent.chat.event" {
5960        return false;
5961    }
5962    if !parsed.id.is_null() {
5963        // Has an id → it's a request, not a notification. Let the
5964        // normal dispatcher handle it (and reply with method-not-found).
5965        return false;
5966    }
5967    let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
5968        return false;
5969    };
5970    let session_id = session_id.to_string();
5971
5972    // Look up the routing entry. If gone (cancelled, agent dropped,
5973    // disconnect cleanup), drop the event silently — late frames from
5974    // a respawned agent for a stale session are not the host's
5975    // problem.
5976    let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
5977    let Some(chat) = chat else {
5978        return true; // recognized the method, but routing has dropped — consumed.
5979    };
5980
5981    // Pull the kind early so terminal-event cleanup runs even if the
5982    // host's send fails.
5983    let kind = parsed
5984        .params
5985        .get("kind")
5986        .and_then(Value::as_str)
5987        .unwrap_or("token")
5988        .to_string();
5989
5990    // Forward to the host. Rewrites the method name to the host-facing
5991    // form and attaches `agent_id` so the host doesn't have to remember
5992    // which agent owns each session.
5993    let host_channel = {
5994        let sessions = state.sessions.lock().await;
5995        sessions
5996            .get(&chat.host_client_id)
5997            .map(|s| s.channel.clone())
5998    };
5999    if let Some(channel) = host_channel {
6000        let mut params = parsed.params.clone();
6001        if let Some(obj) = params.as_object_mut() {
6002            obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6003        }
6004        let forward = serde_json::json!({
6005            "jsonrpc": "2.0",
6006            "method": "agents.chat.event",
6007            "params": params,
6008        });
6009        if let Ok(text) = serde_json::to_string(&forward) {
6010            let _ = channel
6011                .write
6012                .lock()
6013                .await
6014                .send(Message::Text(text.into()))
6015                .await;
6016        }
6017    }
6018    // Else: host disconnected mid-stream. Drop the event silently +
6019    // cancel the session so the agent isn't streaming into the void.
6020
6021    // Terminal-kind cleanup. The host_channel branch above already
6022    // forwarded the terminal event; we just remove the routing entry
6023    // here so subsequent stray frames are no-ops.
6024    if matches!(kind.as_str(), "done" | "error") {
6025        state.chat_sessions.lock().await.remove(&session_id);
6026    }
6027
6028    true
6029}
6030
6031#[cfg(test)]
6032mod fd_leak_regression {
6033    //! car#209 regression: an abrupt transport error must still run
6034    //! the connection cleanup. Before the fix, `let msg = msg?;`
6035    //! propagated the read error out of `run_dispatch`, skipping
6036    //! `remove_session`, so `state.sessions` (holding the
6037    //! `Arc<ClientSession>` -> `Arc<WsChannel>` -> socket FD) leaked
6038    //! forever on every peer reset / crash-loop disconnect.
6039    use super::run_dispatch;
6040    use futures::SinkExt;
6041    use std::sync::Arc;
6042    use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6043
6044    #[tokio::test]
6045    async fn abrupt_read_error_still_runs_session_cleanup() {
6046        let tmp = tempfile::TempDir::new().unwrap();
6047        let state = Arc::new(crate::session::ServerState::standalone(
6048            tmp.path().to_path_buf(),
6049        ));
6050
6051        // Read stream that immediately yields a transport error (peer
6052        // reset), then ends -- the exact shape of an ungraceful
6053        // client disconnect.
6054        let read = futures::stream::iter(vec![Err::<Message, WsError>(
6055            WsError::ConnectionClosed,
6056        )]);
6057        let write: crate::session::WsSink = Box::pin(
6058            futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6059        );
6060
6061        let result =
6062            run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6063        assert!(
6064            result.is_ok(),
6065            "run_dispatch must return Ok after cleanup, got {result:?}"
6066        );
6067
6068        // The session (and its channel/FD) must be gone -- cleanup
6069        // ran despite the abrupt error.
6070        assert!(
6071            state.sessions.lock().await.is_empty(),
6072            "state.sessions must be empty after an abrupt disconnect (car#209)"
6073        );
6074    }
6075}