Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150        + Unpin
151        + Send,
152{
153    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154    tracing::Span::current().record("client_id", &client_id.as_str());
155
156    info!("New connection from {}", peer);
157
158    let channel = Arc::new(WsChannel {
159        write: Mutex::new(write),
160        pending: Mutex::new(HashMap::new()),
161        next_id: AtomicU64::new(1),
162    });
163
164    let session = state.create_session(&client_id, channel.clone()).await;
165
166    while let Some(msg) = read.next().await {
167        let msg = msg?;
168        if msg.is_text() {
169            let text = msg.to_text()?;
170            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
171                Ok(m) => m,
172                Err(e) => {
173                    send_response(
174                        &session.channel,
175                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
176                    )
177                    .await?;
178                    continue;
179                }
180            };
181
182            // Is this a response to a pending tool callback?
183            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
184                if let Some(id_str) = parsed.id.as_str() {
185                    let mut pending = session.channel.pending.lock().await;
186                    if let Some(tx) = pending.remove(id_str) {
187                        let tool_resp = if let Some(result) = parsed.result {
188                            ToolExecuteResponse {
189                                action_id: id_str.to_string(),
190                                output: Some(result),
191                                error: None,
192                            }
193                        } else {
194                            let err_msg = parsed
195                                .error
196                                .as_ref()
197                                .and_then(|e| e.get("message"))
198                                .and_then(|m| m.as_str())
199                                .unwrap_or("unknown error")
200                                .to_string();
201                            ToolExecuteResponse {
202                                action_id: id_str.to_string(),
203                                output: None,
204                                error: Some(err_msg),
205                            }
206                        };
207                        let _ = tx.send(tool_resp);
208                        continue;
209                    }
210                }
211            }
212
213            // Agent → host chat-event interceptor. `agent.chat.event`
214            // notifications coming up the WS from a connected agent
215            // are forwarded to the originating host's channel as
216            // `agents.chat.event`. Lives ahead of the regular method
217            // dispatch so the dispatcher doesn't reply with
218            // "method-not-found" on what is a fire-and-forget
219            // notification (no id). See
220            // `docs/proposals/agent-chat-surface.md`.
221            if try_forward_agent_chat_event(&parsed, &state).await {
222                continue;
223            }
224
225            // Otherwise it's a client request
226            if let Some(method) = &parsed.method {
227                info!(method = %method, "dispatching JSON-RPC method");
228
229                // Auth gate (Parslee-ai/car-releases#32). When the
230                // server has an auth token installed, every method
231                // other than `session.auth` is rejected on
232                // unauthenticated sessions and the connection is
233                // closed after the error response goes out. When no
234                // token is installed (default), this branch never
235                // fires — preserves pre-#32 behaviour.
236                if state.auth_token.get().is_some()
237                    && !session
238                        .authenticated
239                        .load(std::sync::atomic::Ordering::Acquire)
240                    && method != "session.auth"
241                {
242                    let resp = JsonRpcResponse::error(
243                        parsed.id.clone(),
244                        -32001,
245                        "auth required: send `session.auth` with the per-launch token \
246                         from ~/Library/Application Support/ai.parslee.car/auth-token \
247                         (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
248                         as the first frame on this connection",
249                    );
250                    send_response(&session.channel, resp).await?;
251                    info!(client = %client_id, method = %method,
252                        "rejecting non-auth method on unauthenticated session; closing");
253                    break;
254                }
255
256                // Approval gate (audit 2026-05). High-risk methods —
257                // anything that drives macOS automation or sends
258                // messages on the user's behalf — must be acked by
259                // the user via `host.resolve_approval` before they
260                // dispatch. The gate raises an `approval.requested`
261                // host event the local UI can render approve/deny on,
262                // then parks until resolved or the configured
263                // timeout fires. Returns a JSON-RPC error and
264                // continues the dispatch loop on deny / timeout —
265                // no connection close, since the caller may want to
266                // retry with revised parameters.
267                if state.approval_gate.requires_approval(method.as_str()) {
268                    match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
269                        Ok(()) => {}
270                        Err(reason) => {
271                            let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
272                            send_response(&session.channel, resp).await?;
273                            info!(
274                                client = %client_id,
275                                method = %method,
276                                reason = %reason,
277                                "approval gate blocked dispatch"
278                            );
279                            continue;
280                        }
281                    }
282                }
283
284                // Spawn the per-method dispatch in a task so the read
285                // loop keeps reading frames. Without this, methods
286                // that trigger server-initiated `tools.execute`
287                // callbacks (`proposal.submit`, `workflow.run`,
288                // `multi.*` paths that fire a registered tool)
289                // deadlock the connection: the handler awaits the
290                // callback response on a oneshot, but the response is
291                // another frame on this same read half — which the
292                // synchronous `.await` here would prevent the loop
293                // from ever picking up. Surfaced by the
294                // `executeProposal: echo tool via JS callback` smoke
295                // (#173). Response ordering becomes id-keyed (the
296                // JSON-RPC demuxing contract) rather than
297                // arrival-ordered.
298                let session_task = session.clone();
299                let state_task = state.clone();
300                let method_owned = method.clone();
301                let parsed_task = parsed;
302                tokio::spawn(async move {
303                    let session = session_task;
304                    let state = state_task;
305                    let parsed = parsed_task;
306                    let result = match method_owned.as_str() {
307                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
308                        "parslee.auth" => handle_parslee_auth().await,
309                        "session.init" => handle_session_init(&parsed, &session).await,
310                        "host.subscribe" => handle_host_subscribe(&session, &state).await,
311                        "host.agents" => handle_host_agents(&session).await,
312                        "host.events" => handle_host_events(&parsed, &session).await,
313                        "host.approvals" => handle_host_approvals(&session).await,
314                        "host.register_agent" => {
315                            handle_host_register_agent(&parsed, &session).await
316                        }
317                        "host.unregister_agent" => {
318                            handle_host_unregister_agent(&parsed, &session).await
319                        }
320                        "host.set_status" => handle_host_set_status(&parsed, &session).await,
321                        "host.notify" => handle_host_notify(&parsed, &session).await,
322                        "host.request_approval" => {
323                            handle_host_request_approval(&parsed, &session).await
324                        }
325                        "host.resolve_approval" => {
326                            handle_host_resolve_approval(&parsed, &session).await
327                        }
328                        "tools.register" => handle_tools_register(&parsed, &session).await,
329                        "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
330                        "policy.register" => handle_policy_register(&parsed, &session).await,
331                        "session.policy.open" => handle_session_policy_open(&session).await,
332                        "session.policy.close" => {
333                            handle_session_policy_close(&parsed, &session).await
334                        }
335                        "verify" => handle_verify(&parsed, &session).await,
336                        "state.get" => handle_state_get(&parsed, &session).await,
337                        "state.set" => handle_state_set(&parsed, &session).await,
338                        "state.exists" => handle_state_exists(&parsed, &session).await,
339                        "state.keys" => handle_state_keys(&parsed, &session).await,
340                        "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
341                        "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
342                        "memory.query" => handle_memory_query(&parsed, &session).await,
343                        "memory.build_context" => {
344                            handle_memory_build_context(&parsed, &session).await
345                        }
346                        "memory.build_context_fast" => {
347                            handle_memory_build_context_fast(&parsed, &session).await
348                        }
349                        "memory.consolidate" => handle_memory_consolidate(&session).await,
350                        "memory.fact_count" => handle_memory_fact_count(&session).await,
351                        "memory.persist" => handle_memory_persist(&parsed, &session).await,
352                        "memory.load" => handle_memory_load(&parsed, &session).await,
353                        "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
354                        "skill.find" => handle_skill_find(&parsed, &session).await,
355                        "skill.report" => handle_skill_report(&parsed, &session).await,
356                        "skill.repair" => handle_skill_repair(&parsed, &session).await,
357                        "skills.ingest_distilled" => {
358                            handle_skills_ingest_distilled(&parsed, &session).await
359                        }
360                        "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
361                        "skills.domains_needing_evolution" => {
362                            handle_skills_domains_needing_evolution(&parsed, &session).await
363                        }
364                        "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
365                        "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
366                        "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
367                        "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
368                        "multi.vote" => handle_multi_vote(&parsed, &session).await,
369                        "scheduler.create" => handle_scheduler_create(&parsed),
370                        "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
371                        "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
372                        "infer" => handle_infer(&parsed, &state, &session).await,
373                        "image.generate" => handle_image_generate(&parsed, &state).await,
374                        "video.generate" => handle_video_generate(&parsed, &state).await,
375                        "embed" => handle_embed(&parsed, &state).await,
376                        "classify" => handle_classify(&parsed, &state).await,
377                        "tokenize" => handle_tokenize(&parsed, &state).await,
378                        "detokenize" => handle_detokenize(&parsed, &state).await,
379                        "rerank" => handle_rerank(&parsed, &state).await,
380                        "transcribe" => handle_transcribe(&parsed, &state).await,
381                        "synthesize" => handle_synthesize(&parsed, &state).await,
382                        "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
383                        "speech.prepare" => handle_speech_prepare(&state).await,
384                        "models.route" => handle_models_route(&parsed, &state).await,
385                        "models.stats" => handle_models_stats(&state).await,
386                        "outcomes.resolve_pending" => {
387                            handle_outcomes_resolve_pending(&parsed, &state).await
388                        }
389                        "events.count" => handle_events_count(&session).await,
390                        "events.stats" => handle_events_stats(&session).await,
391                        "events.truncate" => handle_events_truncate(&parsed, &session).await,
392                        "events.clear" => handle_events_clear(&session).await,
393                        "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
394                        "models.list" => handle_models_list(&state),
395                        "models.register" => handle_models_register(&parsed, &state).await,
396                        "models.unregister" => handle_models_unregister(&parsed, &state).await,
397                        "models.list_unified" => handle_models_list_unified(&state),
398                        "models.search" => handle_models_search(&parsed, &state),
399                        "models.upgrades" => handle_models_upgrades(&state),
400                        "models.pull" => handle_models_pull(&parsed, &state).await,
401                        "models.install" => handle_models_pull(&parsed, &state).await,
402                        "skills.distill" => handle_skills_distill(&parsed, &state).await,
403                        "skills.list" => handle_skills_list(&parsed, &session).await,
404                        "browser.run" => handle_browser_run(&parsed, &session).await,
405                        "browser.close" => handle_browser_close(&session).await,
406                        "secret.put" => handle_secret_put(&parsed),
407                        "secret.get" => handle_secret_get(&parsed),
408                        "secret.delete" => handle_secret_delete(&parsed),
409                        "secret.status" => handle_secret_status(&parsed),
410                        "secret.available" => Ok(car_ffi_common::secrets::is_available()),
411                        "permissions.status" => handle_perm_status(&parsed),
412                        "permissions.request" => handle_perm_request(&parsed),
413                        "permissions.explain" => handle_perm_explain(&parsed),
414                        "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
415                        "accounts.list" => car_ffi_common::accounts::list(),
416                        "accounts.open" => {
417                            #[derive(serde::Deserialize, Default)]
418                            struct OpenParams {
419                                #[serde(default)]
420                                account_id: Option<String>,
421                            }
422                            let p: OpenParams =
423                                serde_json::from_value(parsed.params.clone()).unwrap_or_default();
424                            car_ffi_common::accounts::open_settings(p.account_id.as_deref())
425                        }
426                        "calendar.list" => car_ffi_common::integrations::calendar_list(),
427                        "calendar.events" => handle_calendar_events(&parsed),
428                        "contacts.containers" => {
429                            car_ffi_common::integrations::contacts_containers()
430                        }
431                        "contacts.find" => handle_contacts_find(&parsed),
432                        "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
433                        "mail.inbox" => handle_mail_inbox(&parsed),
434                        "mail.send" => handle_mail_send(&parsed),
435                        "messages.services" => car_ffi_common::integrations::messages_services(),
436                        "messages.chats" => handle_messages_chats(&parsed),
437                        "messages.send" => handle_messages_send(&parsed),
438                        "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
439                        "notes.find" => handle_notes_find(&parsed),
440                        "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
441                        "reminders.items" => handle_reminders_items(&parsed),
442                        "photos.albums" => car_ffi_common::integrations::photos_albums(),
443                        "bookmarks.list" => handle_bookmarks_list(&parsed),
444                        "files.locations" => car_ffi_common::integrations::files_locations(),
445                        "keychain.status" => car_ffi_common::integrations::keychain_status(),
446                        "health.status" => car_ffi_common::health::status(),
447                        "health.sleep" => handle_health_sleep(&parsed),
448                        "health.workouts" => handle_health_workouts(&parsed),
449                        "health.activity" => handle_health_activity(&parsed),
450                        "voice.transcribe_stream.start" => {
451                            handle_voice_transcribe_stream_start(&parsed, &state, &session).await
452                        }
453                        "voice.transcribe_stream.stop" => {
454                            handle_voice_transcribe_stream_stop(&parsed, &state).await
455                        }
456                        "voice.transcribe_stream.push" => {
457                            handle_voice_transcribe_stream_push(&parsed, &state).await
458                        }
459                        "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
460                        "voice.dispatch_turn" => {
461                            handle_voice_dispatch_turn(&parsed, &state, &session).await
462                        }
463                        "voice.cancel_turn" => handle_voice_cancel_turn().await,
464                        "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
465                        "inference.register_runner" => {
466                            handle_inference_register_runner(&session).await
467                        }
468                        "inference.runner.event" => handle_inference_runner_event(&parsed).await,
469                        "inference.runner.complete" => {
470                            handle_inference_runner_complete(&parsed).await
471                        }
472                        "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
473                        "voice.providers.list" => {
474                            // Stateless: enumerates STT/TTS providers compiled into
475                            // this build. Runtime readiness (API key, permission,
476                            // model download) is reported via per-provider errors.
477                            serde_json::from_str::<serde_json::Value>(
478                                &car_voice::list_voice_providers_json(),
479                            )
480                            .map_err(|e| e.to_string())
481                        }
482                        "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
483                            .await
484                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
485                        "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
486                            .await
487                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
488                        "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
489                        "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
490                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
491                        "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
492                        "workflow.run" => handle_workflow_run(&parsed, &session).await,
493                        "workflow.verify" => handle_workflow_verify(&parsed),
494                        "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
495                        "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
496                        "meeting.list" => handle_meeting_list(&parsed),
497                        "meeting.get" => handle_meeting_get(&parsed),
498                        "registry.register" => handle_registry_register(&parsed),
499                        "registry.heartbeat" => handle_registry_heartbeat(&parsed),
500                        "registry.unregister" => handle_registry_unregister(&parsed),
501                        "registry.list" => handle_registry_list(&parsed),
502                        "registry.reap" => handle_registry_reap(&parsed),
503                        "admission.status" => handle_admission_status(&state),
504                        "a2a.start" => handle_a2a_start(&parsed, &session).await,
505                        "a2a.stop" => handle_a2a_stop(),
506                        "a2a.status" => handle_a2a_status(),
507                        "a2a.send" => handle_a2a_send(&parsed, &state).await,
508                        "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
509                        "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
510                        "a2ui.capabilities" => handle_a2ui_capabilities(&state),
511                        "a2ui.reap" => handle_a2ui_reap(&state).await,
512                        "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
513                        "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
514                        "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
515                        "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
516                        "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
517                        "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
518                        "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
519                        "automation.run_applescript" => handle_run_applescript(&parsed).await,
520                        "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
521                        "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
522                        "notifications.local" => handle_local_notification(&parsed).await,
523                        "vision.ocr" => handle_vision_ocr(&parsed).await,
524                        "agents.list" => handle_agents_list(&state).await,
525                        "agents.health" => handle_agents_health(&state).await,
526                        "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
527                        "agents.install" => handle_agents_install(&parsed, &state).await,
528                        "agents.remove" => handle_agents_remove(&parsed, &state).await,
529                        "agents.start" => handle_agents_start(&parsed, &state).await,
530                        "agents.stop" => handle_agents_stop(&parsed, &state).await,
531                        "agents.restart" => handle_agents_restart(&parsed, &state).await,
532                        "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
533                        "agents.list_external" => handle_agents_list_external(&parsed).await,
534                        "agents.detect_external" => handle_agents_detect_external(&parsed).await,
535                        "agents.health_external" => handle_agents_health_external(&parsed).await,
536                        "agents.invoke_external" => {
537                            handle_agents_invoke_external(&parsed, &state, &session).await
538                        }
539                        "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
540                        "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
541                        // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
542                        // alias to the same in-core dispatcher per
543                        // Parslee-ai/car-releases#28. Embedders that need a
544                        // custom AgentCardSource / TaskStore plug them in
545                        // via ServerStateConfig::with_a2a_card_source /
546                        // with_a2a_store before any handler runs.
547                        "message/send"
548                        | "SendMessage"
549                        | "message/stream"
550                        | "SendStreamingMessage"
551                        | "tasks/get"
552                        | "GetTask"
553                        | "tasks/list"
554                        | "ListTasks"
555                        | "tasks/cancel"
556                        | "CancelTask"
557                        | "tasks/resubscribe"
558                        | "SubscribeToTask"
559                        | "tasks/pushNotificationConfig/set"
560                        | "CreateTaskPushNotificationConfig"
561                        | "tasks/pushNotificationConfig/get"
562                        | "GetTaskPushNotificationConfig"
563                        | "tasks/pushNotificationConfig/list"
564                        | "ListTaskPushNotificationConfigs"
565                        | "tasks/pushNotificationConfig/delete"
566                        | "DeleteTaskPushNotificationConfig"
567                        | "agent/getAuthenticatedExtendedCard"
568                        | "GetExtendedAgentCard" => {
569                            handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
570                        }
571                        _ => Err(format!("unknown method: {}", method_owned)),
572                    };
573
574                    let resp = match result {
575                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
576                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
577                    };
578                    let _ = send_response(&session.channel, resp).await;
579                });
580            }
581        } else if msg.is_close() {
582            info!("Client {} disconnected", client_id);
583            break;
584        }
585    }
586
587    session.host.unsubscribe(&client_id).await;
588    state.a2ui_subscribers.lock().await.remove(&client_id);
589
590    // Fix for MULTI-4 / WS-3: drop the session from the registry and
591    // drain any pending tool callbacks. Without this, every connection
592    // we ever accepted keeps an `Arc<ClientSession>` alive in
593    // `state.sessions`, and outstanding `oneshot::Sender`s in
594    // `session.channel.pending` outlive the closed connection until
595    // their per-call timeout (60s). Dropping the senders here causes any
596    // awaiting `recv()` in `WsToolExecutor::execute` to return
597    // `RecvError` immediately, which the existing error-handler path
598    // already maps to "callback channel closed" — same shape as the
599    // timeout path, just faster.
600    let _removed = state.remove_session(&client_id).await;
601    {
602        let mut pending = session.channel.pending.lock().await;
603        pending.clear();
604    }
605
606    Ok(())
607}
608
609async fn send_response(
610    channel: &WsChannel,
611    resp: JsonRpcResponse,
612) -> Result<(), Box<dyn std::error::Error>> {
613    use futures::SinkExt;
614    let json = serde_json::to_string(&resp)?;
615    channel
616        .write
617        .lock()
618        .await
619        .send(Message::Text(json.into()))
620        .await?;
621    Ok(())
622}
623
624// --- Request handlers ---
625
626async fn handle_host_subscribe(
627    session: &crate::session::ClientSession,
628    state: &Arc<ServerState>,
629) -> Result<Value, String> {
630    session
631        .host
632        .subscribe(&session.client_id, session.channel.clone())
633        .await;
634    serde_json::to_value(HostSnapshot {
635        subscribed: true,
636        agents: session.host.agents().await,
637        approvals: session.host.approvals().await,
638        events: session.host.events(50).await,
639        identity: Some(daemon_identity(state)),
640    })
641    .map_err(|e| e.to_string())
642}
643
644/// Snapshot the daemon-identity facts for a fresh subscriber.
645/// Cheap: non-acquiring reads on `OnceLock`s + a single
646/// `to_string_lossy` on the manifest path. Critically uses
647/// [`ServerState::supervisor_if_installed`] — not the lazy-init
648/// `supervisor()` — so a Heisenberg subscribe can't *cause* the
649/// daemon to acquire the manifest lock just by asking whether it
650/// owns one.
651fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
652    // Observer takes precedence: when both supervisor and observer
653    // markers are set (currently unreachable through the standalone
654    // binary, but an embedder could install both racily), the
655    // observer marker is the authoritative role since the
656    // supervisor handle is only installed when this daemon owns
657    // the lock.
658    let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
659        (
660            Some(p.to_string_lossy().into_owned()),
661            car_proto::HostManifestRole::Observer,
662        )
663    } else if let Some(s) = state.supervisor_if_installed() {
664        (
665            Some(s.manifest_path().to_string_lossy().into_owned()),
666            car_proto::HostManifestRole::Owner,
667        )
668    } else {
669        (None, car_proto::HostManifestRole::None)
670    };
671    car_proto::HostIdentity {
672        version: env!("CARGO_PKG_VERSION").to_string(),
673        pid: std::process::id(),
674        manifest_path,
675        manifest_role,
676        parslee: state
677            .parslee_session
678            .get()
679            .map(|session| session.identity.clone()),
680    }
681}
682
683/// Return the Parslee cloud credential for an authenticated CAR
684/// connection. Managed agents already receive `CAR_AUTH_TOKEN` and
685/// authenticate to the local daemon with `session.auth`; this method is
686/// their supported bridge from local CAR auth to the user's Parslee
687/// backend auth.
688///
689/// The bearer token is intentionally not injected into every managed
690/// child process environment. Agents ask for it only when they need it,
691/// through the same local auth gate that protects the rest of the
692/// daemon.
693async fn handle_parslee_auth() -> Result<Value, String> {
694    let session = crate::parslee_auth::load_or_refresh()
695        .await?
696        .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
697    Ok(serde_json::json!({
698        "authenticated": true,
699        "token_type": "Bearer",
700        "access_token": session.access_token,
701        "authorization_header": format!("Bearer {}", session.access_token),
702        "identity": session.identity,
703    }))
704}
705
706async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
707    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
708}
709
710async fn handle_host_events(
711    req: &JsonRpcMessage,
712    session: &crate::session::ClientSession,
713) -> Result<Value, String> {
714    let limit = req
715        .params
716        .get("limit")
717        .and_then(|v| v.as_u64())
718        .unwrap_or(100) as usize;
719    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
720}
721
722async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
723    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
724}
725
726async fn handle_a2ui_apply(
727    req: &JsonRpcMessage,
728    state: &Arc<ServerState>,
729) -> Result<Value, String> {
730    #[derive(Deserialize)]
731    struct Params {
732        #[serde(default)]
733        envelope: Option<car_a2ui::A2uiEnvelope>,
734        #[serde(default)]
735        message: Option<car_a2ui::A2uiEnvelope>,
736    }
737
738    let envelope = if req.params.get("createSurface").is_some()
739        || req.params.get("updateComponents").is_some()
740        || req.params.get("updateDataModel").is_some()
741        || req.params.get("deleteSurface").is_some()
742    {
743        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
744            .map_err(|e| e.to_string())?
745    } else {
746        match serde_json::from_value::<Params>(req.params.clone()) {
747            Ok(params) => params
748                .envelope
749                .or(params.message)
750                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
751            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
752                .map_err(|e| e.to_string())?,
753        }
754    };
755
756    apply_a2ui_envelope(state, envelope, None, None).await
757}
758
759async fn handle_a2ui_ingest(
760    req: &JsonRpcMessage,
761    state: &Arc<ServerState>,
762) -> Result<Value, String> {
763    #[derive(Deserialize)]
764    #[serde(rename_all = "camelCase")]
765    struct Params {
766        #[serde(default)]
767        endpoint: Option<String>,
768        #[serde(default)]
769        a2a_endpoint: Option<String>,
770        #[serde(default)]
771        owner: Option<car_a2ui::A2uiSurfaceOwner>,
772        #[serde(default)]
773        route_auth: Option<A2aRouteAuth>,
774        #[serde(default)]
775        allow_untrusted_endpoint: bool,
776    }
777
778    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
779        endpoint: None,
780        a2a_endpoint: None,
781        owner: None,
782        route_auth: None,
783        allow_untrusted_endpoint: false,
784    });
785    let payload = req.params.get("payload").unwrap_or(&req.params);
786    state
787        .a2ui
788        .validate_payload(payload)
789        .map_err(|e| e.to_string())?;
790    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
791    if envelopes.is_empty() {
792        return Err("no A2UI envelopes found in payload".into());
793    }
794    let endpoint = params.endpoint.or(params.a2a_endpoint);
795    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
796    let owner = params
797        .owner
798        .or_else(|| car_a2ui::owner_from_value(payload))
799        .map(|owner| match endpoint.clone() {
800            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
801            None => owner,
802        });
803
804    let mut results = Vec::new();
805    for envelope in envelopes {
806        let value =
807            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
808        results.push(value);
809    }
810    Ok(serde_json::json!({ "applied": results }))
811}
812
813async fn apply_a2ui_envelope(
814    state: &Arc<ServerState>,
815    envelope: car_a2ui::A2uiEnvelope,
816    owner: Option<car_a2ui::A2uiSurfaceOwner>,
817    route_auth: Option<A2aRouteAuth>,
818) -> Result<Value, String> {
819    let result = state
820        .a2ui
821        .apply_with_owner(envelope, owner)
822        .await
823        .map_err(|e| e.to_string())?;
824    update_a2ui_route_auth(state, &result, route_auth).await;
825    let kind = if result.deleted {
826        "a2ui.surface_deleted"
827    } else {
828        "a2ui.surface_updated"
829    };
830    let message = if result.deleted {
831        format!("A2UI surface {} deleted", result.surface_id)
832    } else {
833        format!("A2UI surface {} updated", result.surface_id)
834    };
835    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
836    state
837        .host
838        .record_event(kind, None, message, payload.clone())
839        .await;
840    // Push the envelope result to every WS subscriber as an
841    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
842    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
843    broadcast_a2ui_event(state, kind, &payload).await;
844    serde_json::to_value(result).map_err(|e| e.to_string())
845}
846
847async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
848    use futures::SinkExt;
849    use tokio_tungstenite::tungstenite::Message;
850    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
851        .a2ui_subscribers
852        .lock()
853        .await
854        .values()
855        .cloned()
856        .collect();
857    if subscribers.is_empty() {
858        return;
859    }
860    let Ok(json) = serde_json::to_string(&serde_json::json!({
861        "jsonrpc": "2.0",
862        "method": "a2ui.event",
863        "params": {
864            "kind": kind,
865            "result": result,
866        },
867    })) else {
868        return;
869    };
870    for channel in subscribers {
871        let _ = channel
872            .write
873            .lock()
874            .await
875            .send(Message::Text(json.clone().into()))
876            .await;
877    }
878}
879
880async fn update_a2ui_route_auth(
881    state: &Arc<ServerState>,
882    result: &car_a2ui::A2uiApplyResult,
883    route_auth: Option<A2aRouteAuth>,
884) {
885    let mut auth = state.a2ui_route_auth.lock().await;
886    if result.deleted {
887        auth.remove(&result.surface_id);
888        return;
889    }
890
891    let has_route_endpoint = result
892        .surface
893        .as_ref()
894        .and_then(|surface| surface.owner.as_ref())
895        .and_then(|owner| owner.endpoint.as_ref())
896        .is_some();
897    match (has_route_endpoint, route_auth) {
898        (true, Some(route_auth)) => {
899            auth.insert(result.surface_id.clone(), route_auth);
900        }
901        _ => {
902            auth.remove(&result.surface_id);
903        }
904    }
905}
906
907fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
908    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
909}
910
911async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
912    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
913    if !removed.is_empty() {
914        let mut auth = state.a2ui_route_auth.lock().await;
915        for surface_id in &removed {
916            auth.remove(surface_id);
917        }
918    }
919    Ok(serde_json::json!({ "removed": removed }))
920}
921
922async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
923    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
924}
925
926async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
927    let surface_id = req
928        .params
929        .get("surface_id")
930        .or_else(|| req.params.get("surfaceId"))
931        .and_then(Value::as_str)
932        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
933    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
934}
935
936/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
937/// notifications. Subscribers receive every `apply_a2ui_envelope`
938/// result for as long as they're connected; the cleanup hook in
939/// `run_dispatch` removes them on disconnect. Closes
940/// Parslee-ai/car-releases#29.
941async fn handle_a2ui_subscribe(
942    session: &crate::session::ClientSession,
943    state: &Arc<ServerState>,
944) -> Result<Value, String> {
945    state
946        .a2ui_subscribers
947        .lock()
948        .await
949        .insert(session.client_id.clone(), session.channel.clone());
950    Ok(serde_json::json!({ "subscribed": true }))
951}
952
953/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
954/// Idempotent: returns `{ subscribed: false }` regardless of prior
955/// state.
956async fn handle_a2ui_unsubscribe(
957    session: &crate::session::ClientSession,
958    state: &Arc<ServerState>,
959) -> Result<Value, String> {
960    state
961        .a2ui_subscribers
962        .lock()
963        .await
964        .remove(&session.client_id);
965    Ok(serde_json::json!({ "subscribed": false }))
966}
967
968/// `a2ui/replay` — fetch the current state of one surface. Intended
969/// for late joiners and reconnect: a client calls `subscribe`, then
970/// `replay` once per surface it's tracking, and from then on
971/// notifications keep it in sync. Equivalent to `a2ui.get` on the
972/// surface store; lives in the subscribe namespace for
973/// discoverability.
974async fn handle_a2ui_replay(
975    req: &JsonRpcMessage,
976    state: &Arc<ServerState>,
977) -> Result<Value, String> {
978    let surface_id = req
979        .params
980        .get("surface_id")
981        .or_else(|| req.params.get("surfaceId"))
982        .and_then(Value::as_str)
983        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
984    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
985}
986
987async fn handle_a2ui_action(
988    req: &JsonRpcMessage,
989    state: &Arc<ServerState>,
990) -> Result<Value, String> {
991    let action: car_a2ui::ClientAction =
992        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
993    let owner = state.a2ui.owner(&action.surface_id).await;
994    let route = route_a2ui_action(state, &action, owner.clone()).await;
995    let payload = serde_json::json!({
996        "action": action,
997        "owner": owner,
998        "route": route,
999    });
1000    let event = state
1001        .host
1002        .record_event(
1003            "a2ui.action",
1004            None,
1005            format!(
1006                "A2UI action {} from {}",
1007                action.name, action.source_component_id
1008            ),
1009            payload,
1010        )
1011        .await;
1012    Ok(serde_json::json!({
1013        "event": event,
1014        "route": route,
1015    }))
1016}
1017
1018/// `a2ui.render_report` — renderer-emitted telemetry envelope
1019/// (Parslee-ai/car#180). Fire-and-forget; we record an event in
1020/// the host log (so dev tools and the conversation log see it) and
1021/// broadcast as `a2ui.event { kind: "a2ui.render_report", result }`
1022/// to every WS subscriber. The improvement agent reads the report
1023/// and decides whether to issue a follow-up `patchComponents`.
1024async fn handle_a2ui_render_report(
1025    req: &JsonRpcMessage,
1026    state: &Arc<ServerState>,
1027) -> Result<Value, String> {
1028    // Parse into the typed struct to enforce the schema; we
1029    // re-serialize for the event/broadcast payload so downstream
1030    // consumers don't have to defensively re-validate.
1031    let report: car_a2ui::RenderReport =
1032        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1033    let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1034    let kind = "a2ui.render_report";
1035    let message = format!("A2UI render report for surface {}", report.surface_id);
1036    let event = state
1037        .host
1038        .record_event(kind, None, message, payload.clone())
1039        .await;
1040    broadcast_a2ui_event(state, kind, &payload).await;
1041
1042    // Hand the report to the in-process UI-improvement agent. The
1043    // agent is sync but cheap; we await the surface lookup before
1044    // calling it so the strategies see the same surface state the
1045    // renderer saw at report-emit time. Best-effort: surface lookup
1046    // misses (the surface might have been deleted between
1047    // report-emit and report-receive) are logged and skipped, never
1048    // surfaced as JSON-RPC errors — render_report is fire-and-forget.
1049    if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1050        // Iteration budget — runaway-loop backstop. try_consume
1051        // claims the slot atomically; if the surface is already at
1052        // the cap, we short-circuit before consulting the agent.
1053        // The slot stays consumed only if the patch actually
1054        // applies — failure paths refund.
1055        if !state.ui_agent_budget.try_consume(&report.surface_id) {
1056            tracing::warn!(
1057                surface_id = %report.surface_id,
1058                count = state.ui_agent_budget.count(&report.surface_id),
1059                max = state.ui_agent_budget.max(),
1060                "ui-agent iteration budget exhausted; skipping agent invocation"
1061            );
1062            return Ok(serde_json::json!({ "event": event }));
1063        }
1064        // From here on, every non-applied branch must `refund` the
1065        // slot we just claimed. Only the successful-apply branch
1066        // keeps it consumed.
1067        match state.ui_agent.on_render_report(&report, &surface) {
1068            car_ui_agent::Decision::Patch {
1069                envelope,
1070                strategy_id,
1071                patch_hash,
1072                elapsed_ns,
1073            } => {
1074                // Runtime-side convergence monitor — neo's deferred
1075                // ask. The agent's no-double-patch guard catches
1076                // same-sequence repeats; this catches A→B→A across
1077                // sequences by tracking recent patch hashes per
1078                // surface. When a proposal would repeat one in the
1079                // window, drop it; the loop waits for the next
1080                // signature change.
1081                if !state
1082                    .ui_agent_oscillation
1083                    .check_and_record(&report.surface_id, patch_hash)
1084                {
1085                    tracing::warn!(
1086                        surface_id = %report.surface_id,
1087                        strategy = %strategy_id,
1088                        patch_hash,
1089                        "ui-agent oscillation detected; suppressing patch"
1090                    );
1091                    // Suppressed patch never applied → release the
1092                    // budget slot we claimed above.
1093                    state.ui_agent_budget.refund(&report.surface_id);
1094                    return Ok(serde_json::json!({ "event": event }));
1095                }
1096                let a2ui_envelope = car_a2ui::A2uiEnvelope {
1097                    patch_components: Some(envelope),
1098                    ..Default::default()
1099                };
1100                if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1101                    tracing::warn!(
1102                        surface_id = %report.surface_id,
1103                        strategy = %strategy_id,
1104                        patch_hash,
1105                        elapsed_ns,
1106                        error = %e,
1107                        "ui-agent patch apply failed",
1108                    );
1109                    // Apply failed → release the budget slot.
1110                    state.ui_agent_budget.refund(&report.surface_id);
1111                } else {
1112                    tracing::debug!(
1113                        surface_id = %report.surface_id,
1114                        strategy = %strategy_id,
1115                        patch_hash,
1116                        elapsed_ns,
1117                        iteration = state.ui_agent_budget.count(&report.surface_id),
1118                        "ui-agent patch applied",
1119                    );
1120                    // Memgine trace: one Conversation node per
1121                    // successful patch, tagged "ui-agent/<surface>".
1122                    // Spreading activation can surface these on
1123                    // future renders of related surfaces. Spawned
1124                    // off the render-report hot path — ingest walks
1125                    // the graph for spreading activation and can
1126                    // take real time; we don't want two surfaces
1127                    // churning patches to serialize through the
1128                    // memgine mutex behind the WS handler.
1129                    if let Some(memgine) = state.shared_memgine.clone() {
1130                        let speaker = format!("ui-agent/{}", report.surface_id);
1131                        let text = format!("strategy applied: {}", strategy_id);
1132                        tokio::spawn(async move {
1133                            let mut guard = memgine.lock().await;
1134                            guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1135                        });
1136                    }
1137                }
1138            }
1139            car_ui_agent::Decision::StableNoChange => {
1140                // No patch this round → release the slot.
1141                state.ui_agent_budget.refund(&report.surface_id);
1142            }
1143            car_ui_agent::Decision::HardStop { reason } => {
1144                state.ui_agent_budget.refund(&report.surface_id);
1145                // Renderer painted unknown components — contract
1146                // violation between server and renderer. Per
1147                // types.rs docs: "loud failure" + "MUST pause."
1148                // `error!` not `warn!` so it surfaces in production
1149                // logs at the right severity.
1150                tracing::error!(
1151                    surface_id = %report.surface_id,
1152                    reason = %reason,
1153                    "ui-agent hard-stopped improvement loop",
1154                );
1155            }
1156        }
1157    } else {
1158        tracing::debug!(
1159            surface_id = %report.surface_id,
1160            "ui-agent skipped — surface not found in store",
1161        );
1162    }
1163
1164    Ok(serde_json::json!({ "event": event }))
1165}
1166
1167async fn route_a2ui_action(
1168    state: &Arc<ServerState>,
1169    action: &car_a2ui::ClientAction,
1170    owner: Option<car_a2ui::A2uiSurfaceOwner>,
1171) -> Value {
1172    let Some(owner) = owner else {
1173        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1174    };
1175    if owner.kind != "a2a" {
1176        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1177    }
1178    let Some(endpoint) = owner.endpoint.clone() else {
1179        return serde_json::json!({
1180            "delivered": false,
1181            "reason": "surface owner has no endpoint",
1182            "owner": owner
1183        });
1184    };
1185
1186    let message = car_a2a::Message {
1187        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1188        role: car_a2a::MessageRole::User,
1189        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1190            data: serde_json::json!({
1191                "a2uiAction": action,
1192            }),
1193            metadata: Default::default(),
1194        })],
1195        task_id: owner.task_id.clone(),
1196        context_id: owner.context_id.clone(),
1197        metadata: Default::default(),
1198    };
1199
1200    let auth = state
1201        .a2ui_route_auth
1202        .lock()
1203        .await
1204        .get(&action.surface_id)
1205        .cloned()
1206        .map(client_auth_from_route_auth)
1207        .unwrap_or(car_a2a::ClientAuth::None);
1208
1209    match car_a2a::A2aClient::new(endpoint.clone())
1210        .with_auth(auth)
1211        .send_message(message, false)
1212        .await
1213    {
1214        Ok(result) => serde_json::json!({
1215            "delivered": true,
1216            "owner": owner,
1217            "endpoint": endpoint,
1218            "result": result,
1219        }),
1220        Err(error) => serde_json::json!({
1221            "delivered": false,
1222            "owner": owner,
1223            "endpoint": endpoint,
1224            "error": error.to_string(),
1225        }),
1226    }
1227}
1228
1229fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1230    match auth {
1231        A2aRouteAuth::None => car_a2a::ClientAuth::None,
1232        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1233        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1234    }
1235}
1236
1237fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1238    let endpoint = endpoint?;
1239    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1240        Some(endpoint)
1241    } else {
1242        None
1243    }
1244}
1245
1246fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1247    endpoint == "http://localhost"
1248        || endpoint.starts_with("http://localhost:")
1249        || endpoint.starts_with("http://localhost/")
1250        || endpoint == "http://127.0.0.1"
1251        || endpoint.starts_with("http://127.0.0.1:")
1252        || endpoint.starts_with("http://127.0.0.1/")
1253        || endpoint == "http://[::1]"
1254        || endpoint.starts_with("http://[::1]:")
1255        || endpoint.starts_with("http://[::1]/")
1256}
1257
1258async fn handle_host_register_agent(
1259    req: &JsonRpcMessage,
1260    session: &crate::session::ClientSession,
1261) -> Result<Value, String> {
1262    let request: RegisterHostAgentRequest =
1263        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1264    serde_json::to_value(
1265        session
1266            .host
1267            .register_agent(&session.client_id, request)
1268            .await?,
1269    )
1270    .map_err(|e| e.to_string())
1271}
1272
1273async fn handle_host_unregister_agent(
1274    req: &JsonRpcMessage,
1275    session: &crate::session::ClientSession,
1276) -> Result<Value, String> {
1277    let agent_id = req
1278        .params
1279        .get("agent_id")
1280        .and_then(|v| v.as_str())
1281        .ok_or("missing agent_id")?;
1282    session
1283        .host
1284        .unregister_agent(&session.client_id, agent_id)
1285        .await?;
1286    Ok(serde_json::json!({"ok": true}))
1287}
1288
1289async fn handle_host_set_status(
1290    req: &JsonRpcMessage,
1291    session: &crate::session::ClientSession,
1292) -> Result<Value, String> {
1293    let request: SetHostAgentStatusRequest =
1294        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1295    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1296        .map_err(|e| e.to_string())
1297}
1298
1299async fn handle_host_notify(
1300    req: &JsonRpcMessage,
1301    session: &crate::session::ClientSession,
1302) -> Result<Value, String> {
1303    let kind = req
1304        .params
1305        .get("kind")
1306        .and_then(|v| v.as_str())
1307        .unwrap_or("host.notification");
1308    let agent_id = req
1309        .params
1310        .get("agent_id")
1311        .and_then(|v| v.as_str())
1312        .map(str::to_string);
1313    let message = req
1314        .params
1315        .get("message")
1316        .and_then(|v| v.as_str())
1317        .unwrap_or("");
1318    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1319    serde_json::to_value(
1320        session
1321            .host
1322            .record_event(kind, agent_id, message, payload)
1323            .await,
1324    )
1325    .map_err(|e| e.to_string())
1326}
1327
1328async fn handle_host_request_approval(
1329    req: &JsonRpcMessage,
1330    session: &crate::session::ClientSession,
1331) -> Result<Value, String> {
1332    let request: CreateHostApprovalRequest =
1333        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1334    if let Some(agent_id) = &request.agent_id {
1335        // Best-effort. If the caller doesn't own this agent the
1336        // ACL added 2026-05 will refuse the status update — that
1337        // is the correct semantics; we still want the approval row
1338        // itself to land so the UI can render the request.
1339        let _ = session
1340            .host
1341            .set_status(
1342                &session.client_id,
1343                SetHostAgentStatusRequest {
1344                    agent_id: agent_id.clone(),
1345                    status: HostAgentStatus::WaitingForApproval,
1346                    current_task: None,
1347                    message: Some("Waiting for approval".to_string()),
1348                    payload: Value::Null,
1349                },
1350            )
1351            .await;
1352    }
1353    // `system_level: true` opts the approval out of per-session
1354    // ownership. The host-side ACL then allows any authenticated
1355    // session (typically CarHost or `car-host approve`) to resolve.
1356    // Agents requesting user approval should always set this — the
1357    // session-owned mode is only correct when the requesting session
1358    // is also the resolving session, which approval-via-UI never is.
1359    let owner_client_id = if request.system_level {
1360        None
1361    } else {
1362        Some(session.client_id.as_str())
1363    };
1364    serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1365        .map_err(|e| e.to_string())
1366}
1367
1368async fn handle_host_resolve_approval(
1369    req: &JsonRpcMessage,
1370    session: &crate::session::ClientSession,
1371) -> Result<Value, String> {
1372    let request: ResolveHostApprovalRequest =
1373        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1374    serde_json::to_value(
1375        session
1376            .host
1377            .resolve_approval(&session.client_id, request)
1378            .await?,
1379    )
1380    .map_err(|e| e.to_string())
1381}
1382
1383/// `session.auth` — present the per-launch token to unlock the
1384/// connection. When `state.auth_token` is unset, this method is a
1385/// no-op success (auth is disabled). When set, the supplied token
1386/// must equal it (constant-time comparison) — a successful auth
1387/// flips `session.authenticated` to `true` so subsequent methods
1388/// pass the gate. Wrong token returns an error AND leaves the
1389/// session unauthenticated; the dispatcher loop's gate then closes
1390/// the connection on the next non-auth method.
1391///
1392/// Closes Parslee-ai/car-releases#32.
1393async fn handle_session_auth(
1394    req: &JsonRpcMessage,
1395    session: &crate::session::ClientSession,
1396    state: &Arc<ServerState>,
1397) -> Result<Value, String> {
1398    let supplied = req
1399        .params
1400        .get("token")
1401        .and_then(Value::as_str)
1402        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1403    // #169: optional `agent_id` binds the WS connection to a
1404    // supervised lifecycle agent. When present, the supplied token
1405    // must equal the per-agent token the supervisor minted at upsert
1406    // (NOT the daemon-wide auth token). When absent, fall back to
1407    // the daemon-wide token — preserves the legacy unbound-token
1408    // path for browser/host/CLI clients.
1409    let agent_id = req
1410        .params
1411        .get("agent_id")
1412        .and_then(Value::as_str)
1413        .map(str::to_string);
1414
1415    if let Some(id) = agent_id {
1416        let supervisor = state.supervisor()?;
1417        if !supervisor.validate_agent_token(&id, supplied).await {
1418            return Err(format!(
1419                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1420            ));
1421        }
1422        // Single-claim: only one connection at a time per
1423        // agent_id. A second claim is rejected so the daemon-side
1424        // per-agent state stays unambiguous.
1425        {
1426            let mut attached = state.attached_agents.lock().await;
1427            if let Some(prior) = attached.get(&id) {
1428                if prior != &session.client_id {
1429                    return Err(format!(
1430                        "auth failed: agent_id `{id}` is already attached on \
1431                         another connection (client_id={prior})"
1432                    ));
1433                }
1434            }
1435            attached.insert(id.clone(), session.client_id.clone());
1436        }
1437        // #170: attach the daemon-owned persistent memgine for
1438        // this agent. Lazy-loaded on first connection per id from
1439        // `~/.car/memory/agents/<id>.jsonl`; retained across
1440        // disconnect so the next session sees the same state.
1441        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1442        *session.bound_memgine.lock().await = Some(agent_eng);
1443        *session.agent_id.lock().await = Some(id.clone());
1444        session
1445            .authenticated
1446            .store(true, std::sync::atomic::Ordering::Release);
1447        return Ok(serde_json::json!({
1448            "ok": true,
1449            "auth_enabled": true,
1450            "agent_id": id,
1451        }));
1452    }
1453
1454    let expected = match state.auth_token.get() {
1455        Some(t) => t,
1456        None => {
1457            // Auth disabled — accept any token politely so callers
1458            // that always include a session.auth handshake (e.g. the
1459            // FFI proxy) don't fail when the daemon happens to be
1460            // unauthed. Mark the session authenticated anyway so the
1461            // gate is a no-op below.
1462            session
1463                .authenticated
1464                .store(true, std::sync::atomic::Ordering::Release);
1465            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1466        }
1467    };
1468    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1469        return Err("auth failed: token mismatch".to_string());
1470    }
1471    session
1472        .authenticated
1473        .store(true, std::sync::atomic::Ordering::Release);
1474    Ok(serde_json::json!({
1475        "ok": true,
1476        "auth_enabled": true,
1477        "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1478    }))
1479}
1480
1481/// Length-checked constant-time byte comparison. Returns false when
1482/// lengths differ (so length itself is the only timing leak — fine
1483/// for our 43-char fixed-length tokens).
1484fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1485    if a.len() != b.len() {
1486        return false;
1487    }
1488    let mut diff: u8 = 0;
1489    for (x, y) in a.iter().zip(b.iter()) {
1490        diff |= x ^ y;
1491    }
1492    diff == 0
1493}
1494
1495/// Block dispatch of `method` until the user resolves the approval
1496/// raised on [`HostState`].
1497///
1498/// Called from the dispatcher loop only when
1499/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1500/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1501/// to the caller as JSON-RPC error code `-32003` with the supplied
1502/// reason. On timeout, the approval row stays in `Pending` so the
1503/// UI keeps a record of the unanswered request.
1504async fn gate_high_risk_method(
1505    method: &str,
1506    params: &Value,
1507    state: &Arc<ServerState>,
1508) -> Result<(), String> {
1509    let timeout = state.approval_gate.timeout;
1510    let req = CreateHostApprovalRequest {
1511        agent_id: None,
1512        action: format!("ws.method:{method}"),
1513        details: serde_json::json!({
1514            "method": method,
1515            // Truncate params for the UI — full payload is recoverable
1516            // via the request-time host event log if needed. The cap
1517            // keeps a malicious caller from drowning the UI in JSON.
1518            "params_preview": preview_params(params, 2_000),
1519        }),
1520        options: vec!["approve".to_string(), "deny".to_string()],
1521        // The high-risk-method gate is already system-level (it
1522        // passes None as the owner via request_and_wait_approval's
1523        // internal call). This field is informational here.
1524        system_level: true,
1525    };
1526    match state
1527        .host
1528        .request_and_wait_approval(req, "approve", timeout)
1529        .await
1530    {
1531        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1532        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1533            "{method} denied by user (approval gate, audit 2026-05). \
1534             To call this method without an interactive prompt, start \
1535             car-server with --no-approvals on a trusted machine."
1536        )),
1537        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1538            "{method} approval timed out after {}s with no resolution. \
1539             The approval is still visible in `host.approvals` for \
1540             forensics; resubmit the request to retry.",
1541            timeout.as_secs()
1542        )),
1543        Err(e) => Err(format!("approval gate error: {e}")),
1544    }
1545}
1546
1547fn preview_params(value: &Value, max_chars: usize) -> Value {
1548    let s = value.to_string();
1549    if s.len() <= max_chars {
1550        value.clone()
1551    } else {
1552        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1553    }
1554}
1555
1556async fn handle_session_init(
1557    req: &JsonRpcMessage,
1558    session: &crate::session::ClientSession,
1559) -> Result<Value, String> {
1560    let init: SessionInitRequest =
1561        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1562
1563    for tool in &init.tools {
1564        register_from_definition(&session.runtime, tool).await;
1565    }
1566
1567    let mut policy_count = 0;
1568    {
1569        let mut policies = session.runtime.policies.write().await;
1570        for policy_def in &init.policies {
1571            if let Some(check) = build_policy_check(policy_def) {
1572                policies.register(&policy_def.name, check, "");
1573                policy_count += 1;
1574            }
1575        }
1576    }
1577
1578    serde_json::to_value(SessionInitResponse {
1579        session_id: session.client_id.clone(),
1580        tools_registered: init.tools.len(),
1581        policies_registered: policy_count,
1582    })
1583    .map_err(|e| e.to_string())
1584}
1585
1586fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1587    match def.rule.as_str() {
1588        "deny_tool" => {
1589            let target = def.target.clone();
1590            Some(Box::new(
1591                move |action: &car_ir::Action, _: &car_state::StateStore| {
1592                    if action.tool.as_deref() == Some(&target) {
1593                        Some(format!("tool '{}' denied", target))
1594                    } else {
1595                        None
1596                    }
1597                },
1598            ))
1599        }
1600        "require_state" => {
1601            let key = def.key.clone();
1602            let value = def.value.clone();
1603            Some(Box::new(
1604                move |_: &car_ir::Action, state: &car_state::StateStore| {
1605                    if state.get(&key).as_ref() != Some(&value) {
1606                        Some(format!("state['{}'] must be {:?}", key, value))
1607                    } else {
1608                        None
1609                    }
1610                },
1611            ))
1612        }
1613        "deny_tool_param" => {
1614            let target = def.target.clone();
1615            let param = def.key.clone();
1616            let pattern = def.pattern.clone();
1617            Some(Box::new(
1618                move |action: &car_ir::Action, _: &car_state::StateStore| {
1619                    if action.tool.as_deref() != Some(&target) {
1620                        return None;
1621                    }
1622                    if let Some(val) = action.parameters.get(&param) {
1623                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1624                        if s.contains(&pattern) {
1625                            return Some(format!("param '{}' matches '{}'", param, pattern));
1626                        }
1627                    }
1628                    None
1629                },
1630            ))
1631        }
1632        _ => None,
1633    }
1634}
1635
1636async fn handle_tools_register(
1637    req: &JsonRpcMessage,
1638    session: &crate::session::ClientSession,
1639) -> Result<Value, String> {
1640    let tools: Vec<ToolDefinition> =
1641        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1642    for tool in &tools {
1643        register_from_definition(&session.runtime, tool).await;
1644    }
1645    Ok(Value::from(tools.len()))
1646}
1647
1648/// Bridge a wire-protocol `ToolDefinition` to the engine's
1649/// schema-aware registration. Carries the full ToolSchema shape
1650/// (description, parameters, returns, idempotency, caching, rate
1651/// limit) through to the validator. An empty `parameters` object is
1652/// the legacy schemaless registration — the validator no-ops for
1653/// those, so pre-v0.5.x callers see no change.
1654async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1655    runtime
1656        .register_tool_schema(car_ir::ToolSchema {
1657            name: def.name.clone(),
1658            description: def.description.clone(),
1659            parameters: def.parameters.clone(),
1660            returns: def.returns.clone(),
1661            idempotent: def.idempotent,
1662            cache_ttl_secs: def.cache_ttl_secs,
1663            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1664                max_calls: rl.max_calls,
1665                interval_secs: rl.interval_secs,
1666            }),
1667        })
1668        .await;
1669}
1670
1671async fn handle_proposal_submit(
1672    req: &JsonRpcMessage,
1673    session: &crate::session::ClientSession,
1674) -> Result<Value, String> {
1675    let submit: ProposalSubmitRequest =
1676        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1677    // `session_id` is sibling to `proposal` in the params object —
1678    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1679    // present, executes the proposal under the named session so any
1680    // session-scoped policies layer on top of global ones.
1681    // See docs/proposals/per-session-policy-scoping.md.
1682    let session_id = req
1683        .params
1684        .get("session_id")
1685        .and_then(|v| v.as_str())
1686        .map(str::to_string);
1687
1688    // `scope` is the optional per-execution caller / tenant surface
1689    // added in Parslee-ai/car#187 phase 3. Shape mirrors
1690    // `car_engine::RuntimeScope` ({ callerId, tenantId, claims });
1691    // serde's camelCase rename keeps the wire form consistent with
1692    // the rest of the JSON-RPC surface. When present, the proposal
1693    // routes through `execute_scoped*` so the runtime records the
1694    // identity on the event log and state R/W ops apply the tenant
1695    // prefix (#187 phase 3-B).
1696    let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1697        Some(v) if !v.is_null() => {
1698            Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1699        }
1700        _ => None,
1701    };
1702
1703    let result = match (session_id, scope) {
1704        // session_id + scope: no single combined entry point on
1705        // Runtime today. Scope takes precedence because it's the
1706        // tenant-isolation surface; per-session policies layer in
1707        // a follow-up when there's a real caller asking for both.
1708        (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1709        (Some(sid), None) => {
1710            session
1711                .runtime
1712                .execute_with_session(&submit.proposal, &sid)
1713                .await
1714        }
1715        (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1716        (None, None) => session.runtime.execute(&submit.proposal).await,
1717    };
1718    serde_json::to_value(result).map_err(|e| e.to_string())
1719}
1720
1721async fn handle_session_policy_open(
1722    session: &crate::session::ClientSession,
1723) -> Result<Value, String> {
1724    let id = session.runtime.open_session().await;
1725    Ok(serde_json::json!({ "session_id": id }))
1726}
1727
1728async fn handle_session_policy_close(
1729    req: &JsonRpcMessage,
1730    session: &crate::session::ClientSession,
1731) -> Result<Value, String> {
1732    let sid = req
1733        .params
1734        .get("session_id")
1735        .and_then(|v| v.as_str())
1736        .ok_or("missing 'session_id'")?;
1737    let closed = session.runtime.close_session(sid).await;
1738    Ok(serde_json::json!({ "closed": closed }))
1739}
1740
1741/// `policy.register` — register one policy against this WebSocket
1742/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1743/// `session.init`. When `session_id` is present, the policy is scoped
1744/// to the named in-runtime session opened via `session.policy.open`;
1745/// otherwise it is global.
1746async fn handle_policy_register(
1747    req: &JsonRpcMessage,
1748    session: &crate::session::ClientSession,
1749) -> Result<Value, String> {
1750    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1751        .map_err(|e| format!("invalid policy params: {e}"))?;
1752    let session_id = req
1753        .params
1754        .get("session_id")
1755        .and_then(|v| v.as_str())
1756        .map(str::to_string);
1757    let check = build_policy_check(&def)
1758        .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1759    match session_id {
1760        Some(sid) => session
1761            .runtime
1762            .register_policy_in_session(&sid, &def.name, check, "")
1763            .await
1764            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1765        None => {
1766            let mut policies = session.runtime.policies.write().await;
1767            policies.register(&def.name, check, "");
1768            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1769        }
1770    }
1771}
1772
1773async fn handle_verify(
1774    req: &JsonRpcMessage,
1775    session: &crate::session::ClientSession,
1776) -> Result<Value, String> {
1777    let vr: VerifyRequest =
1778        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1779    let tools: std::collections::HashSet<String> =
1780        session.runtime.tools.read().await.keys().cloned().collect();
1781    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1782    serde_json::to_value(VerifyResponse {
1783        valid: result.valid,
1784        issues: result
1785            .issues
1786            .iter()
1787            .map(|i| VerifyIssueProto {
1788                action_id: i.action_id.clone(),
1789                severity: i.severity.clone(),
1790                message: i.message.clone(),
1791            })
1792            .collect(),
1793        simulated_state: result.simulated_state,
1794    })
1795    .map_err(|e| e.to_string())
1796}
1797
1798/// Parse the optional `tenant_id` sibling field from JSON-RPC
1799/// params (Parslee-ai/car#187 phase 3-E). When set and non-empty,
1800/// state R/W routes through `StateStore::scoped(tenant_id)` so
1801/// distinct tenants can't see each other's keys over the WS
1802/// surface — symmetric to the proposal.submit scope plumbing.
1803/// When absent / empty, the legacy unscoped namespace applies.
1804fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1805    req.params
1806        .get("tenant_id")
1807        .and_then(|v| v.as_str())
1808        .filter(|s| !s.is_empty())
1809        .map(str::to_string)
1810}
1811
1812async fn handle_state_get(
1813    req: &JsonRpcMessage,
1814    session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816    let key = req
1817        .params
1818        .get("key")
1819        .and_then(|v| v.as_str())
1820        .ok_or("missing 'key'")?;
1821    let tenant = tenant_from_params(req);
1822    Ok(session
1823        .runtime
1824        .state
1825        .scoped(tenant.as_deref())
1826        .get(key)
1827        .unwrap_or(Value::Null))
1828}
1829
1830async fn handle_state_set(
1831    req: &JsonRpcMessage,
1832    session: &crate::session::ClientSession,
1833) -> Result<Value, String> {
1834    let key = req
1835        .params
1836        .get("key")
1837        .and_then(|v| v.as_str())
1838        .ok_or("missing 'key'")?;
1839    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1840    let tenant = tenant_from_params(req);
1841    session
1842        .runtime
1843        .state
1844        .scoped(tenant.as_deref())
1845        .set(key, value, "client");
1846    Ok(Value::from("ok"))
1847}
1848
1849/// `state.exists` — true if the key is set in this session's state
1850/// store, false otherwise. Cheaper than `state.get` + null-check on
1851/// the client side because it doesn't serialize the value.
1852async fn handle_state_exists(
1853    req: &JsonRpcMessage,
1854    session: &crate::session::ClientSession,
1855) -> Result<Value, String> {
1856    let key = req
1857        .params
1858        .get("key")
1859        .and_then(|v| v.as_str())
1860        .ok_or("missing 'key'")?;
1861    let tenant = tenant_from_params(req);
1862    Ok(Value::Bool(
1863        session.runtime.state.scoped(tenant.as_deref()).exists(key),
1864    ))
1865}
1866
1867/// `state.keys` — list every key currently set in this session's
1868/// state store. Returns a JSON array of strings.
1869async fn handle_state_keys(
1870    req: &JsonRpcMessage,
1871    session: &crate::session::ClientSession,
1872) -> Result<Value, String> {
1873    let tenant = tenant_from_params(req);
1874    Ok(Value::Array(
1875        session
1876            .runtime
1877            .state
1878            .scoped(tenant.as_deref())
1879            .keys()
1880            .into_iter()
1881            .map(Value::String)
1882            .collect(),
1883    ))
1884}
1885
1886/// `state.snapshot` — return the entire session state store as a
1887/// JSON object (`{ key: value, ... }`). Equivalent to iterating
1888/// `state.keys` + `state.get` but in a single round-trip; for
1889/// inspectors/dashboards.
1890///
1891/// Tenant-scoped variant: when `tenant_id` is set, only that
1892/// tenant's keys are returned (prefix stripped on the way out).
1893/// `state.snapshot` with no `tenant_id` returns only unscoped
1894/// keys; consistent with `state.keys`'s filter behaviour and the
1895/// strict-isolation contract from phase 3-B.
1896async fn handle_state_snapshot(
1897    req: &JsonRpcMessage,
1898    session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900    let tenant = tenant_from_params(req);
1901    let view = session.runtime.state.scoped(tenant.as_deref());
1902    let mut map = serde_json::Map::new();
1903    for key in view.keys() {
1904        if let Some(value) = view.get(&key) {
1905            map.insert(key, value);
1906        }
1907    }
1908    Ok(Value::Object(map))
1909}
1910
1911// --- Per-agent persistent memgine (#170) ---
1912
1913/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
1914/// Mirrors the existing `memory.persist` shape (flat JSON array of
1915/// fact objects) so the same loader path works.
1916fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1917    let base = car_ffi_common::memory_path::ensure_base()
1918        .map_err(|e| format!("memory base unavailable: {e}"))?;
1919    let dir = base.join("agents");
1920    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1921    Ok(dir.join(format!("{agent_id}.json")))
1922}
1923
1924/// Acquire (or lazy-create + load from disk) the daemon-owned
1925/// persistent memgine for `agent_id`. First call per id reads
1926/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
1927/// share the in-memory engine across sessions. Caller stores the
1928/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
1929/// handlers route through it via [`ClientSession::effective_memgine`].
1930async fn get_or_load_agent_memgine(
1931    state: &Arc<ServerState>,
1932    agent_id: &str,
1933) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1934    {
1935        let map = state.agent_memgines.lock().await;
1936        if let Some(eng) = map.get(agent_id) {
1937            return Ok(eng.clone());
1938        }
1939    }
1940    // Build a fresh engine and try to load from disk.
1941    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
1942        None,
1943    )));
1944    let path = agent_memgine_snapshot_path(agent_id)?;
1945    if path.exists() {
1946        let content = std::fs::read_to_string(&path)
1947            .map_err(|e| format!("read {}: {}", path.display(), e))?;
1948        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1949        let mut g = engine.lock().await;
1950        let mut loaded: u32 = 0;
1951        for fact in &facts {
1952            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1953            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1954            let kind = fact
1955                .get("kind")
1956                .and_then(|v| v.as_str())
1957                .unwrap_or("pattern");
1958            let fid = format!("loaded-{loaded}");
1959            g.ingest_fact(
1960                &fid,
1961                subject,
1962                body,
1963                "user",
1964                "peer",
1965                chrono::Utc::now(),
1966                "global",
1967                None,
1968                vec![],
1969                kind == "constraint",
1970            );
1971            loaded += 1;
1972        }
1973    }
1974    let mut map = state.agent_memgines.lock().await;
1975    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1976    Ok(stored)
1977}
1978
1979/// Snapshot the agent's memgine to its disk file. Same on-wire shape
1980/// as `memory.persist` so manual snapshots and the daemon-owned
1981/// persistence stay interoperable.
1982async fn persist_agent_memgine(
1983    agent_id: &str,
1984    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1985) -> Result<(), String> {
1986    let path = agent_memgine_snapshot_path(agent_id)?;
1987    let g = engine.lock().await;
1988    let facts: Vec<Value> = g
1989        .graph
1990        .inner
1991        .node_indices()
1992        .filter_map(|nix| {
1993            let node = g.graph.inner.node_weight(nix)?;
1994            if !node.is_valid() {
1995                return None;
1996            }
1997            if node.kind == car_memgine::MemKind::Identity
1998                || node.kind == car_memgine::MemKind::Environment
1999            {
2000                return None;
2001            }
2002            Some(serde_json::json!({
2003                "subject": node.key,
2004                "body": node.value,
2005                "kind": match node.kind {
2006                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2007                    car_memgine::MemKind::Conversation => "outcome",
2008                    _ => "pattern",
2009                },
2010                "confidence": 0.5,
2011                "content_type": node.content_type.as_label(),
2012            }))
2013        })
2014        .collect();
2015    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2016    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2017    Ok(())
2018}
2019
2020// --- Memory handlers ---
2021
2022/// `memory.fact_count` — return `valid_fact_count()` of the
2023/// session's memgine. Used by FFI bindings to mirror their
2024/// embedded `fact_count()` accessor without round-tripping a full
2025/// query. No params.
2026async fn handle_memory_fact_count(
2027    session: &crate::session::ClientSession,
2028) -> Result<Value, String> {
2029    let engine_arc = session.effective_memgine().await;
2030    let engine = engine_arc.lock().await;
2031    Ok(Value::from(engine.valid_fact_count()))
2032}
2033
2034async fn handle_memory_add_fact(
2035    req: &JsonRpcMessage,
2036    session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038    let subject = req
2039        .params
2040        .get("subject")
2041        .and_then(|v| v.as_str())
2042        .ok_or("missing subject")?;
2043    let body = req
2044        .params
2045        .get("body")
2046        .and_then(|v| v.as_str())
2047        .ok_or("missing body")?;
2048    let kind = req
2049        .params
2050        .get("kind")
2051        .and_then(|v| v.as_str())
2052        .unwrap_or("pattern");
2053    // Route through `effective_memgine` so connections bound to a
2054    // lifecycle agent (#169) write into the daemon-owned per-agent
2055    // memgine instead of the per-WS ephemeral one (#170).
2056    let engine_arc = session.effective_memgine().await;
2057    let count = {
2058        let mut engine = engine_arc.lock().await;
2059        let fid = format!("ws-{}", engine.valid_fact_count());
2060        engine.ingest_fact(
2061            &fid,
2062            subject,
2063            body,
2064            "user",
2065            "peer",
2066            chrono::Utc::now(),
2067            "global",
2068            None,
2069            vec![],
2070            kind == "constraint",
2071        );
2072        engine.valid_fact_count()
2073    };
2074    // Persist after every add when the session is bound to a
2075    // supervised agent. Synchronous write — small JSON snapshot.
2076    if let Some(id) = session.agent_id.lock().await.clone() {
2077        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2078            tracing::warn!(agent_id = %id, error = %e,
2079                "agent memgine persist failed; in-memory state is canonical");
2080        }
2081    }
2082    Ok(Value::from(count))
2083}
2084
2085async fn handle_memory_query(
2086    req: &JsonRpcMessage,
2087    session: &crate::session::ClientSession,
2088) -> Result<Value, String> {
2089    let query = req
2090        .params
2091        .get("query")
2092        .and_then(|v| v.as_str())
2093        .ok_or("missing query")?;
2094    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2095    let engine_arc = session.effective_memgine().await;
2096    let engine = engine_arc.lock().await;
2097    let seeds = engine.graph.find_seeds(query, 5);
2098    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
2099    // both use Personalized PageRank so transport choice doesn't shift
2100    // ranking semantics. Result shape (subject/body/kind/confidence)
2101    // also mirrors NAPI for the same reason.
2102    let hits = if !seeds.is_empty() {
2103        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2104    } else {
2105        vec![]
2106    };
2107    let results: Vec<Value> = hits
2108        .iter()
2109        .filter_map(|hit| {
2110            let node = engine.graph.inner.node_weight(hit.node_ix)?;
2111            Some(serde_json::json!({
2112                "subject": node.key,
2113                "body": node.value,
2114                "kind": format!("{:?}", node.kind).to_lowercase(),
2115                "confidence": hit.activation,
2116            }))
2117        })
2118        .collect();
2119    serde_json::to_value(results).map_err(|e| e.to_string())
2120}
2121
2122async fn handle_memory_build_context(
2123    req: &JsonRpcMessage,
2124    session: &crate::session::ClientSession,
2125) -> Result<Value, String> {
2126    let query = req
2127        .params
2128        .get("query")
2129        .and_then(|v| v.as_str())
2130        .unwrap_or("");
2131    // FFI parity with NAPI `build_context(query, model_context_window)`.
2132    // When supplied, sizes the assembly budget against the model's window
2133    // instead of the fixed 8K default.
2134    let model_context_window = req
2135        .params
2136        .get("model_context_window")
2137        .and_then(|v| v.as_u64())
2138        .map(|w| w as usize);
2139    let mut engine = session.memgine.lock().await;
2140    Ok(Value::from(
2141        engine.build_context_for_model(query, model_context_window),
2142    ))
2143}
2144
2145/// `memory.build_context_fast` — Fast-mode context assembly for
2146/// latency-sensitive paths (voice, real-time). Skips embedding flush,
2147/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
2148/// extraction. Keeps identity, constraints, facts (creation order),
2149/// conversation, environment.
2150async fn handle_memory_build_context_fast(
2151    req: &JsonRpcMessage,
2152    session: &crate::session::ClientSession,
2153) -> Result<Value, String> {
2154    let query = req
2155        .params
2156        .get("query")
2157        .and_then(|v| v.as_str())
2158        .unwrap_or("");
2159    let model_context_window = req
2160        .params
2161        .get("model_context_window")
2162        .and_then(|v| v.as_u64())
2163        .map(|w| w as usize);
2164    let mut engine = session.memgine.lock().await;
2165    Ok(Value::from(engine.build_context_with_options(
2166        query,
2167        model_context_window,
2168        car_memgine::ContextMode::Fast,
2169        None,
2170    )))
2171}
2172
2173/// `memory.persist` — write the session's memgine to a JSON file
2174/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
2175/// so daemon-mode clients can drive checkpoint/restore symmetrically
2176/// with embedded mode. Returns the number of facts written.
2177///
2178/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
2179/// not the caller's. Since the 2026-05 audit, `path` is also
2180/// sandboxed under `~/.car/memory/` via
2181/// [`car_ffi_common::memory_path::resolve`] — relative paths land
2182/// under the base, absolute paths must already be under the base,
2183/// `..` segments are rejected, symlinks pointing out are rejected.
2184/// Pre-2026-05 the path was passed straight to `std::fs::write` and
2185/// became an arbitrary file-write primitive. The base64-blob escape
2186/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
2187/// same resolver when it lands.
2188async fn handle_memory_persist(
2189    req: &JsonRpcMessage,
2190    session: &crate::session::ClientSession,
2191) -> Result<Value, String> {
2192    let path = req
2193        .params
2194        .get("path")
2195        .and_then(|v| v.as_str())
2196        .ok_or("missing path")?;
2197    let resolved = car_ffi_common::memory_path::resolve(path)
2198        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2199    let engine = session.memgine.lock().await;
2200    let facts: Vec<Value> = engine
2201        .graph
2202        .inner
2203        .node_indices()
2204        .filter_map(|nix| {
2205            let node = engine.graph.inner.node_weight(nix)?;
2206            if !node.is_valid() {
2207                return None;
2208            }
2209            if node.kind == car_memgine::MemKind::Identity
2210                || node.kind == car_memgine::MemKind::Environment
2211            {
2212                return None;
2213            }
2214            Some(serde_json::json!({
2215                "subject": node.key,
2216                "body": node.value,
2217                "kind": match node.kind {
2218                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2219                    car_memgine::MemKind::Conversation => "outcome",
2220                    _ => "pattern",
2221                },
2222                "confidence": 0.5,
2223                "content_type": node.content_type.as_label(),
2224            }))
2225        })
2226        .collect();
2227    let count = facts.len();
2228    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2229    std::fs::write(&resolved, json)
2230        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2231    Ok(Value::from(count as u64))
2232}
2233
2234/// `memory.load` — replace the session's memgine with facts from the
2235/// JSON file at `path`. Mirrors NAPI `load_memory`
2236/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
2237/// as `memory.persist` since the 2026-05 audit — relative paths
2238/// land under the base, anything that escapes is rejected.
2239async fn handle_memory_load(
2240    req: &JsonRpcMessage,
2241    session: &crate::session::ClientSession,
2242) -> Result<Value, String> {
2243    let path = req
2244        .params
2245        .get("path")
2246        .and_then(|v| v.as_str())
2247        .ok_or("missing path")?;
2248    let resolved = car_ffi_common::memory_path::resolve(path)
2249        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2250    let content = std::fs::read_to_string(&resolved)
2251        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2252    let facts: Vec<Value> =
2253        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2254    let mut engine = session.memgine.lock().await;
2255    engine.reset();
2256    let mut count: u32 = 0;
2257    for fact in &facts {
2258        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2259        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2260        let kind = fact
2261            .get("kind")
2262            .and_then(|v| v.as_str())
2263            .unwrap_or("pattern");
2264        let fid = format!("loaded-{}", count);
2265        engine.ingest_fact(
2266            &fid,
2267            subject,
2268            body,
2269            "user",
2270            "peer",
2271            chrono::Utc::now(),
2272            "global",
2273            None,
2274            vec![],
2275            kind == "constraint",
2276        );
2277        count += 1;
2278    }
2279    Ok(Value::from(count))
2280}
2281
2282// --- Skill handlers ---
2283
2284async fn handle_skill_ingest(
2285    req: &JsonRpcMessage,
2286    session: &crate::session::ClientSession,
2287) -> Result<Value, String> {
2288    let name = req
2289        .params
2290        .get("name")
2291        .and_then(|v| v.as_str())
2292        .ok_or("missing name")?;
2293    let code = req
2294        .params
2295        .get("code")
2296        .and_then(|v| v.as_str())
2297        .ok_or("missing code")?;
2298    let platform = req
2299        .params
2300        .get("platform")
2301        .and_then(|v| v.as_str())
2302        .unwrap_or("unknown");
2303    let persona = req
2304        .params
2305        .get("persona")
2306        .and_then(|v| v.as_str())
2307        .unwrap_or("");
2308    let url_pattern = req
2309        .params
2310        .get("url_pattern")
2311        .and_then(|v| v.as_str())
2312        .unwrap_or("");
2313    let description = req
2314        .params
2315        .get("description")
2316        .and_then(|v| v.as_str())
2317        .unwrap_or("");
2318    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2319    let keywords: Vec<String> = req
2320        .params
2321        .get("task_keywords")
2322        .and_then(|v| v.as_array())
2323        .map(|arr| {
2324            arr.iter()
2325                .filter_map(|v| v.as_str().map(String::from))
2326                .collect()
2327        })
2328        .unwrap_or_default();
2329
2330    let trigger = car_memgine::SkillTrigger {
2331        persona: persona.into(),
2332        url_pattern: url_pattern.into(),
2333        task_keywords: keywords,
2334        structured: None,
2335    };
2336    let mut engine = session.memgine.lock().await;
2337    let node = engine.ingest_skill(
2338        name,
2339        code,
2340        platform,
2341        trigger,
2342        description,
2343        supersedes,
2344        vec![],
2345        vec![],
2346    );
2347    Ok(Value::from(node.index() as u64))
2348}
2349
2350async fn handle_skill_find(
2351    req: &JsonRpcMessage,
2352    session: &crate::session::ClientSession,
2353) -> Result<Value, String> {
2354    let persona = req
2355        .params
2356        .get("persona")
2357        .and_then(|v| v.as_str())
2358        .unwrap_or("");
2359    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2360    let task = req
2361        .params
2362        .get("task")
2363        .and_then(|v| v.as_str())
2364        .unwrap_or("");
2365    let max = req
2366        .params
2367        .get("max_results")
2368        .and_then(|v| v.as_u64())
2369        .unwrap_or(1) as usize;
2370    let engine = session.memgine.lock().await;
2371    let results = engine.find_skill(persona, url, task, max);
2372    let json: Vec<Value> = results
2373        .iter()
2374        .map(|(m, s)| {
2375            serde_json::json!({
2376                "name": m.name, "code": m.code, "platform": m.platform,
2377                "description": m.description, "stats": m.stats, "match_score": s,
2378            })
2379        })
2380        .collect();
2381    serde_json::to_value(json).map_err(|e| e.to_string())
2382}
2383
2384async fn handle_skill_report(
2385    req: &JsonRpcMessage,
2386    session: &crate::session::ClientSession,
2387) -> Result<Value, String> {
2388    let name = req
2389        .params
2390        .get("skill_name")
2391        .and_then(|v| v.as_str())
2392        .ok_or("missing skill_name")?;
2393    let outcome_str = req
2394        .params
2395        .get("outcome")
2396        .and_then(|v| v.as_str())
2397        .ok_or("missing outcome")?;
2398    let outcome = match outcome_str {
2399        "success" => car_memgine::SkillOutcome::Success,
2400        _ => car_memgine::SkillOutcome::Fail,
2401    };
2402    let mut engine = session.memgine.lock().await;
2403    let stats = engine
2404        .report_outcome(name, outcome)
2405        .ok_or(format!("skill '{}' not found", name))?;
2406    serde_json::to_value(stats).map_err(|e| e.to_string())
2407}
2408
2409// ---------------------------------------------------------------------------
2410// Multi-agent coordination handlers
2411//
2412// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2413// The client runs the model loop and responds with AgentOutput JSON.
2414// ---------------------------------------------------------------------------
2415
2416/// AgentRunner backed by WebSocket callback to the client.
2417struct WsAgentRunner {
2418    channel: Arc<WsChannel>,
2419    host: Arc<crate::host::HostState>,
2420    client_id: String,
2421}
2422
2423#[async_trait::async_trait]
2424impl car_multi::AgentRunner for WsAgentRunner {
2425    async fn run(
2426        &self,
2427        spec: &car_multi::AgentSpec,
2428        task: &str,
2429        _runtime: &car_engine::Runtime,
2430        _mailbox: &car_multi::Mailbox,
2431    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2432        use futures::SinkExt;
2433
2434        let request_id = self.channel.next_request_id();
2435        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2436        let agent = self
2437            .host
2438            .register_agent(
2439                &self.client_id,
2440                RegisterHostAgentRequest {
2441                    id: Some(agent_id.clone()),
2442                    name: spec.name.clone(),
2443                    kind: "callback".to_string(),
2444                    capabilities: spec.tools.clone(),
2445                    project: spec
2446                        .metadata
2447                        .get("project")
2448                        .and_then(|v| v.as_str())
2449                        .map(str::to_string),
2450                    pid: None,
2451                    display: serde_json::from_value(
2452                        spec.metadata
2453                            .get("display")
2454                            .cloned()
2455                            .unwrap_or(serde_json::Value::Null),
2456                    )
2457                    .unwrap_or_default(),
2458                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2459                },
2460            )
2461            .await
2462            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2463        let _ = self
2464            .host
2465            .set_status(
2466                &self.client_id,
2467                SetHostAgentStatusRequest {
2468                    agent_id: agent.id.clone(),
2469                    status: HostAgentStatus::Running,
2470                    current_task: Some(task.to_string()),
2471                    message: Some(format!("{} started", spec.name)),
2472                    payload: serde_json::json!({ "task": task }),
2473                },
2474            )
2475            .await;
2476
2477        let rpc_request = serde_json::json!({
2478            "jsonrpc": "2.0",
2479            "method": "multi.run_agent",
2480            "params": {
2481                "spec": spec,
2482                "task": task,
2483            },
2484            "id": request_id,
2485        });
2486
2487        // Create oneshot channel for the response
2488        let (tx, rx) = tokio::sync::oneshot::channel();
2489        self.channel
2490            .pending
2491            .lock()
2492            .await
2493            .insert(request_id.clone(), tx);
2494
2495        let msg = Message::Text(
2496            serde_json::to_string(&rpc_request)
2497                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2498                .into(),
2499        );
2500        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2501            let _ = self
2502                .host
2503                .set_status(
2504                    &self.client_id,
2505                    SetHostAgentStatusRequest {
2506                        agent_id: agent_id.clone(),
2507                        status: HostAgentStatus::Errored,
2508                        current_task: None,
2509                        message: Some(format!("{} failed to start", spec.name)),
2510                        payload: serde_json::json!({ "error": e.to_string() }),
2511                    },
2512                )
2513                .await;
2514            return Err(car_multi::MultiError::AgentFailed(
2515                spec.name.clone(),
2516                format!("ws send error: {}", e),
2517            ));
2518        }
2519
2520        // Wait for client response (5 min timeout for model loops)
2521        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2522            Ok(Ok(response)) => response,
2523            Ok(Err(_)) => {
2524                let _ = self
2525                    .host
2526                    .set_status(
2527                        &self.client_id,
2528                        SetHostAgentStatusRequest {
2529                            agent_id: agent_id.clone(),
2530                            status: HostAgentStatus::Errored,
2531                            current_task: None,
2532                            message: Some(format!("{} callback channel closed", spec.name)),
2533                            payload: Value::Null,
2534                        },
2535                    )
2536                    .await;
2537                return Err(car_multi::MultiError::AgentFailed(
2538                    spec.name.clone(),
2539                    "agent callback channel closed".into(),
2540                ));
2541            }
2542            Err(_) => {
2543                let _ = self
2544                    .host
2545                    .set_status(
2546                        &self.client_id,
2547                        SetHostAgentStatusRequest {
2548                            agent_id: agent_id.clone(),
2549                            status: HostAgentStatus::Errored,
2550                            current_task: None,
2551                            message: Some(format!("{} timed out", spec.name)),
2552                            payload: Value::Null,
2553                        },
2554                    )
2555                    .await;
2556                return Err(car_multi::MultiError::AgentFailed(
2557                    spec.name.clone(),
2558                    "agent callback timed out (300s)".into(),
2559                ));
2560            }
2561        };
2562
2563        if let Some(err) = response.error {
2564            let _ = self
2565                .host
2566                .set_status(
2567                    &self.client_id,
2568                    SetHostAgentStatusRequest {
2569                        agent_id: agent_id.clone(),
2570                        status: HostAgentStatus::Errored,
2571                        current_task: None,
2572                        message: Some(format!("{} errored", spec.name)),
2573                        payload: serde_json::json!({ "error": err }),
2574                    },
2575                )
2576                .await;
2577            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2578        }
2579
2580        let output_value = response.output.unwrap_or(Value::Null);
2581        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2582            car_multi::MultiError::AgentFailed(
2583                spec.name.clone(),
2584                format!("invalid AgentOutput: {}", e),
2585            )
2586        })?;
2587        let status = if output.error.is_some() {
2588            HostAgentStatus::Errored
2589        } else {
2590            HostAgentStatus::Completed
2591        };
2592        let message = if output.error.is_some() {
2593            format!("{} errored", spec.name)
2594        } else {
2595            format!("{} completed", spec.name)
2596        };
2597        let _ = self
2598            .host
2599            .set_status(
2600                &self.client_id,
2601                SetHostAgentStatusRequest {
2602                    agent_id,
2603                    status,
2604                    current_task: None,
2605                    message: Some(message),
2606                    payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2607                },
2608            )
2609            .await;
2610
2611        Ok(output)
2612    }
2613}
2614
2615fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2616    let safe_name: String = name
2617        .chars()
2618        .map(|c| {
2619            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2620                c
2621            } else {
2622                '-'
2623            }
2624        })
2625        .collect();
2626    format!("{}:{}:{}", client_id, safe_name, request_id)
2627}
2628
2629async fn handle_multi_swarm(
2630    req: &JsonRpcMessage,
2631    session: &crate::session::ClientSession,
2632) -> Result<Value, String> {
2633    let mode_str = req
2634        .params
2635        .get("mode")
2636        .and_then(|v| v.as_str())
2637        .ok_or("missing 'mode'")?;
2638    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2639    let task = req
2640        .params
2641        .get("task")
2642        .and_then(|v| v.as_str())
2643        .ok_or("missing 'task'")?;
2644
2645    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2646        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2647    let agent_specs: Vec<car_multi::AgentSpec> =
2648        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2649    let synth: Option<car_multi::AgentSpec> = req
2650        .params
2651        .get("synthesizer")
2652        .map(|v| serde_json::from_value(v.clone()))
2653        .transpose()
2654        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2655
2656    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2657        channel: session.channel.clone(),
2658        host: session.host.clone(),
2659        client_id: session.client_id.clone(),
2660    });
2661    let infra = car_multi::SharedInfra::new();
2662
2663    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2664    if let Some(s) = synth {
2665        swarm = swarm.with_synthesizer(s);
2666    }
2667
2668    let result = swarm
2669        .run(task, &runner, &infra)
2670        .await
2671        .map_err(|e| format!("swarm error: {}", e))?;
2672    serde_json::to_value(result).map_err(|e| e.to_string())
2673}
2674
2675async fn handle_multi_pipeline(
2676    req: &JsonRpcMessage,
2677    session: &crate::session::ClientSession,
2678) -> Result<Value, String> {
2679    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2680    let task = req
2681        .params
2682        .get("task")
2683        .and_then(|v| v.as_str())
2684        .ok_or("missing 'task'")?;
2685
2686    let stage_specs: Vec<car_multi::AgentSpec> =
2687        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2688
2689    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2690        channel: session.channel.clone(),
2691        host: session.host.clone(),
2692        client_id: session.client_id.clone(),
2693    });
2694    let infra = car_multi::SharedInfra::new();
2695
2696    let result = car_multi::Pipeline::new(stage_specs)
2697        .run(task, &runner, &infra)
2698        .await
2699        .map_err(|e| format!("pipeline error: {}", e))?;
2700    serde_json::to_value(result).map_err(|e| e.to_string())
2701}
2702
2703async fn handle_multi_supervisor(
2704    req: &JsonRpcMessage,
2705    session: &crate::session::ClientSession,
2706) -> Result<Value, String> {
2707    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2708    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2709    let task = req
2710        .params
2711        .get("task")
2712        .and_then(|v| v.as_str())
2713        .ok_or("missing 'task'")?;
2714    let max_rounds = req
2715        .params
2716        .get("max_rounds")
2717        .and_then(|v| v.as_u64())
2718        .unwrap_or(3) as u32;
2719
2720    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2721        .map_err(|e| format!("invalid workers: {}", e))?;
2722    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2723        .map_err(|e| format!("invalid supervisor: {}", e))?;
2724
2725    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2726        channel: session.channel.clone(),
2727        host: session.host.clone(),
2728        client_id: session.client_id.clone(),
2729    });
2730    let infra = car_multi::SharedInfra::new();
2731
2732    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2733        .with_max_rounds(max_rounds)
2734        .run(task, &runner, &infra)
2735        .await
2736        .map_err(|e| format!("supervisor error: {}", e))?;
2737    serde_json::to_value(result).map_err(|e| e.to_string())
2738}
2739
2740async fn handle_multi_map_reduce(
2741    req: &JsonRpcMessage,
2742    session: &crate::session::ClientSession,
2743) -> Result<Value, String> {
2744    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2745    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2746    let task = req
2747        .params
2748        .get("task")
2749        .and_then(|v| v.as_str())
2750        .ok_or("missing 'task'")?;
2751    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2752
2753    let mapper_spec: car_multi::AgentSpec =
2754        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2755    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2756        .map_err(|e| format!("invalid reducer: {}", e))?;
2757    let items: Vec<String> =
2758        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2759
2760    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2761        channel: session.channel.clone(),
2762        host: session.host.clone(),
2763        client_id: session.client_id.clone(),
2764    });
2765    let infra = car_multi::SharedInfra::new();
2766
2767    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2768        .run(task, &items, &runner, &infra)
2769        .await
2770        .map_err(|e| format!("map_reduce error: {}", e))?;
2771    serde_json::to_value(result).map_err(|e| e.to_string())
2772}
2773
2774async fn handle_multi_vote(
2775    req: &JsonRpcMessage,
2776    session: &crate::session::ClientSession,
2777) -> Result<Value, String> {
2778    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2779    let task = req
2780        .params
2781        .get("task")
2782        .and_then(|v| v.as_str())
2783        .ok_or("missing 'task'")?;
2784
2785    let agent_specs: Vec<car_multi::AgentSpec> =
2786        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2787    let synth: Option<car_multi::AgentSpec> = req
2788        .params
2789        .get("synthesizer")
2790        .map(|v| serde_json::from_value(v.clone()))
2791        .transpose()
2792        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2793
2794    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2795        channel: session.channel.clone(),
2796        host: session.host.clone(),
2797        client_id: session.client_id.clone(),
2798    });
2799    let infra = car_multi::SharedInfra::new();
2800
2801    let mut vote = car_multi::Vote::new(agent_specs);
2802    if let Some(s) = synth {
2803        vote = vote.with_synthesizer(s);
2804    }
2805
2806    let result = vote
2807        .run(task, &runner, &infra)
2808        .await
2809        .map_err(|e| format!("vote error: {}", e))?;
2810    serde_json::to_value(result).map_err(|e| e.to_string())
2811}
2812
2813// ---------------------------------------------------------------------------
2814// Scheduler handlers
2815// ---------------------------------------------------------------------------
2816
2817fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2818    let name = req
2819        .params
2820        .get("name")
2821        .and_then(|v| v.as_str())
2822        .ok_or("scheduler.create requires 'name'")?;
2823    let prompt = req
2824        .params
2825        .get("prompt")
2826        .and_then(|v| v.as_str())
2827        .ok_or("scheduler.create requires 'prompt'")?;
2828
2829    let mut task = car_scheduler::Task::new(name, prompt);
2830
2831    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2832        let trigger = match t {
2833            "once" => car_scheduler::TaskTrigger::Once,
2834            "cron" => car_scheduler::TaskTrigger::Cron,
2835            "interval" => car_scheduler::TaskTrigger::Interval,
2836            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2837            _ => car_scheduler::TaskTrigger::Manual,
2838        };
2839        let schedule = req
2840            .params
2841            .get("schedule")
2842            .and_then(|v| v.as_str())
2843            .unwrap_or("");
2844        task = task.with_trigger(trigger, schedule);
2845    }
2846
2847    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2848        task = task.with_system_prompt(sp);
2849    }
2850
2851    serde_json::to_value(&task).map_err(|e| e.to_string())
2852}
2853
2854async fn handle_scheduler_run(
2855    req: &JsonRpcMessage,
2856    session: &crate::session::ClientSession,
2857) -> Result<Value, String> {
2858    let task_val = req
2859        .params
2860        .get("task")
2861        .ok_or("scheduler.run requires 'task'")?;
2862    let mut task: car_scheduler::Task =
2863        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2864
2865    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2866        channel: session.channel.clone(),
2867        host: session.host.clone(),
2868        client_id: session.client_id.clone(),
2869    });
2870    let executor = car_scheduler::Executor::new(runner);
2871    let execution = executor.run_once(&mut task).await;
2872
2873    serde_json::to_value(&execution).map_err(|e| e.to_string())
2874}
2875
2876async fn handle_scheduler_run_loop(
2877    req: &JsonRpcMessage,
2878    session: &crate::session::ClientSession,
2879) -> Result<Value, String> {
2880    let task_val = req
2881        .params
2882        .get("task")
2883        .ok_or("scheduler.run_loop requires 'task'")?;
2884    let mut task: car_scheduler::Task =
2885        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2886    let max_iterations = req
2887        .params
2888        .get("max_iterations")
2889        .and_then(|v| v.as_u64())
2890        .map(|v| v as u32);
2891
2892    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2893        channel: session.channel.clone(),
2894        host: session.host.clone(),
2895        client_id: session.client_id.clone(),
2896    });
2897    let executor = car_scheduler::Executor::new(runner);
2898    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2899    let executions = executor
2900        .run_loop(&mut task, max_iterations, cancel_rx)
2901        .await;
2902
2903    serde_json::to_value(&executions).map_err(|e| e.to_string())
2904}
2905
2906// ---------------------------------------------------------------------------
2907// Inference handlers
2908// ---------------------------------------------------------------------------
2909
2910fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2911    state.inference.get_or_init(|| {
2912        Arc::new(car_inference::InferenceEngine::new(
2913            car_inference::InferenceConfig::default(),
2914        ))
2915    })
2916}
2917
2918async fn handle_infer(
2919    msg: &JsonRpcMessage,
2920    state: &ServerState,
2921    session: &crate::session::ClientSession,
2922) -> Result<Value, String> {
2923    let engine = get_inference_engine(state);
2924    let mut req: car_inference::GenerateRequest =
2925        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2926
2927    // If context_query is provided, build context from memgine and inject it
2928    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2929        let mut memgine = session.memgine.lock().await;
2930        let ctx = memgine.build_context(cq);
2931        if !ctx.is_empty() {
2932            req.context = Some(ctx);
2933        }
2934    }
2935
2936    // Process-wide admission gate. Held for the duration of the
2937    // generation so a burst of concurrent infer RPCs can't multiply
2938    // KV-cache + activation memory and take the host out. The
2939    // `_permit` binding is intentional — its `Drop` releases the slot
2940    // when this future returns.
2941    let _permit = state.admission.acquire().await;
2942
2943    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
2944    // latency_ms are preserved in the response. Plain `generate()` discards
2945    // everything except `.text`, which silently breaks tool-use over the
2946    // WebSocket protocol (issue #43).
2947    //
2948    // NOTE: This directly serializes `InferenceResult`. Any field added to
2949    // that struct in `car-inference` becomes part of the public WebSocket
2950    // protocol. The shape is locked by `inference_result_serializes_*` tests
2951    // in car-inference; updating those tests is part of intentionally
2952    // changing the wire contract.
2953    let result = engine
2954        .generate_tracked(req)
2955        .await
2956        .map_err(|e| e.to_string())?;
2957    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2958}
2959
2960/// Streaming inference — mirrors NAPI `inferStream`. Closes
2961/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
2962/// `infer`; emits `inference.stream.event` JSON-RPC notifications
2963/// during the run, and returns the final `InferenceResult` as the
2964/// JSON-RPC response when the stream completes.
2965///
2966/// Notification shape (server → client):
2967/// ```jsonc
2968/// {
2969///   "jsonrpc": "2.0",
2970///   "method": "inference.stream.event",
2971///   "params": {
2972///     "request_id": "<original RPC id>",
2973///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
2974///   }
2975/// }
2976/// ```
2977///
2978/// The final `done` event is not pushed as a notification — it's
2979/// the JSON-RPC response with the accumulated `InferenceResult`.
2980/// `video.generate` — daemon-side wrapper for
2981/// `InferenceEngine::generate_video`. Mirrors `handle_infer`'s
2982/// admission gate + JSON request shape (Parslee-ai/car#185).
2983///
2984/// Previously the CLI's `cmd_video` constructed an in-process
2985/// engine and called `generate_video` directly — a v0.7 holdover
2986/// that bypassed the daemon. With this handler the CLI proxies
2987/// here, so the engine-level audio_passthrough gate fires
2988/// inside the daemon process where all FFI surfaces converge.
2989async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2990    let engine = get_inference_engine(state);
2991    let req: car_inference::GenerateImageRequest =
2992        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2993    // Share the same admission gate as text/video generation — a burst
2994    // of image requests shouldn't smuggle around the concurrency cap.
2995    let _permit = state.admission.acquire().await;
2996    let result = engine
2997        .generate_image(req)
2998        .await
2999        .map_err(|e| e.to_string())?;
3000    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3001}
3002
3003async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3004    let engine = get_inference_engine(state);
3005    let req: car_inference::GenerateVideoRequest =
3006        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3007    let _permit = state.admission.acquire().await;
3008    let result = engine
3009        .generate_video(req)
3010        .await
3011        .map_err(|e| e.to_string())?;
3012    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3013}
3014
3015async fn handle_infer_stream(
3016    msg: &JsonRpcMessage,
3017    session: &crate::session::ClientSession,
3018    state: &ServerState,
3019) -> Result<Value, String> {
3020    use futures::SinkExt;
3021    use tokio_tungstenite::tungstenite::Message;
3022
3023    let engine = get_inference_engine(state);
3024    let mut req: car_inference::GenerateRequest =
3025        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3026
3027    // Same context-injection convenience as non-streaming `infer` so
3028    // the two methods have parity on the call shape.
3029    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3030        let mut memgine = session.memgine.lock().await;
3031        let ctx = memgine.build_context(cq);
3032        if !ctx.is_empty() {
3033            req.context = Some(ctx);
3034        }
3035    }
3036
3037    let _permit = state.admission.acquire().await;
3038    let mut rx = engine
3039        .generate_tracked_stream(req)
3040        .await
3041        .map_err(|e| e.to_string())?;
3042
3043    let mut accumulator = car_inference::StreamAccumulator::default();
3044    let request_id = msg.id.clone();
3045
3046    while let Some(event) = rx.recv().await {
3047        let event_payload = match &event {
3048            car_inference::StreamEvent::TextDelta(text) => {
3049                serde_json::json!({"type": "text", "data": text})
3050            }
3051            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3052                serde_json::json!({"type": "tool_start", "name": name, "index": index})
3053            }
3054            car_inference::StreamEvent::ToolCallDelta {
3055                index,
3056                arguments_delta,
3057            } => serde_json::json!({
3058                "type": "tool_delta",
3059                "index": index,
3060                "data": arguments_delta,
3061            }),
3062            car_inference::StreamEvent::Usage {
3063                input_tokens,
3064                output_tokens,
3065            } => serde_json::json!({
3066                "type": "usage",
3067                "input_tokens": input_tokens,
3068                "output_tokens": output_tokens,
3069            }),
3070            // Done is delivered as the JSON-RPC response, not a
3071            // notification — matches the NAPI contract where the
3072            // standalone function's return value is the accumulated
3073            // result and the callback only sees in-progress events.
3074            car_inference::StreamEvent::Done { .. } => {
3075                accumulator.push(&event);
3076                continue;
3077            }
3078        };
3079
3080        let notif = serde_json::json!({
3081            "jsonrpc": "2.0",
3082            "method": "inference.stream.event",
3083            "params": {
3084                "request_id": request_id,
3085                "event": event_payload,
3086            },
3087        });
3088        if let Ok(text) = serde_json::to_string(&notif) {
3089            let _ = session
3090                .channel
3091                .write
3092                .lock()
3093                .await
3094                .send(Message::Text(text.into()))
3095                .await;
3096        }
3097        accumulator.push(&event);
3098    }
3099
3100    let (text, tool_calls, usage) = accumulator.finish_with_usage();
3101    Ok(serde_json::json!({
3102        "text": text,
3103        "tool_calls": tool_calls,
3104        "usage": usage,
3105    }))
3106}
3107
3108async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3109    let engine = get_inference_engine(state);
3110    let req: car_inference::EmbedRequest =
3111        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3112    // Embeds load their own model weights; share the same admission
3113    // gate as generations so a burst of embed requests can't smuggle
3114    // around the concurrency cap.
3115    let _permit = state.admission.acquire().await;
3116    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3117    Ok(serde_json::json!({"embeddings": result}))
3118}
3119
3120async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3121    let engine = get_inference_engine(state);
3122    let req: car_inference::ClassifyRequest =
3123        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3124    let _permit = state.admission.acquire().await;
3125    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3126    Ok(serde_json::json!({"classifications": result}))
3127}
3128
3129/// Surface the current admission state so the menubar tray and
3130/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
3131/// snapshot — racy by definition but correct enough for status panels.
3132fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3133    let total = state.admission.permits();
3134    let available = state.admission.permits_available();
3135    let in_use = total.saturating_sub(available);
3136    Ok(serde_json::json!({
3137        "permits_total": total,
3138        "permits_available": available,
3139        "permits_in_use": in_use,
3140        "env_override": crate::admission::ENV_MAX_CONCURRENT,
3141    }))
3142}
3143
3144async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3145    let model = msg
3146        .params
3147        .get("model")
3148        .and_then(|v| v.as_str())
3149        .ok_or("missing 'model' parameter")?;
3150    let text = msg
3151        .params
3152        .get("text")
3153        .and_then(|v| v.as_str())
3154        .ok_or("missing 'text' parameter")?;
3155    let engine = get_inference_engine(state);
3156    let ids = engine
3157        .tokenize(model, text)
3158        .await
3159        .map_err(|e| e.to_string())?;
3160    Ok(serde_json::json!({"tokens": ids}))
3161}
3162
3163async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3164    let model = msg
3165        .params
3166        .get("model")
3167        .and_then(|v| v.as_str())
3168        .ok_or("missing 'model' parameter")?;
3169    let tokens: Vec<u32> = msg
3170        .params
3171        .get("tokens")
3172        .and_then(|v| v.as_array())
3173        .ok_or("missing 'tokens' parameter")?
3174        .iter()
3175        .map(|t| {
3176            t.as_u64()
3177                .and_then(|n| u32::try_from(n).ok())
3178                .ok_or_else(|| "tokens[] must be u32 values".to_string())
3179        })
3180        .collect::<Result<Vec<_>, _>>()?;
3181    let engine = get_inference_engine(state);
3182    let text = engine
3183        .detokenize(model, &tokens)
3184        .await
3185        .map_err(|e| e.to_string())?;
3186    Ok(serde_json::json!({"text": text}))
3187}
3188
3189/// `models.register` — persist a user-supplied `ModelSchema` to
3190/// `~/.car/models.json` (Parslee-ai/car-releases#39). Replaces any
3191/// existing entry with the same `id`. Returns `{id, registered}`.
3192///
3193/// **Phase 1 limitation**: the daemon's live `UnifiedRegistry` is
3194/// not updated in-process — the new model becomes visible to
3195/// `models.list`, `infer`, `infer_stream` on the **next daemon
3196/// boot** when `load_user_config` re-reads the file. This is
3197/// enough to unblock opencode's setup flow (register ahead of
3198/// time, then start the daemon). Hot-update requires either an
3199/// `RwLock<InferenceEngine>` on `ServerState` or an
3200/// interior-mutable `UnifiedRegistry`; both touch 20+ call sites
3201/// and are tracked as a follow-up.
3202///
3203/// Until hot-update lands, callers SHOULD register their models
3204/// before issuing `infer` calls against them, and operators
3205/// SHOULD restart the daemon after batches of model
3206/// registrations.
3207async fn handle_models_register(
3208    req: &JsonRpcMessage,
3209    _state: &Arc<ServerState>,
3210) -> Result<Value, String> {
3211    // The params shape mirrors v0.7's FFI `rt.registerModel(schemaJson)`:
3212    // either the bare `ModelSchema` value, OR `{ schema: ModelSchema }`.
3213    // Honor both so existing in-process callers don't have to reshape.
3214    let schema_value = match req.params.get("schema") {
3215        Some(v) => v.clone(),
3216        None => req.params.clone(),
3217    };
3218    let schema: car_inference::ModelSchema =
3219        serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3220    let id = schema.id.clone();
3221
3222    // Resolve the models.json path the same way UnifiedRegistry does:
3223    // `<models_dir>/../models.json` where models_dir defaults to
3224    // `~/.car/models/`. We read whatever's there, swap in the new
3225    // entry, and write back atomically.
3226    let home = std::env::var_os("HOME")
3227        .or_else(|| std::env::var_os("USERPROFILE"))
3228        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3229    let car_dir = std::path::PathBuf::from(home).join(".car");
3230    std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3231    let path = car_dir.join("models.json");
3232
3233    let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3234        let text =
3235            std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3236        if text.trim().is_empty() {
3237            Vec::new()
3238        } else {
3239            serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3240        }
3241    } else {
3242        Vec::new()
3243    };
3244    // Replace existing entry with the same id, else append.
3245    if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3246        *slot = schema;
3247    } else {
3248        models.push(schema);
3249    }
3250    let json =
3251        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3252    let tmp = path.with_extension("json.tmp");
3253    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3254    std::fs::rename(&tmp, &path)
3255        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3256    Ok(serde_json::json!({
3257        "id": id,
3258        "registered": true,
3259        "path": path.to_string_lossy(),
3260        "note": "Daemon restart required for live UnifiedRegistry visibility \
3261                 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3262                 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3263    }))
3264}
3265
3266/// `models.unregister` — remove an entry from `~/.car/models.json`
3267/// by id (Parslee-ai/car#186 — symmetric to `models.register`).
3268/// Returns `{ id, unregistered, path }` on success. Returns an error
3269/// when the model isn't present.
3270///
3271/// **Phase 1 limitation** (same as `models.register`): the daemon's
3272/// live `UnifiedRegistry` is not rebuilt — the removal takes effect
3273/// on the next daemon boot. Callers SHOULD restart the daemon after
3274/// a batch of unregistrations if they expect `models.list_unified`
3275/// to reflect the change immediately.
3276async fn handle_models_unregister(
3277    req: &JsonRpcMessage,
3278    _state: &Arc<ServerState>,
3279) -> Result<Value, String> {
3280    // Params shape mirrors the CLI flag: `{ id: string }`. Bare-string
3281    // params are honored for symmetry with the register handler's
3282    // tolerant shape (`{schema: ...}` OR bare schema).
3283    let id = match req.params.get("id") {
3284        Some(v) => v
3285            .as_str()
3286            .ok_or_else(|| "`id` must be a string".to_string())?
3287            .to_string(),
3288        None => match req.params.as_str() {
3289            Some(s) => s.to_string(),
3290            None => return Err("missing `id` parameter".to_string()),
3291        },
3292    };
3293
3294    let home = std::env::var_os("HOME")
3295        .or_else(|| std::env::var_os("USERPROFILE"))
3296        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3297    let car_dir = std::path::PathBuf::from(home).join(".car");
3298    let path = car_dir.join("models.json");
3299
3300    if !path.exists() {
3301        return Err(format!(
3302            "no models.json at {} — nothing to unregister",
3303            path.display()
3304        ));
3305    }
3306    let text =
3307        std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3308    let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3309        Vec::new()
3310    } else {
3311        serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3312    };
3313    let before = models.len();
3314    models.retain(|m| m.id != id);
3315    if models.len() == before {
3316        return Err(format!("model {} not found in {}", id, path.display()));
3317    }
3318    let json =
3319        serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3320    let tmp = path.with_extension("json.tmp");
3321    std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3322    std::fs::rename(&tmp, &path)
3323        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3324    Ok(serde_json::json!({
3325        "id": id,
3326        "unregistered": true,
3327        "path": path.to_string_lossy(),
3328        "note": "Daemon restart required for live UnifiedRegistry visibility \
3329                 (phase 1, matching models.register).",
3330    }))
3331}
3332
3333fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3334    let engine = get_inference_engine(state);
3335    let models = engine.list_models();
3336    serde_json::to_value(&models).map_err(|e| e.to_string())
3337}
3338
3339fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3340    let engine = get_inference_engine(state);
3341    let models = engine.list_models_unified();
3342    serde_json::to_value(&models).map_err(|e| e.to_string())
3343}
3344
3345#[derive(Debug, Deserialize)]
3346#[serde(rename_all = "camelCase")]
3347struct ModelSearchParams {
3348    #[serde(default)]
3349    query: Option<String>,
3350    #[serde(default)]
3351    capability: Option<car_inference::ModelCapability>,
3352    #[serde(default)]
3353    provider: Option<String>,
3354    #[serde(default)]
3355    local_only: bool,
3356    #[serde(default)]
3357    available_only: bool,
3358    #[serde(default)]
3359    limit: Option<usize>,
3360}
3361
3362#[derive(Debug, Serialize)]
3363#[serde(rename_all = "camelCase")]
3364struct ModelSearchEntry {
3365    #[serde(flatten)]
3366    info: car_inference::ModelInfo,
3367    family: String,
3368    version: String,
3369    tags: Vec<String>,
3370    pullable: bool,
3371    upgrade: Option<car_inference::ModelUpgrade>,
3372}
3373
3374#[derive(Debug, Serialize)]
3375#[serde(rename_all = "camelCase")]
3376struct ModelSearchResponse {
3377    models: Vec<ModelSearchEntry>,
3378    upgrades: Vec<car_inference::ModelUpgrade>,
3379    total: usize,
3380    available: usize,
3381    local: usize,
3382    remote: usize,
3383}
3384
3385fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3386    let params: ModelSearchParams =
3387        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3388            query: None,
3389            capability: None,
3390            provider: None,
3391            local_only: false,
3392            available_only: false,
3393            limit: None,
3394        });
3395    let engine = get_inference_engine(state);
3396    let upgrades = engine.available_model_upgrades();
3397    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3398        .iter()
3399        .cloned()
3400        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3401        .collect();
3402    let query = params
3403        .query
3404        .as_deref()
3405        .map(str::trim)
3406        .filter(|q| !q.is_empty())
3407        .map(|q| q.to_ascii_lowercase());
3408    let provider = params
3409        .provider
3410        .as_deref()
3411        .map(str::trim)
3412        .filter(|p| !p.is_empty())
3413        .map(|p| p.to_ascii_lowercase());
3414
3415    let mut entries: Vec<ModelSearchEntry> = engine
3416        .list_schemas()
3417        .into_iter()
3418        .filter(|schema| {
3419            if let Some(capability) = params.capability {
3420                if !schema.has_capability(capability) {
3421                    return false;
3422                }
3423            }
3424            if let Some(provider) = provider.as_deref() {
3425                if schema.provider.to_ascii_lowercase() != provider {
3426                    return false;
3427                }
3428            }
3429            if params.local_only && !schema.is_local() {
3430                return false;
3431            }
3432            if params.available_only && !schema.available {
3433                return false;
3434            }
3435            if let Some(query) = query.as_deref() {
3436                let capability_text = schema
3437                    .capabilities
3438                    .iter()
3439                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3440                    .collect::<Vec<_>>()
3441                    .join(" ");
3442                let haystack = format!(
3443                    "{} {} {} {} {} {}",
3444                    schema.id,
3445                    schema.name,
3446                    schema.provider,
3447                    schema.family,
3448                    schema.tags.join(" "),
3449                    capability_text
3450                )
3451                .to_ascii_lowercase();
3452                if !haystack.contains(query) {
3453                    return false;
3454                }
3455            }
3456            true
3457        })
3458        .map(|schema| {
3459            let pullable = !schema.available
3460                && matches!(
3461                    schema.source,
3462                    car_inference::ModelSource::Local { .. }
3463                        | car_inference::ModelSource::Mlx { .. }
3464                );
3465            let info = car_inference::ModelInfo::from(&schema);
3466            let upgrade = upgrades_by_from.get(&schema.id).cloned();
3467            ModelSearchEntry {
3468                info,
3469                family: schema.family,
3470                version: schema.version,
3471                tags: schema.tags,
3472                pullable,
3473                upgrade,
3474            }
3475        })
3476        .collect();
3477    entries.sort_by(|a, b| {
3478        b.info
3479            .available
3480            .cmp(&a.info.available)
3481            .then(b.info.is_local.cmp(&a.info.is_local))
3482            .then(a.info.name.cmp(&b.info.name))
3483    });
3484    if let Some(limit) = params.limit {
3485        entries.truncate(limit);
3486    }
3487
3488    let total = entries.len();
3489    let available = entries.iter().filter(|entry| entry.info.available).count();
3490    let local = entries.iter().filter(|entry| entry.info.is_local).count();
3491    let response = ModelSearchResponse {
3492        models: entries,
3493        upgrades,
3494        total,
3495        available,
3496        local,
3497        remote: total.saturating_sub(local),
3498    };
3499    serde_json::to_value(response).map_err(|e| e.to_string())
3500}
3501
3502fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3503    let engine = get_inference_engine(state);
3504    serde_json::to_value(serde_json::json!({
3505        "upgrades": engine.available_model_upgrades()
3506    }))
3507    .map_err(|e| e.to_string())
3508}
3509
3510async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3511    let name = msg
3512        .params
3513        .get("name")
3514        .or_else(|| msg.params.get("id"))
3515        .or_else(|| msg.params.get("model"))
3516        .and_then(|v| v.as_str())
3517        .ok_or("missing 'name' parameter")?;
3518    let engine = get_inference_engine(state);
3519    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3520    Ok(serde_json::json!({"path": path.display().to_string()}))
3521}
3522
3523async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3524    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3525        msg.params
3526            .get("events")
3527            .cloned()
3528            .unwrap_or(msg.params.clone()),
3529    )
3530    .map_err(|e| format!("invalid events: {}", e))?;
3531
3532    let inference = get_inference_engine(state).clone();
3533    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3534
3535    let skills = engine.distill_skills(&events).await;
3536    serde_json::to_value(&skills).map_err(|e| e.to_string())
3537}
3538
3539/// Run memory consolidation against this client's session memgine
3540/// (or the daemon-owned per-agent memgine when bound — #170).
3541/// Returns the JSON `ConsolidationReport`.
3542async fn handle_memory_consolidate(
3543    session: &crate::session::ClientSession,
3544) -> Result<Value, String> {
3545    let engine_arc = session.effective_memgine().await;
3546    let report = {
3547        let mut engine = engine_arc.lock().await;
3548        engine.consolidate().await
3549    };
3550    if let Some(id) = session.agent_id.lock().await.clone() {
3551        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3552            tracing::warn!(agent_id = %id, error = %e,
3553                "agent memgine persist after consolidate failed");
3554        }
3555    }
3556    serde_json::to_value(&report).map_err(|e| e.to_string())
3557}
3558
3559/// Repair a degraded skill on this client's session memgine.
3560/// Returns `{ code: "..." }` on success, `null` if the skill
3561/// isn't broken or repair failed.
3562async fn handle_skill_repair(
3563    msg: &JsonRpcMessage,
3564    session: &crate::session::ClientSession,
3565) -> Result<Value, String> {
3566    let name = msg
3567        .params
3568        .get("skill_name")
3569        .and_then(|v| v.as_str())
3570        .ok_or("missing 'skill_name' parameter")?;
3571    let mut engine = session.memgine.lock().await;
3572    let code = engine.repair_skill(name).await;
3573    Ok(match code {
3574        Some(c) => serde_json::json!({ "code": c }),
3575        None => Value::Null,
3576    })
3577}
3578
3579/// Ingest distilled skills into this client's session memgine.
3580/// Returns the number of nodes inserted.
3581async fn handle_skills_ingest_distilled(
3582    msg: &JsonRpcMessage,
3583    session: &crate::session::ClientSession,
3584) -> Result<Value, String> {
3585    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3586        msg.params
3587            .get("skills")
3588            .cloned()
3589            .unwrap_or(msg.params.clone()),
3590    )
3591    .map_err(|e| format!("invalid skills: {}", e))?;
3592    let mut engine = session.memgine.lock().await;
3593    let nodes = engine.ingest_distilled_skills(&skills);
3594    Ok(serde_json::json!({ "ingested": nodes.len() }))
3595}
3596
3597/// Run skill evolution against this session's memgine for a
3598/// specified domain.  Returns the resulting `DistilledSkill` array.
3599async fn handle_skills_evolve(
3600    msg: &JsonRpcMessage,
3601    session: &crate::session::ClientSession,
3602) -> Result<Value, String> {
3603    let domain = msg
3604        .params
3605        .get("domain")
3606        .and_then(|v| v.as_str())
3607        .ok_or("missing 'domain' parameter")?
3608        .to_string();
3609    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3610        msg.params
3611            .get("events")
3612            .cloned()
3613            .unwrap_or(Value::Array(vec![])),
3614    )
3615    .map_err(|e| format!("invalid events: {}", e))?;
3616    let mut engine = session.memgine.lock().await;
3617    let skills = engine.evolve_skills(&events, &domain).await;
3618    serde_json::to_value(&skills).map_err(|e| e.to_string())
3619}
3620
3621/// List domains whose skills are underperforming on this session.
3622async fn handle_skills_domains_needing_evolution(
3623    msg: &JsonRpcMessage,
3624    session: &crate::session::ClientSession,
3625) -> Result<Value, String> {
3626    let threshold = msg
3627        .params
3628        .get("threshold")
3629        .and_then(|v| v.as_f64())
3630        .unwrap_or(0.6);
3631    let engine = session.memgine.lock().await;
3632    let domains = engine.domains_needing_evolution(threshold);
3633    serde_json::to_value(&domains).map_err(|e| e.to_string())
3634}
3635
3636/// Rerank documents against a query using a cross-encoder model.
3637async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3638    let engine = get_inference_engine(state);
3639    let req: car_inference::RerankRequest =
3640        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3641    let _permit = state.admission.acquire().await;
3642    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3643    serde_json::to_value(&result).map_err(|e| e.to_string())
3644}
3645
3646/// Transcribe audio at the given path. The path is interpreted on
3647/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3648/// callers must pass a path the daemon can read (typically a
3649/// shared `~/.car/...` location or stdin push via the streaming
3650/// API).
3651async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3652    use base64::Engine as _;
3653    let engine = get_inference_engine(state);
3654
3655    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3656    // the caller can't share a filesystem view with the daemon (e.g.
3657    // unsandboxed Milo talking to a sandboxed car-host), they pass
3658    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3659    // run transcribe against the path the engine expects, and clean up
3660    // on drop. Accepts either form; `audio_b64` wins if both are set.
3661    let mut params = msg.params.clone();
3662    let audio_b64 = params
3663        .as_object_mut()
3664        .and_then(|m| m.remove("audio_b64"))
3665        .and_then(|v| v.as_str().map(str::to_string));
3666    let _tmp_audio = if let Some(b64) = audio_b64 {
3667        let bytes = base64::engine::general_purpose::STANDARD
3668            .decode(b64.as_bytes())
3669            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3670        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3671        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3672        let path = tmp.path().to_string_lossy().into_owned();
3673        if let Some(obj) = params.as_object_mut() {
3674            obj.insert("audio_path".to_string(), Value::String(path));
3675        }
3676        Some(tmp)
3677    } else {
3678        None
3679    };
3680
3681    let req: car_inference::TranscribeRequest =
3682        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3683    let _permit = state.admission.acquire().await;
3684    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3685    serde_json::to_value(&result).map_err(|e| e.to_string())
3686}
3687
3688/// Synthesize speech. By default writes to `output_path` on the
3689/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3690/// was supplied) the result also includes an `audio_b64` field with
3691/// the rendered bytes inline so cross-sandbox callers can avoid
3692/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3693async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3694    use base64::Engine as _;
3695    let engine = get_inference_engine(state);
3696
3697    let mut params = msg.params.clone();
3698    let return_b64 = params
3699        .as_object_mut()
3700        .and_then(|m| m.remove("return_b64"))
3701        .and_then(|v| v.as_bool())
3702        .unwrap_or(false);
3703    let no_output_path = params
3704        .as_object()
3705        .map(|m| !m.contains_key("output_path"))
3706        .unwrap_or(true);
3707
3708    let req: car_inference::SynthesizeRequest =
3709        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3710    let _permit = state.admission.acquire().await;
3711    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3712    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3713
3714    // Inline the bytes when the caller asked for them OR when no
3715    // output_path was specified (typical sandbox-crossing case —
3716    // they didn't pick a path because they have no shared one).
3717    if return_b64 || no_output_path {
3718        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3719            format!(
3720                "synthesize: failed to read rendered audio at {}: {e}",
3721                result.audio_path
3722            )
3723        })?;
3724        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3725        if let Some(obj) = value.as_object_mut() {
3726            obj.insert("audio_b64".to_string(), Value::String(encoded));
3727        }
3728    }
3729    Ok(value)
3730}
3731
3732/// Prepare the speech runtime (downloads / warmup). Returns a
3733/// JSON status string, mirroring the embedded
3734/// `prepare_speech_runtime` shape.
3735async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3736    let engine = get_inference_engine(state);
3737    let status = engine
3738        .prepare_speech_runtime()
3739        .await
3740        .map_err(|e| e.to_string())?;
3741    serde_json::to_value(&status).map_err(|e| e.to_string())
3742}
3743
3744/// Adaptive route decision for a prompt — returns the routing
3745/// JSON the FFI's `route_model` returns.
3746async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3747    let prompt = msg
3748        .params
3749        .get("prompt")
3750        .and_then(|v| v.as_str())
3751        .ok_or("missing 'prompt' parameter")?;
3752    let engine = get_inference_engine(state);
3753    let decision = engine.route_adaptive(prompt).await;
3754    serde_json::to_value(&decision).map_err(|e| e.to_string())
3755}
3756
3757/// Model performance profiles snapshot.
3758async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3759    let engine = get_inference_engine(state);
3760    let profiles = engine.export_profiles().await;
3761    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3762}
3763
3764#[derive(Deserialize)]
3765#[serde(rename_all = "camelCase")]
3766struct OutcomesResolvePendingParams {
3767    /// Flat `(trace_id, success, confidence, output)` tuples from the
3768    /// caller. Same shape `car-reason`'s session produces from its
3769    /// `ActionOutcome` vector. Daemon side runs the inference rules
3770    /// and writes resolved outcomes back to the shared tracker.
3771    action_results: Vec<(String, bool, f64, String)>,
3772}
3773
3774/// `outcomes.resolve_pending` — write inferred outcomes back to the
3775/// shared engine's `OutcomeTracker` (Parslee-ai/car#189 follow-up).
3776///
3777/// Symmetric to the in-process path
3778/// `ReasoningInferenceHandle::record_inferred_outcomes` on
3779/// `InferenceEngine`: takes the per-action result tuples the
3780/// reasoning session produces, runs
3781/// `OutcomeTracker::infer_outcomes_from_action_sequence` to convert
3782/// them into `InferredOutcome` records, and calls
3783/// `resolve_pending_from_signals` under the tracker write lock. The
3784/// learning loop that adjusts routing decisions therefore survives
3785/// daemon-routed reasoning runs (previously a best-effort no-op on
3786/// the daemon side).
3787///
3788/// Returns `{ recorded: N }` where N is the number of action results
3789/// the caller passed. The tracker doesn't surface how many of those
3790/// actually had pending entries to resolve; that count would require
3791/// expanding the tracker API and isn't load-bearing for any caller
3792/// yet.
3793async fn handle_outcomes_resolve_pending(
3794    req: &JsonRpcMessage,
3795    state: &ServerState,
3796) -> Result<Value, String> {
3797    let params: OutcomesResolvePendingParams =
3798        serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3799    let engine = get_inference_engine(state);
3800    let mut tracker = engine.outcome_tracker.write().await;
3801    let inferred = tracker.infer_outcomes_from_action_sequence(&params.action_results);
3802    tracker.resolve_pending_from_signals(inferred);
3803    Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3804}
3805
3806/// Per-session event log size.
3807async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3808    let n = session.runtime.log.lock().await.len();
3809    Ok(Value::from(n as u64))
3810}
3811
3812async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3813    let stats = session.runtime.log.lock().await.stats();
3814    serde_json::to_value(stats).map_err(|e| e.to_string())
3815}
3816
3817#[derive(Deserialize)]
3818#[serde(rename_all = "camelCase")]
3819struct EventsTruncateParams {
3820    #[serde(default)]
3821    max_events: Option<usize>,
3822    #[serde(default)]
3823    max_spans: Option<usize>,
3824}
3825
3826async fn handle_events_truncate(
3827    msg: &JsonRpcMessage,
3828    session: &crate::session::ClientSession,
3829) -> Result<Value, String> {
3830    let params: EventsTruncateParams =
3831        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3832            max_events: None,
3833            max_spans: None,
3834        });
3835    let mut log = session.runtime.log.lock().await;
3836    let removed_events = params
3837        .max_events
3838        .map(|max| log.truncate_events_keep_last(max))
3839        .unwrap_or(0);
3840    let removed_spans = params
3841        .max_spans
3842        .map(|max| log.truncate_spans_keep_last(max))
3843        .unwrap_or(0);
3844    let stats = log.stats();
3845    Ok(serde_json::json!({
3846        "removedEvents": removed_events,
3847        "removedSpans": removed_spans,
3848        "stats": stats,
3849    }))
3850}
3851
3852async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3853    let mut log = session.runtime.log.lock().await;
3854    let removed = log.clear();
3855    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3856}
3857
3858/// Update the per-session replan config. Wire shape mirrors the
3859/// FFI's positional `set_replan_config` arguments — the engine
3860/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
3861/// reconstruct it from a flat object here.
3862async fn handle_replan_set_config(
3863    msg: &JsonRpcMessage,
3864    session: &crate::session::ClientSession,
3865) -> Result<Value, String> {
3866    let max_replans = msg
3867        .params
3868        .get("max_replans")
3869        .and_then(|v| v.as_u64())
3870        .unwrap_or(0) as u32;
3871    let delay_ms = msg
3872        .params
3873        .get("delay_ms")
3874        .and_then(|v| v.as_u64())
3875        .unwrap_or(0);
3876    let verify_before_execute = msg
3877        .params
3878        .get("verify_before_execute")
3879        .and_then(|v| v.as_bool())
3880        .unwrap_or(true);
3881    let cfg = car_engine::ReplanConfig {
3882        max_replans,
3883        delay_ms,
3884        verify_before_execute,
3885    };
3886    session.runtime.set_replan_config(cfg).await;
3887    Ok(Value::Null)
3888}
3889
3890async fn handle_skills_list(
3891    msg: &JsonRpcMessage,
3892    session: &crate::session::ClientSession,
3893) -> Result<Value, String> {
3894    let domain = msg.params.get("domain").and_then(|v| v.as_str());
3895    let engine = session.memgine.lock().await;
3896    let skills: Vec<serde_json::Value> = engine
3897        .graph
3898        .inner
3899        .node_indices()
3900        .filter_map(|nix| {
3901            let node = engine.graph.inner.node_weight(nix)?;
3902            if node.kind != car_memgine::MemKind::Skill {
3903                return None;
3904            }
3905            let meta = car_memgine::SkillMeta::from_node(node)?;
3906            if let Some(d) = domain {
3907                match &meta.scope {
3908                    car_memgine::SkillScope::Global => {}
3909                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
3910                    _ => return None,
3911                }
3912            }
3913            Some(serde_json::to_value(&meta).unwrap_or_default())
3914        })
3915        .collect();
3916    serde_json::to_value(&skills).map_err(|e| e.to_string())
3917}
3918
3919#[derive(serde::Deserialize)]
3920struct SecretParams {
3921    #[serde(default)]
3922    service: Option<String>,
3923    key: String,
3924    #[serde(default)]
3925    value: Option<String>,
3926}
3927
3928fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3929    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3930    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3931    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3932}
3933
3934fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3935    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3936    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3937}
3938
3939fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3940    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3941    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3942}
3943
3944fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3945    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3946    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3947}
3948
3949#[derive(serde::Deserialize)]
3950struct PermParams {
3951    domain: String,
3952    #[serde(default)]
3953    target_bundle_id: Option<String>,
3954}
3955
3956fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3957    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3958    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3959}
3960
3961fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3962    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3963    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3964}
3965
3966fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3967    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3968    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3969}
3970
3971fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3972    #[derive(serde::Deserialize)]
3973    struct P {
3974        start: String,
3975        end: String,
3976        #[serde(default)]
3977        calendar_ids: Vec<String>,
3978    }
3979    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3980    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3981        .map_err(|e| format!("parse start: {}", e))?
3982        .with_timezone(&chrono::Utc);
3983    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3984        .map_err(|e| format!("parse end: {}", e))?
3985        .with_timezone(&chrono::Utc);
3986    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3987}
3988
3989fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3990    #[derive(serde::Deserialize)]
3991    struct P {
3992        query: String,
3993        #[serde(default = "default_limit")]
3994        limit: usize,
3995        #[serde(default)]
3996        container_ids: Vec<String>,
3997    }
3998    fn default_limit() -> usize {
3999        50
4000    }
4001    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4002    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4003}
4004
4005fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4006    #[derive(serde::Deserialize, Default)]
4007    struct P {
4008        #[serde(default)]
4009        account_ids: Vec<String>,
4010    }
4011    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4012    car_ffi_common::integrations::mail_inbox(&p.account_ids)
4013}
4014
4015fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4016    let raw = req.params.to_string();
4017    car_ffi_common::integrations::mail_send(&raw)
4018}
4019
4020fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4021    #[derive(serde::Deserialize)]
4022    struct P {
4023        #[serde(default = "default_limit")]
4024        limit: usize,
4025    }
4026    fn default_limit() -> usize {
4027        50
4028    }
4029    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4030    car_ffi_common::integrations::messages_chats(p.limit)
4031}
4032
4033fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4034    let raw = req.params.to_string();
4035    car_ffi_common::integrations::messages_send(&raw)
4036}
4037
4038fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4039    #[derive(serde::Deserialize)]
4040    struct P {
4041        query: String,
4042        #[serde(default = "default_limit")]
4043        limit: usize,
4044    }
4045    fn default_limit() -> usize {
4046        50
4047    }
4048    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4049    car_ffi_common::integrations::notes_find(&p.query, p.limit)
4050}
4051
4052fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4053    #[derive(serde::Deserialize)]
4054    struct P {
4055        #[serde(default = "default_limit")]
4056        limit: usize,
4057    }
4058    fn default_limit() -> usize {
4059        50
4060    }
4061    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4062    car_ffi_common::integrations::reminders_items(p.limit)
4063}
4064
4065fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4066    #[derive(serde::Deserialize)]
4067    struct P {
4068        #[serde(default = "default_limit")]
4069        limit: usize,
4070    }
4071    fn default_limit() -> usize {
4072        100
4073    }
4074    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4075    car_ffi_common::integrations::bookmarks_list(p.limit)
4076}
4077
4078fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4079    #[derive(serde::Deserialize)]
4080    struct P {
4081        start: String,
4082        end: String,
4083    }
4084    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4085    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4086        .map_err(|e| format!("parse start: {}", e))?
4087        .with_timezone(&chrono::Utc);
4088    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4089        .map_err(|e| format!("parse end: {}", e))?
4090        .with_timezone(&chrono::Utc);
4091    car_ffi_common::health::sleep_windows(s, e)
4092}
4093
4094fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4095    #[derive(serde::Deserialize)]
4096    struct P {
4097        start: String,
4098        end: String,
4099    }
4100    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4101    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4102        .map_err(|e| format!("parse start: {}", e))?
4103        .with_timezone(&chrono::Utc);
4104    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4105        .map_err(|e| format!("parse end: {}", e))?
4106        .with_timezone(&chrono::Utc);
4107    car_ffi_common::health::workouts(s, e)
4108}
4109
4110fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4111    #[derive(serde::Deserialize)]
4112    struct P {
4113        start: String,
4114        end: String,
4115    }
4116    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4117    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4118        .map_err(|e| format!("parse start: {}", e))?;
4119    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4120        .map_err(|e| format!("parse end: {}", e))?;
4121    car_ffi_common::health::activity(s, e)
4122}
4123
4124async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4125    let closed = session.browser.close().await?;
4126    Ok(serde_json::json!({"closed": closed}))
4127}
4128
4129async fn handle_browser_run(
4130    req: &JsonRpcMessage,
4131    session: &crate::session::ClientSession,
4132) -> Result<Value, String> {
4133    #[derive(serde::Deserialize)]
4134    struct BrowserRunParams {
4135        /// Inline JSON string (CLI-compatible), OR the structured object.
4136        script: Value,
4137        #[serde(default)]
4138        width: Option<u32>,
4139        #[serde(default)]
4140        height: Option<u32>,
4141        /// When true, launches a visible Chromium window for interactive
4142        /// flows (first-time auth, 2FA, supervised runs). Only honored on
4143        /// the call that first launches the browser session — subsequent
4144        /// calls reuse the existing browser regardless.
4145        #[serde(default)]
4146        headed: Option<bool>,
4147        /// Extra Chromium command-line flags appended verbatim at
4148        /// launch (#112). Honoured only on the launch call.
4149        #[serde(default)]
4150        extra_args: Option<Vec<String>>,
4151    }
4152    let params: BrowserRunParams =
4153        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4154
4155    // Accept either a JSON string OR a structured object under `script`.
4156    let script_json = match params.script {
4157        Value::String(s) => s,
4158        other => other.to_string(),
4159    };
4160
4161    let browser_session = session
4162        .browser
4163        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4164            width: params.width.unwrap_or(1280),
4165            height: params.height.unwrap_or(720),
4166            headless: !params.headed.unwrap_or(false),
4167            extra_args: params.extra_args.unwrap_or_default(),
4168        })
4169        .await?;
4170
4171    let trace_json = browser_session.run(&script_json).await?;
4172    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4173}
4174
4175// ---------------------------------------------------------------------------
4176// Voice streaming JSON-RPC methods
4177//
4178// Events are pushed back to the originating client as JSON-RPC notifications:
4179//   { "jsonrpc": "2.0", "method": "voice.event",
4180//     "params": { "session_id": "...", "event": {...} } }
4181//
4182// The session registry is process-wide (ServerState.voice_sessions); per-call
4183// WsVoiceEventSink instances bind each session to its originating WS so a
4184// client only ever sees events for sessions it started.
4185// ---------------------------------------------------------------------------
4186
4187#[derive(Deserialize)]
4188struct VoiceStartParams {
4189    session_id: String,
4190    audio_source: Value,
4191    #[serde(default)]
4192    options: Option<Value>,
4193}
4194
4195async fn handle_voice_transcribe_stream_start(
4196    req: &JsonRpcMessage,
4197    state: &Arc<ServerState>,
4198    session: &Arc<crate::session::ClientSession>,
4199) -> Result<Value, String> {
4200    let params: VoiceStartParams =
4201        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4202    let audio_source_json =
4203        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
4204    let options_json = params
4205        .options
4206        .as_ref()
4207        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4208        .transpose()?;
4209    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4210        channel: session.channel.clone(),
4211    });
4212    let json = car_ffi_common::voice::transcribe_stream_start(
4213        &params.session_id,
4214        &audio_source_json,
4215        options_json.as_deref(),
4216        state.voice_sessions.clone(),
4217        sink,
4218    )
4219    .await?;
4220    serde_json::from_str(&json).map_err(|e| e.to_string())
4221}
4222
4223#[derive(Deserialize)]
4224struct VoiceStopParams {
4225    session_id: String,
4226}
4227
4228async fn handle_voice_transcribe_stream_stop(
4229    req: &JsonRpcMessage,
4230    state: &Arc<ServerState>,
4231) -> Result<Value, String> {
4232    let params: VoiceStopParams =
4233        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4234    let json = car_ffi_common::voice::transcribe_stream_stop(
4235        &params.session_id,
4236        state.voice_sessions.clone(),
4237    )
4238    .await?;
4239    serde_json::from_str(&json).map_err(|e| e.to_string())
4240}
4241
4242#[derive(Deserialize)]
4243struct VoicePushParams {
4244    session_id: String,
4245    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
4246    /// audio frames have to be encoded; clients in WS-binary contexts that
4247    /// want to skip the round trip can call the FFI directly.
4248    pcm_b64: String,
4249}
4250
4251async fn handle_voice_transcribe_stream_push(
4252    req: &JsonRpcMessage,
4253    state: &Arc<ServerState>,
4254) -> Result<Value, String> {
4255    use base64::Engine;
4256    let params: VoicePushParams =
4257        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4258    let pcm = base64::engine::general_purpose::STANDARD
4259        .decode(&params.pcm_b64)
4260        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4261    let json = car_ffi_common::voice::transcribe_stream_push(
4262        &params.session_id,
4263        &pcm,
4264        state.voice_sessions.clone(),
4265    )
4266    .await?;
4267    serde_json::from_str(&json).map_err(|e| e.to_string())
4268}
4269
4270fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4271    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4272    serde_json::from_str(&json).unwrap_or(Value::Null)
4273}
4274
4275async fn handle_voice_dispatch_turn(
4276    req: &JsonRpcMessage,
4277    state: &Arc<ServerState>,
4278    session: &Arc<crate::session::ClientSession>,
4279) -> Result<Value, String> {
4280    let req_value = req.params.clone();
4281    let request: crate::voice_turn::DispatchVoiceTurnRequest =
4282        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4283    let engine = get_inference_engine(state).clone();
4284    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4285        channel: session.channel.clone(),
4286    });
4287    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4288    serde_json::to_value(resp).map_err(|e| e.to_string())
4289}
4290
4291async fn handle_voice_cancel_turn() -> Result<Value, String> {
4292    crate::voice_turn::cancel().await;
4293    Ok(serde_json::json!({"cancelled": true}))
4294}
4295
4296async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4297    let engine = get_inference_engine(state).clone();
4298    crate::voice_turn::prewarm(engine).await;
4299    Ok(serde_json::json!({"prewarmed": true}))
4300}
4301
4302// ---------------------------------------------------------------------------
4303// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
4304//
4305// Bidirectional protocol shape:
4306//   1. Client → server: `inference.register_runner` (no params). The
4307//      session that calls this becomes the host for delegated models.
4308//   2. Server → client: `inference.runner.invoke` notification with
4309//      {call_id, request} when CAR needs to dispatch a delegated turn.
4310//   3. Client → server: `inference.runner.event` with {call_id, event}
4311//      for each chunk; `inference.runner.complete` with {call_id, result}
4312//      on success; `inference.runner.fail` with {call_id, error} on
4313//      failure.
4314//
4315// The server-side data is process-wide because only one inference
4316// runner can be registered at a time (matches the FFI bindings'
4317// constraint). The per-call mailboxes live in dedicated DashMaps.
4318// ---------------------------------------------------------------------------
4319
4320fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4321    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4322        std::sync::OnceLock::new();
4323    SLOT.get_or_init(|| std::sync::RwLock::new(None))
4324}
4325
4326fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4327    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4328        std::sync::OnceLock::new();
4329    MAP.get_or_init(dashmap::DashMap::new)
4330}
4331
4332fn ws_runner_completions() -> &'static dashmap::DashMap<
4333    String,
4334    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4335> {
4336    static MAP: std::sync::OnceLock<
4337        dashmap::DashMap<
4338            String,
4339            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4340        >,
4341    > = std::sync::OnceLock::new();
4342    MAP.get_or_init(dashmap::DashMap::new)
4343}
4344
4345struct WsInferenceRunner;
4346
4347#[async_trait::async_trait]
4348impl car_inference::InferenceRunner for WsInferenceRunner {
4349    async fn run(
4350        &self,
4351        request: car_inference::tasks::generate::GenerateRequest,
4352        emitter: car_inference::EventEmitter,
4353    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4354        let channel = ws_runner_session()
4355            .read()
4356            .map_err(|e| {
4357                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4358            })?
4359            .clone()
4360            .ok_or_else(|| {
4361                car_inference::RunnerError::Declined(
4362                    "no WebSocket inference runner registered — call inference.register_runner first"
4363                        .into(),
4364                )
4365            })?;
4366
4367        let call_id = uuid::Uuid::new_v4().to_string();
4368        let request_json = serde_json::to_value(&request)
4369            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4370        let (tx, rx) = tokio::sync::oneshot::channel();
4371        ws_runner_calls().insert(call_id.clone(), emitter);
4372        ws_runner_completions().insert(call_id.clone(), tx);
4373
4374        // Fire the invoke notification.
4375        use futures::SinkExt;
4376        let notification = serde_json::json!({
4377            "jsonrpc": "2.0",
4378            "method": "inference.runner.invoke",
4379            "params": {
4380                "call_id": call_id,
4381                "request": request_json,
4382            },
4383        });
4384        let text = serde_json::to_string(&notification)
4385            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4386        let _ = channel
4387            .write
4388            .lock()
4389            .await
4390            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4391            .await;
4392
4393        let result = rx.await.map_err(|_| {
4394            car_inference::RunnerError::Failed("runner completion channel dropped".into())
4395        })?;
4396        ws_runner_calls().remove(&call_id);
4397        result.map_err(car_inference::RunnerError::Failed)
4398    }
4399}
4400
4401async fn handle_inference_register_runner(
4402    session: &Arc<crate::session::ClientSession>,
4403) -> Result<Value, String> {
4404    let mut guard = ws_runner_session()
4405        .write()
4406        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4407    *guard = Some(session.channel.clone());
4408    drop(guard);
4409    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4410    Ok(serde_json::json!({"registered": true}))
4411}
4412
4413#[derive(serde::Deserialize)]
4414struct InferenceRunnerEventParams {
4415    call_id: String,
4416    event: Value,
4417}
4418
4419async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4420    let params: InferenceRunnerEventParams =
4421        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4422    let stream_event = match parse_runner_event_value(&params.event) {
4423        Some(e) => e,
4424        None => return Err("unrecognised runner event shape".into()),
4425    };
4426    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
4427        let emitter = entry.value().clone();
4428        tokio::spawn(async move { emitter.emit(stream_event).await });
4429    }
4430    Ok(serde_json::json!({"emitted": true}))
4431}
4432
4433#[derive(serde::Deserialize)]
4434struct InferenceRunnerCompleteParams {
4435    call_id: String,
4436    result: Value,
4437}
4438
4439async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4440    let params: InferenceRunnerCompleteParams =
4441        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4442    let result: std::result::Result<car_inference::RunnerResult, String> =
4443        serde_json::from_value(params.result)
4444            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4445    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4446        let _ = tx.send(result);
4447    }
4448    Ok(serde_json::json!({"completed": true}))
4449}
4450
4451#[derive(serde::Deserialize)]
4452struct InferenceRunnerFailParams {
4453    call_id: String,
4454    error: String,
4455}
4456
4457async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4458    let params: InferenceRunnerFailParams =
4459        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4460    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4461        let _ = tx.send(Err(params.error));
4462    }
4463    Ok(serde_json::json!({"failed": true}))
4464}
4465
4466fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4467    let ty = v.get("type").and_then(|t| t.as_str())?;
4468    match ty {
4469        "text" => Some(car_inference::StreamEvent::TextDelta(
4470            v.get("data")?.as_str()?.to_string(),
4471        )),
4472        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4473            name: v.get("name")?.as_str()?.to_string(),
4474            index: v.get("index")?.as_u64()? as usize,
4475            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4476        }),
4477        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4478            index: v.get("index")?.as_u64()? as usize,
4479            arguments_delta: v.get("data")?.as_str()?.to_string(),
4480        }),
4481        "usage" => Some(car_inference::StreamEvent::Usage {
4482            input_tokens: v.get("input_tokens")?.as_u64()?,
4483            output_tokens: v.get("output_tokens")?.as_u64()?,
4484        }),
4485        "done" => Some(car_inference::StreamEvent::Done {
4486            text: v.get("text")?.as_str()?.to_string(),
4487            tool_calls: v
4488                .get("tool_calls")
4489                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4490                .unwrap_or_default(),
4491        }),
4492        _ => None,
4493    }
4494}
4495
4496#[derive(Deserialize)]
4497struct EnrollSpeakerParams {
4498    label: String,
4499    audio: Value,
4500}
4501
4502async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4503    let params: EnrollSpeakerParams =
4504        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4505    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
4506    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
4507    serde_json::from_str(&json).map_err(|e| e.to_string())
4508}
4509
4510#[derive(Deserialize)]
4511struct RemoveEnrollmentParams {
4512    label: String,
4513}
4514
4515fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4516    let params: RemoveEnrollmentParams =
4517        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4518    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
4519    serde_json::from_str(&json).map_err(|e| e.to_string())
4520}
4521
4522#[derive(Deserialize)]
4523struct WorkflowRunParams {
4524    workflow: Value,
4525}
4526
4527async fn handle_workflow_run(
4528    req: &JsonRpcMessage,
4529    session: &Arc<crate::session::ClientSession>,
4530) -> Result<Value, String> {
4531    let params: WorkflowRunParams =
4532        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4533    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4534    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4535        channel: session.channel.clone(),
4536        host: session.host.clone(),
4537        client_id: session.client_id.clone(),
4538    });
4539    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4540    serde_json::from_str(&json).map_err(|e| e.to_string())
4541}
4542
4543#[derive(Deserialize)]
4544struct WorkflowVerifyParams {
4545    workflow: Value,
4546}
4547
4548fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4549    let params: WorkflowVerifyParams =
4550        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4551    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4552    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4553    serde_json::from_str(&json).map_err(|e| e.to_string())
4554}
4555
4556// ---------------------------------------------------------------------------
4557// Meeting JSON-RPC methods
4558// ---------------------------------------------------------------------------
4559
4560async fn handle_meeting_start(
4561    req: &JsonRpcMessage,
4562    state: &Arc<ServerState>,
4563    session: &Arc<crate::session::ClientSession>,
4564) -> Result<Value, String> {
4565    // We need the meeting id BEFORE handing the upstream sink to
4566    // start_meeting so the WsMemgineIngestSink stamps transcripts with
4567    // the correct `meeting/<id>/<source>` speaker. Parse the request
4568    // here, mint an id if none was provided, and pass the same id
4569    // through to start_meeting via the request JSON.
4570    let mut req_value = req.params.clone();
4571    let meeting_id = req_value
4572        .get("id")
4573        .and_then(|v| v.as_str())
4574        .map(str::to_string)
4575        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4576    if let Some(map) = req_value.as_object_mut() {
4577        map.insert("id".into(), Value::String(meeting_id.clone()));
4578    }
4579    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4580
4581    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4582        Arc::new(crate::session::WsVoiceEventSink {
4583            channel: session.channel.clone(),
4584        });
4585
4586    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4587    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4588    // the FFI-common `start_meeting` memgine arg to avoid the
4589    // sync-mutex contract there — ingest happens here instead.
4590    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4591        Arc::new(crate::session::WsMemgineIngestSink {
4592            meeting_id,
4593            engine: session.memgine.clone(),
4594            upstream: ws_upstream,
4595        });
4596
4597    let cwd = std::env::current_dir().ok();
4598    let json = crate::meeting::start_meeting(
4599        &request_json,
4600        state.meetings.clone(),
4601        state.voice_sessions.clone(),
4602        upstream,
4603        None,
4604        cwd,
4605    )
4606    .await?;
4607    serde_json::from_str(&json).map_err(|e| e.to_string())
4608}
4609
4610#[derive(Deserialize)]
4611struct MeetingStopParams {
4612    meeting_id: String,
4613    #[serde(default = "default_summarize")]
4614    summarize: bool,
4615}
4616
4617fn default_summarize() -> bool {
4618    true
4619}
4620
4621async fn handle_meeting_stop(
4622    req: &JsonRpcMessage,
4623    state: &Arc<ServerState>,
4624    _session: &Arc<crate::session::ClientSession>,
4625) -> Result<Value, String> {
4626    let params: MeetingStopParams =
4627        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4628    let inference = if params.summarize {
4629        Some(state.inference.get().cloned()).flatten()
4630    } else {
4631        None
4632    };
4633    let json = crate::meeting::stop_meeting(
4634        &params.meeting_id,
4635        params.summarize,
4636        state.meetings.clone(),
4637        state.voice_sessions.clone(),
4638        inference,
4639    )
4640    .await?;
4641    serde_json::from_str(&json).map_err(|e| e.to_string())
4642}
4643
4644#[derive(Deserialize, Default)]
4645struct MeetingListParams {
4646    #[serde(default)]
4647    root: Option<std::path::PathBuf>,
4648}
4649
4650fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4651    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4652    let cwd = std::env::current_dir().ok();
4653    let json = crate::meeting::list_meetings(params.root, cwd)?;
4654    serde_json::from_str(&json).map_err(|e| e.to_string())
4655}
4656
4657#[derive(Deserialize)]
4658struct MeetingGetParams {
4659    meeting_id: String,
4660    #[serde(default)]
4661    root: Option<std::path::PathBuf>,
4662}
4663
4664fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4665    let params: MeetingGetParams =
4666        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4667    let cwd = std::env::current_dir().ok();
4668    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4669    serde_json::from_str(&json).map_err(|e| e.to_string())
4670}
4671
4672// ---------------------------------------------------------------------------
4673// Agent registry — file-based cross-process discovery (#111)
4674// ---------------------------------------------------------------------------
4675
4676#[derive(Deserialize, Default)]
4677struct RegistryRegisterParams {
4678    /// Caller serializes their AgentEntry as a JSON value; we
4679    /// re-serialize it so the ffi-common helper can validate the
4680    /// shape with the same parser used by the bindings.
4681    entry: Value,
4682    #[serde(default)]
4683    registry_path: Option<std::path::PathBuf>,
4684}
4685
4686fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4687    let params: RegistryRegisterParams =
4688        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4689    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4690    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4691    Ok(Value::Null)
4692}
4693
4694#[derive(Deserialize, Default)]
4695struct RegistryNameParams {
4696    name: String,
4697    #[serde(default)]
4698    registry_path: Option<std::path::PathBuf>,
4699}
4700
4701fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4702    let params: RegistryNameParams =
4703        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4704    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4705    serde_json::from_str(&json).map_err(|e| e.to_string())
4706}
4707
4708fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4709    let params: RegistryNameParams =
4710        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4711    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4712    Ok(Value::Null)
4713}
4714
4715#[derive(Deserialize, Default)]
4716struct RegistryListParams {
4717    #[serde(default)]
4718    registry_path: Option<std::path::PathBuf>,
4719}
4720
4721fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4722    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4723    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4724    serde_json::from_str(&json).map_err(|e| e.to_string())
4725}
4726
4727#[derive(Deserialize, Default)]
4728struct RegistryReapParams {
4729    /// Heartbeats older than this many seconds are reaped. Default
4730    /// 60 — two missed 20s heartbeats trigger removal.
4731    #[serde(default = "default_reap_age")]
4732    max_age_secs: u64,
4733    #[serde(default)]
4734    registry_path: Option<std::path::PathBuf>,
4735}
4736
4737fn default_reap_age() -> u64 {
4738    60
4739}
4740
4741fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4742    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4743    let json =
4744        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4745    serde_json::from_str(&json).map_err(|e| e.to_string())
4746}
4747
4748// ---------------------------------------------------------------------------
4749// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4750// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4751// a2a_server_status — closes the binding gap noted in #126).
4752// ---------------------------------------------------------------------------
4753
4754async fn handle_a2a_start(
4755    req: &JsonRpcMessage,
4756    session: &crate::session::ClientSession,
4757) -> Result<Value, String> {
4758    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4759    // Always hand the session's runtime through. start_a2a uses it
4760    // only when `share_session_runtime: true` is set in params;
4761    // otherwise it falls back to the legacy fresh-Runtime + agent_basics
4762    // path. Passing it unconditionally keeps the FFI layer ignorant of
4763    // the flag's plumbing.
4764    let json = crate::a2a::start_a2a(&params_json, Some(session.runtime.clone())).await?;
4765    serde_json::from_str(&json).map_err(|e| e.to_string())
4766}
4767
4768fn handle_a2a_stop() -> Result<Value, String> {
4769    let json = crate::a2a::stop_a2a()?;
4770    serde_json::from_str(&json).map_err(|e| e.to_string())
4771}
4772
4773fn handle_a2a_status() -> Result<Value, String> {
4774    let json = crate::a2a::a2a_status()?;
4775    serde_json::from_str(&json).map_err(|e| e.to_string())
4776}
4777
4778#[derive(Deserialize)]
4779#[serde(rename_all = "camelCase")]
4780struct A2aSendParams {
4781    endpoint: String,
4782    message: car_a2a::Message,
4783    #[serde(default)]
4784    blocking: bool,
4785    #[serde(default = "default_true")]
4786    ingest_a2ui: bool,
4787    #[serde(default)]
4788    route_auth: Option<A2aRouteAuth>,
4789    #[serde(default)]
4790    allow_untrusted_endpoint: bool,
4791}
4792
4793fn default_true() -> bool {
4794    true
4795}
4796
4797/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
4798/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
4799/// on `ServerState`. Closes Parslee-ai/car-releases#28.
4800///
4801/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
4802/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
4803/// transport-neutral surface — the standalone `start_a2a_listener`
4804/// HTTP path serves SSE for those, but the in-core WS surface is
4805/// JSON-RPC only. Same trade as the dispatcher itself.
4806async fn handle_a2a_dispatch(
4807    method: &str,
4808    req: &JsonRpcMessage,
4809    state: &Arc<ServerState>,
4810) -> Result<Value, String> {
4811    let dispatcher = state.a2a_dispatcher().await;
4812    dispatcher
4813        .dispatch(method, req.params.clone())
4814        .await
4815        .map_err(|e| e.to_string())
4816}
4817
4818async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4819    let params: A2aSendParams =
4820        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4821    let endpoint = trusted_route_endpoint(
4822        Some(params.endpoint.clone()),
4823        params.allow_untrusted_endpoint,
4824    )
4825    .ok_or_else(|| {
4826        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4827    })?;
4828    let client = match params.route_auth.clone() {
4829        Some(auth) => {
4830            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4831        }
4832        None => car_a2a::A2aClient::new(endpoint.clone()),
4833    };
4834    let result = client
4835        .send_message(params.message, params.blocking)
4836        .await
4837        .map_err(|e| e.to_string())?;
4838    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4839    let mut applied = Vec::new();
4840    if params.ingest_a2ui {
4841        state
4842            .a2ui
4843            .validate_payload(&result_value)
4844            .map_err(|e| e.to_string())?;
4845        let routed_endpoint = Some(endpoint.clone());
4846        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4847            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4848                if owner.endpoint.is_none() {
4849                    owner.with_endpoint(routed_endpoint.clone())
4850                } else {
4851                    owner
4852                }
4853            });
4854            applied.push(
4855                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4856            );
4857        }
4858    }
4859    Ok(serde_json::json!({
4860        "result": result,
4861        "a2ui": {
4862            "applied": applied,
4863        }
4864    }))
4865}
4866
4867// ---------------------------------------------------------------------------
4868// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
4869// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
4870// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
4871// vision_ocr.
4872// ---------------------------------------------------------------------------
4873
4874async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4875    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4876    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4877    serde_json::from_str(&json).map_err(|e| e.to_string())
4878}
4879
4880async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4881    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4882    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4883    serde_json::from_str(&json).map_err(|e| e.to_string())
4884}
4885
4886async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4887    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4888    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4889    serde_json::from_str(&json).map_err(|e| e.to_string())
4890}
4891
4892async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4893    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4894    let json = car_ffi_common::notifications::local(&args_json).await?;
4895    serde_json::from_str(&json).map_err(|e| e.to_string())
4896}
4897
4898async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4899    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4900    let json = car_ffi_common::vision::ocr(&args_json).await?;
4901    serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904// ---------------------------------------------------------------------------
4905// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
4906// ---------------------------------------------------------------------------
4907
4908async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4909    // Observe-only mode (Parslee-ai/car-releases#44): a second
4910    // car-server on the host can't take the supervisor lock, so it
4911    // can't drive `Supervisor::list` — but it can still answer
4912    // `agents.list` by reading the on-disk manifest directly. The
4913    // `attached` decoration is local to whichever daemon the caller
4914    // is talking to, so observer-mode entries return `attached:
4915    // false` (this daemon hasn't received `session.auth` from those
4916    // children; the primary one has).
4917    let agents = match state.observer_manifest_path() {
4918        Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
4919            .map_err(|e| e.to_string())?,
4920        None => {
4921            let supervisor = state.supervisor()?;
4922            supervisor.list().await
4923        }
4924    };
4925    // Decorate each entry with `attached` + `session_id` so operators
4926    // see whether the supervised process has actually called
4927    // `session.auth { agent_id }` and bound a WS connection (#169) —
4928    // the lifecycle status (`Running`, etc.) only reports the
4929    // process-level view, which can't tell "alive but never
4930    // attached" from "alive and attached".
4931    let attached = state.attached_agents.lock().await.clone();
4932    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4933    for a in agents {
4934        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4935        let session_id = attached.get(&a.spec.id).cloned();
4936        if let Some(map) = v.as_object_mut() {
4937            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4938            if let Some(sid) = session_id {
4939                map.insert("session_id".to_string(), Value::String(sid));
4940            }
4941        }
4942        decorated.push(v);
4943    }
4944    Ok(Value::Array(decorated))
4945}
4946
4947async fn handle_agents_upsert(
4948    req: &JsonRpcMessage,
4949    state: &Arc<ServerState>,
4950) -> Result<Value, String> {
4951    let mut params = req.params.clone();
4952    // Optional `interpreter` sugar (#171). When present, the
4953    // supervisor resolves the bare program name (`"node"`,
4954    // `"python"`, …) against `$PATH` and writes the absolute path
4955    // into `command` *before* validation. This keeps the strict
4956    // no-PATH-lookup rule at upsert time while letting callers
4957    // stop hand-coding `/opt/homebrew/bin/node` into every
4958    // agents.json entry. Resolution happens once; subsequent PATH
4959    // changes do not silently rewire the binding.
4960    if let Some(name) = params
4961        .get("interpreter")
4962        .and_then(|v| v.as_str())
4963        .map(str::to_string)
4964    {
4965        let resolved =
4966            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
4967        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4968    }
4969    let spec: car_registry::supervisor::AgentSpec =
4970        serde_json::from_value(params).map_err(|e| e.to_string())?;
4971    let supervisor = state.supervisor()?;
4972    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4973    serde_json::to_value(agent).map_err(|e| e.to_string())
4974}
4975
4976/// `agents.install` — install a contributed-agent manifest
4977/// (Parslee-ai/car#182 phase 3). Caller passes the parsed
4978/// `AgentManifest` JSON; the daemon runs install-time validation
4979/// (`car_min_version`, capability negotiation against the daemon's
4980/// own advertisement) and adopts the manifest. Returns
4981/// `{ report, agent? }` where `agent` is the spawnable
4982/// `ManagedAgent` for `external_process` transports and absent for
4983/// `pure_data` / health_url-only manifests.
4984///
4985/// The host capability advertisement comes from
4986/// `HostCapabilities::daemon_default(car_version)` — operators that
4987/// want a tighter advertisement go through a future config phase;
4988/// this MVP uses the runtime's natural surface.
4989async fn handle_agents_install(
4990    req: &JsonRpcMessage,
4991    state: &Arc<ServerState>,
4992) -> Result<Value, String> {
4993    let manifest: car_registry::manifest::AgentManifest =
4994        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4995    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
4996    let supervisor = state.supervisor()?;
4997    let (report, managed) = supervisor
4998        .install_manifest(manifest, &host)
4999        .await
5000        .map_err(|e| e.to_string())?;
5001    Ok(serde_json::json!({
5002        "report": {
5003            "missingOptional": report
5004                .missing_optional
5005                .iter()
5006                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5007                .collect::<Vec<_>>(),
5008        },
5009        "agent": managed,
5010    }))
5011}
5012
5013async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5014    // Observe-only mode (Parslee-ai/car-releases#44) — see
5015    // `handle_agents_list` for the rationale. The health view is a
5016    // pure function of each entry's `command` plus the on-disk
5017    // sandbox rules, so reading from the manifest is equivalent to
5018    // calling the live supervisor's `health()`.
5019    let entries = match state.observer_manifest_path() {
5020        Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5021            .map_err(|e| e.to_string())?,
5022        None => {
5023            let supervisor = state.supervisor()?;
5024            supervisor.health().await
5025        }
5026    };
5027    serde_json::to_value(entries).map_err(|e| e.to_string())
5028}
5029
5030fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5031    req.params
5032        .get("id")
5033        .and_then(Value::as_str)
5034        .map(str::to_string)
5035        .ok_or_else(|| "missing required `id` parameter".to_string())
5036}
5037
5038async fn handle_agents_remove(
5039    req: &JsonRpcMessage,
5040    state: &Arc<ServerState>,
5041) -> Result<Value, String> {
5042    let id = extract_agent_id(req)?;
5043    let supervisor = state.supervisor()?;
5044    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5045    Ok(serde_json::json!({ "removed": removed }))
5046}
5047
5048async fn handle_agents_start(
5049    req: &JsonRpcMessage,
5050    state: &Arc<ServerState>,
5051) -> Result<Value, String> {
5052    let id = extract_agent_id(req)?;
5053    let supervisor = state.supervisor()?;
5054    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5055    serde_json::to_value(agent).map_err(|e| e.to_string())
5056}
5057
5058async fn handle_agents_stop(
5059    req: &JsonRpcMessage,
5060    state: &Arc<ServerState>,
5061) -> Result<Value, String> {
5062    let id = extract_agent_id(req)?;
5063    let signal: car_registry::supervisor::StopSignal = req
5064        .params
5065        .get("signal")
5066        .map(|v| serde_json::from_value(v.clone()))
5067        .transpose()
5068        .map_err(|e| e.to_string())?
5069        .unwrap_or_default();
5070    let supervisor = state.supervisor()?;
5071    let agent = supervisor
5072        .stop(&id, signal)
5073        .await
5074        .map_err(|e| e.to_string())?;
5075    serde_json::to_value(agent).map_err(|e| e.to_string())
5076}
5077
5078async fn handle_agents_restart(
5079    req: &JsonRpcMessage,
5080    state: &Arc<ServerState>,
5081) -> Result<Value, String> {
5082    let id = extract_agent_id(req)?;
5083    let supervisor = state.supervisor()?;
5084    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5085    serde_json::to_value(agent).map_err(|e| e.to_string())
5086}
5087
5088async fn handle_agents_tail_log(
5089    req: &JsonRpcMessage,
5090    state: &Arc<ServerState>,
5091) -> Result<Value, String> {
5092    let id = extract_agent_id(req)?;
5093    let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5094    let supervisor = state.supervisor()?;
5095    let lines = supervisor
5096        .tail_log(&id, n)
5097        .await
5098        .map_err(|e| e.to_string())?;
5099    Ok(serde_json::json!({ "lines": lines }))
5100}
5101
5102// ---------------------------------------------------------------------------
5103// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
5104//
5105// Discovery surface for agentic CLIs the user has already installed and
5106// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
5107// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
5108// stdio adapter. The cache lives in car_ffi_common::external_agents so
5109// the in-process FFI singletons share the same snapshot.
5110// ---------------------------------------------------------------------------
5111
5112async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5113    let include_health = req
5114        .params
5115        .get("include_health")
5116        .and_then(Value::as_bool)
5117        .unwrap_or(false);
5118    let json = car_ffi_common::external_agents::list(include_health).await?;
5119    serde_json::from_str(&json).map_err(|e| e.to_string())
5120}
5121
5122async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5123    let include_health = req
5124        .params
5125        .get("include_health")
5126        .and_then(Value::as_bool)
5127        .unwrap_or(false);
5128    let json = car_ffi_common::external_agents::detect(include_health).await?;
5129    serde_json::from_str(&json).map_err(|e| e.to_string())
5130}
5131
5132/// Per-task invocation of an external CLI agent. Required params:
5133/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
5134/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
5135///
5136/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
5137/// ids return `is_error: true` with a structured `error` so hosts
5138/// can surface the gap without a separate error code.
5139///
5140/// Phase 2 stage 4a (governance): every invocation appends a
5141/// structured audit record to `~/.car/external-agents.jsonl`. The
5142/// record captures id, task, options, result, and the full
5143/// `tool_uses` list the assistant emitted — so even though the
5144/// agent executes its built-in tools in-process (which we can't
5145/// gate via stream-json), there's a complete after-the-fact audit
5146/// trail. Full policy gating (proposing each tool_use to CAR's
5147/// validator + getting a yes/no) requires the MCP server route in
5148/// stage 4b.
5149async fn handle_agents_invoke_external(
5150    req: &JsonRpcMessage,
5151    state: &Arc<ServerState>,
5152    host_session: &Arc<crate::session::ClientSession>,
5153) -> Result<Value, String> {
5154    let id = req
5155        .params
5156        .get("id")
5157        .and_then(Value::as_str)
5158        .ok_or_else(|| "missing required `id` parameter".to_string())?
5159        .to_string();
5160    let task = req
5161        .params
5162        .get("task")
5163        .and_then(Value::as_str)
5164        .ok_or_else(|| "missing required `task` parameter".to_string())?
5165        .to_string();
5166    let stream = req
5167        .params
5168        .get("stream")
5169        .and_then(Value::as_bool)
5170        .unwrap_or(false);
5171    let session_id = req
5172        .params
5173        .get("session_id")
5174        .and_then(Value::as_str)
5175        .map(str::to_string)
5176        .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5177
5178    // Build the options sub-object directly from req.params so
5179    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
5180    // `timeout_secs` as siblings of `id`/`task`. Strip the
5181    // dispatch + streaming fields so they don't pollute the
5182    // options serde.
5183    let mut options_value = req.params.clone();
5184    if let Some(obj) = options_value.as_object_mut() {
5185        obj.remove("id");
5186        obj.remove("task");
5187        obj.remove("stream");
5188        obj.remove("session_id");
5189        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
5190        // caller didn't supply one. This is the load-bearing
5191        // wiring of MCP-4: external agents get CAR's tools (memory,
5192        // skills, verify) routed through the daemon's policy +
5193        // shared memgine without any per-call host configuration.
5194        // Callers who want to opt out can pass `"mcp_endpoint": ""`
5195        // (empty string) — the runner skips the temp-file write
5196        // when the value isn't a non-empty URL.
5197        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5198        if !has_explicit_mcp {
5199            if let Some(url) = state.mcp_url.get() {
5200                obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5201            }
5202        }
5203    }
5204
5205    if !stream {
5206        // Legacy one-shot path. Unchanged shape for FFI consumers
5207        // and any caller that hasn't opted into streaming.
5208        let options_json = options_value.to_string();
5209        let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5210        let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5211        append_external_agent_audit(&id, &task, &options_value, &result);
5212        return Ok(result);
5213    }
5214
5215    // Streaming path. Returns an ack ({accepted, session_id})
5216    // immediately and streams `agents.chat.event` notifications
5217    // to the host's WS as the runner emits StreamEvents. Reuses
5218    // the chat_sessions routing infrastructure supervised agents
5219    // use — host UIs render both kinds through the same path.
5220    let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5221        .map_err(|e| format!("invalid options: {e}"))?;
5222
5223    // Register the chat session BEFORE spawning so the host's
5224    // subscriber is correctly bound to this session_id by the
5225    // time the first event arrives. Reusing the supervised
5226    // agent chat infrastructure means `agents.chat.cancel`
5227    // routes to chat_sessions[session_id] and can find this
5228    // entry — though we don't currently honor cancel for
5229    // external invocations (the child process is killed only
5230    // on timeout or task drop; a future iteration can plumb a
5231    // CancellationToken through invoke_with_emitter).
5232    {
5233        // If a chat session is already registered for this id (the
5234        // typical proxy shape: host → agents.chat → supervised agent
5235        // → agents.invoke_external with the same session_id), DO NOT
5236        // overwrite it. The existing entry owns the routing to the
5237        // original host; clobbering it with our caller's client_id
5238        // would send streaming events to the proxying agent instead
5239        // of back to the host that issued agents.chat. Only register
5240        // a fresh entry when the slot is empty (a direct
5241        // host-to-invoke_external call without a prior agents.chat).
5242        let mut chats = state.chat_sessions.lock().await;
5243        chats.entry(session_id.clone()).or_insert_with(|| {
5244            let created_at = std::time::SystemTime::now()
5245                .duration_since(std::time::UNIX_EPOCH)
5246                .map(|d| d.as_secs())
5247                .unwrap_or(0);
5248            crate::session::ChatSession {
5249                agent_id: id.clone(),
5250                host_client_id: host_session.client_id.clone(),
5251                created_at,
5252            }
5253        });
5254    }
5255
5256    // Single drain task pulls StreamEvents off an unbounded
5257    // channel and serializes WS sends to the host. Per-event
5258    // tokio::spawn would let sends race (which token arrives
5259    // first depends on lock acquisition order). The channel
5260    // is unbounded because claude's event volume is bounded
5261    // by user turn count — typically <50 events per invocation.
5262    use tokio::sync::mpsc;
5263    let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5264
5265    let drain_state = state.clone();
5266    let drain_session_id = session_id.clone();
5267    let drain_agent_id = id.clone();
5268    tokio::spawn(async move {
5269        while let Some(event) = rx.recv().await {
5270            emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5271        }
5272    });
5273
5274    let emitter_tx = tx.clone();
5275    let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5276        // Send failure (rx dropped) means the drain task has
5277        // exited — usually because the host disconnected. The
5278        // runner will keep going; let it finish so the audit
5279        // log captures the full result.
5280        let _ = emitter_tx.send(event);
5281    });
5282
5283    // Run the invocation in a separate task so this handler
5284    // can return the ack right away. The runner's child-process
5285    // future owns the spawn lifetime; if the host disconnects
5286    // mid-stream, the runner still completes (its events fall
5287    // on the floor at the drain layer) so the audit log lands.
5288    let spawn_state = state.clone();
5289    let spawn_session_id = session_id.clone();
5290    let spawn_id = id.clone();
5291    let spawn_task = task.clone();
5292    let spawn_options = options_value.clone();
5293    tokio::spawn(async move {
5294        let outcome =
5295            car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5296                .await;
5297        drop(tx); // signal drain task to exit after queue empties
5298
5299        // Synthesize a terminal agents.chat.event so the host's
5300        // bubble finalizes. The runner doesn't emit a "done" event
5301        // itself — the result is the aggregate InvokeResult. We
5302        // translate it here.
5303        let terminal_params: Value;
5304        let result_value: Value;
5305        match outcome {
5306            Ok(res) => {
5307                // Pack the metadata into `finish_reason` as a
5308                // human-readable summary so the host's existing
5309                // ChatEvent decoder surfaces it without a schema
5310                // change. Hosts that want structured data can
5311                // re-issue `agents.invoke_external` with
5312                // `stream: false` and read the InvokeResult.
5313                let mut parts: Vec<String> = Vec::new();
5314                if res.turns > 0 {
5315                    parts.push(format!(
5316                        "{} turn{}",
5317                        res.turns,
5318                        if res.turns == 1 { "" } else { "s" }
5319                    ));
5320                }
5321                if res.tool_calls > 0 {
5322                    parts.push(format!(
5323                        "{} tool{}",
5324                        res.tool_calls,
5325                        if res.tool_calls == 1 { "" } else { "s" }
5326                    ));
5327                }
5328                if res.duration_ms > 0 {
5329                    parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5330                }
5331                let summary = if parts.is_empty() {
5332                    "stop".to_string()
5333                } else {
5334                    parts.join(" · ")
5335                };
5336                if res.is_error {
5337                    terminal_params = serde_json::json!({
5338                        "session_id": spawn_session_id,
5339                        "agent_id": spawn_id,
5340                        "kind": "error",
5341                        "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5342                    });
5343                } else {
5344                    terminal_params = serde_json::json!({
5345                        "session_id": spawn_session_id,
5346                        "agent_id": spawn_id,
5347                        "kind": "done",
5348                        "finish_reason": summary,
5349                    });
5350                }
5351                result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5352            }
5353            Err(e) => {
5354                let message = format!("{e}");
5355                terminal_params = serde_json::json!({
5356                    "session_id": spawn_session_id,
5357                    "agent_id": spawn_id,
5358                    "kind": "error",
5359                    "error": message.clone(),
5360                });
5361                result_value = serde_json::json!({ "is_error": true, "error": message });
5362            }
5363        }
5364        send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5365        spawn_state
5366            .chat_sessions
5367            .lock()
5368            .await
5369            .remove(&spawn_session_id);
5370        append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5371    });
5372
5373    Ok(serde_json::json!({
5374        "accepted": true,
5375        "session_id": session_id,
5376    }))
5377}
5378
5379/// Translate one [`StreamEvent`] from the running external CLI
5380/// into an `agents.chat.event` notification on the originating
5381/// host's WS. Same wire shape supervised agents emit, so host
5382/// UIs render both kinds with one decoder.
5383///
5384/// Mapping:
5385/// - `Assistant` events with `text` content blocks → `kind: "token"`
5386///   per text block. Each block carries the full text the
5387///   assistant emitted in that turn (claude doesn't expose
5388///   word-level deltas via stream-json — it emits per-turn or
5389///   per-content-block chunks).
5390/// - `Assistant` events with `tool_use` blocks → `kind: "tool_call"`
5391///   per block (tool name in `detail`).
5392/// - `System` / `User` / `Result` / others → dropped (Result's
5393///   metadata is folded into the terminal `done` event the
5394///   outer task emits when the invocation finishes).
5395async fn emit_external_chat_event(
5396    state: &Arc<ServerState>,
5397    session_id: &str,
5398    agent_id: &str,
5399    event: car_external_agents::StreamEvent,
5400) {
5401    use car_external_agents::StreamEvent;
5402    match event {
5403        StreamEvent::Assistant(a) => {
5404            if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5405                for block in content {
5406                    let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5407                    match block_type {
5408                        "text" => {
5409                            if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5410                                if !text.is_empty() {
5411                                    let params = serde_json::json!({
5412                                        "session_id": session_id,
5413                                        "agent_id": agent_id,
5414                                        "kind": "token",
5415                                        "delta": text,
5416                                    });
5417                                    send_external_chat_frame(state, session_id, params).await;
5418                                }
5419                            }
5420                        }
5421                        "tool_use" => {
5422                            let name = block
5423                                .get("name")
5424                                .and_then(|v| v.as_str())
5425                                .unwrap_or("(unknown tool)");
5426                            let params = serde_json::json!({
5427                                "session_id": session_id,
5428                                "agent_id": agent_id,
5429                                "kind": "tool_call",
5430                                "detail": name,
5431                            });
5432                            send_external_chat_frame(state, session_id, params).await;
5433                        }
5434                        _ => {}
5435                    }
5436                }
5437            }
5438        }
5439        _ => {
5440            // System (session id init), User (tool result echo),
5441            // Result (final aggregate — folded into terminal
5442            // `done` event by the outer task), RateLimitEvent,
5443            // Other: not surfaced to host.
5444        }
5445    }
5446}
5447
5448/// Send a single `agents.chat.event` notification to the host
5449/// session bound by `session_id`. Best-effort: a missing route
5450/// or a closed WS is silently dropped, the runner continues so
5451/// the audit log lands.
5452async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5453    use futures::SinkExt;
5454    use tokio_tungstenite::tungstenite::Message;
5455
5456    let host_client_id = state
5457        .chat_sessions
5458        .lock()
5459        .await
5460        .get(session_id)
5461        .map(|s| s.host_client_id.clone());
5462    let Some(host_client_id) = host_client_id else {
5463        return;
5464    };
5465    let host_channel = {
5466        let sessions = state.sessions.lock().await;
5467        sessions.get(&host_client_id).map(|s| s.channel.clone())
5468    };
5469    let Some(channel) = host_channel else {
5470        return;
5471    };
5472    let frame = serde_json::json!({
5473        "jsonrpc": "2.0",
5474        "method": "agents.chat.event",
5475        "params": params,
5476    });
5477    if let Ok(text) = serde_json::to_string(&frame) {
5478        let _ = channel
5479            .write
5480            .lock()
5481            .await
5482            .send(Message::Text(text.into()))
5483            .await;
5484    }
5485}
5486
5487/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
5488/// Best-effort: a failure to open the journal must NOT fail the
5489/// invocation; the in-memory result already returned is the
5490/// authoritative answer. Logs at warn level when the write fails so
5491/// operators notice repeated failures.
5492fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5493    use std::io::Write;
5494    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5495        Some(home) => home.join(".car"),
5496        None => return,
5497    };
5498    if std::fs::create_dir_all(&car_dir).is_err() {
5499        return;
5500    }
5501    let path = car_dir.join("external-agents.jsonl");
5502    let record = serde_json::json!({
5503        "ts": chrono::Utc::now().to_rfc3339(),
5504        "adapter_id": id,
5505        "task": task,
5506        "options": options,
5507        "result": result,
5508    });
5509    let line = match serde_json::to_string(&record) {
5510        Ok(s) => s,
5511        Err(_) => return,
5512    };
5513    if let Ok(mut f) = std::fs::OpenOptions::new()
5514        .create(true)
5515        .append(true)
5516        .open(&path)
5517    {
5518        let _ = writeln!(f, "{}", line);
5519    } else {
5520        tracing::warn!(
5521            path = %path.display(),
5522            "failed to append external-agent audit record"
5523        );
5524    }
5525}
5526
5527/// Ground-truth health check. Optional `id` param picks one tool;
5528/// without it, every detected adapter is checked. `force: true`
5529/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
5530/// credential-file shape heuristic as the load-bearing signal for
5531/// "is this tool ready to invoke."
5532async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5533    let force = req
5534        .params
5535        .get("force")
5536        .and_then(Value::as_bool)
5537        .unwrap_or(false);
5538    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5539        let json = car_ffi_common::external_agents::health_one(id, force).await?;
5540        serde_json::from_str(&json).map_err(|e| e.to_string())
5541    } else {
5542        let json = car_ffi_common::external_agents::health(force).await?;
5543        serde_json::from_str(&json).map_err(|e| e.to_string())
5544    }
5545}
5546
5547// ---------------------------------------------------------------------------
5548// agents.chat — unified chat surface (docs/proposals/agent-chat-surface.md)
5549// ---------------------------------------------------------------------------
5550//
5551// Host calls `agents.chat { agent_id, prompt, session_id?, stream? }`.
5552// The server looks up the target agent's attached WS connection,
5553// reverse-calls `agent.chat { session_id, prompt, context }` on it
5554// (same pattern as `tools.execute`), and returns once the agent acks.
5555// The agent then streams `agent.chat.event` notifications back, which
5556// the dispatcher intercepts (see `try_forward_agent_chat_event`) and
5557// rewrites as `agents.chat.event` notifications on the originating
5558// host's channel.
5559
5560/// Timeout the server waits for the agent to ack `agent.chat`. The
5561/// streamed tokens come later as separate notifications and have no
5562/// bearing on this — this is just "did the agent receive the prompt
5563/// and accept it." Five seconds is generous for a local IPC ack.
5564const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5565
5566/// `agents.chat` — host issues a chat turn to a named agent. Returns
5567/// `{ accepted: true, session_id }` once the agent acks; streamed
5568/// tokens arrive on the host's channel as `agents.chat.event`
5569/// notifications keyed by the same `session_id`.
5570async fn handle_agents_chat(
5571    req: &JsonRpcMessage,
5572    state: &Arc<ServerState>,
5573    host_session: &Arc<crate::session::ClientSession>,
5574) -> Result<Value, String> {
5575    use futures::SinkExt;
5576    use tokio::sync::oneshot;
5577    use tokio_tungstenite::tungstenite::Message;
5578
5579    let agent_id = req
5580        .params
5581        .get("agent_id")
5582        .and_then(Value::as_str)
5583        .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5584        .to_string();
5585    let prompt = req
5586        .params
5587        .get("prompt")
5588        .and_then(Value::as_str)
5589        .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5590        .to_string();
5591    let session_id = req
5592        .params
5593        .get("session_id")
5594        .and_then(Value::as_str)
5595        .map(str::to_string)
5596        .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5597    let stream = req
5598        .params
5599        .get("stream")
5600        .and_then(Value::as_bool)
5601        .unwrap_or(true);
5602    let voice_input = req
5603        .params
5604        .get("voice_input")
5605        .and_then(Value::as_bool)
5606        .unwrap_or(false);
5607
5608    // Resolve the agent's attached WS channel via `attached_agents` →
5609    // `sessions` → `channel`. Both lookups must hit; a missing entry on
5610    // either side means the agent is registered in `agents.json` but
5611    // hasn't `session.auth`'d (or has disconnected), so refuse with a
5612    // structured error rather than silently parking the chat.
5613    let agent_client_id = state
5614        .attached_agents
5615        .lock()
5616        .await
5617        .get(&agent_id)
5618        .cloned()
5619        .ok_or_else(|| {
5620            format!(
5621                "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5622                agent_id
5623            )
5624        })?;
5625    let agent_channel = {
5626        let sessions = state.sessions.lock().await;
5627        sessions
5628            .get(&agent_client_id)
5629            .map(|s| s.channel.clone())
5630            .ok_or_else(|| {
5631                format!(
5632                    "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5633                    agent_id, agent_client_id
5634                )
5635            })?
5636    };
5637
5638    // Register the chat session BEFORE sending the reverse call so any
5639    // `agent.chat.event` notifications the agent sends as part of
5640    // accepting the chat (e.g. an immediate `token` delta) route
5641    // correctly. Indexed by session_id so the notification interceptor
5642    // can locate the originating host without scanning.
5643    {
5644        let created_at = std::time::SystemTime::now()
5645            .duration_since(std::time::UNIX_EPOCH)
5646            .map(|d| d.as_secs())
5647            .unwrap_or(0);
5648        state.chat_sessions.lock().await.insert(
5649            session_id.clone(),
5650            crate::session::ChatSession {
5651                agent_id: agent_id.clone(),
5652                host_client_id: host_session.client_id.clone(),
5653                created_at,
5654            },
5655        );
5656    }
5657
5658    // Reverse-callback: register a oneshot for the ack, send the
5659    // `agent.chat` JSON-RPC request on the agent's channel, await up
5660    // to AGENT_CHAT_ACK_TIMEOUT_SECS. Uses the same `pending` map the
5661    // tool-callback path uses (`WsToolExecutor`) — the dispatcher's
5662    // response demuxer at the top of `run_dispatch` already routes
5663    // `result` / `error` frames keyed by request id back through it.
5664    let request_id = agent_channel.next_request_id();
5665    let (tx, rx) = oneshot::channel();
5666    agent_channel
5667        .pending
5668        .lock()
5669        .await
5670        .insert(request_id.clone(), tx);
5671
5672    let rpc_request = serde_json::json!({
5673        "jsonrpc": "2.0",
5674        "method": "agent.chat",
5675        "params": {
5676            "session_id": session_id,
5677            "prompt": prompt,
5678            "stream": stream,
5679            "context": {
5680                "host_client_id": host_session.client_id,
5681                "voice_input": voice_input,
5682            },
5683        },
5684        "id": request_id,
5685    });
5686    let msg = Message::Text(
5687        serde_json::to_string(&rpc_request)
5688            .map_err(|e| e.to_string())?
5689            .into(),
5690    );
5691    if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5692        // Send failed — drop the pending waiter and the chat session
5693        // entry so a retry can take a fresh session_id without
5694        // colliding.
5695        agent_channel.pending.lock().await.remove(&request_id);
5696        state.chat_sessions.lock().await.remove(&session_id);
5697        return Err(format!(
5698            "failed to deliver agent.chat to `{}`: {}",
5699            agent_id, e
5700        ));
5701    }
5702
5703    // Await the agent's ack. The dispatcher's response demuxer routes
5704    // the result/error back via the oneshot. Timeout means the agent
5705    // is alive but unresponsive — clean up routing state and surface a
5706    // structured error so the host UI doesn't hang.
5707    let ack = match tokio::time::timeout(
5708        std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5709        rx,
5710    )
5711    .await
5712    {
5713        Ok(Ok(resp)) => resp,
5714        Ok(Err(_)) => {
5715            // Channel closed — agent disconnected mid-call.
5716            state.chat_sessions.lock().await.remove(&session_id);
5717            return Err(format!(
5718                "agent `{}` disconnected before acking agents.chat",
5719                agent_id
5720            ));
5721        }
5722        Err(_) => {
5723            // Timeout — agent didn't respond in time. Don't keep the
5724            // chat session around: any later events from the agent
5725            // would route to a host that already returned an error.
5726            agent_channel.pending.lock().await.remove(&request_id);
5727            state.chat_sessions.lock().await.remove(&session_id);
5728            return Err(format!(
5729                "agent `{}` did not ack agents.chat within {}s",
5730                agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5731            ));
5732        }
5733    };
5734
5735    if let Some(err) = ack.error {
5736        // Agent explicitly rejected — drop the session and propagate.
5737        state.chat_sessions.lock().await.remove(&session_id);
5738        return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5739    }
5740
5741    Ok(serde_json::json!({
5742        "accepted": true,
5743        "session_id": session_id,
5744    }))
5745}
5746
5747/// `agents.chat.cancel` — host aborts an in-flight chat. Forwards
5748/// `agent.chat.cancel` to the bound agent so the agent can short-
5749/// circuit its inference stream + free upstream resources
5750/// (`inference.stream.cancel`). The chat session is dropped from
5751/// routing state immediately whether or not the agent acks the cancel
5752/// — further `agent.chat.event` notifications for this session_id
5753/// fall on the floor by design.
5754async fn handle_agents_chat_cancel(
5755    req: &JsonRpcMessage,
5756    state: &Arc<ServerState>,
5757) -> Result<Value, String> {
5758    use futures::SinkExt;
5759    use tokio_tungstenite::tungstenite::Message;
5760
5761    let session_id = req
5762        .params
5763        .get("session_id")
5764        .and_then(Value::as_str)
5765        .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5766        .to_string();
5767
5768    let chat = state.chat_sessions.lock().await.remove(&session_id);
5769    let chat = match chat {
5770        Some(c) => c,
5771        None => {
5772            // Already cancelled or never existed — idempotent.
5773            return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
5774        }
5775    };
5776
5777    // Best-effort fire-and-forget to the agent. We've already removed
5778    // the routing entry, so no need to await any agent response.
5779    let agent_client_id = state
5780        .attached_agents
5781        .lock()
5782        .await
5783        .get(&chat.agent_id)
5784        .cloned();
5785    if let Some(client_id) = agent_client_id {
5786        let channel_opt = {
5787            let sessions = state.sessions.lock().await;
5788            sessions.get(&client_id).map(|s| s.channel.clone())
5789        };
5790        if let Some(channel) = channel_opt {
5791            let notification = serde_json::json!({
5792                "jsonrpc": "2.0",
5793                "method": "agent.chat.cancel",
5794                "params": { "session_id": session_id },
5795            });
5796            if let Ok(text) = serde_json::to_string(&notification) {
5797                let _ = channel
5798                    .write
5799                    .lock()
5800                    .await
5801                    .send(Message::Text(text.into()))
5802                    .await;
5803            }
5804        }
5805    }
5806
5807    Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
5808}
5809
5810/// Forward an `agent.chat.event` notification from an agent's
5811/// connection to the originating host's connection, rewritten as an
5812/// `agents.chat.event` notification. Returns `true` if the inbound
5813/// frame was a chat-event we routed (so the dispatcher can `continue`
5814/// past the normal method dispatch and skip the wasted "unknown
5815/// method" response), `false` otherwise.
5816///
5817/// Terminal events (`kind: "done"` / `"error"`) also drop the routing
5818/// entry from `state.chat_sessions` so a later stray notification can
5819/// be rejected as orphaned without leaking memory.
5820pub(crate) async fn try_forward_agent_chat_event(
5821    parsed: &JsonRpcMessage,
5822    state: &Arc<ServerState>,
5823) -> bool {
5824    use futures::SinkExt;
5825    use tokio_tungstenite::tungstenite::Message;
5826
5827    // Notification predicate: method is `agent.chat.event`, id is
5828    // missing/null (per JSON-RPC, notifications have no id), and
5829    // params carry a session_id.
5830    let Some(method) = parsed.method.as_deref() else {
5831        return false;
5832    };
5833    if method != "agent.chat.event" {
5834        return false;
5835    }
5836    if !parsed.id.is_null() {
5837        // Has an id → it's a request, not a notification. Let the
5838        // normal dispatcher handle it (and reply with method-not-found).
5839        return false;
5840    }
5841    let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
5842        return false;
5843    };
5844    let session_id = session_id.to_string();
5845
5846    // Look up the routing entry. If gone (cancelled, agent dropped,
5847    // disconnect cleanup), drop the event silently — late frames from
5848    // a respawned agent for a stale session are not the host's
5849    // problem.
5850    let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
5851    let Some(chat) = chat else {
5852        return true; // recognized the method, but routing has dropped — consumed.
5853    };
5854
5855    // Pull the kind early so terminal-event cleanup runs even if the
5856    // host's send fails.
5857    let kind = parsed
5858        .params
5859        .get("kind")
5860        .and_then(Value::as_str)
5861        .unwrap_or("token")
5862        .to_string();
5863
5864    // Forward to the host. Rewrites the method name to the host-facing
5865    // form and attaches `agent_id` so the host doesn't have to remember
5866    // which agent owns each session.
5867    let host_channel = {
5868        let sessions = state.sessions.lock().await;
5869        sessions
5870            .get(&chat.host_client_id)
5871            .map(|s| s.channel.clone())
5872    };
5873    if let Some(channel) = host_channel {
5874        let mut params = parsed.params.clone();
5875        if let Some(obj) = params.as_object_mut() {
5876            obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
5877        }
5878        let forward = serde_json::json!({
5879            "jsonrpc": "2.0",
5880            "method": "agents.chat.event",
5881            "params": params,
5882        });
5883        if let Ok(text) = serde_json::to_string(&forward) {
5884            let _ = channel
5885                .write
5886                .lock()
5887                .await
5888                .send(Message::Text(text.into()))
5889                .await;
5890        }
5891    }
5892    // Else: host disconnected mid-stream. Drop the event silently +
5893    // cancel the session so the agent isn't streaming into the void.
5894
5895    // Terminal-kind cleanup. The host_channel branch above already
5896    // forwarded the terminal event; we just remove the routing entry
5897    // here so subsequent stray frames are no-ops.
5898    if matches!(kind.as_str(), "done" | "error") {
5899        state.chat_sessions.lock().await.remove(&session_id);
5900    }
5901
5902    true
5903}