Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150        + Unpin
151        + Send,
152{
153    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154    tracing::Span::current().record("client_id", &client_id.as_str());
155
156    info!("New connection from {}", peer);
157
158    let channel = Arc::new(WsChannel {
159        write: Mutex::new(write),
160        pending: Mutex::new(HashMap::new()),
161        next_id: AtomicU64::new(1),
162    });
163
164    let session = state.create_session(&client_id, channel.clone()).await;
165
166    while let Some(msg) = read.next().await {
167        let msg = msg?;
168        if msg.is_text() {
169            let text = msg.to_text()?;
170            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
171                Ok(m) => m,
172                Err(e) => {
173                    send_response(
174                        &session.channel,
175                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
176                    )
177                    .await?;
178                    continue;
179                }
180            };
181
182            // Is this a response to a pending tool callback?
183            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
184                if let Some(id_str) = parsed.id.as_str() {
185                    let mut pending = session.channel.pending.lock().await;
186                    if let Some(tx) = pending.remove(id_str) {
187                        let tool_resp = if let Some(result) = parsed.result {
188                            ToolExecuteResponse {
189                                action_id: id_str.to_string(),
190                                output: Some(result),
191                                error: None,
192                            }
193                        } else {
194                            let err_msg = parsed
195                                .error
196                                .as_ref()
197                                .and_then(|e| e.get("message"))
198                                .and_then(|m| m.as_str())
199                                .unwrap_or("unknown error")
200                                .to_string();
201                            ToolExecuteResponse {
202                                action_id: id_str.to_string(),
203                                output: None,
204                                error: Some(err_msg),
205                            }
206                        };
207                        let _ = tx.send(tool_resp);
208                        continue;
209                    }
210                }
211            }
212
213            // Otherwise it's a client request
214            if let Some(method) = &parsed.method {
215                info!(method = %method, "dispatching JSON-RPC method");
216
217                // Auth gate (Parslee-ai/car-releases#32). When the
218                // server has an auth token installed, every method
219                // other than `session.auth` is rejected on
220                // unauthenticated sessions and the connection is
221                // closed after the error response goes out. When no
222                // token is installed (default), this branch never
223                // fires — preserves pre-#32 behaviour.
224                if state.auth_token.get().is_some()
225                    && !session
226                        .authenticated
227                        .load(std::sync::atomic::Ordering::Acquire)
228                    && method != "session.auth"
229                {
230                    let resp = JsonRpcResponse::error(
231                        parsed.id.clone(),
232                        -32001,
233                        "auth required: send `session.auth` with the per-launch token \
234                         from ~/Library/Application Support/ai.parslee.car/auth-token \
235                         (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
236                         as the first frame on this connection",
237                    );
238                    send_response(&session.channel, resp).await?;
239                    info!(client = %client_id, method = %method,
240                        "rejecting non-auth method on unauthenticated session; closing");
241                    break;
242                }
243
244                // Approval gate (audit 2026-05). High-risk methods —
245                // anything that drives macOS automation or sends
246                // messages on the user's behalf — must be acked by
247                // the user via `host.resolve_approval` before they
248                // dispatch. The gate raises an `approval.requested`
249                // host event the local UI can render approve/deny on,
250                // then parks until resolved or the configured
251                // timeout fires. Returns a JSON-RPC error and
252                // continues the dispatch loop on deny / timeout —
253                // no connection close, since the caller may want to
254                // retry with revised parameters.
255                if state.approval_gate.requires_approval(method.as_str()) {
256                    match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
257                        Ok(()) => {}
258                        Err(reason) => {
259                            let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
260                            send_response(&session.channel, resp).await?;
261                            info!(
262                                client = %client_id,
263                                method = %method,
264                                reason = %reason,
265                                "approval gate blocked dispatch"
266                            );
267                            continue;
268                        }
269                    }
270                }
271
272                // Spawn the per-method dispatch in a task so the read
273                // loop keeps reading frames. Without this, methods
274                // that trigger server-initiated `tools.execute`
275                // callbacks (`proposal.submit`, `workflow.run`,
276                // `multi.*` paths that fire a registered tool)
277                // deadlock the connection: the handler awaits the
278                // callback response on a oneshot, but the response is
279                // another frame on this same read half — which the
280                // synchronous `.await` here would prevent the loop
281                // from ever picking up. Surfaced by the
282                // `executeProposal: echo tool via JS callback` smoke
283                // (#173). Response ordering becomes id-keyed (the
284                // JSON-RPC demuxing contract) rather than
285                // arrival-ordered.
286                let session_task = session.clone();
287                let state_task = state.clone();
288                let method_owned = method.clone();
289                let parsed_task = parsed;
290                tokio::spawn(async move {
291                    let session = session_task;
292                    let state = state_task;
293                    let parsed = parsed_task;
294                    let result = match method_owned.as_str() {
295                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
296                        "session.init" => handle_session_init(&parsed, &session).await,
297                        "host.subscribe" => handle_host_subscribe(&session).await,
298                        "host.agents" => handle_host_agents(&session).await,
299                        "host.events" => handle_host_events(&parsed, &session).await,
300                        "host.approvals" => handle_host_approvals(&session).await,
301                        "host.register_agent" => {
302                            handle_host_register_agent(&parsed, &session).await
303                        }
304                        "host.unregister_agent" => {
305                            handle_host_unregister_agent(&parsed, &session).await
306                        }
307                        "host.set_status" => handle_host_set_status(&parsed, &session).await,
308                        "host.notify" => handle_host_notify(&parsed, &session).await,
309                        "host.request_approval" => {
310                            handle_host_request_approval(&parsed, &session).await
311                        }
312                        "host.resolve_approval" => {
313                            handle_host_resolve_approval(&parsed, &session).await
314                        }
315                        "tools.register" => handle_tools_register(&parsed, &session).await,
316                        "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
317                        "policy.register" => handle_policy_register(&parsed, &session).await,
318                        "session.policy.open" => handle_session_policy_open(&session).await,
319                        "session.policy.close" => {
320                            handle_session_policy_close(&parsed, &session).await
321                        }
322                        "verify" => handle_verify(&parsed, &session).await,
323                        "state.get" => handle_state_get(&parsed, &session).await,
324                        "state.set" => handle_state_set(&parsed, &session).await,
325                        "state.exists" => handle_state_exists(&parsed, &session).await,
326                        "state.keys" => handle_state_keys(&parsed, &session).await,
327                        "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
328                        "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
329                        "memory.query" => handle_memory_query(&parsed, &session).await,
330                        "memory.build_context" => {
331                            handle_memory_build_context(&parsed, &session).await
332                        }
333                        "memory.build_context_fast" => {
334                            handle_memory_build_context_fast(&parsed, &session).await
335                        }
336                        "memory.consolidate" => handle_memory_consolidate(&session).await,
337                        "memory.fact_count" => handle_memory_fact_count(&session).await,
338                        "memory.persist" => handle_memory_persist(&parsed, &session).await,
339                        "memory.load" => handle_memory_load(&parsed, &session).await,
340                        "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
341                        "skill.find" => handle_skill_find(&parsed, &session).await,
342                        "skill.report" => handle_skill_report(&parsed, &session).await,
343                        "skill.repair" => handle_skill_repair(&parsed, &session).await,
344                        "skills.ingest_distilled" => {
345                            handle_skills_ingest_distilled(&parsed, &session).await
346                        }
347                        "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
348                        "skills.domains_needing_evolution" => {
349                            handle_skills_domains_needing_evolution(&parsed, &session).await
350                        }
351                        "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
352                        "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
353                        "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
354                        "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
355                        "multi.vote" => handle_multi_vote(&parsed, &session).await,
356                        "scheduler.create" => handle_scheduler_create(&parsed),
357                        "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
358                        "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
359                        "infer" => handle_infer(&parsed, &state, &session).await,
360                        "image.generate" => handle_image_generate(&parsed, &state).await,
361                        "video.generate" => handle_video_generate(&parsed, &state).await,
362                        "embed" => handle_embed(&parsed, &state).await,
363                        "classify" => handle_classify(&parsed, &state).await,
364                        "tokenize" => handle_tokenize(&parsed, &state).await,
365                        "detokenize" => handle_detokenize(&parsed, &state).await,
366                        "rerank" => handle_rerank(&parsed, &state).await,
367                        "transcribe" => handle_transcribe(&parsed, &state).await,
368                        "synthesize" => handle_synthesize(&parsed, &state).await,
369                        "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
370                        "speech.prepare" => handle_speech_prepare(&state).await,
371                        "models.route" => handle_models_route(&parsed, &state).await,
372                        "models.stats" => handle_models_stats(&state).await,
373                        "outcomes.resolve_pending" => {
374                            handle_outcomes_resolve_pending(&parsed, &state).await
375                        }
376                        "events.count" => handle_events_count(&session).await,
377                        "events.stats" => handle_events_stats(&session).await,
378                        "events.truncate" => handle_events_truncate(&parsed, &session).await,
379                        "events.clear" => handle_events_clear(&session).await,
380                        "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
381                        "models.list" => handle_models_list(&state),
382                        "models.register" => handle_models_register(&parsed, &state).await,
383                        "models.unregister" => handle_models_unregister(&parsed, &state).await,
384                        "models.list_unified" => handle_models_list_unified(&state),
385                        "models.search" => handle_models_search(&parsed, &state),
386                        "models.upgrades" => handle_models_upgrades(&state),
387                        "models.pull" => handle_models_pull(&parsed, &state).await,
388                        "models.install" => handle_models_pull(&parsed, &state).await,
389                        "skills.distill" => handle_skills_distill(&parsed, &state).await,
390                        "skills.list" => handle_skills_list(&parsed, &session).await,
391                        "browser.run" => handle_browser_run(&parsed, &session).await,
392                        "browser.close" => handle_browser_close(&session).await,
393                        "secret.put" => handle_secret_put(&parsed),
394                        "secret.get" => handle_secret_get(&parsed),
395                        "secret.delete" => handle_secret_delete(&parsed),
396                        "secret.status" => handle_secret_status(&parsed),
397                        "secret.available" => Ok(car_ffi_common::secrets::is_available()),
398                        "permissions.status" => handle_perm_status(&parsed),
399                        "permissions.request" => handle_perm_request(&parsed),
400                        "permissions.explain" => handle_perm_explain(&parsed),
401                        "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
402                        "accounts.list" => car_ffi_common::accounts::list(),
403                        "accounts.open" => {
404                            #[derive(serde::Deserialize, Default)]
405                            struct OpenParams {
406                                #[serde(default)]
407                                account_id: Option<String>,
408                            }
409                            let p: OpenParams =
410                                serde_json::from_value(parsed.params.clone()).unwrap_or_default();
411                            car_ffi_common::accounts::open_settings(p.account_id.as_deref())
412                        }
413                        "calendar.list" => car_ffi_common::integrations::calendar_list(),
414                        "calendar.events" => handle_calendar_events(&parsed),
415                        "contacts.containers" => {
416                            car_ffi_common::integrations::contacts_containers()
417                        }
418                        "contacts.find" => handle_contacts_find(&parsed),
419                        "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
420                        "mail.inbox" => handle_mail_inbox(&parsed),
421                        "mail.send" => handle_mail_send(&parsed),
422                        "messages.services" => car_ffi_common::integrations::messages_services(),
423                        "messages.chats" => handle_messages_chats(&parsed),
424                        "messages.send" => handle_messages_send(&parsed),
425                        "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
426                        "notes.find" => handle_notes_find(&parsed),
427                        "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
428                        "reminders.items" => handle_reminders_items(&parsed),
429                        "photos.albums" => car_ffi_common::integrations::photos_albums(),
430                        "bookmarks.list" => handle_bookmarks_list(&parsed),
431                        "files.locations" => car_ffi_common::integrations::files_locations(),
432                        "keychain.status" => car_ffi_common::integrations::keychain_status(),
433                        "health.status" => car_ffi_common::health::status(),
434                        "health.sleep" => handle_health_sleep(&parsed),
435                        "health.workouts" => handle_health_workouts(&parsed),
436                        "health.activity" => handle_health_activity(&parsed),
437                        "voice.transcribe_stream.start" => {
438                            handle_voice_transcribe_stream_start(&parsed, &state, &session).await
439                        }
440                        "voice.transcribe_stream.stop" => {
441                            handle_voice_transcribe_stream_stop(&parsed, &state).await
442                        }
443                        "voice.transcribe_stream.push" => {
444                            handle_voice_transcribe_stream_push(&parsed, &state).await
445                        }
446                        "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
447                        "voice.dispatch_turn" => {
448                            handle_voice_dispatch_turn(&parsed, &state, &session).await
449                        }
450                        "voice.cancel_turn" => handle_voice_cancel_turn().await,
451                        "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
452                        "inference.register_runner" => {
453                            handle_inference_register_runner(&session).await
454                        }
455                        "inference.runner.event" => handle_inference_runner_event(&parsed).await,
456                        "inference.runner.complete" => {
457                            handle_inference_runner_complete(&parsed).await
458                        }
459                        "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
460                        "voice.providers.list" => {
461                            // Stateless: enumerates STT/TTS providers compiled into
462                            // this build. Runtime readiness (API key, permission,
463                            // model download) is reported via per-provider errors.
464                            serde_json::from_str::<serde_json::Value>(
465                                &car_voice::list_voice_providers_json(),
466                            )
467                            .map_err(|e| e.to_string())
468                        }
469                        "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
470                            .await
471                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
472                        "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
473                            .await
474                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
475                        "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
476                        "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
477                            .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
478                        "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
479                        "workflow.run" => handle_workflow_run(&parsed, &session).await,
480                        "workflow.verify" => handle_workflow_verify(&parsed),
481                        "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
482                        "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
483                        "meeting.list" => handle_meeting_list(&parsed),
484                        "meeting.get" => handle_meeting_get(&parsed),
485                        "registry.register" => handle_registry_register(&parsed),
486                        "registry.heartbeat" => handle_registry_heartbeat(&parsed),
487                        "registry.unregister" => handle_registry_unregister(&parsed),
488                        "registry.list" => handle_registry_list(&parsed),
489                        "registry.reap" => handle_registry_reap(&parsed),
490                        "admission.status" => handle_admission_status(&state),
491                        "a2a.start" => handle_a2a_start(&parsed, &session).await,
492                        "a2a.stop" => handle_a2a_stop(),
493                        "a2a.status" => handle_a2a_status(),
494                        "a2a.send" => handle_a2a_send(&parsed, &state).await,
495                        "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
496                        "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
497                        "a2ui.capabilities" => handle_a2ui_capabilities(&state),
498                        "a2ui.reap" => handle_a2ui_reap(&state).await,
499                        "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
500                        "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
501                        "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
502                        "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
503                        "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
504                        "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
505                        "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
506                        "automation.run_applescript" => handle_run_applescript(&parsed).await,
507                        "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
508                        "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
509                        "notifications.local" => handle_local_notification(&parsed).await,
510                        "vision.ocr" => handle_vision_ocr(&parsed).await,
511                        "agents.list" => handle_agents_list(&state).await,
512                        "agents.health" => handle_agents_health(&state).await,
513                        "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
514                    "agents.install" => handle_agents_install(&parsed, &state).await,
515                        "agents.remove" => handle_agents_remove(&parsed, &state).await,
516                        "agents.start" => handle_agents_start(&parsed, &state).await,
517                        "agents.stop" => handle_agents_stop(&parsed, &state).await,
518                        "agents.restart" => handle_agents_restart(&parsed, &state).await,
519                        "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
520                        "agents.list_external" => handle_agents_list_external(&parsed).await,
521                        "agents.detect_external" => handle_agents_detect_external(&parsed).await,
522                        "agents.health_external" => handle_agents_health_external(&parsed).await,
523                        "agents.invoke_external" => {
524                            handle_agents_invoke_external(&parsed, &state).await
525                        }
526                        // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
527                        // alias to the same in-core dispatcher per
528                        // Parslee-ai/car-releases#28. Embedders that need a
529                        // custom AgentCardSource / TaskStore plug them in
530                        // via ServerStateConfig::with_a2a_card_source /
531                        // with_a2a_store before any handler runs.
532                        "message/send"
533                        | "SendMessage"
534                        | "message/stream"
535                        | "SendStreamingMessage"
536                        | "tasks/get"
537                        | "GetTask"
538                        | "tasks/list"
539                        | "ListTasks"
540                        | "tasks/cancel"
541                        | "CancelTask"
542                        | "tasks/resubscribe"
543                        | "SubscribeToTask"
544                        | "tasks/pushNotificationConfig/set"
545                        | "CreateTaskPushNotificationConfig"
546                        | "tasks/pushNotificationConfig/get"
547                        | "GetTaskPushNotificationConfig"
548                        | "tasks/pushNotificationConfig/list"
549                        | "ListTaskPushNotificationConfigs"
550                        | "tasks/pushNotificationConfig/delete"
551                        | "DeleteTaskPushNotificationConfig"
552                        | "agent/getAuthenticatedExtendedCard"
553                        | "GetExtendedAgentCard" => {
554                            handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
555                        }
556                        _ => Err(format!("unknown method: {}", method_owned)),
557                    };
558
559                    let resp = match result {
560                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
561                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
562                    };
563                    let _ = send_response(&session.channel, resp).await;
564                });
565            }
566        } else if msg.is_close() {
567            info!("Client {} disconnected", client_id);
568            break;
569        }
570    }
571
572    session.host.unsubscribe(&client_id).await;
573    state.a2ui_subscribers.lock().await.remove(&client_id);
574
575    // Fix for MULTI-4 / WS-3: drop the session from the registry and
576    // drain any pending tool callbacks. Without this, every connection
577    // we ever accepted keeps an `Arc<ClientSession>` alive in
578    // `state.sessions`, and outstanding `oneshot::Sender`s in
579    // `session.channel.pending` outlive the closed connection until
580    // their per-call timeout (60s). Dropping the senders here causes any
581    // awaiting `recv()` in `WsToolExecutor::execute` to return
582    // `RecvError` immediately, which the existing error-handler path
583    // already maps to "callback channel closed" — same shape as the
584    // timeout path, just faster.
585    let _removed = state.remove_session(&client_id).await;
586    {
587        let mut pending = session.channel.pending.lock().await;
588        pending.clear();
589    }
590
591    Ok(())
592}
593
594async fn send_response(
595    channel: &WsChannel,
596    resp: JsonRpcResponse,
597) -> Result<(), Box<dyn std::error::Error>> {
598    use futures::SinkExt;
599    let json = serde_json::to_string(&resp)?;
600    channel
601        .write
602        .lock()
603        .await
604        .send(Message::Text(json.into()))
605        .await?;
606    Ok(())
607}
608
609// --- Request handlers ---
610
611async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
612    session
613        .host
614        .subscribe(&session.client_id, session.channel.clone())
615        .await;
616    serde_json::to_value(HostSnapshot {
617        subscribed: true,
618        agents: session.host.agents().await,
619        approvals: session.host.approvals().await,
620        events: session.host.events(50).await,
621    })
622    .map_err(|e| e.to_string())
623}
624
625async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
626    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
627}
628
629async fn handle_host_events(
630    req: &JsonRpcMessage,
631    session: &crate::session::ClientSession,
632) -> Result<Value, String> {
633    let limit = req
634        .params
635        .get("limit")
636        .and_then(|v| v.as_u64())
637        .unwrap_or(100) as usize;
638    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
639}
640
641async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
642    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
643}
644
645async fn handle_a2ui_apply(
646    req: &JsonRpcMessage,
647    state: &Arc<ServerState>,
648) -> Result<Value, String> {
649    #[derive(Deserialize)]
650    struct Params {
651        #[serde(default)]
652        envelope: Option<car_a2ui::A2uiEnvelope>,
653        #[serde(default)]
654        message: Option<car_a2ui::A2uiEnvelope>,
655    }
656
657    let envelope = if req.params.get("createSurface").is_some()
658        || req.params.get("updateComponents").is_some()
659        || req.params.get("updateDataModel").is_some()
660        || req.params.get("deleteSurface").is_some()
661    {
662        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
663            .map_err(|e| e.to_string())?
664    } else {
665        match serde_json::from_value::<Params>(req.params.clone()) {
666            Ok(params) => params
667                .envelope
668                .or(params.message)
669                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
670            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
671                .map_err(|e| e.to_string())?,
672        }
673    };
674
675    apply_a2ui_envelope(state, envelope, None, None).await
676}
677
678async fn handle_a2ui_ingest(
679    req: &JsonRpcMessage,
680    state: &Arc<ServerState>,
681) -> Result<Value, String> {
682    #[derive(Deserialize)]
683    #[serde(rename_all = "camelCase")]
684    struct Params {
685        #[serde(default)]
686        endpoint: Option<String>,
687        #[serde(default)]
688        a2a_endpoint: Option<String>,
689        #[serde(default)]
690        owner: Option<car_a2ui::A2uiSurfaceOwner>,
691        #[serde(default)]
692        route_auth: Option<A2aRouteAuth>,
693        #[serde(default)]
694        allow_untrusted_endpoint: bool,
695    }
696
697    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
698        endpoint: None,
699        a2a_endpoint: None,
700        owner: None,
701        route_auth: None,
702        allow_untrusted_endpoint: false,
703    });
704    let payload = req.params.get("payload").unwrap_or(&req.params);
705    state
706        .a2ui
707        .validate_payload(payload)
708        .map_err(|e| e.to_string())?;
709    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
710    if envelopes.is_empty() {
711        return Err("no A2UI envelopes found in payload".into());
712    }
713    let endpoint = params.endpoint.or(params.a2a_endpoint);
714    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
715    let owner = params
716        .owner
717        .or_else(|| car_a2ui::owner_from_value(payload))
718        .map(|owner| match endpoint.clone() {
719            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
720            None => owner,
721        });
722
723    let mut results = Vec::new();
724    for envelope in envelopes {
725        let value =
726            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
727        results.push(value);
728    }
729    Ok(serde_json::json!({ "applied": results }))
730}
731
732async fn apply_a2ui_envelope(
733    state: &Arc<ServerState>,
734    envelope: car_a2ui::A2uiEnvelope,
735    owner: Option<car_a2ui::A2uiSurfaceOwner>,
736    route_auth: Option<A2aRouteAuth>,
737) -> Result<Value, String> {
738    let result = state
739        .a2ui
740        .apply_with_owner(envelope, owner)
741        .await
742        .map_err(|e| e.to_string())?;
743    update_a2ui_route_auth(state, &result, route_auth).await;
744    let kind = if result.deleted {
745        "a2ui.surface_deleted"
746    } else {
747        "a2ui.surface_updated"
748    };
749    let message = if result.deleted {
750        format!("A2UI surface {} deleted", result.surface_id)
751    } else {
752        format!("A2UI surface {} updated", result.surface_id)
753    };
754    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
755    state
756        .host
757        .record_event(kind, None, message, payload.clone())
758        .await;
759    // Push the envelope result to every WS subscriber as an
760    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
761    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
762    broadcast_a2ui_event(state, kind, &payload).await;
763    serde_json::to_value(result).map_err(|e| e.to_string())
764}
765
766async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
767    use futures::SinkExt;
768    use tokio_tungstenite::tungstenite::Message;
769    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
770        .a2ui_subscribers
771        .lock()
772        .await
773        .values()
774        .cloned()
775        .collect();
776    if subscribers.is_empty() {
777        return;
778    }
779    let Ok(json) = serde_json::to_string(&serde_json::json!({
780        "jsonrpc": "2.0",
781        "method": "a2ui.event",
782        "params": {
783            "kind": kind,
784            "result": result,
785        },
786    })) else {
787        return;
788    };
789    for channel in subscribers {
790        let _ = channel
791            .write
792            .lock()
793            .await
794            .send(Message::Text(json.clone().into()))
795            .await;
796    }
797}
798
799async fn update_a2ui_route_auth(
800    state: &Arc<ServerState>,
801    result: &car_a2ui::A2uiApplyResult,
802    route_auth: Option<A2aRouteAuth>,
803) {
804    let mut auth = state.a2ui_route_auth.lock().await;
805    if result.deleted {
806        auth.remove(&result.surface_id);
807        return;
808    }
809
810    let has_route_endpoint = result
811        .surface
812        .as_ref()
813        .and_then(|surface| surface.owner.as_ref())
814        .and_then(|owner| owner.endpoint.as_ref())
815        .is_some();
816    match (has_route_endpoint, route_auth) {
817        (true, Some(route_auth)) => {
818            auth.insert(result.surface_id.clone(), route_auth);
819        }
820        _ => {
821            auth.remove(&result.surface_id);
822        }
823    }
824}
825
826fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
827    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
828}
829
830async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
831    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
832    if !removed.is_empty() {
833        let mut auth = state.a2ui_route_auth.lock().await;
834        for surface_id in &removed {
835            auth.remove(surface_id);
836        }
837    }
838    Ok(serde_json::json!({ "removed": removed }))
839}
840
841async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
842    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
843}
844
845async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
846    let surface_id = req
847        .params
848        .get("surface_id")
849        .or_else(|| req.params.get("surfaceId"))
850        .and_then(Value::as_str)
851        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
852    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
853}
854
855/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
856/// notifications. Subscribers receive every `apply_a2ui_envelope`
857/// result for as long as they're connected; the cleanup hook in
858/// `run_dispatch` removes them on disconnect. Closes
859/// Parslee-ai/car-releases#29.
860async fn handle_a2ui_subscribe(
861    session: &crate::session::ClientSession,
862    state: &Arc<ServerState>,
863) -> Result<Value, String> {
864    state
865        .a2ui_subscribers
866        .lock()
867        .await
868        .insert(session.client_id.clone(), session.channel.clone());
869    Ok(serde_json::json!({ "subscribed": true }))
870}
871
872/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
873/// Idempotent: returns `{ subscribed: false }` regardless of prior
874/// state.
875async fn handle_a2ui_unsubscribe(
876    session: &crate::session::ClientSession,
877    state: &Arc<ServerState>,
878) -> Result<Value, String> {
879    state
880        .a2ui_subscribers
881        .lock()
882        .await
883        .remove(&session.client_id);
884    Ok(serde_json::json!({ "subscribed": false }))
885}
886
887/// `a2ui/replay` — fetch the current state of one surface. Intended
888/// for late joiners and reconnect: a client calls `subscribe`, then
889/// `replay` once per surface it's tracking, and from then on
890/// notifications keep it in sync. Equivalent to `a2ui.get` on the
891/// surface store; lives in the subscribe namespace for
892/// discoverability.
893async fn handle_a2ui_replay(
894    req: &JsonRpcMessage,
895    state: &Arc<ServerState>,
896) -> Result<Value, String> {
897    let surface_id = req
898        .params
899        .get("surface_id")
900        .or_else(|| req.params.get("surfaceId"))
901        .and_then(Value::as_str)
902        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
903    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
904}
905
906async fn handle_a2ui_action(
907    req: &JsonRpcMessage,
908    state: &Arc<ServerState>,
909) -> Result<Value, String> {
910    let action: car_a2ui::ClientAction =
911        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
912    let owner = state.a2ui.owner(&action.surface_id).await;
913    let route = route_a2ui_action(state, &action, owner.clone()).await;
914    let payload = serde_json::json!({
915        "action": action,
916        "owner": owner,
917        "route": route,
918    });
919    let event = state
920        .host
921        .record_event(
922            "a2ui.action",
923            None,
924            format!(
925                "A2UI action {} from {}",
926                action.name, action.source_component_id
927            ),
928            payload,
929        )
930        .await;
931    Ok(serde_json::json!({
932        "event": event,
933        "route": route,
934    }))
935}
936
937/// `a2ui.render_report` — renderer-emitted telemetry envelope
938/// (Parslee-ai/car#180). Fire-and-forget; we record an event in
939/// the host log (so dev tools and the conversation log see it) and
940/// broadcast as `a2ui.event { kind: "a2ui.render_report", result }`
941/// to every WS subscriber. The improvement agent reads the report
942/// and decides whether to issue a follow-up `patchComponents`.
943async fn handle_a2ui_render_report(
944    req: &JsonRpcMessage,
945    state: &Arc<ServerState>,
946) -> Result<Value, String> {
947    // Parse into the typed struct to enforce the schema; we
948    // re-serialize for the event/broadcast payload so downstream
949    // consumers don't have to defensively re-validate.
950    let report: car_a2ui::RenderReport =
951        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
952    let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
953    let kind = "a2ui.render_report";
954    let message = format!("A2UI render report for surface {}", report.surface_id);
955    let event = state
956        .host
957        .record_event(kind, None, message, payload.clone())
958        .await;
959    broadcast_a2ui_event(state, kind, &payload).await;
960
961    // Hand the report to the in-process UI-improvement agent. The
962    // agent is sync but cheap; we await the surface lookup before
963    // calling it so the strategies see the same surface state the
964    // renderer saw at report-emit time. Best-effort: surface lookup
965    // misses (the surface might have been deleted between
966    // report-emit and report-receive) are logged and skipped, never
967    // surfaced as JSON-RPC errors — render_report is fire-and-forget.
968    if let Some(surface) = state.a2ui.get(&report.surface_id).await {
969        // Iteration budget — runaway-loop backstop. try_consume
970        // claims the slot atomically; if the surface is already at
971        // the cap, we short-circuit before consulting the agent.
972        // The slot stays consumed only if the patch actually
973        // applies — failure paths refund.
974        if !state.ui_agent_budget.try_consume(&report.surface_id) {
975            tracing::warn!(
976                surface_id = %report.surface_id,
977                count = state.ui_agent_budget.count(&report.surface_id),
978                max = state.ui_agent_budget.max(),
979                "ui-agent iteration budget exhausted; skipping agent invocation"
980            );
981            return Ok(serde_json::json!({ "event": event }));
982        }
983        // From here on, every non-applied branch must `refund` the
984        // slot we just claimed. Only the successful-apply branch
985        // keeps it consumed.
986        match state.ui_agent.on_render_report(&report, &surface) {
987            car_ui_agent::Decision::Patch {
988                envelope,
989                strategy_id,
990                patch_hash,
991                elapsed_ns,
992            } => {
993                // Runtime-side convergence monitor — neo's deferred
994                // ask. The agent's no-double-patch guard catches
995                // same-sequence repeats; this catches A→B→A across
996                // sequences by tracking recent patch hashes per
997                // surface. When a proposal would repeat one in the
998                // window, drop it; the loop waits for the next
999                // signature change.
1000                if !state
1001                    .ui_agent_oscillation
1002                    .check_and_record(&report.surface_id, patch_hash)
1003                {
1004                    tracing::warn!(
1005                        surface_id = %report.surface_id,
1006                        strategy = %strategy_id,
1007                        patch_hash,
1008                        "ui-agent oscillation detected; suppressing patch"
1009                    );
1010                    // Suppressed patch never applied → release the
1011                    // budget slot we claimed above.
1012                    state.ui_agent_budget.refund(&report.surface_id);
1013                    return Ok(serde_json::json!({ "event": event }));
1014                }
1015                let a2ui_envelope = car_a2ui::A2uiEnvelope {
1016                    patch_components: Some(envelope),
1017                    ..Default::default()
1018                };
1019                if let Err(e) =
1020                    apply_a2ui_envelope(state, a2ui_envelope, None, None).await
1021                {
1022                    tracing::warn!(
1023                        surface_id = %report.surface_id,
1024                        strategy = %strategy_id,
1025                        patch_hash,
1026                        elapsed_ns,
1027                        error = %e,
1028                        "ui-agent patch apply failed",
1029                    );
1030                    // Apply failed → release the budget slot.
1031                    state.ui_agent_budget.refund(&report.surface_id);
1032                } else {
1033                    tracing::debug!(
1034                        surface_id = %report.surface_id,
1035                        strategy = %strategy_id,
1036                        patch_hash,
1037                        elapsed_ns,
1038                        iteration = state.ui_agent_budget.count(&report.surface_id),
1039                        "ui-agent patch applied",
1040                    );
1041                    // Memgine trace: one Conversation node per
1042                    // successful patch, tagged "ui-agent/<surface>".
1043                    // Spreading activation can surface these on
1044                    // future renders of related surfaces. Spawned
1045                    // off the render-report hot path — ingest walks
1046                    // the graph for spreading activation and can
1047                    // take real time; we don't want two surfaces
1048                    // churning patches to serialize through the
1049                    // memgine mutex behind the WS handler.
1050                    if let Some(memgine) = state.shared_memgine.clone() {
1051                        let speaker = format!("ui-agent/{}", report.surface_id);
1052                        let text = format!("strategy applied: {}", strategy_id);
1053                        tokio::spawn(async move {
1054                            let mut guard = memgine.lock().await;
1055                            guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1056                        });
1057                    }
1058                }
1059            }
1060            car_ui_agent::Decision::StableNoChange => {
1061                // No patch this round → release the slot.
1062                state.ui_agent_budget.refund(&report.surface_id);
1063            }
1064            car_ui_agent::Decision::HardStop { reason } => {
1065                state.ui_agent_budget.refund(&report.surface_id);
1066                // Renderer painted unknown components — contract
1067                // violation between server and renderer. Per
1068                // types.rs docs: "loud failure" + "MUST pause."
1069                // `error!` not `warn!` so it surfaces in production
1070                // logs at the right severity.
1071                tracing::error!(
1072                    surface_id = %report.surface_id,
1073                    reason = %reason,
1074                    "ui-agent hard-stopped improvement loop",
1075                );
1076            }
1077        }
1078    } else {
1079        tracing::debug!(
1080            surface_id = %report.surface_id,
1081            "ui-agent skipped — surface not found in store",
1082        );
1083    }
1084
1085    Ok(serde_json::json!({ "event": event }))
1086}
1087
1088async fn route_a2ui_action(
1089    state: &Arc<ServerState>,
1090    action: &car_a2ui::ClientAction,
1091    owner: Option<car_a2ui::A2uiSurfaceOwner>,
1092) -> Value {
1093    let Some(owner) = owner else {
1094        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1095    };
1096    if owner.kind != "a2a" {
1097        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1098    }
1099    let Some(endpoint) = owner.endpoint.clone() else {
1100        return serde_json::json!({
1101            "delivered": false,
1102            "reason": "surface owner has no endpoint",
1103            "owner": owner
1104        });
1105    };
1106
1107    let message = car_a2a::Message {
1108        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1109        role: car_a2a::MessageRole::User,
1110        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1111            data: serde_json::json!({
1112                "a2uiAction": action,
1113            }),
1114            metadata: Default::default(),
1115        })],
1116        task_id: owner.task_id.clone(),
1117        context_id: owner.context_id.clone(),
1118        metadata: Default::default(),
1119    };
1120
1121    let auth = state
1122        .a2ui_route_auth
1123        .lock()
1124        .await
1125        .get(&action.surface_id)
1126        .cloned()
1127        .map(client_auth_from_route_auth)
1128        .unwrap_or(car_a2a::ClientAuth::None);
1129
1130    match car_a2a::A2aClient::new(endpoint.clone())
1131        .with_auth(auth)
1132        .send_message(message, false)
1133        .await
1134    {
1135        Ok(result) => serde_json::json!({
1136            "delivered": true,
1137            "owner": owner,
1138            "endpoint": endpoint,
1139            "result": result,
1140        }),
1141        Err(error) => serde_json::json!({
1142            "delivered": false,
1143            "owner": owner,
1144            "endpoint": endpoint,
1145            "error": error.to_string(),
1146        }),
1147    }
1148}
1149
1150fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1151    match auth {
1152        A2aRouteAuth::None => car_a2a::ClientAuth::None,
1153        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1154        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1155    }
1156}
1157
1158fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1159    let endpoint = endpoint?;
1160    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1161        Some(endpoint)
1162    } else {
1163        None
1164    }
1165}
1166
1167fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1168    endpoint == "http://localhost"
1169        || endpoint.starts_with("http://localhost:")
1170        || endpoint.starts_with("http://localhost/")
1171        || endpoint == "http://127.0.0.1"
1172        || endpoint.starts_with("http://127.0.0.1:")
1173        || endpoint.starts_with("http://127.0.0.1/")
1174        || endpoint == "http://[::1]"
1175        || endpoint.starts_with("http://[::1]:")
1176        || endpoint.starts_with("http://[::1]/")
1177}
1178
1179async fn handle_host_register_agent(
1180    req: &JsonRpcMessage,
1181    session: &crate::session::ClientSession,
1182) -> Result<Value, String> {
1183    let request: RegisterHostAgentRequest =
1184        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1185    serde_json::to_value(
1186        session
1187            .host
1188            .register_agent(&session.client_id, request)
1189            .await?,
1190    )
1191    .map_err(|e| e.to_string())
1192}
1193
1194async fn handle_host_unregister_agent(
1195    req: &JsonRpcMessage,
1196    session: &crate::session::ClientSession,
1197) -> Result<Value, String> {
1198    let agent_id = req
1199        .params
1200        .get("agent_id")
1201        .and_then(|v| v.as_str())
1202        .ok_or("missing agent_id")?;
1203    session
1204        .host
1205        .unregister_agent(&session.client_id, agent_id)
1206        .await?;
1207    Ok(serde_json::json!({"ok": true}))
1208}
1209
1210async fn handle_host_set_status(
1211    req: &JsonRpcMessage,
1212    session: &crate::session::ClientSession,
1213) -> Result<Value, String> {
1214    let request: SetHostAgentStatusRequest =
1215        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1216    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1217        .map_err(|e| e.to_string())
1218}
1219
1220async fn handle_host_notify(
1221    req: &JsonRpcMessage,
1222    session: &crate::session::ClientSession,
1223) -> Result<Value, String> {
1224    let kind = req
1225        .params
1226        .get("kind")
1227        .and_then(|v| v.as_str())
1228        .unwrap_or("host.notification");
1229    let agent_id = req
1230        .params
1231        .get("agent_id")
1232        .and_then(|v| v.as_str())
1233        .map(str::to_string);
1234    let message = req
1235        .params
1236        .get("message")
1237        .and_then(|v| v.as_str())
1238        .unwrap_or("");
1239    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1240    serde_json::to_value(
1241        session
1242            .host
1243            .record_event(kind, agent_id, message, payload)
1244            .await,
1245    )
1246    .map_err(|e| e.to_string())
1247}
1248
1249async fn handle_host_request_approval(
1250    req: &JsonRpcMessage,
1251    session: &crate::session::ClientSession,
1252) -> Result<Value, String> {
1253    let request: CreateHostApprovalRequest =
1254        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1255    if let Some(agent_id) = &request.agent_id {
1256        // Best-effort. If the caller doesn't own this agent the
1257        // ACL added 2026-05 will refuse the status update — that
1258        // is the correct semantics; we still want the approval row
1259        // itself to land so the UI can render the request.
1260        let _ = session
1261            .host
1262            .set_status(
1263                &session.client_id,
1264                SetHostAgentStatusRequest {
1265                    agent_id: agent_id.clone(),
1266                    status: HostAgentStatus::WaitingForApproval,
1267                    current_task: None,
1268                    message: Some("Waiting for approval".to_string()),
1269                    payload: Value::Null,
1270                },
1271            )
1272            .await;
1273    }
1274    serde_json::to_value(
1275        session
1276            .host
1277            .create_approval(Some(&session.client_id), request)
1278            .await?,
1279    )
1280    .map_err(|e| e.to_string())
1281}
1282
1283async fn handle_host_resolve_approval(
1284    req: &JsonRpcMessage,
1285    session: &crate::session::ClientSession,
1286) -> Result<Value, String> {
1287    let request: ResolveHostApprovalRequest =
1288        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1289    serde_json::to_value(
1290        session
1291            .host
1292            .resolve_approval(&session.client_id, request)
1293            .await?,
1294    )
1295    .map_err(|e| e.to_string())
1296}
1297
1298/// `session.auth` — present the per-launch token to unlock the
1299/// connection. When `state.auth_token` is unset, this method is a
1300/// no-op success (auth is disabled). When set, the supplied token
1301/// must equal it (constant-time comparison) — a successful auth
1302/// flips `session.authenticated` to `true` so subsequent methods
1303/// pass the gate. Wrong token returns an error AND leaves the
1304/// session unauthenticated; the dispatcher loop's gate then closes
1305/// the connection on the next non-auth method.
1306///
1307/// Closes Parslee-ai/car-releases#32.
1308async fn handle_session_auth(
1309    req: &JsonRpcMessage,
1310    session: &crate::session::ClientSession,
1311    state: &Arc<ServerState>,
1312) -> Result<Value, String> {
1313    let supplied = req
1314        .params
1315        .get("token")
1316        .and_then(Value::as_str)
1317        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1318    // #169: optional `agent_id` binds the WS connection to a
1319    // supervised lifecycle agent. When present, the supplied token
1320    // must equal the per-agent token the supervisor minted at upsert
1321    // (NOT the daemon-wide auth token). When absent, fall back to
1322    // the daemon-wide token — preserves the legacy unbound-token
1323    // path for browser/host/CLI clients.
1324    let agent_id = req
1325        .params
1326        .get("agent_id")
1327        .and_then(Value::as_str)
1328        .map(str::to_string);
1329
1330    if let Some(id) = agent_id {
1331        let supervisor = state.supervisor()?;
1332        if !supervisor.validate_agent_token(&id, supplied).await {
1333            return Err(format!(
1334                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1335            ));
1336        }
1337        // Single-claim: only one connection at a time per
1338        // agent_id. A second claim is rejected so the daemon-side
1339        // per-agent state stays unambiguous.
1340        {
1341            let mut attached = state.attached_agents.lock().await;
1342            if let Some(prior) = attached.get(&id) {
1343                if prior != &session.client_id {
1344                    return Err(format!(
1345                        "auth failed: agent_id `{id}` is already attached on \
1346                         another connection (client_id={prior})"
1347                    ));
1348                }
1349            }
1350            attached.insert(id.clone(), session.client_id.clone());
1351        }
1352        // #170: attach the daemon-owned persistent memgine for
1353        // this agent. Lazy-loaded on first connection per id from
1354        // `~/.car/memory/agents/<id>.jsonl`; retained across
1355        // disconnect so the next session sees the same state.
1356        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1357        *session.bound_memgine.lock().await = Some(agent_eng);
1358        *session.agent_id.lock().await = Some(id.clone());
1359        session
1360            .authenticated
1361            .store(true, std::sync::atomic::Ordering::Release);
1362        return Ok(serde_json::json!({
1363            "ok": true,
1364            "auth_enabled": true,
1365            "agent_id": id,
1366        }));
1367    }
1368
1369    let expected = match state.auth_token.get() {
1370        Some(t) => t,
1371        None => {
1372            // Auth disabled — accept any token politely so callers
1373            // that always include a session.auth handshake (e.g. the
1374            // FFI proxy) don't fail when the daemon happens to be
1375            // unauthed. Mark the session authenticated anyway so the
1376            // gate is a no-op below.
1377            session
1378                .authenticated
1379                .store(true, std::sync::atomic::Ordering::Release);
1380            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1381        }
1382    };
1383    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1384        return Err("auth failed: token mismatch".to_string());
1385    }
1386    session
1387        .authenticated
1388        .store(true, std::sync::atomic::Ordering::Release);
1389    Ok(serde_json::json!({ "ok": true, "auth_enabled": true }))
1390}
1391
1392/// Length-checked constant-time byte comparison. Returns false when
1393/// lengths differ (so length itself is the only timing leak — fine
1394/// for our 43-char fixed-length tokens).
1395fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1396    if a.len() != b.len() {
1397        return false;
1398    }
1399    let mut diff: u8 = 0;
1400    for (x, y) in a.iter().zip(b.iter()) {
1401        diff |= x ^ y;
1402    }
1403    diff == 0
1404}
1405
1406/// Block dispatch of `method` until the user resolves the approval
1407/// raised on [`HostState`].
1408///
1409/// Called from the dispatcher loop only when
1410/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1411/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1412/// to the caller as JSON-RPC error code `-32003` with the supplied
1413/// reason. On timeout, the approval row stays in `Pending` so the
1414/// UI keeps a record of the unanswered request.
1415async fn gate_high_risk_method(
1416    method: &str,
1417    params: &Value,
1418    state: &Arc<ServerState>,
1419) -> Result<(), String> {
1420    let timeout = state.approval_gate.timeout;
1421    let req = CreateHostApprovalRequest {
1422        agent_id: None,
1423        action: format!("ws.method:{method}"),
1424        details: serde_json::json!({
1425            "method": method,
1426            // Truncate params for the UI — full payload is recoverable
1427            // via the request-time host event log if needed. The cap
1428            // keeps a malicious caller from drowning the UI in JSON.
1429            "params_preview": preview_params(params, 2_000),
1430        }),
1431        options: vec!["approve".to_string(), "deny".to_string()],
1432    };
1433    match state
1434        .host
1435        .request_and_wait_approval(req, "approve", timeout)
1436        .await
1437    {
1438        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1439        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1440            "{method} denied by user (approval gate, audit 2026-05). \
1441             To call this method without an interactive prompt, start \
1442             car-server with --no-approvals on a trusted machine."
1443        )),
1444        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1445            "{method} approval timed out after {}s with no resolution. \
1446             The approval is still visible in `host.approvals` for \
1447             forensics; resubmit the request to retry.",
1448            timeout.as_secs()
1449        )),
1450        Err(e) => Err(format!("approval gate error: {e}")),
1451    }
1452}
1453
1454fn preview_params(value: &Value, max_chars: usize) -> Value {
1455    let s = value.to_string();
1456    if s.len() <= max_chars {
1457        value.clone()
1458    } else {
1459        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1460    }
1461}
1462
1463async fn handle_session_init(
1464    req: &JsonRpcMessage,
1465    session: &crate::session::ClientSession,
1466) -> Result<Value, String> {
1467    let init: SessionInitRequest =
1468        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1469
1470    for tool in &init.tools {
1471        register_from_definition(&session.runtime, tool).await;
1472    }
1473
1474    let mut policy_count = 0;
1475    {
1476        let mut policies = session.runtime.policies.write().await;
1477        for policy_def in &init.policies {
1478            if let Some(check) = build_policy_check(policy_def) {
1479                policies.register(&policy_def.name, check, "");
1480                policy_count += 1;
1481            }
1482        }
1483    }
1484
1485    serde_json::to_value(SessionInitResponse {
1486        session_id: session.client_id.clone(),
1487        tools_registered: init.tools.len(),
1488        policies_registered: policy_count,
1489    })
1490    .map_err(|e| e.to_string())
1491}
1492
1493fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1494    match def.rule.as_str() {
1495        "deny_tool" => {
1496            let target = def.target.clone();
1497            Some(Box::new(
1498                move |action: &car_ir::Action, _: &car_state::StateStore| {
1499                    if action.tool.as_deref() == Some(&target) {
1500                        Some(format!("tool '{}' denied", target))
1501                    } else {
1502                        None
1503                    }
1504                },
1505            ))
1506        }
1507        "require_state" => {
1508            let key = def.key.clone();
1509            let value = def.value.clone();
1510            Some(Box::new(
1511                move |_: &car_ir::Action, state: &car_state::StateStore| {
1512                    if state.get(&key).as_ref() != Some(&value) {
1513                        Some(format!("state['{}'] must be {:?}", key, value))
1514                    } else {
1515                        None
1516                    }
1517                },
1518            ))
1519        }
1520        "deny_tool_param" => {
1521            let target = def.target.clone();
1522            let param = def.key.clone();
1523            let pattern = def.pattern.clone();
1524            Some(Box::new(
1525                move |action: &car_ir::Action, _: &car_state::StateStore| {
1526                    if action.tool.as_deref() != Some(&target) {
1527                        return None;
1528                    }
1529                    if let Some(val) = action.parameters.get(&param) {
1530                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1531                        if s.contains(&pattern) {
1532                            return Some(format!("param '{}' matches '{}'", param, pattern));
1533                        }
1534                    }
1535                    None
1536                },
1537            ))
1538        }
1539        _ => None,
1540    }
1541}
1542
1543async fn handle_tools_register(
1544    req: &JsonRpcMessage,
1545    session: &crate::session::ClientSession,
1546) -> Result<Value, String> {
1547    let tools: Vec<ToolDefinition> =
1548        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1549    for tool in &tools {
1550        register_from_definition(&session.runtime, tool).await;
1551    }
1552    Ok(Value::from(tools.len()))
1553}
1554
1555/// Bridge a wire-protocol `ToolDefinition` to the engine's
1556/// schema-aware registration. Carries the full ToolSchema shape
1557/// (description, parameters, returns, idempotency, caching, rate
1558/// limit) through to the validator. An empty `parameters` object is
1559/// the legacy schemaless registration — the validator no-ops for
1560/// those, so pre-v0.5.x callers see no change.
1561async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1562    runtime
1563        .register_tool_schema(car_ir::ToolSchema {
1564            name: def.name.clone(),
1565            description: def.description.clone(),
1566            parameters: def.parameters.clone(),
1567            returns: def.returns.clone(),
1568            idempotent: def.idempotent,
1569            cache_ttl_secs: def.cache_ttl_secs,
1570            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1571                max_calls: rl.max_calls,
1572                interval_secs: rl.interval_secs,
1573            }),
1574        })
1575        .await;
1576}
1577
1578async fn handle_proposal_submit(
1579    req: &JsonRpcMessage,
1580    session: &crate::session::ClientSession,
1581) -> Result<Value, String> {
1582    let submit: ProposalSubmitRequest =
1583        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1584    // `session_id` is sibling to `proposal` in the params object —
1585    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1586    // present, executes the proposal under the named session so any
1587    // session-scoped policies layer on top of global ones.
1588    // See docs/proposals/per-session-policy-scoping.md.
1589    let session_id = req
1590        .params
1591        .get("session_id")
1592        .and_then(|v| v.as_str())
1593        .map(str::to_string);
1594
1595    // `scope` is the optional per-execution caller / tenant surface
1596    // added in Parslee-ai/car#187 phase 3. Shape mirrors
1597    // `car_engine::RuntimeScope` ({ callerId, tenantId, claims });
1598    // serde's camelCase rename keeps the wire form consistent with
1599    // the rest of the JSON-RPC surface. When present, the proposal
1600    // routes through `execute_scoped*` so the runtime records the
1601    // identity on the event log and state R/W ops apply the tenant
1602    // prefix (#187 phase 3-B).
1603    let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1604        Some(v) if !v.is_null() => Some(
1605            serde_json::from_value(v.clone())
1606                .map_err(|e| format!("invalid scope: {e}"))?,
1607        ),
1608        _ => None,
1609    };
1610
1611    let result = match (session_id, scope) {
1612        // session_id + scope: no single combined entry point on
1613        // Runtime today. Scope takes precedence because it's the
1614        // tenant-isolation surface; per-session policies layer in
1615        // a follow-up when there's a real caller asking for both.
1616        (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1617        (Some(sid), None) => {
1618            session
1619                .runtime
1620                .execute_with_session(&submit.proposal, &sid)
1621                .await
1622        }
1623        (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1624        (None, None) => session.runtime.execute(&submit.proposal).await,
1625    };
1626    serde_json::to_value(result).map_err(|e| e.to_string())
1627}
1628
1629async fn handle_session_policy_open(
1630    session: &crate::session::ClientSession,
1631) -> Result<Value, String> {
1632    let id = session.runtime.open_session().await;
1633    Ok(serde_json::json!({ "session_id": id }))
1634}
1635
1636async fn handle_session_policy_close(
1637    req: &JsonRpcMessage,
1638    session: &crate::session::ClientSession,
1639) -> Result<Value, String> {
1640    let sid = req
1641        .params
1642        .get("session_id")
1643        .and_then(|v| v.as_str())
1644        .ok_or("missing 'session_id'")?;
1645    let closed = session.runtime.close_session(sid).await;
1646    Ok(serde_json::json!({ "closed": closed }))
1647}
1648
1649/// `policy.register` — register one policy against this WebSocket
1650/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1651/// `session.init`. When `session_id` is present, the policy is scoped
1652/// to the named in-runtime session opened via `session.policy.open`;
1653/// otherwise it is global.
1654async fn handle_policy_register(
1655    req: &JsonRpcMessage,
1656    session: &crate::session::ClientSession,
1657) -> Result<Value, String> {
1658    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1659        .map_err(|e| format!("invalid policy params: {e}"))?;
1660    let session_id = req
1661        .params
1662        .get("session_id")
1663        .and_then(|v| v.as_str())
1664        .map(str::to_string);
1665    let check = build_policy_check(&def)
1666        .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1667    match session_id {
1668        Some(sid) => session
1669            .runtime
1670            .register_policy_in_session(&sid, &def.name, check, "")
1671            .await
1672            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1673        None => {
1674            let mut policies = session.runtime.policies.write().await;
1675            policies.register(&def.name, check, "");
1676            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1677        }
1678    }
1679}
1680
1681async fn handle_verify(
1682    req: &JsonRpcMessage,
1683    session: &crate::session::ClientSession,
1684) -> Result<Value, String> {
1685    let vr: VerifyRequest =
1686        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1687    let tools: std::collections::HashSet<String> =
1688        session.runtime.tools.read().await.keys().cloned().collect();
1689    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1690    serde_json::to_value(VerifyResponse {
1691        valid: result.valid,
1692        issues: result
1693            .issues
1694            .iter()
1695            .map(|i| VerifyIssueProto {
1696                action_id: i.action_id.clone(),
1697                severity: i.severity.clone(),
1698                message: i.message.clone(),
1699            })
1700            .collect(),
1701        simulated_state: result.simulated_state,
1702    })
1703    .map_err(|e| e.to_string())
1704}
1705
1706/// Parse the optional `tenant_id` sibling field from JSON-RPC
1707/// params (Parslee-ai/car#187 phase 3-E). When set and non-empty,
1708/// state R/W routes through `StateStore::scoped(tenant_id)` so
1709/// distinct tenants can't see each other's keys over the WS
1710/// surface — symmetric to the proposal.submit scope plumbing.
1711/// When absent / empty, the legacy unscoped namespace applies.
1712fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1713    req.params
1714        .get("tenant_id")
1715        .and_then(|v| v.as_str())
1716        .filter(|s| !s.is_empty())
1717        .map(str::to_string)
1718}
1719
1720async fn handle_state_get(
1721    req: &JsonRpcMessage,
1722    session: &crate::session::ClientSession,
1723) -> Result<Value, String> {
1724    let key = req
1725        .params
1726        .get("key")
1727        .and_then(|v| v.as_str())
1728        .ok_or("missing 'key'")?;
1729    let tenant = tenant_from_params(req);
1730    Ok(session
1731        .runtime
1732        .state
1733        .scoped(tenant.as_deref())
1734        .get(key)
1735        .unwrap_or(Value::Null))
1736}
1737
1738async fn handle_state_set(
1739    req: &JsonRpcMessage,
1740    session: &crate::session::ClientSession,
1741) -> Result<Value, String> {
1742    let key = req
1743        .params
1744        .get("key")
1745        .and_then(|v| v.as_str())
1746        .ok_or("missing 'key'")?;
1747    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1748    let tenant = tenant_from_params(req);
1749    session
1750        .runtime
1751        .state
1752        .scoped(tenant.as_deref())
1753        .set(key, value, "client");
1754    Ok(Value::from("ok"))
1755}
1756
1757/// `state.exists` — true if the key is set in this session's state
1758/// store, false otherwise. Cheaper than `state.get` + null-check on
1759/// the client side because it doesn't serialize the value.
1760async fn handle_state_exists(
1761    req: &JsonRpcMessage,
1762    session: &crate::session::ClientSession,
1763) -> Result<Value, String> {
1764    let key = req
1765        .params
1766        .get("key")
1767        .and_then(|v| v.as_str())
1768        .ok_or("missing 'key'")?;
1769    let tenant = tenant_from_params(req);
1770    Ok(Value::Bool(
1771        session
1772            .runtime
1773            .state
1774            .scoped(tenant.as_deref())
1775            .exists(key),
1776    ))
1777}
1778
1779/// `state.keys` — list every key currently set in this session's
1780/// state store. Returns a JSON array of strings.
1781async fn handle_state_keys(
1782    req: &JsonRpcMessage,
1783    session: &crate::session::ClientSession,
1784) -> Result<Value, String> {
1785    let tenant = tenant_from_params(req);
1786    Ok(Value::Array(
1787        session
1788            .runtime
1789            .state
1790            .scoped(tenant.as_deref())
1791            .keys()
1792            .into_iter()
1793            .map(Value::String)
1794            .collect(),
1795    ))
1796}
1797
1798/// `state.snapshot` — return the entire session state store as a
1799/// JSON object (`{ key: value, ... }`). Equivalent to iterating
1800/// `state.keys` + `state.get` but in a single round-trip; for
1801/// inspectors/dashboards.
1802///
1803/// Tenant-scoped variant: when `tenant_id` is set, only that
1804/// tenant's keys are returned (prefix stripped on the way out).
1805/// `state.snapshot` with no `tenant_id` returns only unscoped
1806/// keys; consistent with `state.keys`'s filter behaviour and the
1807/// strict-isolation contract from phase 3-B.
1808async fn handle_state_snapshot(
1809    req: &JsonRpcMessage,
1810    session: &crate::session::ClientSession,
1811) -> Result<Value, String> {
1812    let tenant = tenant_from_params(req);
1813    let view = session.runtime.state.scoped(tenant.as_deref());
1814    let mut map = serde_json::Map::new();
1815    for key in view.keys() {
1816        if let Some(value) = view.get(&key) {
1817            map.insert(key, value);
1818        }
1819    }
1820    Ok(Value::Object(map))
1821}
1822
1823// --- Per-agent persistent memgine (#170) ---
1824
1825/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
1826/// Mirrors the existing `memory.persist` shape (flat JSON array of
1827/// fact objects) so the same loader path works.
1828fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1829    let base = car_ffi_common::memory_path::ensure_base()
1830        .map_err(|e| format!("memory base unavailable: {e}"))?;
1831    let dir = base.join("agents");
1832    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1833    Ok(dir.join(format!("{agent_id}.json")))
1834}
1835
1836/// Acquire (or lazy-create + load from disk) the daemon-owned
1837/// persistent memgine for `agent_id`. First call per id reads
1838/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
1839/// share the in-memory engine across sessions. Caller stores the
1840/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
1841/// handlers route through it via [`ClientSession::effective_memgine`].
1842async fn get_or_load_agent_memgine(
1843    state: &Arc<ServerState>,
1844    agent_id: &str,
1845) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1846    {
1847        let map = state.agent_memgines.lock().await;
1848        if let Some(eng) = map.get(agent_id) {
1849            return Ok(eng.clone());
1850        }
1851    }
1852    // Build a fresh engine and try to load from disk.
1853    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
1854        None,
1855    )));
1856    let path = agent_memgine_snapshot_path(agent_id)?;
1857    if path.exists() {
1858        let content = std::fs::read_to_string(&path)
1859            .map_err(|e| format!("read {}: {}", path.display(), e))?;
1860        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1861        let mut g = engine.lock().await;
1862        let mut loaded: u32 = 0;
1863        for fact in &facts {
1864            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1865            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1866            let kind = fact
1867                .get("kind")
1868                .and_then(|v| v.as_str())
1869                .unwrap_or("pattern");
1870            let fid = format!("loaded-{loaded}");
1871            g.ingest_fact(
1872                &fid,
1873                subject,
1874                body,
1875                "user",
1876                "peer",
1877                chrono::Utc::now(),
1878                "global",
1879                None,
1880                vec![],
1881                kind == "constraint",
1882            );
1883            loaded += 1;
1884        }
1885    }
1886    let mut map = state.agent_memgines.lock().await;
1887    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1888    Ok(stored)
1889}
1890
1891/// Snapshot the agent's memgine to its disk file. Same on-wire shape
1892/// as `memory.persist` so manual snapshots and the daemon-owned
1893/// persistence stay interoperable.
1894async fn persist_agent_memgine(
1895    agent_id: &str,
1896    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1897) -> Result<(), String> {
1898    let path = agent_memgine_snapshot_path(agent_id)?;
1899    let g = engine.lock().await;
1900    let facts: Vec<Value> = g
1901        .graph
1902        .inner
1903        .node_indices()
1904        .filter_map(|nix| {
1905            let node = g.graph.inner.node_weight(nix)?;
1906            if !node.is_valid() {
1907                return None;
1908            }
1909            if node.kind == car_memgine::MemKind::Identity
1910                || node.kind == car_memgine::MemKind::Environment
1911            {
1912                return None;
1913            }
1914            Some(serde_json::json!({
1915                "subject": node.key,
1916                "body": node.value,
1917                "kind": match node.kind {
1918                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1919                    car_memgine::MemKind::Conversation => "outcome",
1920                    _ => "pattern",
1921                },
1922                "confidence": 0.5,
1923                "content_type": node.content_type.as_label(),
1924            }))
1925        })
1926        .collect();
1927    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1928    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
1929    Ok(())
1930}
1931
1932// --- Memory handlers ---
1933
1934/// `memory.fact_count` — return `valid_fact_count()` of the
1935/// session's memgine. Used by FFI bindings to mirror their
1936/// embedded `fact_count()` accessor without round-tripping a full
1937/// query. No params.
1938async fn handle_memory_fact_count(
1939    session: &crate::session::ClientSession,
1940) -> Result<Value, String> {
1941    let engine_arc = session.effective_memgine().await;
1942    let engine = engine_arc.lock().await;
1943    Ok(Value::from(engine.valid_fact_count()))
1944}
1945
1946async fn handle_memory_add_fact(
1947    req: &JsonRpcMessage,
1948    session: &crate::session::ClientSession,
1949) -> Result<Value, String> {
1950    let subject = req
1951        .params
1952        .get("subject")
1953        .and_then(|v| v.as_str())
1954        .ok_or("missing subject")?;
1955    let body = req
1956        .params
1957        .get("body")
1958        .and_then(|v| v.as_str())
1959        .ok_or("missing body")?;
1960    let kind = req
1961        .params
1962        .get("kind")
1963        .and_then(|v| v.as_str())
1964        .unwrap_or("pattern");
1965    // Route through `effective_memgine` so connections bound to a
1966    // lifecycle agent (#169) write into the daemon-owned per-agent
1967    // memgine instead of the per-WS ephemeral one (#170).
1968    let engine_arc = session.effective_memgine().await;
1969    let count = {
1970        let mut engine = engine_arc.lock().await;
1971        let fid = format!("ws-{}", engine.valid_fact_count());
1972        engine.ingest_fact(
1973            &fid,
1974            subject,
1975            body,
1976            "user",
1977            "peer",
1978            chrono::Utc::now(),
1979            "global",
1980            None,
1981            vec![],
1982            kind == "constraint",
1983        );
1984        engine.valid_fact_count()
1985    };
1986    // Persist after every add when the session is bound to a
1987    // supervised agent. Synchronous write — small JSON snapshot.
1988    if let Some(id) = session.agent_id.lock().await.clone() {
1989        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
1990            tracing::warn!(agent_id = %id, error = %e,
1991                "agent memgine persist failed; in-memory state is canonical");
1992        }
1993    }
1994    Ok(Value::from(count))
1995}
1996
1997async fn handle_memory_query(
1998    req: &JsonRpcMessage,
1999    session: &crate::session::ClientSession,
2000) -> Result<Value, String> {
2001    let query = req
2002        .params
2003        .get("query")
2004        .and_then(|v| v.as_str())
2005        .ok_or("missing query")?;
2006    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2007    let engine_arc = session.effective_memgine().await;
2008    let engine = engine_arc.lock().await;
2009    let seeds = engine.graph.find_seeds(query, 5);
2010    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
2011    // both use Personalized PageRank so transport choice doesn't shift
2012    // ranking semantics. Result shape (subject/body/kind/confidence)
2013    // also mirrors NAPI for the same reason.
2014    let hits = if !seeds.is_empty() {
2015        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2016    } else {
2017        vec![]
2018    };
2019    let results: Vec<Value> = hits
2020        .iter()
2021        .filter_map(|hit| {
2022            let node = engine.graph.inner.node_weight(hit.node_ix)?;
2023            Some(serde_json::json!({
2024                "subject": node.key,
2025                "body": node.value,
2026                "kind": format!("{:?}", node.kind).to_lowercase(),
2027                "confidence": hit.activation,
2028            }))
2029        })
2030        .collect();
2031    serde_json::to_value(results).map_err(|e| e.to_string())
2032}
2033
2034async fn handle_memory_build_context(
2035    req: &JsonRpcMessage,
2036    session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038    let query = req
2039        .params
2040        .get("query")
2041        .and_then(|v| v.as_str())
2042        .unwrap_or("");
2043    // FFI parity with NAPI `build_context(query, model_context_window)`.
2044    // When supplied, sizes the assembly budget against the model's window
2045    // instead of the fixed 8K default.
2046    let model_context_window = req
2047        .params
2048        .get("model_context_window")
2049        .and_then(|v| v.as_u64())
2050        .map(|w| w as usize);
2051    let mut engine = session.memgine.lock().await;
2052    Ok(Value::from(
2053        engine.build_context_for_model(query, model_context_window),
2054    ))
2055}
2056
2057/// `memory.build_context_fast` — Fast-mode context assembly for
2058/// latency-sensitive paths (voice, real-time). Skips embedding flush,
2059/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
2060/// extraction. Keeps identity, constraints, facts (creation order),
2061/// conversation, environment.
2062async fn handle_memory_build_context_fast(
2063    req: &JsonRpcMessage,
2064    session: &crate::session::ClientSession,
2065) -> Result<Value, String> {
2066    let query = req
2067        .params
2068        .get("query")
2069        .and_then(|v| v.as_str())
2070        .unwrap_or("");
2071    let model_context_window = req
2072        .params
2073        .get("model_context_window")
2074        .and_then(|v| v.as_u64())
2075        .map(|w| w as usize);
2076    let mut engine = session.memgine.lock().await;
2077    Ok(Value::from(engine.build_context_with_options(
2078        query,
2079        model_context_window,
2080        car_memgine::ContextMode::Fast,
2081        None,
2082    )))
2083}
2084
2085/// `memory.persist` — write the session's memgine to a JSON file
2086/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
2087/// so daemon-mode clients can drive checkpoint/restore symmetrically
2088/// with embedded mode. Returns the number of facts written.
2089///
2090/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
2091/// not the caller's. Since the 2026-05 audit, `path` is also
2092/// sandboxed under `~/.car/memory/` via
2093/// [`car_ffi_common::memory_path::resolve`] — relative paths land
2094/// under the base, absolute paths must already be under the base,
2095/// `..` segments are rejected, symlinks pointing out are rejected.
2096/// Pre-2026-05 the path was passed straight to `std::fs::write` and
2097/// became an arbitrary file-write primitive. The base64-blob escape
2098/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
2099/// same resolver when it lands.
2100async fn handle_memory_persist(
2101    req: &JsonRpcMessage,
2102    session: &crate::session::ClientSession,
2103) -> Result<Value, String> {
2104    let path = req
2105        .params
2106        .get("path")
2107        .and_then(|v| v.as_str())
2108        .ok_or("missing path")?;
2109    let resolved = car_ffi_common::memory_path::resolve(path)
2110        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2111    let engine = session.memgine.lock().await;
2112    let facts: Vec<Value> = engine
2113        .graph
2114        .inner
2115        .node_indices()
2116        .filter_map(|nix| {
2117            let node = engine.graph.inner.node_weight(nix)?;
2118            if !node.is_valid() {
2119                return None;
2120            }
2121            if node.kind == car_memgine::MemKind::Identity
2122                || node.kind == car_memgine::MemKind::Environment
2123            {
2124                return None;
2125            }
2126            Some(serde_json::json!({
2127                "subject": node.key,
2128                "body": node.value,
2129                "kind": match node.kind {
2130                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2131                    car_memgine::MemKind::Conversation => "outcome",
2132                    _ => "pattern",
2133                },
2134                "confidence": 0.5,
2135                "content_type": node.content_type.as_label(),
2136            }))
2137        })
2138        .collect();
2139    let count = facts.len();
2140    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2141    std::fs::write(&resolved, json)
2142        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2143    Ok(Value::from(count as u64))
2144}
2145
2146/// `memory.load` — replace the session's memgine with facts from the
2147/// JSON file at `path`. Mirrors NAPI `load_memory`
2148/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
2149/// as `memory.persist` since the 2026-05 audit — relative paths
2150/// land under the base, anything that escapes is rejected.
2151async fn handle_memory_load(
2152    req: &JsonRpcMessage,
2153    session: &crate::session::ClientSession,
2154) -> Result<Value, String> {
2155    let path = req
2156        .params
2157        .get("path")
2158        .and_then(|v| v.as_str())
2159        .ok_or("missing path")?;
2160    let resolved = car_ffi_common::memory_path::resolve(path)
2161        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2162    let content = std::fs::read_to_string(&resolved)
2163        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2164    let facts: Vec<Value> =
2165        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2166    let mut engine = session.memgine.lock().await;
2167    engine.reset();
2168    let mut count: u32 = 0;
2169    for fact in &facts {
2170        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2171        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2172        let kind = fact
2173            .get("kind")
2174            .and_then(|v| v.as_str())
2175            .unwrap_or("pattern");
2176        let fid = format!("loaded-{}", count);
2177        engine.ingest_fact(
2178            &fid,
2179            subject,
2180            body,
2181            "user",
2182            "peer",
2183            chrono::Utc::now(),
2184            "global",
2185            None,
2186            vec![],
2187            kind == "constraint",
2188        );
2189        count += 1;
2190    }
2191    Ok(Value::from(count))
2192}
2193
2194// --- Skill handlers ---
2195
2196async fn handle_skill_ingest(
2197    req: &JsonRpcMessage,
2198    session: &crate::session::ClientSession,
2199) -> Result<Value, String> {
2200    let name = req
2201        .params
2202        .get("name")
2203        .and_then(|v| v.as_str())
2204        .ok_or("missing name")?;
2205    let code = req
2206        .params
2207        .get("code")
2208        .and_then(|v| v.as_str())
2209        .ok_or("missing code")?;
2210    let platform = req
2211        .params
2212        .get("platform")
2213        .and_then(|v| v.as_str())
2214        .unwrap_or("unknown");
2215    let persona = req
2216        .params
2217        .get("persona")
2218        .and_then(|v| v.as_str())
2219        .unwrap_or("");
2220    let url_pattern = req
2221        .params
2222        .get("url_pattern")
2223        .and_then(|v| v.as_str())
2224        .unwrap_or("");
2225    let description = req
2226        .params
2227        .get("description")
2228        .and_then(|v| v.as_str())
2229        .unwrap_or("");
2230    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2231    let keywords: Vec<String> = req
2232        .params
2233        .get("task_keywords")
2234        .and_then(|v| v.as_array())
2235        .map(|arr| {
2236            arr.iter()
2237                .filter_map(|v| v.as_str().map(String::from))
2238                .collect()
2239        })
2240        .unwrap_or_default();
2241
2242    let trigger = car_memgine::SkillTrigger {
2243        persona: persona.into(),
2244        url_pattern: url_pattern.into(),
2245        task_keywords: keywords,
2246        structured: None,
2247    };
2248    let mut engine = session.memgine.lock().await;
2249    let node = engine.ingest_skill(
2250        name,
2251        code,
2252        platform,
2253        trigger,
2254        description,
2255        supersedes,
2256        vec![],
2257        vec![],
2258    );
2259    Ok(Value::from(node.index() as u64))
2260}
2261
2262async fn handle_skill_find(
2263    req: &JsonRpcMessage,
2264    session: &crate::session::ClientSession,
2265) -> Result<Value, String> {
2266    let persona = req
2267        .params
2268        .get("persona")
2269        .and_then(|v| v.as_str())
2270        .unwrap_or("");
2271    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2272    let task = req
2273        .params
2274        .get("task")
2275        .and_then(|v| v.as_str())
2276        .unwrap_or("");
2277    let max = req
2278        .params
2279        .get("max_results")
2280        .and_then(|v| v.as_u64())
2281        .unwrap_or(1) as usize;
2282    let engine = session.memgine.lock().await;
2283    let results = engine.find_skill(persona, url, task, max);
2284    let json: Vec<Value> = results
2285        .iter()
2286        .map(|(m, s)| {
2287            serde_json::json!({
2288                "name": m.name, "code": m.code, "platform": m.platform,
2289                "description": m.description, "stats": m.stats, "match_score": s,
2290            })
2291        })
2292        .collect();
2293    serde_json::to_value(json).map_err(|e| e.to_string())
2294}
2295
2296async fn handle_skill_report(
2297    req: &JsonRpcMessage,
2298    session: &crate::session::ClientSession,
2299) -> Result<Value, String> {
2300    let name = req
2301        .params
2302        .get("skill_name")
2303        .and_then(|v| v.as_str())
2304        .ok_or("missing skill_name")?;
2305    let outcome_str = req
2306        .params
2307        .get("outcome")
2308        .and_then(|v| v.as_str())
2309        .ok_or("missing outcome")?;
2310    let outcome = match outcome_str {
2311        "success" => car_memgine::SkillOutcome::Success,
2312        _ => car_memgine::SkillOutcome::Fail,
2313    };
2314    let mut engine = session.memgine.lock().await;
2315    let stats = engine
2316        .report_outcome(name, outcome)
2317        .ok_or(format!("skill '{}' not found", name))?;
2318    serde_json::to_value(stats).map_err(|e| e.to_string())
2319}
2320
2321// ---------------------------------------------------------------------------
2322// Multi-agent coordination handlers
2323//
2324// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2325// The client runs the model loop and responds with AgentOutput JSON.
2326// ---------------------------------------------------------------------------
2327
2328/// AgentRunner backed by WebSocket callback to the client.
2329struct WsAgentRunner {
2330    channel: Arc<WsChannel>,
2331    host: Arc<crate::host::HostState>,
2332    client_id: String,
2333}
2334
2335#[async_trait::async_trait]
2336impl car_multi::AgentRunner for WsAgentRunner {
2337    async fn run(
2338        &self,
2339        spec: &car_multi::AgentSpec,
2340        task: &str,
2341        _runtime: &car_engine::Runtime,
2342        _mailbox: &car_multi::Mailbox,
2343    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2344        use futures::SinkExt;
2345
2346        let request_id = self.channel.next_request_id();
2347        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2348        let agent = self
2349            .host
2350            .register_agent(
2351                &self.client_id,
2352                RegisterHostAgentRequest {
2353                    id: Some(agent_id.clone()),
2354                    name: spec.name.clone(),
2355                    kind: "callback".to_string(),
2356                    capabilities: spec.tools.clone(),
2357                    project: spec
2358                        .metadata
2359                        .get("project")
2360                        .and_then(|v| v.as_str())
2361                        .map(str::to_string),
2362                    pid: None,
2363                    display: serde_json::from_value(
2364                        spec.metadata
2365                            .get("display")
2366                            .cloned()
2367                            .unwrap_or(serde_json::Value::Null),
2368                    )
2369                    .unwrap_or_default(),
2370                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2371                },
2372            )
2373            .await
2374            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2375        let _ = self
2376            .host
2377            .set_status(
2378                &self.client_id,
2379                SetHostAgentStatusRequest {
2380                    agent_id: agent.id.clone(),
2381                    status: HostAgentStatus::Running,
2382                    current_task: Some(task.to_string()),
2383                    message: Some(format!("{} started", spec.name)),
2384                    payload: serde_json::json!({ "task": task }),
2385                },
2386            )
2387            .await;
2388
2389        let rpc_request = serde_json::json!({
2390            "jsonrpc": "2.0",
2391            "method": "multi.run_agent",
2392            "params": {
2393                "spec": spec,
2394                "task": task,
2395            },
2396            "id": request_id,
2397        });
2398
2399        // Create oneshot channel for the response
2400        let (tx, rx) = tokio::sync::oneshot::channel();
2401        self.channel
2402            .pending
2403            .lock()
2404            .await
2405            .insert(request_id.clone(), tx);
2406
2407        let msg = Message::Text(
2408            serde_json::to_string(&rpc_request)
2409                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2410                .into(),
2411        );
2412        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2413            let _ = self
2414                .host
2415                .set_status(
2416                    &self.client_id,
2417                    SetHostAgentStatusRequest {
2418                        agent_id: agent_id.clone(),
2419                        status: HostAgentStatus::Errored,
2420                        current_task: None,
2421                        message: Some(format!("{} failed to start", spec.name)),
2422                        payload: serde_json::json!({ "error": e.to_string() }),
2423                    },
2424                )
2425                .await;
2426            return Err(car_multi::MultiError::AgentFailed(
2427                spec.name.clone(),
2428                format!("ws send error: {}", e),
2429            ));
2430        }
2431
2432        // Wait for client response (5 min timeout for model loops)
2433        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2434            Ok(Ok(response)) => response,
2435            Ok(Err(_)) => {
2436                let _ = self
2437                    .host
2438                    .set_status(
2439                        &self.client_id,
2440                        SetHostAgentStatusRequest {
2441                            agent_id: agent_id.clone(),
2442                            status: HostAgentStatus::Errored,
2443                            current_task: None,
2444                            message: Some(format!("{} callback channel closed", spec.name)),
2445                            payload: Value::Null,
2446                        },
2447                    )
2448                    .await;
2449                return Err(car_multi::MultiError::AgentFailed(
2450                    spec.name.clone(),
2451                    "agent callback channel closed".into(),
2452                ));
2453            }
2454            Err(_) => {
2455                let _ = self
2456                    .host
2457                    .set_status(
2458                        &self.client_id,
2459                        SetHostAgentStatusRequest {
2460                            agent_id: agent_id.clone(),
2461                            status: HostAgentStatus::Errored,
2462                            current_task: None,
2463                            message: Some(format!("{} timed out", spec.name)),
2464                            payload: Value::Null,
2465                        },
2466                    )
2467                    .await;
2468                return Err(car_multi::MultiError::AgentFailed(
2469                    spec.name.clone(),
2470                    "agent callback timed out (300s)".into(),
2471                ));
2472            }
2473        };
2474
2475        if let Some(err) = response.error {
2476            let _ = self
2477                .host
2478                .set_status(
2479                    &self.client_id,
2480                    SetHostAgentStatusRequest {
2481                        agent_id: agent_id.clone(),
2482                        status: HostAgentStatus::Errored,
2483                        current_task: None,
2484                        message: Some(format!("{} errored", spec.name)),
2485                        payload: serde_json::json!({ "error": err }),
2486                    },
2487                )
2488                .await;
2489            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2490        }
2491
2492        let output_value = response.output.unwrap_or(Value::Null);
2493        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2494            car_multi::MultiError::AgentFailed(
2495                spec.name.clone(),
2496                format!("invalid AgentOutput: {}", e),
2497            )
2498        })?;
2499        let status = if output.error.is_some() {
2500            HostAgentStatus::Errored
2501        } else {
2502            HostAgentStatus::Completed
2503        };
2504        let message = if output.error.is_some() {
2505            format!("{} errored", spec.name)
2506        } else {
2507            format!("{} completed", spec.name)
2508        };
2509        let _ = self
2510            .host
2511            .set_status(
2512                &self.client_id,
2513                SetHostAgentStatusRequest {
2514                    agent_id,
2515                    status,
2516                    current_task: None,
2517                    message: Some(message),
2518                    payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2519                },
2520            )
2521            .await;
2522
2523        Ok(output)
2524    }
2525}
2526
2527fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2528    let safe_name: String = name
2529        .chars()
2530        .map(|c| {
2531            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2532                c
2533            } else {
2534                '-'
2535            }
2536        })
2537        .collect();
2538    format!("{}:{}:{}", client_id, safe_name, request_id)
2539}
2540
2541async fn handle_multi_swarm(
2542    req: &JsonRpcMessage,
2543    session: &crate::session::ClientSession,
2544) -> Result<Value, String> {
2545    let mode_str = req
2546        .params
2547        .get("mode")
2548        .and_then(|v| v.as_str())
2549        .ok_or("missing 'mode'")?;
2550    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2551    let task = req
2552        .params
2553        .get("task")
2554        .and_then(|v| v.as_str())
2555        .ok_or("missing 'task'")?;
2556
2557    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2558        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2559    let agent_specs: Vec<car_multi::AgentSpec> =
2560        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2561    let synth: Option<car_multi::AgentSpec> = req
2562        .params
2563        .get("synthesizer")
2564        .map(|v| serde_json::from_value(v.clone()))
2565        .transpose()
2566        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2567
2568    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2569        channel: session.channel.clone(),
2570        host: session.host.clone(),
2571        client_id: session.client_id.clone(),
2572    });
2573    let infra = car_multi::SharedInfra::new();
2574
2575    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2576    if let Some(s) = synth {
2577        swarm = swarm.with_synthesizer(s);
2578    }
2579
2580    let result = swarm
2581        .run(task, &runner, &infra)
2582        .await
2583        .map_err(|e| format!("swarm error: {}", e))?;
2584    serde_json::to_value(result).map_err(|e| e.to_string())
2585}
2586
2587async fn handle_multi_pipeline(
2588    req: &JsonRpcMessage,
2589    session: &crate::session::ClientSession,
2590) -> Result<Value, String> {
2591    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2592    let task = req
2593        .params
2594        .get("task")
2595        .and_then(|v| v.as_str())
2596        .ok_or("missing 'task'")?;
2597
2598    let stage_specs: Vec<car_multi::AgentSpec> =
2599        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2600
2601    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2602        channel: session.channel.clone(),
2603        host: session.host.clone(),
2604        client_id: session.client_id.clone(),
2605    });
2606    let infra = car_multi::SharedInfra::new();
2607
2608    let result = car_multi::Pipeline::new(stage_specs)
2609        .run(task, &runner, &infra)
2610        .await
2611        .map_err(|e| format!("pipeline error: {}", e))?;
2612    serde_json::to_value(result).map_err(|e| e.to_string())
2613}
2614
2615async fn handle_multi_supervisor(
2616    req: &JsonRpcMessage,
2617    session: &crate::session::ClientSession,
2618) -> Result<Value, String> {
2619    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2620    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2621    let task = req
2622        .params
2623        .get("task")
2624        .and_then(|v| v.as_str())
2625        .ok_or("missing 'task'")?;
2626    let max_rounds = req
2627        .params
2628        .get("max_rounds")
2629        .and_then(|v| v.as_u64())
2630        .unwrap_or(3) as u32;
2631
2632    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2633        .map_err(|e| format!("invalid workers: {}", e))?;
2634    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2635        .map_err(|e| format!("invalid supervisor: {}", e))?;
2636
2637    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2638        channel: session.channel.clone(),
2639        host: session.host.clone(),
2640        client_id: session.client_id.clone(),
2641    });
2642    let infra = car_multi::SharedInfra::new();
2643
2644    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2645        .with_max_rounds(max_rounds)
2646        .run(task, &runner, &infra)
2647        .await
2648        .map_err(|e| format!("supervisor error: {}", e))?;
2649    serde_json::to_value(result).map_err(|e| e.to_string())
2650}
2651
2652async fn handle_multi_map_reduce(
2653    req: &JsonRpcMessage,
2654    session: &crate::session::ClientSession,
2655) -> Result<Value, String> {
2656    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2657    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2658    let task = req
2659        .params
2660        .get("task")
2661        .and_then(|v| v.as_str())
2662        .ok_or("missing 'task'")?;
2663    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2664
2665    let mapper_spec: car_multi::AgentSpec =
2666        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2667    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2668        .map_err(|e| format!("invalid reducer: {}", e))?;
2669    let items: Vec<String> =
2670        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2671
2672    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2673        channel: session.channel.clone(),
2674        host: session.host.clone(),
2675        client_id: session.client_id.clone(),
2676    });
2677    let infra = car_multi::SharedInfra::new();
2678
2679    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2680        .run(task, &items, &runner, &infra)
2681        .await
2682        .map_err(|e| format!("map_reduce error: {}", e))?;
2683    serde_json::to_value(result).map_err(|e| e.to_string())
2684}
2685
2686async fn handle_multi_vote(
2687    req: &JsonRpcMessage,
2688    session: &crate::session::ClientSession,
2689) -> Result<Value, String> {
2690    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2691    let task = req
2692        .params
2693        .get("task")
2694        .and_then(|v| v.as_str())
2695        .ok_or("missing 'task'")?;
2696
2697    let agent_specs: Vec<car_multi::AgentSpec> =
2698        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2699    let synth: Option<car_multi::AgentSpec> = req
2700        .params
2701        .get("synthesizer")
2702        .map(|v| serde_json::from_value(v.clone()))
2703        .transpose()
2704        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2705
2706    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2707        channel: session.channel.clone(),
2708        host: session.host.clone(),
2709        client_id: session.client_id.clone(),
2710    });
2711    let infra = car_multi::SharedInfra::new();
2712
2713    let mut vote = car_multi::Vote::new(agent_specs);
2714    if let Some(s) = synth {
2715        vote = vote.with_synthesizer(s);
2716    }
2717
2718    let result = vote
2719        .run(task, &runner, &infra)
2720        .await
2721        .map_err(|e| format!("vote error: {}", e))?;
2722    serde_json::to_value(result).map_err(|e| e.to_string())
2723}
2724
2725// ---------------------------------------------------------------------------
2726// Scheduler handlers
2727// ---------------------------------------------------------------------------
2728
2729fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2730    let name = req
2731        .params
2732        .get("name")
2733        .and_then(|v| v.as_str())
2734        .ok_or("scheduler.create requires 'name'")?;
2735    let prompt = req
2736        .params
2737        .get("prompt")
2738        .and_then(|v| v.as_str())
2739        .ok_or("scheduler.create requires 'prompt'")?;
2740
2741    let mut task = car_scheduler::Task::new(name, prompt);
2742
2743    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2744        let trigger = match t {
2745            "once" => car_scheduler::TaskTrigger::Once,
2746            "cron" => car_scheduler::TaskTrigger::Cron,
2747            "interval" => car_scheduler::TaskTrigger::Interval,
2748            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2749            _ => car_scheduler::TaskTrigger::Manual,
2750        };
2751        let schedule = req
2752            .params
2753            .get("schedule")
2754            .and_then(|v| v.as_str())
2755            .unwrap_or("");
2756        task = task.with_trigger(trigger, schedule);
2757    }
2758
2759    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2760        task = task.with_system_prompt(sp);
2761    }
2762
2763    serde_json::to_value(&task).map_err(|e| e.to_string())
2764}
2765
2766async fn handle_scheduler_run(
2767    req: &JsonRpcMessage,
2768    session: &crate::session::ClientSession,
2769) -> Result<Value, String> {
2770    let task_val = req
2771        .params
2772        .get("task")
2773        .ok_or("scheduler.run requires 'task'")?;
2774    let mut task: car_scheduler::Task =
2775        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2776
2777    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2778        channel: session.channel.clone(),
2779        host: session.host.clone(),
2780        client_id: session.client_id.clone(),
2781    });
2782    let executor = car_scheduler::Executor::new(runner);
2783    let execution = executor.run_once(&mut task).await;
2784
2785    serde_json::to_value(&execution).map_err(|e| e.to_string())
2786}
2787
2788async fn handle_scheduler_run_loop(
2789    req: &JsonRpcMessage,
2790    session: &crate::session::ClientSession,
2791) -> Result<Value, String> {
2792    let task_val = req
2793        .params
2794        .get("task")
2795        .ok_or("scheduler.run_loop requires 'task'")?;
2796    let mut task: car_scheduler::Task =
2797        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2798    let max_iterations = req
2799        .params
2800        .get("max_iterations")
2801        .and_then(|v| v.as_u64())
2802        .map(|v| v as u32);
2803
2804    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2805        channel: session.channel.clone(),
2806        host: session.host.clone(),
2807        client_id: session.client_id.clone(),
2808    });
2809    let executor = car_scheduler::Executor::new(runner);
2810    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2811    let executions = executor
2812        .run_loop(&mut task, max_iterations, cancel_rx)
2813        .await;
2814
2815    serde_json::to_value(&executions).map_err(|e| e.to_string())
2816}
2817
2818// ---------------------------------------------------------------------------
2819// Inference handlers
2820// ---------------------------------------------------------------------------
2821
2822fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2823    state.inference.get_or_init(|| {
2824        Arc::new(car_inference::InferenceEngine::new(
2825            car_inference::InferenceConfig::default(),
2826        ))
2827    })
2828}
2829
2830async fn handle_infer(
2831    msg: &JsonRpcMessage,
2832    state: &ServerState,
2833    session: &crate::session::ClientSession,
2834) -> Result<Value, String> {
2835    let engine = get_inference_engine(state);
2836    let mut req: car_inference::GenerateRequest =
2837        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2838
2839    // If context_query is provided, build context from memgine and inject it
2840    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2841        let mut memgine = session.memgine.lock().await;
2842        let ctx = memgine.build_context(cq);
2843        if !ctx.is_empty() {
2844            req.context = Some(ctx);
2845        }
2846    }
2847
2848    // Process-wide admission gate. Held for the duration of the
2849    // generation so a burst of concurrent infer RPCs can't multiply
2850    // KV-cache + activation memory and take the host out. The
2851    // `_permit` binding is intentional — its `Drop` releases the slot
2852    // when this future returns.
2853    let _permit = state.admission.acquire().await;
2854
2855    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
2856    // latency_ms are preserved in the response. Plain `generate()` discards
2857    // everything except `.text`, which silently breaks tool-use over the
2858    // WebSocket protocol (issue #43).
2859    //
2860    // NOTE: This directly serializes `InferenceResult`. Any field added to
2861    // that struct in `car-inference` becomes part of the public WebSocket
2862    // protocol. The shape is locked by `inference_result_serializes_*` tests
2863    // in car-inference; updating those tests is part of intentionally
2864    // changing the wire contract.
2865    let result = engine
2866        .generate_tracked(req)
2867        .await
2868        .map_err(|e| e.to_string())?;
2869    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2870}
2871
2872/// Streaming inference — mirrors NAPI `inferStream`. Closes
2873/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
2874/// `infer`; emits `inference.stream.event` JSON-RPC notifications
2875/// during the run, and returns the final `InferenceResult` as the
2876/// JSON-RPC response when the stream completes.
2877///
2878/// Notification shape (server → client):
2879/// ```jsonc
2880/// {
2881///   "jsonrpc": "2.0",
2882///   "method": "inference.stream.event",
2883///   "params": {
2884///     "request_id": "<original RPC id>",
2885///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
2886///   }
2887/// }
2888/// ```
2889///
2890/// The final `done` event is not pushed as a notification — it's
2891/// the JSON-RPC response with the accumulated `InferenceResult`.
2892/// `video.generate` — daemon-side wrapper for
2893/// `InferenceEngine::generate_video`. Mirrors `handle_infer`'s
2894/// admission gate + JSON request shape (Parslee-ai/car#185).
2895///
2896/// Previously the CLI's `cmd_video` constructed an in-process
2897/// engine and called `generate_video` directly — a v0.7 holdover
2898/// that bypassed the daemon. With this handler the CLI proxies
2899/// here, so the engine-level audio_passthrough gate fires
2900/// inside the daemon process where all FFI surfaces converge.
2901async fn handle_image_generate(
2902    msg: &JsonRpcMessage,
2903    state: &ServerState,
2904) -> Result<Value, String> {
2905    let engine = get_inference_engine(state);
2906    let req: car_inference::GenerateImageRequest = serde_json::from_value(msg.params.clone())
2907        .map_err(|e| format!("invalid params: {}", e))?;
2908    // Share the same admission gate as text/video generation — a burst
2909    // of image requests shouldn't smuggle around the concurrency cap.
2910    let _permit = state.admission.acquire().await;
2911    let result = engine
2912        .generate_image(req)
2913        .await
2914        .map_err(|e| e.to_string())?;
2915    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2916}
2917
2918async fn handle_video_generate(
2919    msg: &JsonRpcMessage,
2920    state: &ServerState,
2921) -> Result<Value, String> {
2922    let engine = get_inference_engine(state);
2923    let req: car_inference::GenerateVideoRequest = serde_json::from_value(msg.params.clone())
2924        .map_err(|e| format!("invalid params: {}", e))?;
2925    let _permit = state.admission.acquire().await;
2926    let result = engine
2927        .generate_video(req)
2928        .await
2929        .map_err(|e| e.to_string())?;
2930    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2931}
2932
2933async fn handle_infer_stream(
2934    msg: &JsonRpcMessage,
2935    session: &crate::session::ClientSession,
2936    state: &ServerState,
2937) -> Result<Value, String> {
2938    use futures::SinkExt;
2939    use tokio_tungstenite::tungstenite::Message;
2940
2941    let engine = get_inference_engine(state);
2942    let mut req: car_inference::GenerateRequest =
2943        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2944
2945    // Same context-injection convenience as non-streaming `infer` so
2946    // the two methods have parity on the call shape.
2947    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2948        let mut memgine = session.memgine.lock().await;
2949        let ctx = memgine.build_context(cq);
2950        if !ctx.is_empty() {
2951            req.context = Some(ctx);
2952        }
2953    }
2954
2955    let _permit = state.admission.acquire().await;
2956    let mut rx = engine
2957        .generate_tracked_stream(req)
2958        .await
2959        .map_err(|e| e.to_string())?;
2960
2961    let mut accumulator = car_inference::StreamAccumulator::default();
2962    let request_id = msg.id.clone();
2963
2964    while let Some(event) = rx.recv().await {
2965        let event_payload = match &event {
2966            car_inference::StreamEvent::TextDelta(text) => {
2967                serde_json::json!({"type": "text", "data": text})
2968            }
2969            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
2970                serde_json::json!({"type": "tool_start", "name": name, "index": index})
2971            }
2972            car_inference::StreamEvent::ToolCallDelta {
2973                index,
2974                arguments_delta,
2975            } => serde_json::json!({
2976                "type": "tool_delta",
2977                "index": index,
2978                "data": arguments_delta,
2979            }),
2980            car_inference::StreamEvent::Usage {
2981                input_tokens,
2982                output_tokens,
2983            } => serde_json::json!({
2984                "type": "usage",
2985                "input_tokens": input_tokens,
2986                "output_tokens": output_tokens,
2987            }),
2988            // Done is delivered as the JSON-RPC response, not a
2989            // notification — matches the NAPI contract where the
2990            // standalone function's return value is the accumulated
2991            // result and the callback only sees in-progress events.
2992            car_inference::StreamEvent::Done { .. } => {
2993                accumulator.push(&event);
2994                continue;
2995            }
2996        };
2997
2998        let notif = serde_json::json!({
2999            "jsonrpc": "2.0",
3000            "method": "inference.stream.event",
3001            "params": {
3002                "request_id": request_id,
3003                "event": event_payload,
3004            },
3005        });
3006        if let Ok(text) = serde_json::to_string(&notif) {
3007            let _ = session
3008                .channel
3009                .write
3010                .lock()
3011                .await
3012                .send(Message::Text(text.into()))
3013                .await;
3014        }
3015        accumulator.push(&event);
3016    }
3017
3018    let (text, tool_calls, usage) = accumulator.finish_with_usage();
3019    Ok(serde_json::json!({
3020        "text": text,
3021        "tool_calls": tool_calls,
3022        "usage": usage,
3023    }))
3024}
3025
3026async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3027    let engine = get_inference_engine(state);
3028    let req: car_inference::EmbedRequest =
3029        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3030    // Embeds load their own model weights; share the same admission
3031    // gate as generations so a burst of embed requests can't smuggle
3032    // around the concurrency cap.
3033    let _permit = state.admission.acquire().await;
3034    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3035    Ok(serde_json::json!({"embeddings": result}))
3036}
3037
3038async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3039    let engine = get_inference_engine(state);
3040    let req: car_inference::ClassifyRequest =
3041        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3042    let _permit = state.admission.acquire().await;
3043    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3044    Ok(serde_json::json!({"classifications": result}))
3045}
3046
3047/// Surface the current admission state so the menubar tray and
3048/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
3049/// snapshot — racy by definition but correct enough for status panels.
3050fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3051    let total = state.admission.permits();
3052    let available = state.admission.permits_available();
3053    let in_use = total.saturating_sub(available);
3054    Ok(serde_json::json!({
3055        "permits_total": total,
3056        "permits_available": available,
3057        "permits_in_use": in_use,
3058        "env_override": crate::admission::ENV_MAX_CONCURRENT,
3059    }))
3060}
3061
3062async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3063    let model = msg
3064        .params
3065        .get("model")
3066        .and_then(|v| v.as_str())
3067        .ok_or("missing 'model' parameter")?;
3068    let text = msg
3069        .params
3070        .get("text")
3071        .and_then(|v| v.as_str())
3072        .ok_or("missing 'text' parameter")?;
3073    let engine = get_inference_engine(state);
3074    let ids = engine
3075        .tokenize(model, text)
3076        .await
3077        .map_err(|e| e.to_string())?;
3078    Ok(serde_json::json!({"tokens": ids}))
3079}
3080
3081async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3082    let model = msg
3083        .params
3084        .get("model")
3085        .and_then(|v| v.as_str())
3086        .ok_or("missing 'model' parameter")?;
3087    let tokens: Vec<u32> = msg
3088        .params
3089        .get("tokens")
3090        .and_then(|v| v.as_array())
3091        .ok_or("missing 'tokens' parameter")?
3092        .iter()
3093        .map(|t| {
3094            t.as_u64()
3095                .and_then(|n| u32::try_from(n).ok())
3096                .ok_or_else(|| "tokens[] must be u32 values".to_string())
3097        })
3098        .collect::<Result<Vec<_>, _>>()?;
3099    let engine = get_inference_engine(state);
3100    let text = engine
3101        .detokenize(model, &tokens)
3102        .await
3103        .map_err(|e| e.to_string())?;
3104    Ok(serde_json::json!({"text": text}))
3105}
3106
3107/// `models.register` — persist a user-supplied `ModelSchema` to
3108/// `~/.car/models.json` (Parslee-ai/car-releases#39). Replaces any
3109/// existing entry with the same `id`. Returns `{id, registered}`.
3110///
3111/// **Phase 1 limitation**: the daemon's live `UnifiedRegistry` is
3112/// not updated in-process — the new model becomes visible to
3113/// `models.list`, `infer`, `infer_stream` on the **next daemon
3114/// boot** when `load_user_config` re-reads the file. This is
3115/// enough to unblock opencode's setup flow (register ahead of
3116/// time, then start the daemon). Hot-update requires either an
3117/// `RwLock<InferenceEngine>` on `ServerState` or an
3118/// interior-mutable `UnifiedRegistry`; both touch 20+ call sites
3119/// and are tracked as a follow-up.
3120///
3121/// Until hot-update lands, callers SHOULD register their models
3122/// before issuing `infer` calls against them, and operators
3123/// SHOULD restart the daemon after batches of model
3124/// registrations.
3125async fn handle_models_register(
3126    req: &JsonRpcMessage,
3127    _state: &Arc<ServerState>,
3128) -> Result<Value, String> {
3129    // The params shape mirrors v0.7's FFI `rt.registerModel(schemaJson)`:
3130    // either the bare `ModelSchema` value, OR `{ schema: ModelSchema }`.
3131    // Honor both so existing in-process callers don't have to reshape.
3132    let schema_value = match req.params.get("schema") {
3133        Some(v) => v.clone(),
3134        None => req.params.clone(),
3135    };
3136    let schema: car_inference::ModelSchema = serde_json::from_value(schema_value)
3137        .map_err(|e| format!("invalid ModelSchema: {e}"))?;
3138    let id = schema.id.clone();
3139
3140    // Resolve the models.json path the same way UnifiedRegistry does:
3141    // `<models_dir>/../models.json` where models_dir defaults to
3142    // `~/.car/models/`. We read whatever's there, swap in the new
3143    // entry, and write back atomically.
3144    let home = std::env::var_os("HOME")
3145        .or_else(|| std::env::var_os("USERPROFILE"))
3146        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3147    let car_dir = std::path::PathBuf::from(home).join(".car");
3148    std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3149    let path = car_dir.join("models.json");
3150
3151    let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3152        let text = std::fs::read_to_string(&path)
3153            .map_err(|e| format!("read {}: {e}", path.display()))?;
3154        if text.trim().is_empty() {
3155            Vec::new()
3156        } else {
3157            serde_json::from_str(&text)
3158                .map_err(|e| format!("parse {}: {e}", path.display()))?
3159        }
3160    } else {
3161        Vec::new()
3162    };
3163    // Replace existing entry with the same id, else append.
3164    if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3165        *slot = schema;
3166    } else {
3167        models.push(schema);
3168    }
3169    let json = serde_json::to_string_pretty(&models)
3170        .map_err(|e| format!("serialize models.json: {e}"))?;
3171    let tmp = path.with_extension("json.tmp");
3172    std::fs::write(&tmp, json)
3173        .map_err(|e| format!("write {}: {e}", tmp.display()))?;
3174    std::fs::rename(&tmp, &path)
3175        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3176    Ok(serde_json::json!({
3177        "id": id,
3178        "registered": true,
3179        "path": path.to_string_lossy(),
3180        "note": "Daemon restart required for live UnifiedRegistry visibility \
3181                 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3182                 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3183    }))
3184}
3185
3186/// `models.unregister` — remove an entry from `~/.car/models.json`
3187/// by id (Parslee-ai/car#186 — symmetric to `models.register`).
3188/// Returns `{ id, unregistered, path }` on success. Returns an error
3189/// when the model isn't present.
3190///
3191/// **Phase 1 limitation** (same as `models.register`): the daemon's
3192/// live `UnifiedRegistry` is not rebuilt — the removal takes effect
3193/// on the next daemon boot. Callers SHOULD restart the daemon after
3194/// a batch of unregistrations if they expect `models.list_unified`
3195/// to reflect the change immediately.
3196async fn handle_models_unregister(
3197    req: &JsonRpcMessage,
3198    _state: &Arc<ServerState>,
3199) -> Result<Value, String> {
3200    // Params shape mirrors the CLI flag: `{ id: string }`. Bare-string
3201    // params are honored for symmetry with the register handler's
3202    // tolerant shape (`{schema: ...}` OR bare schema).
3203    let id = match req.params.get("id") {
3204        Some(v) => v
3205            .as_str()
3206            .ok_or_else(|| "`id` must be a string".to_string())?
3207            .to_string(),
3208        None => match req.params.as_str() {
3209            Some(s) => s.to_string(),
3210            None => return Err("missing `id` parameter".to_string()),
3211        },
3212    };
3213
3214    let home = std::env::var_os("HOME")
3215        .or_else(|| std::env::var_os("USERPROFILE"))
3216        .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3217    let car_dir = std::path::PathBuf::from(home).join(".car");
3218    let path = car_dir.join("models.json");
3219
3220    if !path.exists() {
3221        return Err(format!("no models.json at {} — nothing to unregister", path.display()));
3222    }
3223    let text = std::fs::read_to_string(&path)
3224        .map_err(|e| format!("read {}: {e}", path.display()))?;
3225    let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3226        Vec::new()
3227    } else {
3228        serde_json::from_str(&text)
3229            .map_err(|e| format!("parse {}: {e}", path.display()))?
3230    };
3231    let before = models.len();
3232    models.retain(|m| m.id != id);
3233    if models.len() == before {
3234        return Err(format!("model {} not found in {}", id, path.display()));
3235    }
3236    let json = serde_json::to_string_pretty(&models)
3237        .map_err(|e| format!("serialize models.json: {e}"))?;
3238    let tmp = path.with_extension("json.tmp");
3239    std::fs::write(&tmp, json)
3240        .map_err(|e| format!("write {}: {e}", tmp.display()))?;
3241    std::fs::rename(&tmp, &path)
3242        .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3243    Ok(serde_json::json!({
3244        "id": id,
3245        "unregistered": true,
3246        "path": path.to_string_lossy(),
3247        "note": "Daemon restart required for live UnifiedRegistry visibility \
3248                 (phase 1, matching models.register).",
3249    }))
3250}
3251
3252fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3253    let engine = get_inference_engine(state);
3254    let models = engine.list_models();
3255    serde_json::to_value(&models).map_err(|e| e.to_string())
3256}
3257
3258fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3259    let engine = get_inference_engine(state);
3260    let models = engine.list_models_unified();
3261    serde_json::to_value(&models).map_err(|e| e.to_string())
3262}
3263
3264#[derive(Debug, Deserialize)]
3265#[serde(rename_all = "camelCase")]
3266struct ModelSearchParams {
3267    #[serde(default)]
3268    query: Option<String>,
3269    #[serde(default)]
3270    capability: Option<car_inference::ModelCapability>,
3271    #[serde(default)]
3272    provider: Option<String>,
3273    #[serde(default)]
3274    local_only: bool,
3275    #[serde(default)]
3276    available_only: bool,
3277    #[serde(default)]
3278    limit: Option<usize>,
3279}
3280
3281#[derive(Debug, Serialize)]
3282#[serde(rename_all = "camelCase")]
3283struct ModelSearchEntry {
3284    #[serde(flatten)]
3285    info: car_inference::ModelInfo,
3286    family: String,
3287    version: String,
3288    tags: Vec<String>,
3289    pullable: bool,
3290    upgrade: Option<car_inference::ModelUpgrade>,
3291}
3292
3293#[derive(Debug, Serialize)]
3294#[serde(rename_all = "camelCase")]
3295struct ModelSearchResponse {
3296    models: Vec<ModelSearchEntry>,
3297    upgrades: Vec<car_inference::ModelUpgrade>,
3298    total: usize,
3299    available: usize,
3300    local: usize,
3301    remote: usize,
3302}
3303
3304fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3305    let params: ModelSearchParams =
3306        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3307            query: None,
3308            capability: None,
3309            provider: None,
3310            local_only: false,
3311            available_only: false,
3312            limit: None,
3313        });
3314    let engine = get_inference_engine(state);
3315    let upgrades = engine.available_model_upgrades();
3316    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3317        .iter()
3318        .cloned()
3319        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3320        .collect();
3321    let query = params
3322        .query
3323        .as_deref()
3324        .map(str::trim)
3325        .filter(|q| !q.is_empty())
3326        .map(|q| q.to_ascii_lowercase());
3327    let provider = params
3328        .provider
3329        .as_deref()
3330        .map(str::trim)
3331        .filter(|p| !p.is_empty())
3332        .map(|p| p.to_ascii_lowercase());
3333
3334    let mut entries: Vec<ModelSearchEntry> = engine
3335        .list_schemas()
3336        .into_iter()
3337        .filter(|schema| {
3338            if let Some(capability) = params.capability {
3339                if !schema.has_capability(capability) {
3340                    return false;
3341                }
3342            }
3343            if let Some(provider) = provider.as_deref() {
3344                if schema.provider.to_ascii_lowercase() != provider {
3345                    return false;
3346                }
3347            }
3348            if params.local_only && !schema.is_local() {
3349                return false;
3350            }
3351            if params.available_only && !schema.available {
3352                return false;
3353            }
3354            if let Some(query) = query.as_deref() {
3355                let capability_text = schema
3356                    .capabilities
3357                    .iter()
3358                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3359                    .collect::<Vec<_>>()
3360                    .join(" ");
3361                let haystack = format!(
3362                    "{} {} {} {} {} {}",
3363                    schema.id,
3364                    schema.name,
3365                    schema.provider,
3366                    schema.family,
3367                    schema.tags.join(" "),
3368                    capability_text
3369                )
3370                .to_ascii_lowercase();
3371                if !haystack.contains(query) {
3372                    return false;
3373                }
3374            }
3375            true
3376        })
3377        .map(|schema| {
3378            let pullable = !schema.available
3379                && matches!(
3380                    schema.source,
3381                    car_inference::ModelSource::Local { .. }
3382                        | car_inference::ModelSource::Mlx { .. }
3383                );
3384            let info = car_inference::ModelInfo::from(&schema);
3385            let upgrade = upgrades_by_from.get(&schema.id).cloned();
3386            ModelSearchEntry {
3387                info,
3388                family: schema.family,
3389                version: schema.version,
3390                tags: schema.tags,
3391                pullable,
3392                upgrade,
3393            }
3394        })
3395        .collect();
3396    entries.sort_by(|a, b| {
3397        b.info
3398            .available
3399            .cmp(&a.info.available)
3400            .then(b.info.is_local.cmp(&a.info.is_local))
3401            .then(a.info.name.cmp(&b.info.name))
3402    });
3403    if let Some(limit) = params.limit {
3404        entries.truncate(limit);
3405    }
3406
3407    let total = entries.len();
3408    let available = entries.iter().filter(|entry| entry.info.available).count();
3409    let local = entries.iter().filter(|entry| entry.info.is_local).count();
3410    let response = ModelSearchResponse {
3411        models: entries,
3412        upgrades,
3413        total,
3414        available,
3415        local,
3416        remote: total.saturating_sub(local),
3417    };
3418    serde_json::to_value(response).map_err(|e| e.to_string())
3419}
3420
3421fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3422    let engine = get_inference_engine(state);
3423    serde_json::to_value(serde_json::json!({
3424        "upgrades": engine.available_model_upgrades()
3425    }))
3426    .map_err(|e| e.to_string())
3427}
3428
3429async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3430    let name = msg
3431        .params
3432        .get("name")
3433        .or_else(|| msg.params.get("id"))
3434        .or_else(|| msg.params.get("model"))
3435        .and_then(|v| v.as_str())
3436        .ok_or("missing 'name' parameter")?;
3437    let engine = get_inference_engine(state);
3438    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3439    Ok(serde_json::json!({"path": path.display().to_string()}))
3440}
3441
3442async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3443    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3444        msg.params
3445            .get("events")
3446            .cloned()
3447            .unwrap_or(msg.params.clone()),
3448    )
3449    .map_err(|e| format!("invalid events: {}", e))?;
3450
3451    let inference = get_inference_engine(state).clone();
3452    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3453
3454    let skills = engine.distill_skills(&events).await;
3455    serde_json::to_value(&skills).map_err(|e| e.to_string())
3456}
3457
3458/// Run memory consolidation against this client's session memgine
3459/// (or the daemon-owned per-agent memgine when bound — #170).
3460/// Returns the JSON `ConsolidationReport`.
3461async fn handle_memory_consolidate(
3462    session: &crate::session::ClientSession,
3463) -> Result<Value, String> {
3464    let engine_arc = session.effective_memgine().await;
3465    let report = {
3466        let mut engine = engine_arc.lock().await;
3467        engine.consolidate().await
3468    };
3469    if let Some(id) = session.agent_id.lock().await.clone() {
3470        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3471            tracing::warn!(agent_id = %id, error = %e,
3472                "agent memgine persist after consolidate failed");
3473        }
3474    }
3475    serde_json::to_value(&report).map_err(|e| e.to_string())
3476}
3477
3478/// Repair a degraded skill on this client's session memgine.
3479/// Returns `{ code: "..." }` on success, `null` if the skill
3480/// isn't broken or repair failed.
3481async fn handle_skill_repair(
3482    msg: &JsonRpcMessage,
3483    session: &crate::session::ClientSession,
3484) -> Result<Value, String> {
3485    let name = msg
3486        .params
3487        .get("skill_name")
3488        .and_then(|v| v.as_str())
3489        .ok_or("missing 'skill_name' parameter")?;
3490    let mut engine = session.memgine.lock().await;
3491    let code = engine.repair_skill(name).await;
3492    Ok(match code {
3493        Some(c) => serde_json::json!({ "code": c }),
3494        None => Value::Null,
3495    })
3496}
3497
3498/// Ingest distilled skills into this client's session memgine.
3499/// Returns the number of nodes inserted.
3500async fn handle_skills_ingest_distilled(
3501    msg: &JsonRpcMessage,
3502    session: &crate::session::ClientSession,
3503) -> Result<Value, String> {
3504    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3505        msg.params
3506            .get("skills")
3507            .cloned()
3508            .unwrap_or(msg.params.clone()),
3509    )
3510    .map_err(|e| format!("invalid skills: {}", e))?;
3511    let mut engine = session.memgine.lock().await;
3512    let nodes = engine.ingest_distilled_skills(&skills);
3513    Ok(serde_json::json!({ "ingested": nodes.len() }))
3514}
3515
3516/// Run skill evolution against this session's memgine for a
3517/// specified domain.  Returns the resulting `DistilledSkill` array.
3518async fn handle_skills_evolve(
3519    msg: &JsonRpcMessage,
3520    session: &crate::session::ClientSession,
3521) -> Result<Value, String> {
3522    let domain = msg
3523        .params
3524        .get("domain")
3525        .and_then(|v| v.as_str())
3526        .ok_or("missing 'domain' parameter")?
3527        .to_string();
3528    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3529        msg.params
3530            .get("events")
3531            .cloned()
3532            .unwrap_or(Value::Array(vec![])),
3533    )
3534    .map_err(|e| format!("invalid events: {}", e))?;
3535    let mut engine = session.memgine.lock().await;
3536    let skills = engine.evolve_skills(&events, &domain).await;
3537    serde_json::to_value(&skills).map_err(|e| e.to_string())
3538}
3539
3540/// List domains whose skills are underperforming on this session.
3541async fn handle_skills_domains_needing_evolution(
3542    msg: &JsonRpcMessage,
3543    session: &crate::session::ClientSession,
3544) -> Result<Value, String> {
3545    let threshold = msg
3546        .params
3547        .get("threshold")
3548        .and_then(|v| v.as_f64())
3549        .unwrap_or(0.6);
3550    let engine = session.memgine.lock().await;
3551    let domains = engine.domains_needing_evolution(threshold);
3552    serde_json::to_value(&domains).map_err(|e| e.to_string())
3553}
3554
3555/// Rerank documents against a query using a cross-encoder model.
3556async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3557    let engine = get_inference_engine(state);
3558    let req: car_inference::RerankRequest =
3559        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3560    let _permit = state.admission.acquire().await;
3561    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3562    serde_json::to_value(&result).map_err(|e| e.to_string())
3563}
3564
3565/// Transcribe audio at the given path. The path is interpreted on
3566/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3567/// callers must pass a path the daemon can read (typically a
3568/// shared `~/.car/...` location or stdin push via the streaming
3569/// API).
3570async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3571    use base64::Engine as _;
3572    let engine = get_inference_engine(state);
3573
3574    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3575    // the caller can't share a filesystem view with the daemon (e.g.
3576    // unsandboxed Milo talking to a sandboxed car-host), they pass
3577    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3578    // run transcribe against the path the engine expects, and clean up
3579    // on drop. Accepts either form; `audio_b64` wins if both are set.
3580    let mut params = msg.params.clone();
3581    let audio_b64 = params
3582        .as_object_mut()
3583        .and_then(|m| m.remove("audio_b64"))
3584        .and_then(|v| v.as_str().map(str::to_string));
3585    let _tmp_audio = if let Some(b64) = audio_b64 {
3586        let bytes = base64::engine::general_purpose::STANDARD
3587            .decode(b64.as_bytes())
3588            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3589        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3590        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3591        let path = tmp.path().to_string_lossy().into_owned();
3592        if let Some(obj) = params.as_object_mut() {
3593            obj.insert("audio_path".to_string(), Value::String(path));
3594        }
3595        Some(tmp)
3596    } else {
3597        None
3598    };
3599
3600    let req: car_inference::TranscribeRequest =
3601        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3602    let _permit = state.admission.acquire().await;
3603    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3604    serde_json::to_value(&result).map_err(|e| e.to_string())
3605}
3606
3607/// Synthesize speech. By default writes to `output_path` on the
3608/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3609/// was supplied) the result also includes an `audio_b64` field with
3610/// the rendered bytes inline so cross-sandbox callers can avoid
3611/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3612async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3613    use base64::Engine as _;
3614    let engine = get_inference_engine(state);
3615
3616    let mut params = msg.params.clone();
3617    let return_b64 = params
3618        .as_object_mut()
3619        .and_then(|m| m.remove("return_b64"))
3620        .and_then(|v| v.as_bool())
3621        .unwrap_or(false);
3622    let no_output_path = params
3623        .as_object()
3624        .map(|m| !m.contains_key("output_path"))
3625        .unwrap_or(true);
3626
3627    let req: car_inference::SynthesizeRequest =
3628        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3629    let _permit = state.admission.acquire().await;
3630    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3631    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3632
3633    // Inline the bytes when the caller asked for them OR when no
3634    // output_path was specified (typical sandbox-crossing case —
3635    // they didn't pick a path because they have no shared one).
3636    if return_b64 || no_output_path {
3637        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3638            format!(
3639                "synthesize: failed to read rendered audio at {}: {e}",
3640                result.audio_path
3641            )
3642        })?;
3643        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3644        if let Some(obj) = value.as_object_mut() {
3645            obj.insert("audio_b64".to_string(), Value::String(encoded));
3646        }
3647    }
3648    Ok(value)
3649}
3650
3651/// Prepare the speech runtime (downloads / warmup). Returns a
3652/// JSON status string, mirroring the embedded
3653/// `prepare_speech_runtime` shape.
3654async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3655    let engine = get_inference_engine(state);
3656    let status = engine
3657        .prepare_speech_runtime()
3658        .await
3659        .map_err(|e| e.to_string())?;
3660    serde_json::to_value(&status).map_err(|e| e.to_string())
3661}
3662
3663/// Adaptive route decision for a prompt — returns the routing
3664/// JSON the FFI's `route_model` returns.
3665async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3666    let prompt = msg
3667        .params
3668        .get("prompt")
3669        .and_then(|v| v.as_str())
3670        .ok_or("missing 'prompt' parameter")?;
3671    let engine = get_inference_engine(state);
3672    let decision = engine.route_adaptive(prompt).await;
3673    serde_json::to_value(&decision).map_err(|e| e.to_string())
3674}
3675
3676/// Model performance profiles snapshot.
3677async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3678    let engine = get_inference_engine(state);
3679    let profiles = engine.export_profiles().await;
3680    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3681}
3682
3683#[derive(Deserialize)]
3684#[serde(rename_all = "camelCase")]
3685struct OutcomesResolvePendingParams {
3686    /// Flat `(trace_id, success, confidence, output)` tuples from the
3687    /// caller. Same shape `car-reason`'s session produces from its
3688    /// `ActionOutcome` vector. Daemon side runs the inference rules
3689    /// and writes resolved outcomes back to the shared tracker.
3690    action_results: Vec<(String, bool, f64, String)>,
3691}
3692
3693/// `outcomes.resolve_pending` — write inferred outcomes back to the
3694/// shared engine's `OutcomeTracker` (Parslee-ai/car#189 follow-up).
3695///
3696/// Symmetric to the in-process path
3697/// `ReasoningInferenceHandle::record_inferred_outcomes` on
3698/// `InferenceEngine`: takes the per-action result tuples the
3699/// reasoning session produces, runs
3700/// `OutcomeTracker::infer_outcomes_from_action_sequence` to convert
3701/// them into `InferredOutcome` records, and calls
3702/// `resolve_pending_from_signals` under the tracker write lock. The
3703/// learning loop that adjusts routing decisions therefore survives
3704/// daemon-routed reasoning runs (previously a best-effort no-op on
3705/// the daemon side).
3706///
3707/// Returns `{ recorded: N }` where N is the number of action results
3708/// the caller passed. The tracker doesn't surface how many of those
3709/// actually had pending entries to resolve; that count would require
3710/// expanding the tracker API and isn't load-bearing for any caller
3711/// yet.
3712async fn handle_outcomes_resolve_pending(
3713    req: &JsonRpcMessage,
3714    state: &ServerState,
3715) -> Result<Value, String> {
3716    let params: OutcomesResolvePendingParams = serde_json::from_value(req.params.clone())
3717        .map_err(|e| format!("invalid params: {e}"))?;
3718    let engine = get_inference_engine(state);
3719    let mut tracker = engine.outcome_tracker.write().await;
3720    let inferred = tracker.infer_outcomes_from_action_sequence(&params.action_results);
3721    tracker.resolve_pending_from_signals(inferred);
3722    Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3723}
3724
3725/// Per-session event log size.
3726async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3727    let n = session.runtime.log.lock().await.len();
3728    Ok(Value::from(n as u64))
3729}
3730
3731async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3732    let stats = session.runtime.log.lock().await.stats();
3733    serde_json::to_value(stats).map_err(|e| e.to_string())
3734}
3735
3736#[derive(Deserialize)]
3737#[serde(rename_all = "camelCase")]
3738struct EventsTruncateParams {
3739    #[serde(default)]
3740    max_events: Option<usize>,
3741    #[serde(default)]
3742    max_spans: Option<usize>,
3743}
3744
3745async fn handle_events_truncate(
3746    msg: &JsonRpcMessage,
3747    session: &crate::session::ClientSession,
3748) -> Result<Value, String> {
3749    let params: EventsTruncateParams =
3750        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3751            max_events: None,
3752            max_spans: None,
3753        });
3754    let mut log = session.runtime.log.lock().await;
3755    let removed_events = params
3756        .max_events
3757        .map(|max| log.truncate_events_keep_last(max))
3758        .unwrap_or(0);
3759    let removed_spans = params
3760        .max_spans
3761        .map(|max| log.truncate_spans_keep_last(max))
3762        .unwrap_or(0);
3763    let stats = log.stats();
3764    Ok(serde_json::json!({
3765        "removedEvents": removed_events,
3766        "removedSpans": removed_spans,
3767        "stats": stats,
3768    }))
3769}
3770
3771async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3772    let mut log = session.runtime.log.lock().await;
3773    let removed = log.clear();
3774    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3775}
3776
3777/// Update the per-session replan config. Wire shape mirrors the
3778/// FFI's positional `set_replan_config` arguments — the engine
3779/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
3780/// reconstruct it from a flat object here.
3781async fn handle_replan_set_config(
3782    msg: &JsonRpcMessage,
3783    session: &crate::session::ClientSession,
3784) -> Result<Value, String> {
3785    let max_replans = msg
3786        .params
3787        .get("max_replans")
3788        .and_then(|v| v.as_u64())
3789        .unwrap_or(0) as u32;
3790    let delay_ms = msg
3791        .params
3792        .get("delay_ms")
3793        .and_then(|v| v.as_u64())
3794        .unwrap_or(0);
3795    let verify_before_execute = msg
3796        .params
3797        .get("verify_before_execute")
3798        .and_then(|v| v.as_bool())
3799        .unwrap_or(true);
3800    let cfg = car_engine::ReplanConfig {
3801        max_replans,
3802        delay_ms,
3803        verify_before_execute,
3804    };
3805    session.runtime.set_replan_config(cfg).await;
3806    Ok(Value::Null)
3807}
3808
3809async fn handle_skills_list(
3810    msg: &JsonRpcMessage,
3811    session: &crate::session::ClientSession,
3812) -> Result<Value, String> {
3813    let domain = msg.params.get("domain").and_then(|v| v.as_str());
3814    let engine = session.memgine.lock().await;
3815    let skills: Vec<serde_json::Value> = engine
3816        .graph
3817        .inner
3818        .node_indices()
3819        .filter_map(|nix| {
3820            let node = engine.graph.inner.node_weight(nix)?;
3821            if node.kind != car_memgine::MemKind::Skill {
3822                return None;
3823            }
3824            let meta = car_memgine::SkillMeta::from_node(node)?;
3825            if let Some(d) = domain {
3826                match &meta.scope {
3827                    car_memgine::SkillScope::Global => {}
3828                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
3829                    _ => return None,
3830                }
3831            }
3832            Some(serde_json::to_value(&meta).unwrap_or_default())
3833        })
3834        .collect();
3835    serde_json::to_value(&skills).map_err(|e| e.to_string())
3836}
3837
3838#[derive(serde::Deserialize)]
3839struct SecretParams {
3840    #[serde(default)]
3841    service: Option<String>,
3842    key: String,
3843    #[serde(default)]
3844    value: Option<String>,
3845}
3846
3847fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3848    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3849    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3850    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3851}
3852
3853fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3854    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3855    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3856}
3857
3858fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3859    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3860    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3861}
3862
3863fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3864    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3865    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3866}
3867
3868#[derive(serde::Deserialize)]
3869struct PermParams {
3870    domain: String,
3871    #[serde(default)]
3872    target_bundle_id: Option<String>,
3873}
3874
3875fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3876    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3877    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3878}
3879
3880fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3881    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3882    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3883}
3884
3885fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3886    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3887    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3888}
3889
3890fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3891    #[derive(serde::Deserialize)]
3892    struct P {
3893        start: String,
3894        end: String,
3895        #[serde(default)]
3896        calendar_ids: Vec<String>,
3897    }
3898    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3899    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3900        .map_err(|e| format!("parse start: {}", e))?
3901        .with_timezone(&chrono::Utc);
3902    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3903        .map_err(|e| format!("parse end: {}", e))?
3904        .with_timezone(&chrono::Utc);
3905    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3906}
3907
3908fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3909    #[derive(serde::Deserialize)]
3910    struct P {
3911        query: String,
3912        #[serde(default = "default_limit")]
3913        limit: usize,
3914        #[serde(default)]
3915        container_ids: Vec<String>,
3916    }
3917    fn default_limit() -> usize {
3918        50
3919    }
3920    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3921    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
3922}
3923
3924fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
3925    #[derive(serde::Deserialize, Default)]
3926    struct P {
3927        #[serde(default)]
3928        account_ids: Vec<String>,
3929    }
3930    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
3931    car_ffi_common::integrations::mail_inbox(&p.account_ids)
3932}
3933
3934fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
3935    let raw = req.params.to_string();
3936    car_ffi_common::integrations::mail_send(&raw)
3937}
3938
3939fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
3940    #[derive(serde::Deserialize)]
3941    struct P {
3942        #[serde(default = "default_limit")]
3943        limit: usize,
3944    }
3945    fn default_limit() -> usize {
3946        50
3947    }
3948    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3949    car_ffi_common::integrations::messages_chats(p.limit)
3950}
3951
3952fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
3953    let raw = req.params.to_string();
3954    car_ffi_common::integrations::messages_send(&raw)
3955}
3956
3957fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
3958    #[derive(serde::Deserialize)]
3959    struct P {
3960        query: String,
3961        #[serde(default = "default_limit")]
3962        limit: usize,
3963    }
3964    fn default_limit() -> usize {
3965        50
3966    }
3967    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3968    car_ffi_common::integrations::notes_find(&p.query, p.limit)
3969}
3970
3971fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
3972    #[derive(serde::Deserialize)]
3973    struct P {
3974        #[serde(default = "default_limit")]
3975        limit: usize,
3976    }
3977    fn default_limit() -> usize {
3978        50
3979    }
3980    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3981    car_ffi_common::integrations::reminders_items(p.limit)
3982}
3983
3984fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
3985    #[derive(serde::Deserialize)]
3986    struct P {
3987        #[serde(default = "default_limit")]
3988        limit: usize,
3989    }
3990    fn default_limit() -> usize {
3991        100
3992    }
3993    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
3994    car_ffi_common::integrations::bookmarks_list(p.limit)
3995}
3996
3997fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
3998    #[derive(serde::Deserialize)]
3999    struct P {
4000        start: String,
4001        end: String,
4002    }
4003    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4004    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4005        .map_err(|e| format!("parse start: {}", e))?
4006        .with_timezone(&chrono::Utc);
4007    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4008        .map_err(|e| format!("parse end: {}", e))?
4009        .with_timezone(&chrono::Utc);
4010    car_ffi_common::health::sleep_windows(s, e)
4011}
4012
4013fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4014    #[derive(serde::Deserialize)]
4015    struct P {
4016        start: String,
4017        end: String,
4018    }
4019    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4020    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4021        .map_err(|e| format!("parse start: {}", e))?
4022        .with_timezone(&chrono::Utc);
4023    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4024        .map_err(|e| format!("parse end: {}", e))?
4025        .with_timezone(&chrono::Utc);
4026    car_ffi_common::health::workouts(s, e)
4027}
4028
4029fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4030    #[derive(serde::Deserialize)]
4031    struct P {
4032        start: String,
4033        end: String,
4034    }
4035    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4036    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4037        .map_err(|e| format!("parse start: {}", e))?;
4038    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4039        .map_err(|e| format!("parse end: {}", e))?;
4040    car_ffi_common::health::activity(s, e)
4041}
4042
4043async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4044    let closed = session.browser.close().await?;
4045    Ok(serde_json::json!({"closed": closed}))
4046}
4047
4048async fn handle_browser_run(
4049    req: &JsonRpcMessage,
4050    session: &crate::session::ClientSession,
4051) -> Result<Value, String> {
4052    #[derive(serde::Deserialize)]
4053    struct BrowserRunParams {
4054        /// Inline JSON string (CLI-compatible), OR the structured object.
4055        script: Value,
4056        #[serde(default)]
4057        width: Option<u32>,
4058        #[serde(default)]
4059        height: Option<u32>,
4060        /// When true, launches a visible Chromium window for interactive
4061        /// flows (first-time auth, 2FA, supervised runs). Only honored on
4062        /// the call that first launches the browser session — subsequent
4063        /// calls reuse the existing browser regardless.
4064        #[serde(default)]
4065        headed: Option<bool>,
4066        /// Extra Chromium command-line flags appended verbatim at
4067        /// launch (#112). Honoured only on the launch call.
4068        #[serde(default)]
4069        extra_args: Option<Vec<String>>,
4070    }
4071    let params: BrowserRunParams =
4072        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4073
4074    // Accept either a JSON string OR a structured object under `script`.
4075    let script_json = match params.script {
4076        Value::String(s) => s,
4077        other => other.to_string(),
4078    };
4079
4080    let browser_session = session
4081        .browser
4082        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4083            width: params.width.unwrap_or(1280),
4084            height: params.height.unwrap_or(720),
4085            headless: !params.headed.unwrap_or(false),
4086            extra_args: params.extra_args.unwrap_or_default(),
4087        })
4088        .await?;
4089
4090    let trace_json = browser_session.run(&script_json).await?;
4091    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4092}
4093
4094// ---------------------------------------------------------------------------
4095// Voice streaming JSON-RPC methods
4096//
4097// Events are pushed back to the originating client as JSON-RPC notifications:
4098//   { "jsonrpc": "2.0", "method": "voice.event",
4099//     "params": { "session_id": "...", "event": {...} } }
4100//
4101// The session registry is process-wide (ServerState.voice_sessions); per-call
4102// WsVoiceEventSink instances bind each session to its originating WS so a
4103// client only ever sees events for sessions it started.
4104// ---------------------------------------------------------------------------
4105
4106#[derive(Deserialize)]
4107struct VoiceStartParams {
4108    session_id: String,
4109    audio_source: Value,
4110    #[serde(default)]
4111    options: Option<Value>,
4112}
4113
4114async fn handle_voice_transcribe_stream_start(
4115    req: &JsonRpcMessage,
4116    state: &Arc<ServerState>,
4117    session: &Arc<crate::session::ClientSession>,
4118) -> Result<Value, String> {
4119    let params: VoiceStartParams =
4120        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4121    let audio_source_json =
4122        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
4123    let options_json = params
4124        .options
4125        .as_ref()
4126        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4127        .transpose()?;
4128    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4129        channel: session.channel.clone(),
4130    });
4131    let json = car_ffi_common::voice::transcribe_stream_start(
4132        &params.session_id,
4133        &audio_source_json,
4134        options_json.as_deref(),
4135        state.voice_sessions.clone(),
4136        sink,
4137    )
4138    .await?;
4139    serde_json::from_str(&json).map_err(|e| e.to_string())
4140}
4141
4142#[derive(Deserialize)]
4143struct VoiceStopParams {
4144    session_id: String,
4145}
4146
4147async fn handle_voice_transcribe_stream_stop(
4148    req: &JsonRpcMessage,
4149    state: &Arc<ServerState>,
4150) -> Result<Value, String> {
4151    let params: VoiceStopParams =
4152        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4153    let json = car_ffi_common::voice::transcribe_stream_stop(
4154        &params.session_id,
4155        state.voice_sessions.clone(),
4156    )
4157    .await?;
4158    serde_json::from_str(&json).map_err(|e| e.to_string())
4159}
4160
4161#[derive(Deserialize)]
4162struct VoicePushParams {
4163    session_id: String,
4164    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
4165    /// audio frames have to be encoded; clients in WS-binary contexts that
4166    /// want to skip the round trip can call the FFI directly.
4167    pcm_b64: String,
4168}
4169
4170async fn handle_voice_transcribe_stream_push(
4171    req: &JsonRpcMessage,
4172    state: &Arc<ServerState>,
4173) -> Result<Value, String> {
4174    use base64::Engine;
4175    let params: VoicePushParams =
4176        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4177    let pcm = base64::engine::general_purpose::STANDARD
4178        .decode(&params.pcm_b64)
4179        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4180    let json = car_ffi_common::voice::transcribe_stream_push(
4181        &params.session_id,
4182        &pcm,
4183        state.voice_sessions.clone(),
4184    )
4185    .await?;
4186    serde_json::from_str(&json).map_err(|e| e.to_string())
4187}
4188
4189fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4190    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4191    serde_json::from_str(&json).unwrap_or(Value::Null)
4192}
4193
4194async fn handle_voice_dispatch_turn(
4195    req: &JsonRpcMessage,
4196    state: &Arc<ServerState>,
4197    session: &Arc<crate::session::ClientSession>,
4198) -> Result<Value, String> {
4199    let req_value = req.params.clone();
4200    let request: crate::voice_turn::DispatchVoiceTurnRequest =
4201        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4202    let engine = get_inference_engine(state).clone();
4203    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4204        channel: session.channel.clone(),
4205    });
4206    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4207    serde_json::to_value(resp).map_err(|e| e.to_string())
4208}
4209
4210async fn handle_voice_cancel_turn() -> Result<Value, String> {
4211    crate::voice_turn::cancel().await;
4212    Ok(serde_json::json!({"cancelled": true}))
4213}
4214
4215async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4216    let engine = get_inference_engine(state).clone();
4217    crate::voice_turn::prewarm(engine).await;
4218    Ok(serde_json::json!({"prewarmed": true}))
4219}
4220
4221// ---------------------------------------------------------------------------
4222// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
4223//
4224// Bidirectional protocol shape:
4225//   1. Client → server: `inference.register_runner` (no params). The
4226//      session that calls this becomes the host for delegated models.
4227//   2. Server → client: `inference.runner.invoke` notification with
4228//      {call_id, request} when CAR needs to dispatch a delegated turn.
4229//   3. Client → server: `inference.runner.event` with {call_id, event}
4230//      for each chunk; `inference.runner.complete` with {call_id, result}
4231//      on success; `inference.runner.fail` with {call_id, error} on
4232//      failure.
4233//
4234// The server-side data is process-wide because only one inference
4235// runner can be registered at a time (matches the FFI bindings'
4236// constraint). The per-call mailboxes live in dedicated DashMaps.
4237// ---------------------------------------------------------------------------
4238
4239fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4240    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4241        std::sync::OnceLock::new();
4242    SLOT.get_or_init(|| std::sync::RwLock::new(None))
4243}
4244
4245fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4246    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4247        std::sync::OnceLock::new();
4248    MAP.get_or_init(dashmap::DashMap::new)
4249}
4250
4251fn ws_runner_completions() -> &'static dashmap::DashMap<
4252    String,
4253    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4254> {
4255    static MAP: std::sync::OnceLock<
4256        dashmap::DashMap<
4257            String,
4258            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4259        >,
4260    > = std::sync::OnceLock::new();
4261    MAP.get_or_init(dashmap::DashMap::new)
4262}
4263
4264struct WsInferenceRunner;
4265
4266#[async_trait::async_trait]
4267impl car_inference::InferenceRunner for WsInferenceRunner {
4268    async fn run(
4269        &self,
4270        request: car_inference::tasks::generate::GenerateRequest,
4271        emitter: car_inference::EventEmitter,
4272    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4273        let channel = ws_runner_session()
4274            .read()
4275            .map_err(|e| {
4276                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4277            })?
4278            .clone()
4279            .ok_or_else(|| {
4280                car_inference::RunnerError::Declined(
4281                    "no WebSocket inference runner registered — call inference.register_runner first"
4282                        .into(),
4283                )
4284            })?;
4285
4286        let call_id = uuid::Uuid::new_v4().to_string();
4287        let request_json = serde_json::to_value(&request)
4288            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4289        let (tx, rx) = tokio::sync::oneshot::channel();
4290        ws_runner_calls().insert(call_id.clone(), emitter);
4291        ws_runner_completions().insert(call_id.clone(), tx);
4292
4293        // Fire the invoke notification.
4294        use futures::SinkExt;
4295        let notification = serde_json::json!({
4296            "jsonrpc": "2.0",
4297            "method": "inference.runner.invoke",
4298            "params": {
4299                "call_id": call_id,
4300                "request": request_json,
4301            },
4302        });
4303        let text = serde_json::to_string(&notification)
4304            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4305        let _ = channel
4306            .write
4307            .lock()
4308            .await
4309            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4310            .await;
4311
4312        let result = rx.await.map_err(|_| {
4313            car_inference::RunnerError::Failed("runner completion channel dropped".into())
4314        })?;
4315        ws_runner_calls().remove(&call_id);
4316        result.map_err(car_inference::RunnerError::Failed)
4317    }
4318}
4319
4320async fn handle_inference_register_runner(
4321    session: &Arc<crate::session::ClientSession>,
4322) -> Result<Value, String> {
4323    let mut guard = ws_runner_session()
4324        .write()
4325        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4326    *guard = Some(session.channel.clone());
4327    drop(guard);
4328    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4329    Ok(serde_json::json!({"registered": true}))
4330}
4331
4332#[derive(serde::Deserialize)]
4333struct InferenceRunnerEventParams {
4334    call_id: String,
4335    event: Value,
4336}
4337
4338async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4339    let params: InferenceRunnerEventParams =
4340        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4341    let stream_event = match parse_runner_event_value(&params.event) {
4342        Some(e) => e,
4343        None => return Err("unrecognised runner event shape".into()),
4344    };
4345    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
4346        let emitter = entry.value().clone();
4347        tokio::spawn(async move { emitter.emit(stream_event).await });
4348    }
4349    Ok(serde_json::json!({"emitted": true}))
4350}
4351
4352#[derive(serde::Deserialize)]
4353struct InferenceRunnerCompleteParams {
4354    call_id: String,
4355    result: Value,
4356}
4357
4358async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4359    let params: InferenceRunnerCompleteParams =
4360        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4361    let result: std::result::Result<car_inference::RunnerResult, String> =
4362        serde_json::from_value(params.result)
4363            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4364    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4365        let _ = tx.send(result);
4366    }
4367    Ok(serde_json::json!({"completed": true}))
4368}
4369
4370#[derive(serde::Deserialize)]
4371struct InferenceRunnerFailParams {
4372    call_id: String,
4373    error: String,
4374}
4375
4376async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4377    let params: InferenceRunnerFailParams =
4378        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4379    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
4380        let _ = tx.send(Err(params.error));
4381    }
4382    Ok(serde_json::json!({"failed": true}))
4383}
4384
4385fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4386    let ty = v.get("type").and_then(|t| t.as_str())?;
4387    match ty {
4388        "text" => Some(car_inference::StreamEvent::TextDelta(
4389            v.get("data")?.as_str()?.to_string(),
4390        )),
4391        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4392            name: v.get("name")?.as_str()?.to_string(),
4393            index: v.get("index")?.as_u64()? as usize,
4394            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4395        }),
4396        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4397            index: v.get("index")?.as_u64()? as usize,
4398            arguments_delta: v.get("data")?.as_str()?.to_string(),
4399        }),
4400        "usage" => Some(car_inference::StreamEvent::Usage {
4401            input_tokens: v.get("input_tokens")?.as_u64()?,
4402            output_tokens: v.get("output_tokens")?.as_u64()?,
4403        }),
4404        "done" => Some(car_inference::StreamEvent::Done {
4405            text: v.get("text")?.as_str()?.to_string(),
4406            tool_calls: v
4407                .get("tool_calls")
4408                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4409                .unwrap_or_default(),
4410        }),
4411        _ => None,
4412    }
4413}
4414
4415#[derive(Deserialize)]
4416struct EnrollSpeakerParams {
4417    label: String,
4418    audio: Value,
4419}
4420
4421async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4422    let params: EnrollSpeakerParams =
4423        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4424    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
4425    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
4426    serde_json::from_str(&json).map_err(|e| e.to_string())
4427}
4428
4429#[derive(Deserialize)]
4430struct RemoveEnrollmentParams {
4431    label: String,
4432}
4433
4434fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4435    let params: RemoveEnrollmentParams =
4436        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4437    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
4438    serde_json::from_str(&json).map_err(|e| e.to_string())
4439}
4440
4441#[derive(Deserialize)]
4442struct WorkflowRunParams {
4443    workflow: Value,
4444}
4445
4446async fn handle_workflow_run(
4447    req: &JsonRpcMessage,
4448    session: &Arc<crate::session::ClientSession>,
4449) -> Result<Value, String> {
4450    let params: WorkflowRunParams =
4451        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4452    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4453    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4454        channel: session.channel.clone(),
4455        host: session.host.clone(),
4456        client_id: session.client_id.clone(),
4457    });
4458    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4459    serde_json::from_str(&json).map_err(|e| e.to_string())
4460}
4461
4462#[derive(Deserialize)]
4463struct WorkflowVerifyParams {
4464    workflow: Value,
4465}
4466
4467fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4468    let params: WorkflowVerifyParams =
4469        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4470    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
4471    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4472    serde_json::from_str(&json).map_err(|e| e.to_string())
4473}
4474
4475// ---------------------------------------------------------------------------
4476// Meeting JSON-RPC methods
4477// ---------------------------------------------------------------------------
4478
4479async fn handle_meeting_start(
4480    req: &JsonRpcMessage,
4481    state: &Arc<ServerState>,
4482    session: &Arc<crate::session::ClientSession>,
4483) -> Result<Value, String> {
4484    // We need the meeting id BEFORE handing the upstream sink to
4485    // start_meeting so the WsMemgineIngestSink stamps transcripts with
4486    // the correct `meeting/<id>/<source>` speaker. Parse the request
4487    // here, mint an id if none was provided, and pass the same id
4488    // through to start_meeting via the request JSON.
4489    let mut req_value = req.params.clone();
4490    let meeting_id = req_value
4491        .get("id")
4492        .and_then(|v| v.as_str())
4493        .map(str::to_string)
4494        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4495    if let Some(map) = req_value.as_object_mut() {
4496        map.insert("id".into(), Value::String(meeting_id.clone()));
4497    }
4498    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4499
4500    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4501        Arc::new(crate::session::WsVoiceEventSink {
4502            channel: session.channel.clone(),
4503        });
4504
4505    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4506    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4507    // the FFI-common `start_meeting` memgine arg to avoid the
4508    // sync-mutex contract there — ingest happens here instead.
4509    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4510        Arc::new(crate::session::WsMemgineIngestSink {
4511            meeting_id,
4512            engine: session.memgine.clone(),
4513            upstream: ws_upstream,
4514        });
4515
4516    let cwd = std::env::current_dir().ok();
4517    let json = crate::meeting::start_meeting(
4518        &request_json,
4519        state.meetings.clone(),
4520        state.voice_sessions.clone(),
4521        upstream,
4522        None,
4523        cwd,
4524    )
4525    .await?;
4526    serde_json::from_str(&json).map_err(|e| e.to_string())
4527}
4528
4529#[derive(Deserialize)]
4530struct MeetingStopParams {
4531    meeting_id: String,
4532    #[serde(default = "default_summarize")]
4533    summarize: bool,
4534}
4535
4536fn default_summarize() -> bool {
4537    true
4538}
4539
4540async fn handle_meeting_stop(
4541    req: &JsonRpcMessage,
4542    state: &Arc<ServerState>,
4543    _session: &Arc<crate::session::ClientSession>,
4544) -> Result<Value, String> {
4545    let params: MeetingStopParams =
4546        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4547    let inference = if params.summarize {
4548        Some(state.inference.get().cloned()).flatten()
4549    } else {
4550        None
4551    };
4552    let json = crate::meeting::stop_meeting(
4553        &params.meeting_id,
4554        params.summarize,
4555        state.meetings.clone(),
4556        state.voice_sessions.clone(),
4557        inference,
4558    )
4559    .await?;
4560    serde_json::from_str(&json).map_err(|e| e.to_string())
4561}
4562
4563#[derive(Deserialize, Default)]
4564struct MeetingListParams {
4565    #[serde(default)]
4566    root: Option<std::path::PathBuf>,
4567}
4568
4569fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4570    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4571    let cwd = std::env::current_dir().ok();
4572    let json = crate::meeting::list_meetings(params.root, cwd)?;
4573    serde_json::from_str(&json).map_err(|e| e.to_string())
4574}
4575
4576#[derive(Deserialize)]
4577struct MeetingGetParams {
4578    meeting_id: String,
4579    #[serde(default)]
4580    root: Option<std::path::PathBuf>,
4581}
4582
4583fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4584    let params: MeetingGetParams =
4585        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4586    let cwd = std::env::current_dir().ok();
4587    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4588    serde_json::from_str(&json).map_err(|e| e.to_string())
4589}
4590
4591// ---------------------------------------------------------------------------
4592// Agent registry — file-based cross-process discovery (#111)
4593// ---------------------------------------------------------------------------
4594
4595#[derive(Deserialize, Default)]
4596struct RegistryRegisterParams {
4597    /// Caller serializes their AgentEntry as a JSON value; we
4598    /// re-serialize it so the ffi-common helper can validate the
4599    /// shape with the same parser used by the bindings.
4600    entry: Value,
4601    #[serde(default)]
4602    registry_path: Option<std::path::PathBuf>,
4603}
4604
4605fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4606    let params: RegistryRegisterParams =
4607        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4608    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4609    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4610    Ok(Value::Null)
4611}
4612
4613#[derive(Deserialize, Default)]
4614struct RegistryNameParams {
4615    name: String,
4616    #[serde(default)]
4617    registry_path: Option<std::path::PathBuf>,
4618}
4619
4620fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4621    let params: RegistryNameParams =
4622        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4623    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4624    serde_json::from_str(&json).map_err(|e| e.to_string())
4625}
4626
4627fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4628    let params: RegistryNameParams =
4629        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4630    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4631    Ok(Value::Null)
4632}
4633
4634#[derive(Deserialize, Default)]
4635struct RegistryListParams {
4636    #[serde(default)]
4637    registry_path: Option<std::path::PathBuf>,
4638}
4639
4640fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4641    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4642    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4643    serde_json::from_str(&json).map_err(|e| e.to_string())
4644}
4645
4646#[derive(Deserialize, Default)]
4647struct RegistryReapParams {
4648    /// Heartbeats older than this many seconds are reaped. Default
4649    /// 60 — two missed 20s heartbeats trigger removal.
4650    #[serde(default = "default_reap_age")]
4651    max_age_secs: u64,
4652    #[serde(default)]
4653    registry_path: Option<std::path::PathBuf>,
4654}
4655
4656fn default_reap_age() -> u64 {
4657    60
4658}
4659
4660fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4661    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4662    let json =
4663        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4664    serde_json::from_str(&json).map_err(|e| e.to_string())
4665}
4666
4667// ---------------------------------------------------------------------------
4668// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4669// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4670// a2a_server_status — closes the binding gap noted in #126).
4671// ---------------------------------------------------------------------------
4672
4673async fn handle_a2a_start(
4674    req: &JsonRpcMessage,
4675    session: &crate::session::ClientSession,
4676) -> Result<Value, String> {
4677    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4678    // Always hand the session's runtime through. start_a2a uses it
4679    // only when `share_session_runtime: true` is set in params;
4680    // otherwise it falls back to the legacy fresh-Runtime + agent_basics
4681    // path. Passing it unconditionally keeps the FFI layer ignorant of
4682    // the flag's plumbing.
4683    let json =
4684        crate::a2a::start_a2a(&params_json, Some(session.runtime.clone())).await?;
4685    serde_json::from_str(&json).map_err(|e| e.to_string())
4686}
4687
4688fn handle_a2a_stop() -> Result<Value, String> {
4689    let json = crate::a2a::stop_a2a()?;
4690    serde_json::from_str(&json).map_err(|e| e.to_string())
4691}
4692
4693fn handle_a2a_status() -> Result<Value, String> {
4694    let json = crate::a2a::a2a_status()?;
4695    serde_json::from_str(&json).map_err(|e| e.to_string())
4696}
4697
4698#[derive(Deserialize)]
4699#[serde(rename_all = "camelCase")]
4700struct A2aSendParams {
4701    endpoint: String,
4702    message: car_a2a::Message,
4703    #[serde(default)]
4704    blocking: bool,
4705    #[serde(default = "default_true")]
4706    ingest_a2ui: bool,
4707    #[serde(default)]
4708    route_auth: Option<A2aRouteAuth>,
4709    #[serde(default)]
4710    allow_untrusted_endpoint: bool,
4711}
4712
4713fn default_true() -> bool {
4714    true
4715}
4716
4717/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
4718/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
4719/// on `ServerState`. Closes Parslee-ai/car-releases#28.
4720///
4721/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
4722/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
4723/// transport-neutral surface — the standalone `start_a2a_listener`
4724/// HTTP path serves SSE for those, but the in-core WS surface is
4725/// JSON-RPC only. Same trade as the dispatcher itself.
4726async fn handle_a2a_dispatch(
4727    method: &str,
4728    req: &JsonRpcMessage,
4729    state: &Arc<ServerState>,
4730) -> Result<Value, String> {
4731    let dispatcher = state.a2a_dispatcher().await;
4732    dispatcher
4733        .dispatch(method, req.params.clone())
4734        .await
4735        .map_err(|e| e.to_string())
4736}
4737
4738async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4739    let params: A2aSendParams =
4740        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4741    let endpoint = trusted_route_endpoint(
4742        Some(params.endpoint.clone()),
4743        params.allow_untrusted_endpoint,
4744    )
4745    .ok_or_else(|| {
4746        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4747    })?;
4748    let client = match params.route_auth.clone() {
4749        Some(auth) => {
4750            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4751        }
4752        None => car_a2a::A2aClient::new(endpoint.clone()),
4753    };
4754    let result = client
4755        .send_message(params.message, params.blocking)
4756        .await
4757        .map_err(|e| e.to_string())?;
4758    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4759    let mut applied = Vec::new();
4760    if params.ingest_a2ui {
4761        state
4762            .a2ui
4763            .validate_payload(&result_value)
4764            .map_err(|e| e.to_string())?;
4765        let routed_endpoint = Some(endpoint.clone());
4766        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4767            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4768                if owner.endpoint.is_none() {
4769                    owner.with_endpoint(routed_endpoint.clone())
4770                } else {
4771                    owner
4772                }
4773            });
4774            applied.push(
4775                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4776            );
4777        }
4778    }
4779    Ok(serde_json::json!({
4780        "result": result,
4781        "a2ui": {
4782            "applied": applied,
4783        }
4784    }))
4785}
4786
4787// ---------------------------------------------------------------------------
4788// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
4789// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
4790// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
4791// vision_ocr.
4792// ---------------------------------------------------------------------------
4793
4794async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4795    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4796    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4797    serde_json::from_str(&json).map_err(|e| e.to_string())
4798}
4799
4800async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4801    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4802    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4803    serde_json::from_str(&json).map_err(|e| e.to_string())
4804}
4805
4806async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4807    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4808    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4809    serde_json::from_str(&json).map_err(|e| e.to_string())
4810}
4811
4812async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4813    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4814    let json = car_ffi_common::notifications::local(&args_json).await?;
4815    serde_json::from_str(&json).map_err(|e| e.to_string())
4816}
4817
4818async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4819    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4820    let json = car_ffi_common::vision::ocr(&args_json).await?;
4821    serde_json::from_str(&json).map_err(|e| e.to_string())
4822}
4823
4824// ---------------------------------------------------------------------------
4825// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
4826// ---------------------------------------------------------------------------
4827
4828async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4829    let supervisor = state.supervisor()?;
4830    let agents = supervisor.list().await;
4831    // Decorate each entry with `attached` + `session_id` so operators
4832    // see whether the supervised process has actually called
4833    // `session.auth { agent_id }` and bound a WS connection (#169) —
4834    // the lifecycle status (`Running`, etc.) only reports the
4835    // process-level view, which can't tell "alive but never
4836    // attached" from "alive and attached".
4837    let attached = state.attached_agents.lock().await.clone();
4838    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4839    for a in agents {
4840        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4841        let session_id = attached.get(&a.spec.id).cloned();
4842        if let Some(map) = v.as_object_mut() {
4843            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4844            if let Some(sid) = session_id {
4845                map.insert("session_id".to_string(), Value::String(sid));
4846            }
4847        }
4848        decorated.push(v);
4849    }
4850    Ok(Value::Array(decorated))
4851}
4852
4853async fn handle_agents_upsert(
4854    req: &JsonRpcMessage,
4855    state: &Arc<ServerState>,
4856) -> Result<Value, String> {
4857    let mut params = req.params.clone();
4858    // Optional `interpreter` sugar (#171). When present, the
4859    // supervisor resolves the bare program name (`"node"`,
4860    // `"python"`, …) against `$PATH` and writes the absolute path
4861    // into `command` *before* validation. This keeps the strict
4862    // no-PATH-lookup rule at upsert time while letting callers
4863    // stop hand-coding `/opt/homebrew/bin/node` into every
4864    // agents.json entry. Resolution happens once; subsequent PATH
4865    // changes do not silently rewire the binding.
4866    if let Some(name) = params
4867        .get("interpreter")
4868        .and_then(|v| v.as_str())
4869        .map(str::to_string)
4870    {
4871        let resolved =
4872            car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
4873        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4874    }
4875    let spec: car_registry::supervisor::AgentSpec =
4876        serde_json::from_value(params).map_err(|e| e.to_string())?;
4877    let supervisor = state.supervisor()?;
4878    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4879    serde_json::to_value(agent).map_err(|e| e.to_string())
4880}
4881
4882/// `agents.install` — install a contributed-agent manifest
4883/// (Parslee-ai/car#182 phase 3). Caller passes the parsed
4884/// `AgentManifest` JSON; the daemon runs install-time validation
4885/// (`car_min_version`, capability negotiation against the daemon's
4886/// own advertisement) and adopts the manifest. Returns
4887/// `{ report, agent? }` where `agent` is the spawnable
4888/// `ManagedAgent` for `external_process` transports and absent for
4889/// `pure_data` / health_url-only manifests.
4890///
4891/// The host capability advertisement comes from
4892/// `HostCapabilities::daemon_default(car_version)` — operators that
4893/// want a tighter advertisement go through a future config phase;
4894/// this MVP uses the runtime's natural surface.
4895async fn handle_agents_install(
4896    req: &JsonRpcMessage,
4897    state: &Arc<ServerState>,
4898) -> Result<Value, String> {
4899    let manifest: car_registry::manifest::AgentManifest =
4900        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4901    let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
4902    let supervisor = state.supervisor()?;
4903    let (report, managed) = supervisor
4904        .install_manifest(manifest, &host)
4905        .await
4906        .map_err(|e| e.to_string())?;
4907    Ok(serde_json::json!({
4908        "report": {
4909            "missingOptional": report
4910                .missing_optional
4911                .iter()
4912                .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
4913                .collect::<Vec<_>>(),
4914        },
4915        "agent": managed,
4916    }))
4917}
4918
4919async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
4920    let supervisor = state.supervisor()?;
4921    let entries = supervisor.health().await;
4922    serde_json::to_value(entries).map_err(|e| e.to_string())
4923}
4924
4925fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
4926    req.params
4927        .get("id")
4928        .and_then(Value::as_str)
4929        .map(str::to_string)
4930        .ok_or_else(|| "missing required `id` parameter".to_string())
4931}
4932
4933async fn handle_agents_remove(
4934    req: &JsonRpcMessage,
4935    state: &Arc<ServerState>,
4936) -> Result<Value, String> {
4937    let id = extract_agent_id(req)?;
4938    let supervisor = state.supervisor()?;
4939    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
4940    Ok(serde_json::json!({ "removed": removed }))
4941}
4942
4943async fn handle_agents_start(
4944    req: &JsonRpcMessage,
4945    state: &Arc<ServerState>,
4946) -> Result<Value, String> {
4947    let id = extract_agent_id(req)?;
4948    let supervisor = state.supervisor()?;
4949    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
4950    serde_json::to_value(agent).map_err(|e| e.to_string())
4951}
4952
4953async fn handle_agents_stop(
4954    req: &JsonRpcMessage,
4955    state: &Arc<ServerState>,
4956) -> Result<Value, String> {
4957    let id = extract_agent_id(req)?;
4958    let signal: car_registry::supervisor::StopSignal = req
4959        .params
4960        .get("signal")
4961        .map(|v| serde_json::from_value(v.clone()))
4962        .transpose()
4963        .map_err(|e| e.to_string())?
4964        .unwrap_or_default();
4965    let supervisor = state.supervisor()?;
4966    let agent = supervisor
4967        .stop(&id, signal)
4968        .await
4969        .map_err(|e| e.to_string())?;
4970    serde_json::to_value(agent).map_err(|e| e.to_string())
4971}
4972
4973async fn handle_agents_restart(
4974    req: &JsonRpcMessage,
4975    state: &Arc<ServerState>,
4976) -> Result<Value, String> {
4977    let id = extract_agent_id(req)?;
4978    let supervisor = state.supervisor()?;
4979    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
4980    serde_json::to_value(agent).map_err(|e| e.to_string())
4981}
4982
4983async fn handle_agents_tail_log(
4984    req: &JsonRpcMessage,
4985    state: &Arc<ServerState>,
4986) -> Result<Value, String> {
4987    let id = extract_agent_id(req)?;
4988    let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
4989    let supervisor = state.supervisor()?;
4990    let lines = supervisor
4991        .tail_log(&id, n)
4992        .await
4993        .map_err(|e| e.to_string())?;
4994    Ok(serde_json::json!({ "lines": lines }))
4995}
4996
4997// ---------------------------------------------------------------------------
4998// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
4999//
5000// Discovery surface for agentic CLIs the user has already installed and
5001// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
5002// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
5003// stdio adapter. The cache lives in car_ffi_common::external_agents so
5004// the in-process FFI singletons share the same snapshot.
5005// ---------------------------------------------------------------------------
5006
5007async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5008    let include_health = req
5009        .params
5010        .get("include_health")
5011        .and_then(Value::as_bool)
5012        .unwrap_or(false);
5013    let json = car_ffi_common::external_agents::list(include_health).await?;
5014    serde_json::from_str(&json).map_err(|e| e.to_string())
5015}
5016
5017async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5018    let include_health = req
5019        .params
5020        .get("include_health")
5021        .and_then(Value::as_bool)
5022        .unwrap_or(false);
5023    let json = car_ffi_common::external_agents::detect(include_health).await?;
5024    serde_json::from_str(&json).map_err(|e| e.to_string())
5025}
5026
5027/// Per-task invocation of an external CLI agent. Required params:
5028/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
5029/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
5030///
5031/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
5032/// ids return `is_error: true` with a structured `error` so hosts
5033/// can surface the gap without a separate error code.
5034///
5035/// Phase 2 stage 4a (governance): every invocation appends a
5036/// structured audit record to `~/.car/external-agents.jsonl`. The
5037/// record captures id, task, options, result, and the full
5038/// `tool_uses` list the assistant emitted — so even though the
5039/// agent executes its built-in tools in-process (which we can't
5040/// gate via stream-json), there's a complete after-the-fact audit
5041/// trail. Full policy gating (proposing each tool_use to CAR's
5042/// validator + getting a yes/no) requires the MCP server route in
5043/// stage 4b.
5044async fn handle_agents_invoke_external(
5045    req: &JsonRpcMessage,
5046    state: &Arc<ServerState>,
5047) -> Result<Value, String> {
5048    let id = req
5049        .params
5050        .get("id")
5051        .and_then(Value::as_str)
5052        .ok_or_else(|| "missing required `id` parameter".to_string())?;
5053    let task = req
5054        .params
5055        .get("task")
5056        .and_then(Value::as_str)
5057        .ok_or_else(|| "missing required `task` parameter".to_string())?;
5058    // Build the options sub-object directly from req.params so
5059    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
5060    // `timeout_secs` as siblings of `id`/`task`. Strip the two
5061    // dispatch fields so they don't pollute the options serde.
5062    let mut options_value = req.params.clone();
5063    if let Some(obj) = options_value.as_object_mut() {
5064        obj.remove("id");
5065        obj.remove("task");
5066        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
5067        // caller didn't supply one. This is the load-bearing
5068        // wiring of MCP-4: external agents get CAR's tools (memory,
5069        // skills, verify) routed through the daemon's policy +
5070        // shared memgine without any per-call host configuration.
5071        // Callers who want to opt out can pass `"mcp_endpoint": ""`
5072        // (empty string) — the runner skips the temp-file write
5073        // when the value isn't a non-empty URL.
5074        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5075        if !has_explicit_mcp {
5076            if let Some(url) = state.mcp_url.get() {
5077                obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5078            }
5079        }
5080    }
5081    let options_json = options_value.to_string();
5082    let json = car_ffi_common::external_agents::invoke(id, task, &options_json).await?;
5083    let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5084    append_external_agent_audit(id, task, &options_value, &result);
5085    Ok(result)
5086}
5087
5088/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
5089/// Best-effort: a failure to open the journal must NOT fail the
5090/// invocation; the in-memory result already returned is the
5091/// authoritative answer. Logs at warn level when the write fails so
5092/// operators notice repeated failures.
5093fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5094    use std::io::Write;
5095    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5096        Some(home) => home.join(".car"),
5097        None => return,
5098    };
5099    if std::fs::create_dir_all(&car_dir).is_err() {
5100        return;
5101    }
5102    let path = car_dir.join("external-agents.jsonl");
5103    let record = serde_json::json!({
5104        "ts": chrono::Utc::now().to_rfc3339(),
5105        "adapter_id": id,
5106        "task": task,
5107        "options": options,
5108        "result": result,
5109    });
5110    let line = match serde_json::to_string(&record) {
5111        Ok(s) => s,
5112        Err(_) => return,
5113    };
5114    if let Ok(mut f) = std::fs::OpenOptions::new()
5115        .create(true)
5116        .append(true)
5117        .open(&path)
5118    {
5119        let _ = writeln!(f, "{}", line);
5120    } else {
5121        tracing::warn!(
5122            path = %path.display(),
5123            "failed to append external-agent audit record"
5124        );
5125    }
5126}
5127
5128/// Ground-truth health check. Optional `id` param picks one tool;
5129/// without it, every detected adapter is checked. `force: true`
5130/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
5131/// credential-file shape heuristic as the load-bearing signal for
5132/// "is this tool ready to invoke."
5133async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5134    let force = req
5135        .params
5136        .get("force")
5137        .and_then(Value::as_bool)
5138        .unwrap_or(false);
5139    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5140        let json = car_ffi_common::external_agents::health_one(id, force).await?;
5141        serde_json::from_str(&json).map_err(|e| e.to_string())
5142    } else {
5143        let json = car_ffi_common::external_agents::health(force).await?;
5144        serde_json::from_str(&json).map_err(|e| e.to_string())
5145    }
5146}