Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` skip this and call `run_dispatch` directly.
87#[instrument(
88    name = "ws.connection",
89    skip_all,
90    fields(peer = %peer),
91)]
92pub async fn handle_connection(
93    stream: TcpStream,
94    peer: SocketAddr,
95    state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97    let ws_stream = accept_async(stream).await?;
98    let (write, read) = ws_stream.split();
99    run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102/// Convenience wrapper for the daemon-as-default Unix-socket
103/// listener. Same shape as [`handle_connection`] but accepts a
104/// `UnixStream` — used by the per-user UDS listener in
105/// `car-server::main` (default transport for FFI thin clients,
106/// since UDS is faster + permission-scoped vs localhost TCP).
107///
108/// Unix-only — `tokio::net::UnixStream` is gated on
109/// `cfg(all(unix, feature = "net"))`. On Windows the daemon binds
110/// only the TCP listener (loopback) and this entry point is
111/// compiled out; consumers must use `handle_connection` instead.
112#[cfg(unix)]
113#[instrument(
114    name = "ws.connection",
115    skip_all,
116    fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119    stream: tokio::net::UnixStream,
120    peer: String,
121    state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123    let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124    let (write, read) = ws_stream.split();
125    run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
129/// against an already-handshake-completed split WebSocket. Generic
130/// over the read half (any `Stream<Item = Result<Message, WsError>>`)
131/// and the write half (a [`WsSink`] — type-erased so this function
132/// doesn't templatize every downstream consumer of `WsChannel`).
133///
134/// `peer` is a free-form string ("127.0.0.1:1234" for TCP,
135/// "uds:/path/sock" for UDS, "axum:..." for embedders) — used only
136/// for tracing fields, never for dispatch logic.
137#[instrument(
138    name = "ws.dispatch",
139    skip_all,
140    fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143    mut read: R,
144    write: crate::session::WsSink,
145    peer: String,
146    state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149    R: futures::Stream<
150            Item = Result<Message, tokio_tungstenite::tungstenite::Error>,
151        >
152        + Unpin
153        + Send,
154{
155    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
156    tracing::Span::current().record("client_id", &client_id.as_str());
157
158    info!("New connection from {}", peer);
159
160    let channel = Arc::new(WsChannel {
161        write: Mutex::new(write),
162        pending: Mutex::new(HashMap::new()),
163        next_id: AtomicU64::new(1),
164    });
165
166    let session = state.create_session(&client_id, channel.clone()).await;
167
168    while let Some(msg) = read.next().await {
169        let msg = msg?;
170        if msg.is_text() {
171            let text = msg.to_text()?;
172            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
173                Ok(m) => m,
174                Err(e) => {
175                    send_response(
176                        &session.channel,
177                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
178                    )
179                    .await?;
180                    continue;
181                }
182            };
183
184            // Is this a response to a pending tool callback?
185            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
186                if let Some(id_str) = parsed.id.as_str() {
187                    let mut pending = session.channel.pending.lock().await;
188                    if let Some(tx) = pending.remove(id_str) {
189                        let tool_resp = if let Some(result) = parsed.result {
190                            ToolExecuteResponse {
191                                action_id: id_str.to_string(),
192                                output: Some(result),
193                                error: None,
194                            }
195                        } else {
196                            let err_msg = parsed
197                                .error
198                                .as_ref()
199                                .and_then(|e| e.get("message"))
200                                .and_then(|m| m.as_str())
201                                .unwrap_or("unknown error")
202                                .to_string();
203                            ToolExecuteResponse {
204                                action_id: id_str.to_string(),
205                                output: None,
206                                error: Some(err_msg),
207                            }
208                        };
209                        let _ = tx.send(tool_resp);
210                        continue;
211                    }
212                }
213            }
214
215            // Otherwise it's a client request
216            if let Some(method) = &parsed.method {
217                info!(method = %method, "dispatching JSON-RPC method");
218
219                // Auth gate (Parslee-ai/car-releases#32). When the
220                // server has an auth token installed, every method
221                // other than `session.auth` is rejected on
222                // unauthenticated sessions and the connection is
223                // closed after the error response goes out. When no
224                // token is installed (default), this branch never
225                // fires — preserves pre-#32 behaviour.
226                if state.auth_token.get().is_some()
227                    && !session
228                        .authenticated
229                        .load(std::sync::atomic::Ordering::Acquire)
230                    && method != "session.auth"
231                {
232                    let resp = JsonRpcResponse::error(
233                        parsed.id.clone(),
234                        -32001,
235                        "auth required: send `session.auth` with the per-launch token \
236                         from ~/Library/Application Support/ai.parslee.car/auth-token \
237                         (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
238                         as the first frame on this connection",
239                    );
240                    send_response(&session.channel, resp).await?;
241                    info!(client = %client_id, method = %method,
242                        "rejecting non-auth method on unauthenticated session; closing");
243                    break;
244                }
245
246                // Approval gate (audit 2026-05). High-risk methods —
247                // anything that drives macOS automation or sends
248                // messages on the user's behalf — must be acked by
249                // the user via `host.resolve_approval` before they
250                // dispatch. The gate raises an `approval.requested`
251                // host event the local UI can render approve/deny on,
252                // then parks until resolved or the configured
253                // timeout fires. Returns a JSON-RPC error and
254                // continues the dispatch loop on deny / timeout —
255                // no connection close, since the caller may want to
256                // retry with revised parameters.
257                if state.approval_gate.requires_approval(method.as_str()) {
258                    match gate_high_risk_method(
259                        method.as_str(),
260                        &parsed.params,
261                        &state,
262                    )
263                    .await
264                    {
265                        Ok(()) => {}
266                        Err(reason) => {
267                            let resp = JsonRpcResponse::error(
268                                parsed.id.clone(),
269                                -32003,
270                                &reason,
271                            );
272                            send_response(&session.channel, resp).await?;
273                            info!(
274                                client = %client_id,
275                                method = %method,
276                                reason = %reason,
277                                "approval gate blocked dispatch"
278                            );
279                            continue;
280                        }
281                    }
282                }
283
284                // Spawn the per-method dispatch in a task so the read
285                // loop keeps reading frames. Without this, methods
286                // that trigger server-initiated `tools.execute`
287                // callbacks (`proposal.submit`, `workflow.run`,
288                // `multi.*` paths that fire a registered tool)
289                // deadlock the connection: the handler awaits the
290                // callback response on a oneshot, but the response is
291                // another frame on this same read half — which the
292                // synchronous `.await` here would prevent the loop
293                // from ever picking up. Surfaced by the
294                // `executeProposal: echo tool via JS callback` smoke
295                // (#173). Response ordering becomes id-keyed (the
296                // JSON-RPC demuxing contract) rather than
297                // arrival-ordered.
298                let session_task = session.clone();
299                let state_task = state.clone();
300                let method_owned = method.clone();
301                let parsed_task = parsed;
302                tokio::spawn(async move {
303                    let session = session_task;
304                    let state = state_task;
305                    let parsed = parsed_task;
306                    let result = match method_owned.as_str() {
307                        "session.auth" => handle_session_auth(&parsed, &session, &state).await,
308                    "session.init" => handle_session_init(&parsed, &session).await,
309                    "host.subscribe" => handle_host_subscribe(&session).await,
310                    "host.agents" => handle_host_agents(&session).await,
311                    "host.events" => handle_host_events(&parsed, &session).await,
312                    "host.approvals" => handle_host_approvals(&session).await,
313                    "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
314                    "host.unregister_agent" => {
315                        handle_host_unregister_agent(&parsed, &session).await
316                    }
317                    "host.set_status" => handle_host_set_status(&parsed, &session).await,
318                    "host.notify" => handle_host_notify(&parsed, &session).await,
319                    "host.request_approval" => {
320                        handle_host_request_approval(&parsed, &session).await
321                    }
322                    "host.resolve_approval" => {
323                        handle_host_resolve_approval(&parsed, &session).await
324                    }
325                    "tools.register" => handle_tools_register(&parsed, &session).await,
326                    "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
327                    "policy.register" => handle_policy_register(&parsed, &session).await,
328                    "session.policy.open" => handle_session_policy_open(&session).await,
329                    "session.policy.close" => {
330                        handle_session_policy_close(&parsed, &session).await
331                    }
332                    "verify" => handle_verify(&parsed, &session).await,
333                    "state.get" => handle_state_get(&parsed, &session).await,
334                    "state.set" => handle_state_set(&parsed, &session).await,
335                    "state.exists" => handle_state_exists(&parsed, &session).await,
336                    "state.keys" => handle_state_keys(&session).await,
337                    "state.snapshot" => handle_state_snapshot(&session).await,
338                    "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
339                    "memory.query" => handle_memory_query(&parsed, &session).await,
340                    "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
341                    "memory.build_context_fast" => {
342                        handle_memory_build_context_fast(&parsed, &session).await
343                    }
344                    "memory.consolidate" => handle_memory_consolidate(&session).await,
345                    "memory.fact_count" => handle_memory_fact_count(&session).await,
346                    "memory.persist" => handle_memory_persist(&parsed, &session).await,
347                    "memory.load" => handle_memory_load(&parsed, &session).await,
348                    "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
349                    "skill.find" => handle_skill_find(&parsed, &session).await,
350                    "skill.report" => handle_skill_report(&parsed, &session).await,
351                    "skill.repair" => handle_skill_repair(&parsed, &session).await,
352                    "skills.ingest_distilled" => {
353                        handle_skills_ingest_distilled(&parsed, &session).await
354                    }
355                    "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
356                    "skills.domains_needing_evolution" => {
357                        handle_skills_domains_needing_evolution(&parsed, &session).await
358                    }
359                    "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
360                    "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
361                    "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
362                    "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
363                    "multi.vote" => handle_multi_vote(&parsed, &session).await,
364                    "scheduler.create" => handle_scheduler_create(&parsed),
365                    "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
366                    "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
367                    "infer" => handle_infer(&parsed, &state, &session).await,
368                    "embed" => handle_embed(&parsed, &state).await,
369                    "classify" => handle_classify(&parsed, &state).await,
370                    "tokenize" => handle_tokenize(&parsed, &state).await,
371                    "detokenize" => handle_detokenize(&parsed, &state).await,
372                    "rerank" => handle_rerank(&parsed, &state).await,
373                    "transcribe" => handle_transcribe(&parsed, &state).await,
374                    "synthesize" => handle_synthesize(&parsed, &state).await,
375                    "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
376                    "speech.prepare" => handle_speech_prepare(&state).await,
377                    "models.route" => handle_models_route(&parsed, &state).await,
378                    "models.stats" => handle_models_stats(&state).await,
379                    "events.count" => handle_events_count(&session).await,
380                    "events.stats" => handle_events_stats(&session).await,
381                    "events.truncate" => handle_events_truncate(&parsed, &session).await,
382                    "events.clear" => handle_events_clear(&session).await,
383                    "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
384                    "models.list" => handle_models_list(&state),
385                    "models.list_unified" => handle_models_list_unified(&state),
386                    "models.search" => handle_models_search(&parsed, &state),
387                    "models.upgrades" => handle_models_upgrades(&state),
388                    "models.pull" => handle_models_pull(&parsed, &state).await,
389                    "models.install" => handle_models_pull(&parsed, &state).await,
390                    "skills.distill" => handle_skills_distill(&parsed, &state).await,
391                    "skills.list" => handle_skills_list(&parsed, &session).await,
392                    "browser.run" => handle_browser_run(&parsed, &session).await,
393                    "browser.close" => handle_browser_close(&session).await,
394                    "secret.put" => handle_secret_put(&parsed),
395                    "secret.get" => handle_secret_get(&parsed),
396                    "secret.delete" => handle_secret_delete(&parsed),
397                    "secret.status" => handle_secret_status(&parsed),
398                    "secret.available" => Ok(car_ffi_common::secrets::is_available()),
399                    "permissions.status" => handle_perm_status(&parsed),
400                    "permissions.request" => handle_perm_request(&parsed),
401                    "permissions.explain" => handle_perm_explain(&parsed),
402                    "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
403                    "accounts.list" => car_ffi_common::accounts::list(),
404                    "accounts.open" => {
405                        #[derive(serde::Deserialize, Default)]
406                        struct OpenParams {
407                            #[serde(default)]
408                            account_id: Option<String>,
409                        }
410                        let p: OpenParams =
411                            serde_json::from_value(parsed.params.clone()).unwrap_or_default();
412                        car_ffi_common::accounts::open_settings(p.account_id.as_deref())
413                    }
414                    "calendar.list" => car_ffi_common::integrations::calendar_list(),
415                    "calendar.events" => handle_calendar_events(&parsed),
416                    "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
417                    "contacts.find" => handle_contacts_find(&parsed),
418                    "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
419                    "mail.inbox" => handle_mail_inbox(&parsed),
420                    "mail.send" => handle_mail_send(&parsed),
421                    "messages.services" => car_ffi_common::integrations::messages_services(),
422                    "messages.chats" => handle_messages_chats(&parsed),
423                    "messages.send" => handle_messages_send(&parsed),
424                    "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
425                    "notes.find" => handle_notes_find(&parsed),
426                    "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
427                    "reminders.items" => handle_reminders_items(&parsed),
428                    "photos.albums" => car_ffi_common::integrations::photos_albums(),
429                    "bookmarks.list" => handle_bookmarks_list(&parsed),
430                    "files.locations" => car_ffi_common::integrations::files_locations(),
431                    "keychain.status" => car_ffi_common::integrations::keychain_status(),
432                    "health.status" => car_ffi_common::health::status(),
433                    "health.sleep" => handle_health_sleep(&parsed),
434                    "health.workouts" => handle_health_workouts(&parsed),
435                    "health.activity" => handle_health_activity(&parsed),
436                    "voice.transcribe_stream.start" => {
437                        handle_voice_transcribe_stream_start(&parsed, &state, &session).await
438                    }
439                    "voice.transcribe_stream.stop" => {
440                        handle_voice_transcribe_stream_stop(&parsed, &state).await
441                    }
442                    "voice.transcribe_stream.push" => {
443                        handle_voice_transcribe_stream_push(&parsed, &state).await
444                    }
445                    "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
446                    "voice.dispatch_turn" => {
447                        handle_voice_dispatch_turn(&parsed, &state, &session).await
448                    }
449                    "voice.cancel_turn" => handle_voice_cancel_turn().await,
450                    "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
451                    "inference.register_runner" => {
452                        handle_inference_register_runner(&session).await
453                    }
454                    "inference.runner.event" => {
455                        handle_inference_runner_event(&parsed).await
456                    }
457                    "inference.runner.complete" => {
458                        handle_inference_runner_complete(&parsed).await
459                    }
460                    "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
461                    "voice.providers.list" => {
462                        // Stateless: enumerates STT/TTS providers compiled into
463                        // this build. Runtime readiness (API key, permission,
464                        // model download) is reported via per-provider errors.
465                        serde_json::from_str::<serde_json::Value>(
466                            &car_voice::list_voice_providers_json(),
467                        )
468                        .map_err(|e| e.to_string())
469                    }
470                    "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
471                        .await
472                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
473                    "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
474                        .await
475                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
476                    "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
477                    "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
478                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
479                    "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
480                    "workflow.run" => handle_workflow_run(&parsed, &session).await,
481                    "workflow.verify" => handle_workflow_verify(&parsed),
482                    "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
483                    "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
484                    "meeting.list" => handle_meeting_list(&parsed),
485                    "meeting.get" => handle_meeting_get(&parsed),
486                    "registry.register" => handle_registry_register(&parsed),
487                    "registry.heartbeat" => handle_registry_heartbeat(&parsed),
488                    "registry.unregister" => handle_registry_unregister(&parsed),
489                    "registry.list" => handle_registry_list(&parsed),
490                    "registry.reap" => handle_registry_reap(&parsed),
491                    "admission.status" => handle_admission_status(&state),
492                    "a2a.start" => handle_a2a_start(&parsed).await,
493                    "a2a.stop" => handle_a2a_stop(),
494                    "a2a.status" => handle_a2a_status(),
495                    "a2a.send" => handle_a2a_send(&parsed, &state).await,
496                    "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
497                    "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
498                    "a2ui.capabilities" => handle_a2ui_capabilities(&state),
499                    "a2ui.reap" => handle_a2ui_reap(&state).await,
500                    "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
501                    "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
502                    "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
503                    "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
504                    "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
505                    "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
506                    "automation.run_applescript" => handle_run_applescript(&parsed).await,
507                    "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
508                    "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
509                    "notifications.local" => handle_local_notification(&parsed).await,
510                    "vision.ocr" => handle_vision_ocr(&parsed).await,
511                    "agents.list" => handle_agents_list(&state).await,
512                    "agents.health" => handle_agents_health(&state).await,
513                    "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
514                    "agents.remove" => handle_agents_remove(&parsed, &state).await,
515                    "agents.start" => handle_agents_start(&parsed, &state).await,
516                    "agents.stop" => handle_agents_stop(&parsed, &state).await,
517                    "agents.restart" => handle_agents_restart(&parsed, &state).await,
518                    "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
519                    "agents.list_external" => handle_agents_list_external(&parsed).await,
520                    "agents.detect_external" => handle_agents_detect_external(&parsed).await,
521                    "agents.health_external" => handle_agents_health_external(&parsed).await,
522                    "agents.invoke_external" => {
523                        handle_agents_invoke_external(&parsed, &state).await
524                    }
525                    // A2A v1.0 (PascalCase) + v0.3 (slash form) — both
526                    // alias to the same in-core dispatcher per
527                    // Parslee-ai/car-releases#28. Embedders that need a
528                    // custom AgentCardSource / TaskStore plug them in
529                    // via ServerStateConfig::with_a2a_card_source /
530                    // with_a2a_store before any handler runs.
531                    "message/send" | "SendMessage"
532                    | "message/stream" | "SendStreamingMessage"
533                    | "tasks/get" | "GetTask"
534                    | "tasks/list" | "ListTasks"
535                    | "tasks/cancel" | "CancelTask"
536                    | "tasks/resubscribe" | "SubscribeToTask"
537                    | "tasks/pushNotificationConfig/set"
538                    | "CreateTaskPushNotificationConfig"
539                    | "tasks/pushNotificationConfig/get"
540                    | "GetTaskPushNotificationConfig"
541                    | "tasks/pushNotificationConfig/list"
542                    | "ListTaskPushNotificationConfigs"
543                    | "tasks/pushNotificationConfig/delete"
544                    | "DeleteTaskPushNotificationConfig"
545                    | "agent/getAuthenticatedExtendedCard"
546                    | "GetExtendedAgentCard" => handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await,
547                    _ => Err(format!("unknown method: {}", method_owned)),
548                    };
549
550                    let resp = match result {
551                        Ok(value) => JsonRpcResponse::success(parsed.id, value),
552                        Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
553                    };
554                    let _ = send_response(&session.channel, resp).await;
555                });
556            }
557        } else if msg.is_close() {
558            info!("Client {} disconnected", client_id);
559            break;
560        }
561    }
562
563    session.host.unsubscribe(&client_id).await;
564    state.a2ui_subscribers.lock().await.remove(&client_id);
565
566    // Fix for MULTI-4 / WS-3: drop the session from the registry and
567    // drain any pending tool callbacks. Without this, every connection
568    // we ever accepted keeps an `Arc<ClientSession>` alive in
569    // `state.sessions`, and outstanding `oneshot::Sender`s in
570    // `session.channel.pending` outlive the closed connection until
571    // their per-call timeout (60s). Dropping the senders here causes any
572    // awaiting `recv()` in `WsToolExecutor::execute` to return
573    // `RecvError` immediately, which the existing error-handler path
574    // already maps to "callback channel closed" — same shape as the
575    // timeout path, just faster.
576    let _removed = state.remove_session(&client_id).await;
577    {
578        let mut pending = session.channel.pending.lock().await;
579        pending.clear();
580    }
581
582    Ok(())
583}
584
585async fn send_response(
586    channel: &WsChannel,
587    resp: JsonRpcResponse,
588) -> Result<(), Box<dyn std::error::Error>> {
589    use futures::SinkExt;
590    let json = serde_json::to_string(&resp)?;
591    channel
592        .write
593        .lock()
594        .await
595        .send(Message::Text(json.into()))
596        .await?;
597    Ok(())
598}
599
600// --- Request handlers ---
601
602async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
603    session
604        .host
605        .subscribe(&session.client_id, session.channel.clone())
606        .await;
607    serde_json::to_value(HostSnapshot {
608        subscribed: true,
609        agents: session.host.agents().await,
610        approvals: session.host.approvals().await,
611        events: session.host.events(50).await,
612    })
613    .map_err(|e| e.to_string())
614}
615
616async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
617    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
618}
619
620async fn handle_host_events(
621    req: &JsonRpcMessage,
622    session: &crate::session::ClientSession,
623) -> Result<Value, String> {
624    let limit = req
625        .params
626        .get("limit")
627        .and_then(|v| v.as_u64())
628        .unwrap_or(100) as usize;
629    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
630}
631
632async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
633    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
634}
635
636async fn handle_a2ui_apply(
637    req: &JsonRpcMessage,
638    state: &Arc<ServerState>,
639) -> Result<Value, String> {
640    #[derive(Deserialize)]
641    struct Params {
642        #[serde(default)]
643        envelope: Option<car_a2ui::A2uiEnvelope>,
644        #[serde(default)]
645        message: Option<car_a2ui::A2uiEnvelope>,
646    }
647
648    let envelope = if req.params.get("createSurface").is_some()
649        || req.params.get("updateComponents").is_some()
650        || req.params.get("updateDataModel").is_some()
651        || req.params.get("deleteSurface").is_some()
652    {
653        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
654            .map_err(|e| e.to_string())?
655    } else {
656        match serde_json::from_value::<Params>(req.params.clone()) {
657            Ok(params) => params
658                .envelope
659                .or(params.message)
660                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
661            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
662                .map_err(|e| e.to_string())?,
663        }
664    };
665
666    apply_a2ui_envelope(state, envelope, None, None).await
667}
668
669async fn handle_a2ui_ingest(
670    req: &JsonRpcMessage,
671    state: &Arc<ServerState>,
672) -> Result<Value, String> {
673    #[derive(Deserialize)]
674    #[serde(rename_all = "camelCase")]
675    struct Params {
676        #[serde(default)]
677        endpoint: Option<String>,
678        #[serde(default)]
679        a2a_endpoint: Option<String>,
680        #[serde(default)]
681        owner: Option<car_a2ui::A2uiSurfaceOwner>,
682        #[serde(default)]
683        route_auth: Option<A2aRouteAuth>,
684        #[serde(default)]
685        allow_untrusted_endpoint: bool,
686    }
687
688    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
689        endpoint: None,
690        a2a_endpoint: None,
691        owner: None,
692        route_auth: None,
693        allow_untrusted_endpoint: false,
694    });
695    let payload = req.params.get("payload").unwrap_or(&req.params);
696    state
697        .a2ui
698        .validate_payload(payload)
699        .map_err(|e| e.to_string())?;
700    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
701    if envelopes.is_empty() {
702        return Err("no A2UI envelopes found in payload".into());
703    }
704    let endpoint = params.endpoint.or(params.a2a_endpoint);
705    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
706    let owner = params
707        .owner
708        .or_else(|| car_a2ui::owner_from_value(payload))
709        .map(|owner| match endpoint.clone() {
710            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
711            None => owner,
712        });
713
714    let mut results = Vec::new();
715    for envelope in envelopes {
716        let value =
717            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
718        results.push(value);
719    }
720    Ok(serde_json::json!({ "applied": results }))
721}
722
723async fn apply_a2ui_envelope(
724    state: &Arc<ServerState>,
725    envelope: car_a2ui::A2uiEnvelope,
726    owner: Option<car_a2ui::A2uiSurfaceOwner>,
727    route_auth: Option<A2aRouteAuth>,
728) -> Result<Value, String> {
729    let result = state
730        .a2ui
731        .apply_with_owner(envelope, owner)
732        .await
733        .map_err(|e| e.to_string())?;
734    update_a2ui_route_auth(state, &result, route_auth).await;
735    let kind = if result.deleted {
736        "a2ui.surface_deleted"
737    } else {
738        "a2ui.surface_updated"
739    };
740    let message = if result.deleted {
741        format!("A2UI surface {} deleted", result.surface_id)
742    } else {
743        format!("A2UI surface {} updated", result.surface_id)
744    };
745    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
746    state
747        .host
748        .record_event(kind, None, message, payload.clone())
749        .await;
750    // Push the envelope result to every WS subscriber as an
751    // `a2ui.event` notification — Parslee-ai/car-releases#29. Late
752    // joiners catch up via `a2ui/replay` (or `a2ui.surfaces`).
753    broadcast_a2ui_event(state, kind, &payload).await;
754    serde_json::to_value(result).map_err(|e| e.to_string())
755}
756
757async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
758    use futures::SinkExt;
759    use tokio_tungstenite::tungstenite::Message;
760    let subscribers: Vec<Arc<crate::session::WsChannel>> = state
761        .a2ui_subscribers
762        .lock()
763        .await
764        .values()
765        .cloned()
766        .collect();
767    if subscribers.is_empty() {
768        return;
769    }
770    let Ok(json) = serde_json::to_string(&serde_json::json!({
771        "jsonrpc": "2.0",
772        "method": "a2ui.event",
773        "params": {
774            "kind": kind,
775            "result": result,
776        },
777    })) else {
778        return;
779    };
780    for channel in subscribers {
781        let _ = channel
782            .write
783            .lock()
784            .await
785            .send(Message::Text(json.clone().into()))
786            .await;
787    }
788}
789
790async fn update_a2ui_route_auth(
791    state: &Arc<ServerState>,
792    result: &car_a2ui::A2uiApplyResult,
793    route_auth: Option<A2aRouteAuth>,
794) {
795    let mut auth = state.a2ui_route_auth.lock().await;
796    if result.deleted {
797        auth.remove(&result.surface_id);
798        return;
799    }
800
801    let has_route_endpoint = result
802        .surface
803        .as_ref()
804        .and_then(|surface| surface.owner.as_ref())
805        .and_then(|owner| owner.endpoint.as_ref())
806        .is_some();
807    match (has_route_endpoint, route_auth) {
808        (true, Some(route_auth)) => {
809            auth.insert(result.surface_id.clone(), route_auth);
810        }
811        _ => {
812            auth.remove(&result.surface_id);
813        }
814    }
815}
816
817fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
818    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
819}
820
821async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
822    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
823    if !removed.is_empty() {
824        let mut auth = state.a2ui_route_auth.lock().await;
825        for surface_id in &removed {
826            auth.remove(surface_id);
827        }
828    }
829    Ok(serde_json::json!({ "removed": removed }))
830}
831
832async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
833    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
834}
835
836async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
837    let surface_id = req
838        .params
839        .get("surface_id")
840        .or_else(|| req.params.get("surfaceId"))
841        .and_then(Value::as_str)
842        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
843    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
844}
845
846/// `a2ui/subscribe` — opt this WS connection into `a2ui.event`
847/// notifications. Subscribers receive every `apply_a2ui_envelope`
848/// result for as long as they're connected; the cleanup hook in
849/// `run_dispatch` removes them on disconnect. Closes
850/// Parslee-ai/car-releases#29.
851async fn handle_a2ui_subscribe(
852    session: &crate::session::ClientSession,
853    state: &Arc<ServerState>,
854) -> Result<Value, String> {
855    state
856        .a2ui_subscribers
857        .lock()
858        .await
859        .insert(session.client_id.clone(), session.channel.clone());
860    Ok(serde_json::json!({ "subscribed": true }))
861}
862
863/// `a2ui/unsubscribe` — opt out of `a2ui.event` notifications.
864/// Idempotent: returns `{ subscribed: false }` regardless of prior
865/// state.
866async fn handle_a2ui_unsubscribe(
867    session: &crate::session::ClientSession,
868    state: &Arc<ServerState>,
869) -> Result<Value, String> {
870    state.a2ui_subscribers.lock().await.remove(&session.client_id);
871    Ok(serde_json::json!({ "subscribed": false }))
872}
873
874/// `a2ui/replay` — fetch the current state of one surface. Intended
875/// for late joiners and reconnect: a client calls `subscribe`, then
876/// `replay` once per surface it's tracking, and from then on
877/// notifications keep it in sync. Equivalent to `a2ui.get` on the
878/// surface store; lives in the subscribe namespace for
879/// discoverability.
880async fn handle_a2ui_replay(
881    req: &JsonRpcMessage,
882    state: &Arc<ServerState>,
883) -> Result<Value, String> {
884    let surface_id = req
885        .params
886        .get("surface_id")
887        .or_else(|| req.params.get("surfaceId"))
888        .and_then(Value::as_str)
889        .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
890    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
891}
892
893async fn handle_a2ui_action(
894    req: &JsonRpcMessage,
895    state: &Arc<ServerState>,
896) -> Result<Value, String> {
897    let action: car_a2ui::ClientAction =
898        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
899    let owner = state.a2ui.owner(&action.surface_id).await;
900    let route = route_a2ui_action(state, &action, owner.clone()).await;
901    let payload = serde_json::json!({
902        "action": action,
903        "owner": owner,
904        "route": route,
905    });
906    let event = state
907        .host
908        .record_event(
909            "a2ui.action",
910            None,
911            format!(
912                "A2UI action {} from {}",
913                action.name, action.source_component_id
914            ),
915            payload,
916        )
917        .await;
918    Ok(serde_json::json!({
919        "event": event,
920        "route": route,
921    }))
922}
923
924async fn route_a2ui_action(
925    state: &Arc<ServerState>,
926    action: &car_a2ui::ClientAction,
927    owner: Option<car_a2ui::A2uiSurfaceOwner>,
928) -> Value {
929    let Some(owner) = owner else {
930        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
931    };
932    if owner.kind != "a2a" {
933        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
934    }
935    let Some(endpoint) = owner.endpoint.clone() else {
936        return serde_json::json!({
937            "delivered": false,
938            "reason": "surface owner has no endpoint",
939            "owner": owner
940        });
941    };
942
943    let message = car_a2a::Message {
944        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
945        role: car_a2a::MessageRole::User,
946        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
947            data: serde_json::json!({
948                "a2uiAction": action,
949            }),
950            metadata: Default::default(),
951        })],
952        task_id: owner.task_id.clone(),
953        context_id: owner.context_id.clone(),
954        metadata: Default::default(),
955    };
956
957    let auth = state
958        .a2ui_route_auth
959        .lock()
960        .await
961        .get(&action.surface_id)
962        .cloned()
963        .map(client_auth_from_route_auth)
964        .unwrap_or(car_a2a::ClientAuth::None);
965
966    match car_a2a::A2aClient::new(endpoint.clone())
967        .with_auth(auth)
968        .send_message(message, false)
969        .await
970    {
971        Ok(result) => serde_json::json!({
972            "delivered": true,
973            "owner": owner,
974            "endpoint": endpoint,
975            "result": result,
976        }),
977        Err(error) => serde_json::json!({
978            "delivered": false,
979            "owner": owner,
980            "endpoint": endpoint,
981            "error": error.to_string(),
982        }),
983    }
984}
985
986fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
987    match auth {
988        A2aRouteAuth::None => car_a2a::ClientAuth::None,
989        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
990        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
991    }
992}
993
994fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
995    let endpoint = endpoint?;
996    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
997        Some(endpoint)
998    } else {
999        None
1000    }
1001}
1002
1003fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1004    endpoint == "http://localhost"
1005        || endpoint.starts_with("http://localhost:")
1006        || endpoint.starts_with("http://localhost/")
1007        || endpoint == "http://127.0.0.1"
1008        || endpoint.starts_with("http://127.0.0.1:")
1009        || endpoint.starts_with("http://127.0.0.1/")
1010        || endpoint == "http://[::1]"
1011        || endpoint.starts_with("http://[::1]:")
1012        || endpoint.starts_with("http://[::1]/")
1013}
1014
1015async fn handle_host_register_agent(
1016    req: &JsonRpcMessage,
1017    session: &crate::session::ClientSession,
1018) -> Result<Value, String> {
1019    let request: RegisterHostAgentRequest =
1020        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1021    serde_json::to_value(
1022        session
1023            .host
1024            .register_agent(&session.client_id, request)
1025            .await?,
1026    )
1027    .map_err(|e| e.to_string())
1028}
1029
1030async fn handle_host_unregister_agent(
1031    req: &JsonRpcMessage,
1032    session: &crate::session::ClientSession,
1033) -> Result<Value, String> {
1034    let agent_id = req
1035        .params
1036        .get("agent_id")
1037        .and_then(|v| v.as_str())
1038        .ok_or("missing agent_id")?;
1039    session
1040        .host
1041        .unregister_agent(&session.client_id, agent_id)
1042        .await?;
1043    Ok(serde_json::json!({"ok": true}))
1044}
1045
1046async fn handle_host_set_status(
1047    req: &JsonRpcMessage,
1048    session: &crate::session::ClientSession,
1049) -> Result<Value, String> {
1050    let request: SetHostAgentStatusRequest =
1051        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1052    serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1053        .map_err(|e| e.to_string())
1054}
1055
1056async fn handle_host_notify(
1057    req: &JsonRpcMessage,
1058    session: &crate::session::ClientSession,
1059) -> Result<Value, String> {
1060    let kind = req
1061        .params
1062        .get("kind")
1063        .and_then(|v| v.as_str())
1064        .unwrap_or("host.notification");
1065    let agent_id = req
1066        .params
1067        .get("agent_id")
1068        .and_then(|v| v.as_str())
1069        .map(str::to_string);
1070    let message = req
1071        .params
1072        .get("message")
1073        .and_then(|v| v.as_str())
1074        .unwrap_or("");
1075    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1076    serde_json::to_value(
1077        session
1078            .host
1079            .record_event(kind, agent_id, message, payload)
1080            .await,
1081    )
1082    .map_err(|e| e.to_string())
1083}
1084
1085async fn handle_host_request_approval(
1086    req: &JsonRpcMessage,
1087    session: &crate::session::ClientSession,
1088) -> Result<Value, String> {
1089    let request: CreateHostApprovalRequest =
1090        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1091    if let Some(agent_id) = &request.agent_id {
1092        // Best-effort. If the caller doesn't own this agent the
1093        // ACL added 2026-05 will refuse the status update — that
1094        // is the correct semantics; we still want the approval row
1095        // itself to land so the UI can render the request.
1096        let _ = session
1097            .host
1098            .set_status(
1099                &session.client_id,
1100                SetHostAgentStatusRequest {
1101                    agent_id: agent_id.clone(),
1102                    status: HostAgentStatus::WaitingForApproval,
1103                    current_task: None,
1104                    message: Some("Waiting for approval".to_string()),
1105                    payload: Value::Null,
1106                },
1107            )
1108            .await;
1109    }
1110    serde_json::to_value(
1111        session
1112            .host
1113            .create_approval(Some(&session.client_id), request)
1114            .await?,
1115    )
1116    .map_err(|e| e.to_string())
1117}
1118
1119async fn handle_host_resolve_approval(
1120    req: &JsonRpcMessage,
1121    session: &crate::session::ClientSession,
1122) -> Result<Value, String> {
1123    let request: ResolveHostApprovalRequest =
1124        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1125    serde_json::to_value(
1126        session
1127            .host
1128            .resolve_approval(&session.client_id, request)
1129            .await?,
1130    )
1131    .map_err(|e| e.to_string())
1132}
1133
1134/// `session.auth` — present the per-launch token to unlock the
1135/// connection. When `state.auth_token` is unset, this method is a
1136/// no-op success (auth is disabled). When set, the supplied token
1137/// must equal it (constant-time comparison) — a successful auth
1138/// flips `session.authenticated` to `true` so subsequent methods
1139/// pass the gate. Wrong token returns an error AND leaves the
1140/// session unauthenticated; the dispatcher loop's gate then closes
1141/// the connection on the next non-auth method.
1142///
1143/// Closes Parslee-ai/car-releases#32.
1144async fn handle_session_auth(
1145    req: &JsonRpcMessage,
1146    session: &crate::session::ClientSession,
1147    state: &Arc<ServerState>,
1148) -> Result<Value, String> {
1149    let supplied = req
1150        .params
1151        .get("token")
1152        .and_then(Value::as_str)
1153        .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1154    // #169: optional `agent_id` binds the WS connection to a
1155    // supervised lifecycle agent. When present, the supplied token
1156    // must equal the per-agent token the supervisor minted at upsert
1157    // (NOT the daemon-wide auth token). When absent, fall back to
1158    // the daemon-wide token — preserves the legacy unbound-token
1159    // path for browser/host/CLI clients.
1160    let agent_id = req
1161        .params
1162        .get("agent_id")
1163        .and_then(Value::as_str)
1164        .map(str::to_string);
1165
1166    if let Some(id) = agent_id {
1167        let supervisor = state.supervisor()?;
1168        if !supervisor.validate_agent_token(&id, supplied).await {
1169            return Err(format!(
1170                "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1171            ));
1172        }
1173        // Single-claim: only one connection at a time per
1174        // agent_id. A second claim is rejected so the daemon-side
1175        // per-agent state stays unambiguous.
1176        {
1177            let mut attached = state.attached_agents.lock().await;
1178            if let Some(prior) = attached.get(&id) {
1179                if prior != &session.client_id {
1180                    return Err(format!(
1181                        "auth failed: agent_id `{id}` is already attached on \
1182                         another connection (client_id={prior})"
1183                    ));
1184                }
1185            }
1186            attached.insert(id.clone(), session.client_id.clone());
1187        }
1188        // #170: attach the daemon-owned persistent memgine for
1189        // this agent. Lazy-loaded on first connection per id from
1190        // `~/.car/memory/agents/<id>.jsonl`; retained across
1191        // disconnect so the next session sees the same state.
1192        let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1193        *session.bound_memgine.lock().await = Some(agent_eng);
1194        *session.agent_id.lock().await = Some(id.clone());
1195        session
1196            .authenticated
1197            .store(true, std::sync::atomic::Ordering::Release);
1198        return Ok(serde_json::json!({
1199            "ok": true,
1200            "auth_enabled": true,
1201            "agent_id": id,
1202        }));
1203    }
1204
1205    let expected = match state.auth_token.get() {
1206        Some(t) => t,
1207        None => {
1208            // Auth disabled — accept any token politely so callers
1209            // that always include a session.auth handshake (e.g. the
1210            // FFI proxy) don't fail when the daemon happens to be
1211            // unauthed. Mark the session authenticated anyway so the
1212            // gate is a no-op below.
1213            session
1214                .authenticated
1215                .store(true, std::sync::atomic::Ordering::Release);
1216            return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1217        }
1218    };
1219    if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1220        return Err("auth failed: token mismatch".to_string());
1221    }
1222    session
1223        .authenticated
1224        .store(true, std::sync::atomic::Ordering::Release);
1225    Ok(serde_json::json!({ "ok": true, "auth_enabled": true }))
1226}
1227
1228/// Length-checked constant-time byte comparison. Returns false when
1229/// lengths differ (so length itself is the only timing leak — fine
1230/// for our 43-char fixed-length tokens).
1231fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1232    if a.len() != b.len() {
1233        return false;
1234    }
1235    let mut diff: u8 = 0;
1236    for (x, y) in a.iter().zip(b.iter()) {
1237        diff |= x ^ y;
1238    }
1239    diff == 0
1240}
1241
1242/// Block dispatch of `method` until the user resolves the approval
1243/// raised on [`HostState`].
1244///
1245/// Called from the dispatcher loop only when
1246/// [`crate::session::ApprovalGate::requires_approval`] returns true.
1247/// `Ok(())` means the user picked "approve"; `Err(reason)` is sent
1248/// to the caller as JSON-RPC error code `-32003` with the supplied
1249/// reason. On timeout, the approval row stays in `Pending` so the
1250/// UI keeps a record of the unanswered request.
1251async fn gate_high_risk_method(
1252    method: &str,
1253    params: &Value,
1254    state: &Arc<ServerState>,
1255) -> Result<(), String> {
1256    let timeout = state.approval_gate.timeout;
1257    let req = CreateHostApprovalRequest {
1258        agent_id: None,
1259        action: format!("ws.method:{method}"),
1260        details: serde_json::json!({
1261            "method": method,
1262            // Truncate params for the UI — full payload is recoverable
1263            // via the request-time host event log if needed. The cap
1264            // keeps a malicious caller from drowning the UI in JSON.
1265            "params_preview": preview_params(params, 2_000),
1266        }),
1267        options: vec!["approve".to_string(), "deny".to_string()],
1268    };
1269    match state
1270        .host
1271        .request_and_wait_approval(req, "approve", timeout)
1272        .await
1273    {
1274        Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1275        Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1276            "{method} denied by user (approval gate, audit 2026-05). \
1277             To call this method without an interactive prompt, start \
1278             car-server with --no-approvals on a trusted machine."
1279        )),
1280        Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1281            "{method} approval timed out after {}s with no resolution. \
1282             The approval is still visible in `host.approvals` for \
1283             forensics; resubmit the request to retry.",
1284            timeout.as_secs()
1285        )),
1286        Err(e) => Err(format!("approval gate error: {e}")),
1287    }
1288}
1289
1290fn preview_params(value: &Value, max_chars: usize) -> Value {
1291    let s = value.to_string();
1292    if s.len() <= max_chars {
1293        value.clone()
1294    } else {
1295        Value::String(format!("{}… (truncated)", &s[..max_chars]))
1296    }
1297}
1298
1299async fn handle_session_init(
1300    req: &JsonRpcMessage,
1301    session: &crate::session::ClientSession,
1302) -> Result<Value, String> {
1303    let init: SessionInitRequest =
1304        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1305
1306    for tool in &init.tools {
1307        register_from_definition(&session.runtime, tool).await;
1308    }
1309
1310    let mut policy_count = 0;
1311    {
1312        let mut policies = session.runtime.policies.write().await;
1313        for policy_def in &init.policies {
1314            if let Some(check) = build_policy_check(policy_def) {
1315                policies.register(&policy_def.name, check, "");
1316                policy_count += 1;
1317            }
1318        }
1319    }
1320
1321    serde_json::to_value(SessionInitResponse {
1322        session_id: session.client_id.clone(),
1323        tools_registered: init.tools.len(),
1324        policies_registered: policy_count,
1325    })
1326    .map_err(|e| e.to_string())
1327}
1328
1329fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1330    match def.rule.as_str() {
1331        "deny_tool" => {
1332            let target = def.target.clone();
1333            Some(Box::new(
1334                move |action: &car_ir::Action, _: &car_state::StateStore| {
1335                    if action.tool.as_deref() == Some(&target) {
1336                        Some(format!("tool '{}' denied", target))
1337                    } else {
1338                        None
1339                    }
1340                },
1341            ))
1342        }
1343        "require_state" => {
1344            let key = def.key.clone();
1345            let value = def.value.clone();
1346            Some(Box::new(
1347                move |_: &car_ir::Action, state: &car_state::StateStore| {
1348                    if state.get(&key).as_ref() != Some(&value) {
1349                        Some(format!("state['{}'] must be {:?}", key, value))
1350                    } else {
1351                        None
1352                    }
1353                },
1354            ))
1355        }
1356        "deny_tool_param" => {
1357            let target = def.target.clone();
1358            let param = def.key.clone();
1359            let pattern = def.pattern.clone();
1360            Some(Box::new(
1361                move |action: &car_ir::Action, _: &car_state::StateStore| {
1362                    if action.tool.as_deref() != Some(&target) {
1363                        return None;
1364                    }
1365                    if let Some(val) = action.parameters.get(&param) {
1366                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1367                        if s.contains(&pattern) {
1368                            return Some(format!("param '{}' matches '{}'", param, pattern));
1369                        }
1370                    }
1371                    None
1372                },
1373            ))
1374        }
1375        _ => None,
1376    }
1377}
1378
1379async fn handle_tools_register(
1380    req: &JsonRpcMessage,
1381    session: &crate::session::ClientSession,
1382) -> Result<Value, String> {
1383    let tools: Vec<ToolDefinition> =
1384        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1385    for tool in &tools {
1386        register_from_definition(&session.runtime, tool).await;
1387    }
1388    Ok(Value::from(tools.len()))
1389}
1390
1391/// Bridge a wire-protocol `ToolDefinition` to the engine's
1392/// schema-aware registration. Carries the full ToolSchema shape
1393/// (description, parameters, returns, idempotency, caching, rate
1394/// limit) through to the validator. An empty `parameters` object is
1395/// the legacy schemaless registration — the validator no-ops for
1396/// those, so pre-v0.5.x callers see no change.
1397async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1398    runtime
1399        .register_tool_schema(car_ir::ToolSchema {
1400            name: def.name.clone(),
1401            description: def.description.clone(),
1402            parameters: def.parameters.clone(),
1403            returns: def.returns.clone(),
1404            idempotent: def.idempotent,
1405            cache_ttl_secs: def.cache_ttl_secs,
1406            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1407                max_calls: rl.max_calls,
1408                interval_secs: rl.interval_secs,
1409            }),
1410        })
1411        .await;
1412}
1413
1414async fn handle_proposal_submit(
1415    req: &JsonRpcMessage,
1416    session: &crate::session::ClientSession,
1417) -> Result<Value, String> {
1418    let submit: ProposalSubmitRequest =
1419        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1420    // `session_id` is sibling to `proposal` in the params object —
1421    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
1422    // present, executes the proposal under the named session so any
1423    // session-scoped policies layer on top of global ones.
1424    // See docs/proposals/per-session-policy-scoping.md.
1425    let session_id = req
1426        .params
1427        .get("session_id")
1428        .and_then(|v| v.as_str())
1429        .map(str::to_string);
1430    let result = match session_id {
1431        Some(sid) => session.runtime.execute_with_session(&submit.proposal, &sid).await,
1432        None => session.runtime.execute(&submit.proposal).await,
1433    };
1434    serde_json::to_value(result).map_err(|e| e.to_string())
1435}
1436
1437async fn handle_session_policy_open(
1438    session: &crate::session::ClientSession,
1439) -> Result<Value, String> {
1440    let id = session.runtime.open_session().await;
1441    Ok(serde_json::json!({ "session_id": id }))
1442}
1443
1444async fn handle_session_policy_close(
1445    req: &JsonRpcMessage,
1446    session: &crate::session::ClientSession,
1447) -> Result<Value, String> {
1448    let sid = req
1449        .params
1450        .get("session_id")
1451        .and_then(|v| v.as_str())
1452        .ok_or("missing 'session_id'")?;
1453    let closed = session.runtime.close_session(sid).await;
1454    Ok(serde_json::json!({ "closed": closed }))
1455}
1456
1457/// `policy.register` — register one policy against this WebSocket
1458/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1459/// `session.init`. When `session_id` is present, the policy is scoped
1460/// to the named in-runtime session opened via `session.policy.open`;
1461/// otherwise it is global.
1462async fn handle_policy_register(
1463    req: &JsonRpcMessage,
1464    session: &crate::session::ClientSession,
1465) -> Result<Value, String> {
1466    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1467        .map_err(|e| format!("invalid policy params: {e}"))?;
1468    let session_id = req
1469        .params
1470        .get("session_id")
1471        .and_then(|v| v.as_str())
1472        .map(str::to_string);
1473    let check = build_policy_check(&def).ok_or_else(|| {
1474        format!("unsupported policy rule '{}'", def.rule)
1475    })?;
1476    match session_id {
1477        Some(sid) => session
1478            .runtime
1479            .register_policy_in_session(&sid, &def.name, check, "")
1480            .await
1481            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1482        None => {
1483            let mut policies = session.runtime.policies.write().await;
1484            policies.register(&def.name, check, "");
1485            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1486        }
1487    }
1488}
1489
1490async fn handle_verify(
1491    req: &JsonRpcMessage,
1492    session: &crate::session::ClientSession,
1493) -> Result<Value, String> {
1494    let vr: VerifyRequest =
1495        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1496    let tools: std::collections::HashSet<String> =
1497        session.runtime.tools.read().await.keys().cloned().collect();
1498    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1499    serde_json::to_value(VerifyResponse {
1500        valid: result.valid,
1501        issues: result
1502            .issues
1503            .iter()
1504            .map(|i| VerifyIssueProto {
1505                action_id: i.action_id.clone(),
1506                severity: i.severity.clone(),
1507                message: i.message.clone(),
1508            })
1509            .collect(),
1510        simulated_state: result.simulated_state,
1511    })
1512    .map_err(|e| e.to_string())
1513}
1514
1515async fn handle_state_get(
1516    req: &JsonRpcMessage,
1517    session: &crate::session::ClientSession,
1518) -> Result<Value, String> {
1519    let key = req
1520        .params
1521        .get("key")
1522        .and_then(|v| v.as_str())
1523        .ok_or("missing 'key'")?;
1524    Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
1525}
1526
1527async fn handle_state_set(
1528    req: &JsonRpcMessage,
1529    session: &crate::session::ClientSession,
1530) -> Result<Value, String> {
1531    let key = req
1532        .params
1533        .get("key")
1534        .and_then(|v| v.as_str())
1535        .ok_or("missing 'key'")?;
1536    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1537    session.runtime.state.set(key, value, "client");
1538    Ok(Value::from("ok"))
1539}
1540
1541/// `state.exists` — true if the key is set in this session's state
1542/// store, false otherwise. Cheaper than `state.get` + null-check on
1543/// the client side because it doesn't serialize the value.
1544async fn handle_state_exists(
1545    req: &JsonRpcMessage,
1546    session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548    let key = req
1549        .params
1550        .get("key")
1551        .and_then(|v| v.as_str())
1552        .ok_or("missing 'key'")?;
1553    Ok(Value::Bool(session.runtime.state.exists(key)))
1554}
1555
1556/// `state.keys` — list every key currently set in this session's
1557/// state store. Returns a JSON array of strings.
1558async fn handle_state_keys(session: &crate::session::ClientSession) -> Result<Value, String> {
1559    Ok(Value::Array(
1560        session
1561            .runtime
1562            .state
1563            .keys()
1564            .into_iter()
1565            .map(Value::String)
1566            .collect(),
1567    ))
1568}
1569
1570/// `state.snapshot` — return the entire session state store as a
1571/// JSON object (`{ key: value, ... }`). Equivalent to iterating
1572/// `state.keys` + `state.get` but in a single round-trip; for
1573/// inspectors/dashboards.
1574async fn handle_state_snapshot(
1575    session: &crate::session::ClientSession,
1576) -> Result<Value, String> {
1577    Ok(serde_json::to_value(session.runtime.state.snapshot())
1578        .map_err(|e| format!("serialize snapshot: {e}"))?)
1579}
1580
1581// --- Per-agent persistent memgine (#170) ---
1582
1583/// `~/.car/memory/agents/<id>.json` — the per-agent snapshot file.
1584/// Mirrors the existing `memory.persist` shape (flat JSON array of
1585/// fact objects) so the same loader path works.
1586fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1587    let base = car_ffi_common::memory_path::ensure_base()
1588        .map_err(|e| format!("memory base unavailable: {e}"))?;
1589    let dir = base.join("agents");
1590    std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1591    Ok(dir.join(format!("{agent_id}.json")))
1592}
1593
1594/// Acquire (or lazy-create + load from disk) the daemon-owned
1595/// persistent memgine for `agent_id`. First call per id reads
1596/// `~/.car/memory/agents/<id>.json` if it exists; subsequent calls
1597/// share the in-memory engine across sessions. Caller stores the
1598/// returned `Arc` on `ClientSession.bound_memgine` so memory.*
1599/// handlers route through it via [`ClientSession::effective_memgine`].
1600async fn get_or_load_agent_memgine(
1601    state: &Arc<ServerState>,
1602    agent_id: &str,
1603) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1604    {
1605        let map = state.agent_memgines.lock().await;
1606        if let Some(eng) = map.get(agent_id) {
1607            return Ok(eng.clone());
1608        }
1609    }
1610    // Build a fresh engine and try to load from disk.
1611    let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(None)));
1612    let path = agent_memgine_snapshot_path(agent_id)?;
1613    if path.exists() {
1614        let content = std::fs::read_to_string(&path)
1615            .map_err(|e| format!("read {}: {}", path.display(), e))?;
1616        let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1617        let mut g = engine.lock().await;
1618        let mut loaded: u32 = 0;
1619        for fact in &facts {
1620            let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1621            let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1622            let kind = fact
1623                .get("kind")
1624                .and_then(|v| v.as_str())
1625                .unwrap_or("pattern");
1626            let fid = format!("loaded-{loaded}");
1627            g.ingest_fact(
1628                &fid,
1629                subject,
1630                body,
1631                "user",
1632                "peer",
1633                chrono::Utc::now(),
1634                "global",
1635                None,
1636                vec![],
1637                kind == "constraint",
1638            );
1639            loaded += 1;
1640        }
1641    }
1642    let mut map = state.agent_memgines.lock().await;
1643    let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1644    Ok(stored)
1645}
1646
1647/// Snapshot the agent's memgine to its disk file. Same on-wire shape
1648/// as `memory.persist` so manual snapshots and the daemon-owned
1649/// persistence stay interoperable.
1650async fn persist_agent_memgine(
1651    agent_id: &str,
1652    engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1653) -> Result<(), String> {
1654    let path = agent_memgine_snapshot_path(agent_id)?;
1655    let g = engine.lock().await;
1656    let facts: Vec<Value> = g
1657        .graph
1658        .inner
1659        .node_indices()
1660        .filter_map(|nix| {
1661            let node = g.graph.inner.node_weight(nix)?;
1662            if !node.is_valid() {
1663                return None;
1664            }
1665            if node.kind == car_memgine::MemKind::Identity
1666                || node.kind == car_memgine::MemKind::Environment
1667            {
1668                return None;
1669            }
1670            Some(serde_json::json!({
1671                "subject": node.key,
1672                "body": node.value,
1673                "kind": match node.kind {
1674                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1675                    car_memgine::MemKind::Conversation => "outcome",
1676                    _ => "pattern",
1677                },
1678                "confidence": 0.5,
1679                "content_type": node.content_type.as_label(),
1680            }))
1681        })
1682        .collect();
1683    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1684    std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
1685    Ok(())
1686}
1687
1688// --- Memory handlers ---
1689
1690/// `memory.fact_count` — return `valid_fact_count()` of the
1691/// session's memgine. Used by FFI bindings to mirror their
1692/// embedded `fact_count()` accessor without round-tripping a full
1693/// query. No params.
1694async fn handle_memory_fact_count(
1695    session: &crate::session::ClientSession,
1696) -> Result<Value, String> {
1697    let engine_arc = session.effective_memgine().await;
1698    let engine = engine_arc.lock().await;
1699    Ok(Value::from(engine.valid_fact_count()))
1700}
1701
1702async fn handle_memory_add_fact(
1703    req: &JsonRpcMessage,
1704    session: &crate::session::ClientSession,
1705) -> Result<Value, String> {
1706    let subject = req
1707        .params
1708        .get("subject")
1709        .and_then(|v| v.as_str())
1710        .ok_or("missing subject")?;
1711    let body = req
1712        .params
1713        .get("body")
1714        .and_then(|v| v.as_str())
1715        .ok_or("missing body")?;
1716    let kind = req
1717        .params
1718        .get("kind")
1719        .and_then(|v| v.as_str())
1720        .unwrap_or("pattern");
1721    // Route through `effective_memgine` so connections bound to a
1722    // lifecycle agent (#169) write into the daemon-owned per-agent
1723    // memgine instead of the per-WS ephemeral one (#170).
1724    let engine_arc = session.effective_memgine().await;
1725    let count = {
1726        let mut engine = engine_arc.lock().await;
1727        let fid = format!("ws-{}", engine.valid_fact_count());
1728        engine.ingest_fact(
1729            &fid,
1730            subject,
1731            body,
1732            "user",
1733            "peer",
1734            chrono::Utc::now(),
1735            "global",
1736            None,
1737            vec![],
1738            kind == "constraint",
1739        );
1740        engine.valid_fact_count()
1741    };
1742    // Persist after every add when the session is bound to a
1743    // supervised agent. Synchronous write — small JSON snapshot.
1744    if let Some(id) = session.agent_id.lock().await.clone() {
1745        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
1746            tracing::warn!(agent_id = %id, error = %e,
1747                "agent memgine persist failed; in-memory state is canonical");
1748        }
1749    }
1750    Ok(Value::from(count))
1751}
1752
1753async fn handle_memory_query(
1754    req: &JsonRpcMessage,
1755    session: &crate::session::ClientSession,
1756) -> Result<Value, String> {
1757    let query = req
1758        .params
1759        .get("query")
1760        .and_then(|v| v.as_str())
1761        .ok_or("missing query")?;
1762    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
1763    let engine_arc = session.effective_memgine().await;
1764    let engine = engine_arc.lock().await;
1765    let seeds = engine.graph.find_seeds(query, 5);
1766    // FFI parity with NAPI `query_facts` (car-ffi-napi/src/lib.rs:577) —
1767    // both use Personalized PageRank so transport choice doesn't shift
1768    // ranking semantics. Result shape (subject/body/kind/confidence)
1769    // also mirrors NAPI for the same reason.
1770    let hits = if !seeds.is_empty() {
1771        engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
1772    } else {
1773        vec![]
1774    };
1775    let results: Vec<Value> = hits.iter().filter_map(|hit| {
1776        let node = engine.graph.inner.node_weight(hit.node_ix)?;
1777        Some(serde_json::json!({
1778            "subject": node.key,
1779            "body": node.value,
1780            "kind": format!("{:?}", node.kind).to_lowercase(),
1781            "confidence": hit.activation,
1782        }))
1783    }).collect();
1784    serde_json::to_value(results).map_err(|e| e.to_string())
1785}
1786
1787async fn handle_memory_build_context(
1788    req: &JsonRpcMessage,
1789    session: &crate::session::ClientSession,
1790) -> Result<Value, String> {
1791    let query = req
1792        .params
1793        .get("query")
1794        .and_then(|v| v.as_str())
1795        .unwrap_or("");
1796    // FFI parity with NAPI `build_context(query, model_context_window)`.
1797    // When supplied, sizes the assembly budget against the model's window
1798    // instead of the fixed 8K default.
1799    let model_context_window = req
1800        .params
1801        .get("model_context_window")
1802        .and_then(|v| v.as_u64())
1803        .map(|w| w as usize);
1804    let mut engine = session.memgine.lock().await;
1805    Ok(Value::from(engine.build_context_for_model(query, model_context_window)))
1806}
1807
1808/// `memory.build_context_fast` — Fast-mode context assembly for
1809/// latency-sensitive paths (voice, real-time). Skips embedding flush,
1810/// skill lookup, PPR-based scoring, inline repairs, known-unknowns
1811/// extraction. Keeps identity, constraints, facts (creation order),
1812/// conversation, environment.
1813async fn handle_memory_build_context_fast(
1814    req: &JsonRpcMessage,
1815    session: &crate::session::ClientSession,
1816) -> Result<Value, String> {
1817    let query = req
1818        .params
1819        .get("query")
1820        .and_then(|v| v.as_str())
1821        .unwrap_or("");
1822    let model_context_window = req
1823        .params
1824        .get("model_context_window")
1825        .and_then(|v| v.as_u64())
1826        .map(|w| w as usize);
1827    let mut engine = session.memgine.lock().await;
1828    Ok(Value::from(engine.build_context_with_options(
1829        query,
1830        model_context_window,
1831        car_memgine::ContextMode::Fast,
1832    )))
1833}
1834
1835/// `memory.persist` — write the session's memgine to a JSON file
1836/// at `path`. Mirrors NAPI `persist_memory` (car-ffi-napi/src/lib.rs:797)
1837/// so daemon-mode clients can drive checkpoint/restore symmetrically
1838/// with embedded mode. Returns the number of facts written.
1839///
1840/// Filesystem caveat: `path` is interpreted on the daemon's filesystem,
1841/// not the caller's. Since the 2026-05 audit, `path` is also
1842/// sandboxed under `~/.car/memory/` via
1843/// [`car_ffi_common::memory_path::resolve`] — relative paths land
1844/// under the base, absolute paths must already be under the base,
1845/// `..` segments are rejected, symlinks pointing out are rejected.
1846/// Pre-2026-05 the path was passed straight to `std::fs::write` and
1847/// became an arbitrary file-write primitive. The base64-blob escape
1848/// hatch tracked in `Parslee-ai/car-releases#31` will plug into the
1849/// same resolver when it lands.
1850async fn handle_memory_persist(
1851    req: &JsonRpcMessage,
1852    session: &crate::session::ClientSession,
1853) -> Result<Value, String> {
1854    let path = req
1855        .params
1856        .get("path")
1857        .and_then(|v| v.as_str())
1858        .ok_or("missing path")?;
1859    let resolved = car_ffi_common::memory_path::resolve(path)
1860        .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
1861    let engine = session.memgine.lock().await;
1862    let facts: Vec<Value> = engine
1863        .graph
1864        .inner
1865        .node_indices()
1866        .filter_map(|nix| {
1867            let node = engine.graph.inner.node_weight(nix)?;
1868            if !node.is_valid() {
1869                return None;
1870            }
1871            if node.kind == car_memgine::MemKind::Identity
1872                || node.kind == car_memgine::MemKind::Environment
1873            {
1874                return None;
1875            }
1876            Some(serde_json::json!({
1877                "subject": node.key,
1878                "body": node.value,
1879                "kind": match node.kind {
1880                    car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1881                    car_memgine::MemKind::Conversation => "outcome",
1882                    _ => "pattern",
1883                },
1884                "confidence": 0.5,
1885                "content_type": node.content_type.as_label(),
1886            }))
1887        })
1888        .collect();
1889    let count = facts.len();
1890    let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1891    std::fs::write(&resolved, json)
1892        .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
1893    Ok(Value::from(count as u64))
1894}
1895
1896/// `memory.load` — replace the session's memgine with facts from the
1897/// JSON file at `path`. Mirrors NAPI `load_memory`
1898/// (car-ffi-napi/src/lib.rs:121). Same `~/.car/memory/` sandboxing
1899/// as `memory.persist` since the 2026-05 audit — relative paths
1900/// land under the base, anything that escapes is rejected.
1901async fn handle_memory_load(
1902    req: &JsonRpcMessage,
1903    session: &crate::session::ClientSession,
1904) -> Result<Value, String> {
1905    let path = req
1906        .params
1907        .get("path")
1908        .and_then(|v| v.as_str())
1909        .ok_or("missing path")?;
1910    let resolved = car_ffi_common::memory_path::resolve(path)
1911        .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
1912    let content = std::fs::read_to_string(&resolved)
1913        .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
1914    let facts: Vec<Value> =
1915        serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
1916    let mut engine = session.memgine.lock().await;
1917    engine.reset();
1918    let mut count: u32 = 0;
1919    for fact in &facts {
1920        let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1921        let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1922        let kind = fact
1923            .get("kind")
1924            .and_then(|v| v.as_str())
1925            .unwrap_or("pattern");
1926        let fid = format!("loaded-{}", count);
1927        engine.ingest_fact(
1928            &fid,
1929            subject,
1930            body,
1931            "user",
1932            "peer",
1933            chrono::Utc::now(),
1934            "global",
1935            None,
1936            vec![],
1937            kind == "constraint",
1938        );
1939        count += 1;
1940    }
1941    Ok(Value::from(count))
1942}
1943
1944// --- Skill handlers ---
1945
1946async fn handle_skill_ingest(
1947    req: &JsonRpcMessage,
1948    session: &crate::session::ClientSession,
1949) -> Result<Value, String> {
1950    let name = req
1951        .params
1952        .get("name")
1953        .and_then(|v| v.as_str())
1954        .ok_or("missing name")?;
1955    let code = req
1956        .params
1957        .get("code")
1958        .and_then(|v| v.as_str())
1959        .ok_or("missing code")?;
1960    let platform = req
1961        .params
1962        .get("platform")
1963        .and_then(|v| v.as_str())
1964        .unwrap_or("unknown");
1965    let persona = req
1966        .params
1967        .get("persona")
1968        .and_then(|v| v.as_str())
1969        .unwrap_or("");
1970    let url_pattern = req
1971        .params
1972        .get("url_pattern")
1973        .and_then(|v| v.as_str())
1974        .unwrap_or("");
1975    let description = req
1976        .params
1977        .get("description")
1978        .and_then(|v| v.as_str())
1979        .unwrap_or("");
1980    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
1981    let keywords: Vec<String> = req
1982        .params
1983        .get("task_keywords")
1984        .and_then(|v| v.as_array())
1985        .map(|arr| {
1986            arr.iter()
1987                .filter_map(|v| v.as_str().map(String::from))
1988                .collect()
1989        })
1990        .unwrap_or_default();
1991
1992    let trigger = car_memgine::SkillTrigger {
1993        persona: persona.into(),
1994        url_pattern: url_pattern.into(),
1995        task_keywords: keywords,
1996    };
1997    let mut engine = session.memgine.lock().await;
1998    let node = engine.ingest_skill(
1999        name,
2000        code,
2001        platform,
2002        trigger,
2003        description,
2004        supersedes,
2005        vec![],
2006        vec![],
2007    );
2008    Ok(Value::from(node.index() as u64))
2009}
2010
2011async fn handle_skill_find(
2012    req: &JsonRpcMessage,
2013    session: &crate::session::ClientSession,
2014) -> Result<Value, String> {
2015    let persona = req
2016        .params
2017        .get("persona")
2018        .and_then(|v| v.as_str())
2019        .unwrap_or("");
2020    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2021    let task = req
2022        .params
2023        .get("task")
2024        .and_then(|v| v.as_str())
2025        .unwrap_or("");
2026    let max = req
2027        .params
2028        .get("max_results")
2029        .and_then(|v| v.as_u64())
2030        .unwrap_or(1) as usize;
2031    let engine = session.memgine.lock().await;
2032    let results = engine.find_skill(persona, url, task, max);
2033    let json: Vec<Value> = results
2034        .iter()
2035        .map(|(m, s)| {
2036            serde_json::json!({
2037                "name": m.name, "code": m.code, "platform": m.platform,
2038                "description": m.description, "stats": m.stats, "match_score": s,
2039            })
2040        })
2041        .collect();
2042    serde_json::to_value(json).map_err(|e| e.to_string())
2043}
2044
2045async fn handle_skill_report(
2046    req: &JsonRpcMessage,
2047    session: &crate::session::ClientSession,
2048) -> Result<Value, String> {
2049    let name = req
2050        .params
2051        .get("skill_name")
2052        .and_then(|v| v.as_str())
2053        .ok_or("missing skill_name")?;
2054    let outcome_str = req
2055        .params
2056        .get("outcome")
2057        .and_then(|v| v.as_str())
2058        .ok_or("missing outcome")?;
2059    let outcome = match outcome_str {
2060        "success" => car_memgine::SkillOutcome::Success,
2061        _ => car_memgine::SkillOutcome::Fail,
2062    };
2063    let mut engine = session.memgine.lock().await;
2064    let stats = engine
2065        .report_outcome(name, outcome)
2066        .ok_or(format!("skill '{}' not found", name))?;
2067    serde_json::to_value(stats).map_err(|e| e.to_string())
2068}
2069
2070// ---------------------------------------------------------------------------
2071// Multi-agent coordination handlers
2072//
2073// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
2074// The client runs the model loop and responds with AgentOutput JSON.
2075// ---------------------------------------------------------------------------
2076
2077/// AgentRunner backed by WebSocket callback to the client.
2078struct WsAgentRunner {
2079    channel: Arc<WsChannel>,
2080    host: Arc<crate::host::HostState>,
2081    client_id: String,
2082}
2083
2084#[async_trait::async_trait]
2085impl car_multi::AgentRunner for WsAgentRunner {
2086    async fn run(
2087        &self,
2088        spec: &car_multi::AgentSpec,
2089        task: &str,
2090        _runtime: &car_engine::Runtime,
2091        _mailbox: &car_multi::Mailbox,
2092    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2093        use futures::SinkExt;
2094
2095        let request_id = self.channel.next_request_id();
2096        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2097        let agent = self
2098            .host
2099            .register_agent(
2100                &self.client_id,
2101                RegisterHostAgentRequest {
2102                    id: Some(agent_id.clone()),
2103                    name: spec.name.clone(),
2104                    kind: "callback".to_string(),
2105                    capabilities: spec.tools.clone(),
2106                    project: spec
2107                        .metadata
2108                        .get("project")
2109                        .and_then(|v| v.as_str())
2110                        .map(str::to_string),
2111                    pid: None,
2112                    display: serde_json::from_value(
2113                        spec.metadata
2114                            .get("display")
2115                            .cloned()
2116                            .unwrap_or(serde_json::Value::Null),
2117                    )
2118                    .unwrap_or_default(),
2119                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2120                },
2121            )
2122            .await
2123            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2124        let _ = self
2125            .host
2126            .set_status(&self.client_id, SetHostAgentStatusRequest {
2127                agent_id: agent.id.clone(),
2128                status: HostAgentStatus::Running,
2129                current_task: Some(task.to_string()),
2130                message: Some(format!("{} started", spec.name)),
2131                payload: serde_json::json!({ "task": task }),
2132            })
2133            .await;
2134
2135        let rpc_request = serde_json::json!({
2136            "jsonrpc": "2.0",
2137            "method": "multi.run_agent",
2138            "params": {
2139                "spec": spec,
2140                "task": task,
2141            },
2142            "id": request_id,
2143        });
2144
2145        // Create oneshot channel for the response
2146        let (tx, rx) = tokio::sync::oneshot::channel();
2147        self.channel
2148            .pending
2149            .lock()
2150            .await
2151            .insert(request_id.clone(), tx);
2152
2153        let msg = Message::Text(
2154            serde_json::to_string(&rpc_request)
2155                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2156                .into(),
2157        );
2158        if let Err(e) = self.channel.write.lock().await.send(msg).await {
2159            let _ = self
2160                .host
2161                .set_status(&self.client_id, SetHostAgentStatusRequest {
2162                    agent_id: agent_id.clone(),
2163                    status: HostAgentStatus::Errored,
2164                    current_task: None,
2165                    message: Some(format!("{} failed to start", spec.name)),
2166                    payload: serde_json::json!({ "error": e.to_string() }),
2167                })
2168                .await;
2169            return Err(car_multi::MultiError::AgentFailed(
2170                spec.name.clone(),
2171                format!("ws send error: {}", e),
2172            ));
2173        }
2174
2175        // Wait for client response (5 min timeout for model loops)
2176        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2177            Ok(Ok(response)) => response,
2178            Ok(Err(_)) => {
2179                let _ = self
2180                    .host
2181                    .set_status(&self.client_id, SetHostAgentStatusRequest {
2182                        agent_id: agent_id.clone(),
2183                        status: HostAgentStatus::Errored,
2184                        current_task: None,
2185                        message: Some(format!("{} callback channel closed", spec.name)),
2186                        payload: Value::Null,
2187                    })
2188                    .await;
2189                return Err(car_multi::MultiError::AgentFailed(
2190                    spec.name.clone(),
2191                    "agent callback channel closed".into(),
2192                ));
2193            }
2194            Err(_) => {
2195                let _ = self
2196                    .host
2197                    .set_status(&self.client_id, SetHostAgentStatusRequest {
2198                        agent_id: agent_id.clone(),
2199                        status: HostAgentStatus::Errored,
2200                        current_task: None,
2201                        message: Some(format!("{} timed out", spec.name)),
2202                        payload: Value::Null,
2203                    })
2204                    .await;
2205                return Err(car_multi::MultiError::AgentFailed(
2206                    spec.name.clone(),
2207                    "agent callback timed out (300s)".into(),
2208                ));
2209            }
2210        };
2211
2212        if let Some(err) = response.error {
2213            let _ = self
2214                .host
2215                .set_status(&self.client_id, SetHostAgentStatusRequest {
2216                    agent_id: agent_id.clone(),
2217                    status: HostAgentStatus::Errored,
2218                    current_task: None,
2219                    message: Some(format!("{} errored", spec.name)),
2220                    payload: serde_json::json!({ "error": err }),
2221                })
2222                .await;
2223            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2224        }
2225
2226        let output_value = response.output.unwrap_or(Value::Null);
2227        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2228            car_multi::MultiError::AgentFailed(
2229                spec.name.clone(),
2230                format!("invalid AgentOutput: {}", e),
2231            )
2232        })?;
2233        let status = if output.error.is_some() {
2234            HostAgentStatus::Errored
2235        } else {
2236            HostAgentStatus::Completed
2237        };
2238        let message = if output.error.is_some() {
2239            format!("{} errored", spec.name)
2240        } else {
2241            format!("{} completed", spec.name)
2242        };
2243        let _ = self
2244            .host
2245            .set_status(&self.client_id, SetHostAgentStatusRequest {
2246                agent_id,
2247                status,
2248                current_task: None,
2249                message: Some(message),
2250                payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2251            })
2252            .await;
2253
2254        Ok(output)
2255    }
2256}
2257
2258fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2259    let safe_name: String = name
2260        .chars()
2261        .map(|c| {
2262            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2263                c
2264            } else {
2265                '-'
2266            }
2267        })
2268        .collect();
2269    format!("{}:{}:{}", client_id, safe_name, request_id)
2270}
2271
2272async fn handle_multi_swarm(
2273    req: &JsonRpcMessage,
2274    session: &crate::session::ClientSession,
2275) -> Result<Value, String> {
2276    let mode_str = req
2277        .params
2278        .get("mode")
2279        .and_then(|v| v.as_str())
2280        .ok_or("missing 'mode'")?;
2281    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2282    let task = req
2283        .params
2284        .get("task")
2285        .and_then(|v| v.as_str())
2286        .ok_or("missing 'task'")?;
2287
2288    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2289        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2290    let agent_specs: Vec<car_multi::AgentSpec> =
2291        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2292    let synth: Option<car_multi::AgentSpec> = req
2293        .params
2294        .get("synthesizer")
2295        .map(|v| serde_json::from_value(v.clone()))
2296        .transpose()
2297        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2298
2299    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2300        channel: session.channel.clone(),
2301        host: session.host.clone(),
2302        client_id: session.client_id.clone(),
2303    });
2304    let infra = car_multi::SharedInfra::new();
2305
2306    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2307    if let Some(s) = synth {
2308        swarm = swarm.with_synthesizer(s);
2309    }
2310
2311    let result = swarm
2312        .run(task, &runner, &infra)
2313        .await
2314        .map_err(|e| format!("swarm error: {}", e))?;
2315    serde_json::to_value(result).map_err(|e| e.to_string())
2316}
2317
2318async fn handle_multi_pipeline(
2319    req: &JsonRpcMessage,
2320    session: &crate::session::ClientSession,
2321) -> Result<Value, String> {
2322    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2323    let task = req
2324        .params
2325        .get("task")
2326        .and_then(|v| v.as_str())
2327        .ok_or("missing 'task'")?;
2328
2329    let stage_specs: Vec<car_multi::AgentSpec> =
2330        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2331
2332    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2333        channel: session.channel.clone(),
2334        host: session.host.clone(),
2335        client_id: session.client_id.clone(),
2336    });
2337    let infra = car_multi::SharedInfra::new();
2338
2339    let result = car_multi::Pipeline::new(stage_specs)
2340        .run(task, &runner, &infra)
2341        .await
2342        .map_err(|e| format!("pipeline error: {}", e))?;
2343    serde_json::to_value(result).map_err(|e| e.to_string())
2344}
2345
2346async fn handle_multi_supervisor(
2347    req: &JsonRpcMessage,
2348    session: &crate::session::ClientSession,
2349) -> Result<Value, String> {
2350    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2351    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2352    let task = req
2353        .params
2354        .get("task")
2355        .and_then(|v| v.as_str())
2356        .ok_or("missing 'task'")?;
2357    let max_rounds = req
2358        .params
2359        .get("max_rounds")
2360        .and_then(|v| v.as_u64())
2361        .unwrap_or(3) as u32;
2362
2363    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2364        .map_err(|e| format!("invalid workers: {}", e))?;
2365    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2366        .map_err(|e| format!("invalid supervisor: {}", e))?;
2367
2368    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2369        channel: session.channel.clone(),
2370        host: session.host.clone(),
2371        client_id: session.client_id.clone(),
2372    });
2373    let infra = car_multi::SharedInfra::new();
2374
2375    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2376        .with_max_rounds(max_rounds)
2377        .run(task, &runner, &infra)
2378        .await
2379        .map_err(|e| format!("supervisor error: {}", e))?;
2380    serde_json::to_value(result).map_err(|e| e.to_string())
2381}
2382
2383async fn handle_multi_map_reduce(
2384    req: &JsonRpcMessage,
2385    session: &crate::session::ClientSession,
2386) -> Result<Value, String> {
2387    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2388    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2389    let task = req
2390        .params
2391        .get("task")
2392        .and_then(|v| v.as_str())
2393        .ok_or("missing 'task'")?;
2394    let items_val = req.params.get("items").ok_or("missing 'items'")?;
2395
2396    let mapper_spec: car_multi::AgentSpec =
2397        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2398    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2399        .map_err(|e| format!("invalid reducer: {}", e))?;
2400    let items: Vec<String> =
2401        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2402
2403    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2404        channel: session.channel.clone(),
2405        host: session.host.clone(),
2406        client_id: session.client_id.clone(),
2407    });
2408    let infra = car_multi::SharedInfra::new();
2409
2410    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2411        .run(task, &items, &runner, &infra)
2412        .await
2413        .map_err(|e| format!("map_reduce error: {}", e))?;
2414    serde_json::to_value(result).map_err(|e| e.to_string())
2415}
2416
2417async fn handle_multi_vote(
2418    req: &JsonRpcMessage,
2419    session: &crate::session::ClientSession,
2420) -> Result<Value, String> {
2421    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2422    let task = req
2423        .params
2424        .get("task")
2425        .and_then(|v| v.as_str())
2426        .ok_or("missing 'task'")?;
2427
2428    let agent_specs: Vec<car_multi::AgentSpec> =
2429        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2430    let synth: Option<car_multi::AgentSpec> = req
2431        .params
2432        .get("synthesizer")
2433        .map(|v| serde_json::from_value(v.clone()))
2434        .transpose()
2435        .map_err(|e| format!("invalid synthesizer: {}", e))?;
2436
2437    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2438        channel: session.channel.clone(),
2439        host: session.host.clone(),
2440        client_id: session.client_id.clone(),
2441    });
2442    let infra = car_multi::SharedInfra::new();
2443
2444    let mut vote = car_multi::Vote::new(agent_specs);
2445    if let Some(s) = synth {
2446        vote = vote.with_synthesizer(s);
2447    }
2448
2449    let result = vote
2450        .run(task, &runner, &infra)
2451        .await
2452        .map_err(|e| format!("vote error: {}", e))?;
2453    serde_json::to_value(result).map_err(|e| e.to_string())
2454}
2455
2456// ---------------------------------------------------------------------------
2457// Scheduler handlers
2458// ---------------------------------------------------------------------------
2459
2460fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2461    let name = req
2462        .params
2463        .get("name")
2464        .and_then(|v| v.as_str())
2465        .ok_or("scheduler.create requires 'name'")?;
2466    let prompt = req
2467        .params
2468        .get("prompt")
2469        .and_then(|v| v.as_str())
2470        .ok_or("scheduler.create requires 'prompt'")?;
2471
2472    let mut task = car_scheduler::Task::new(name, prompt);
2473
2474    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2475        let trigger = match t {
2476            "once" => car_scheduler::TaskTrigger::Once,
2477            "cron" => car_scheduler::TaskTrigger::Cron,
2478            "interval" => car_scheduler::TaskTrigger::Interval,
2479            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2480            _ => car_scheduler::TaskTrigger::Manual,
2481        };
2482        let schedule = req
2483            .params
2484            .get("schedule")
2485            .and_then(|v| v.as_str())
2486            .unwrap_or("");
2487        task = task.with_trigger(trigger, schedule);
2488    }
2489
2490    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2491        task = task.with_system_prompt(sp);
2492    }
2493
2494    serde_json::to_value(&task).map_err(|e| e.to_string())
2495}
2496
2497async fn handle_scheduler_run(
2498    req: &JsonRpcMessage,
2499    session: &crate::session::ClientSession,
2500) -> Result<Value, String> {
2501    let task_val = req
2502        .params
2503        .get("task")
2504        .ok_or("scheduler.run requires 'task'")?;
2505    let mut task: car_scheduler::Task =
2506        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2507
2508    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2509        channel: session.channel.clone(),
2510        host: session.host.clone(),
2511        client_id: session.client_id.clone(),
2512    });
2513    let executor = car_scheduler::Executor::new(runner);
2514    let execution = executor.run_once(&mut task).await;
2515
2516    serde_json::to_value(&execution).map_err(|e| e.to_string())
2517}
2518
2519async fn handle_scheduler_run_loop(
2520    req: &JsonRpcMessage,
2521    session: &crate::session::ClientSession,
2522) -> Result<Value, String> {
2523    let task_val = req
2524        .params
2525        .get("task")
2526        .ok_or("scheduler.run_loop requires 'task'")?;
2527    let mut task: car_scheduler::Task =
2528        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2529    let max_iterations = req
2530        .params
2531        .get("max_iterations")
2532        .and_then(|v| v.as_u64())
2533        .map(|v| v as u32);
2534
2535    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2536        channel: session.channel.clone(),
2537        host: session.host.clone(),
2538        client_id: session.client_id.clone(),
2539    });
2540    let executor = car_scheduler::Executor::new(runner);
2541    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2542    let executions = executor
2543        .run_loop(&mut task, max_iterations, cancel_rx)
2544        .await;
2545
2546    serde_json::to_value(&executions).map_err(|e| e.to_string())
2547}
2548
2549// ---------------------------------------------------------------------------
2550// Inference handlers
2551// ---------------------------------------------------------------------------
2552
2553fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2554    state.inference.get_or_init(|| {
2555        Arc::new(car_inference::InferenceEngine::new(
2556            car_inference::InferenceConfig::default(),
2557        ))
2558    })
2559}
2560
2561async fn handle_infer(
2562    msg: &JsonRpcMessage,
2563    state: &ServerState,
2564    session: &crate::session::ClientSession,
2565) -> Result<Value, String> {
2566    let engine = get_inference_engine(state);
2567    let mut req: car_inference::GenerateRequest =
2568        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2569
2570    // If context_query is provided, build context from memgine and inject it
2571    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2572        let mut memgine = session.memgine.lock().await;
2573        let ctx = memgine.build_context(cq);
2574        if !ctx.is_empty() {
2575            req.context = Some(ctx);
2576        }
2577    }
2578
2579    // Process-wide admission gate. Held for the duration of the
2580    // generation so a burst of concurrent infer RPCs can't multiply
2581    // KV-cache + activation memory and take the host out. The
2582    // `_permit` binding is intentional — its `Drop` releases the slot
2583    // when this future returns.
2584    let _permit = state.admission.acquire().await;
2585
2586    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
2587    // latency_ms are preserved in the response. Plain `generate()` discards
2588    // everything except `.text`, which silently breaks tool-use over the
2589    // WebSocket protocol (issue #43).
2590    //
2591    // NOTE: This directly serializes `InferenceResult`. Any field added to
2592    // that struct in `car-inference` becomes part of the public WebSocket
2593    // protocol. The shape is locked by `inference_result_serializes_*` tests
2594    // in car-inference; updating those tests is part of intentionally
2595    // changing the wire contract.
2596    let result = engine
2597        .generate_tracked(req)
2598        .await
2599        .map_err(|e| e.to_string())?;
2600    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2601}
2602
2603/// Streaming inference — mirrors NAPI `inferStream`. Closes
2604/// Parslee-ai/car-releases#30. Same `GenerateRequest` shape as
2605/// `infer`; emits `inference.stream.event` JSON-RPC notifications
2606/// during the run, and returns the final `InferenceResult` as the
2607/// JSON-RPC response when the stream completes.
2608///
2609/// Notification shape (server → client):
2610/// ```jsonc
2611/// {
2612///   "jsonrpc": "2.0",
2613///   "method": "inference.stream.event",
2614///   "params": {
2615///     "request_id": "<original RPC id>",
2616///     "event": { "type": "text" | "tool_start" | "tool_delta" | "usage", ... }
2617///   }
2618/// }
2619/// ```
2620///
2621/// The final `done` event is not pushed as a notification — it's
2622/// the JSON-RPC response with the accumulated `InferenceResult`.
2623async fn handle_infer_stream(
2624    msg: &JsonRpcMessage,
2625    session: &crate::session::ClientSession,
2626    state: &ServerState,
2627) -> Result<Value, String> {
2628    use futures::SinkExt;
2629    use tokio_tungstenite::tungstenite::Message;
2630
2631    let engine = get_inference_engine(state);
2632    let mut req: car_inference::GenerateRequest =
2633        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2634
2635    // Same context-injection convenience as non-streaming `infer` so
2636    // the two methods have parity on the call shape.
2637    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2638        let mut memgine = session.memgine.lock().await;
2639        let ctx = memgine.build_context(cq);
2640        if !ctx.is_empty() {
2641            req.context = Some(ctx);
2642        }
2643    }
2644
2645    let _permit = state.admission.acquire().await;
2646    let mut rx = engine
2647        .generate_tracked_stream(req)
2648        .await
2649        .map_err(|e| e.to_string())?;
2650
2651    let mut accumulator = car_inference::StreamAccumulator::default();
2652    let request_id = msg.id.clone();
2653
2654    while let Some(event) = rx.recv().await {
2655        let event_payload = match &event {
2656            car_inference::StreamEvent::TextDelta(text) => {
2657                serde_json::json!({"type": "text", "data": text})
2658            }
2659            car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
2660                serde_json::json!({"type": "tool_start", "name": name, "index": index})
2661            }
2662            car_inference::StreamEvent::ToolCallDelta {
2663                index,
2664                arguments_delta,
2665            } => serde_json::json!({
2666                "type": "tool_delta",
2667                "index": index,
2668                "data": arguments_delta,
2669            }),
2670            car_inference::StreamEvent::Usage {
2671                input_tokens,
2672                output_tokens,
2673            } => serde_json::json!({
2674                "type": "usage",
2675                "input_tokens": input_tokens,
2676                "output_tokens": output_tokens,
2677            }),
2678            // Done is delivered as the JSON-RPC response, not a
2679            // notification — matches the NAPI contract where the
2680            // standalone function's return value is the accumulated
2681            // result and the callback only sees in-progress events.
2682            car_inference::StreamEvent::Done { .. } => {
2683                accumulator.push(&event);
2684                continue;
2685            }
2686        };
2687
2688        let notif = serde_json::json!({
2689            "jsonrpc": "2.0",
2690            "method": "inference.stream.event",
2691            "params": {
2692                "request_id": request_id,
2693                "event": event_payload,
2694            },
2695        });
2696        if let Ok(text) = serde_json::to_string(&notif) {
2697            let _ = session
2698                .channel
2699                .write
2700                .lock()
2701                .await
2702                .send(Message::Text(text.into()))
2703                .await;
2704        }
2705        accumulator.push(&event);
2706    }
2707
2708    let (text, tool_calls, usage) = accumulator.finish_with_usage();
2709    Ok(serde_json::json!({
2710        "text": text,
2711        "tool_calls": tool_calls,
2712        "usage": usage,
2713    }))
2714}
2715
2716async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2717    let engine = get_inference_engine(state);
2718    let req: car_inference::EmbedRequest =
2719        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2720    // Embeds load their own model weights; share the same admission
2721    // gate as generations so a burst of embed requests can't smuggle
2722    // around the concurrency cap.
2723    let _permit = state.admission.acquire().await;
2724    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
2725    Ok(serde_json::json!({"embeddings": result}))
2726}
2727
2728async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2729    let engine = get_inference_engine(state);
2730    let req: car_inference::ClassifyRequest =
2731        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2732    let _permit = state.admission.acquire().await;
2733    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
2734    Ok(serde_json::json!({"classifications": result}))
2735}
2736
2737/// Surface the current admission state so the menubar tray and
2738/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
2739/// snapshot — racy by definition but correct enough for status panels.
2740fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
2741    let total = state.admission.permits();
2742    let available = state.admission.permits_available();
2743    let in_use = total.saturating_sub(available);
2744    Ok(serde_json::json!({
2745        "permits_total": total,
2746        "permits_available": available,
2747        "permits_in_use": in_use,
2748        "env_override": crate::admission::ENV_MAX_CONCURRENT,
2749    }))
2750}
2751
2752async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2753    let model = msg
2754        .params
2755        .get("model")
2756        .and_then(|v| v.as_str())
2757        .ok_or("missing 'model' parameter")?;
2758    let text = msg
2759        .params
2760        .get("text")
2761        .and_then(|v| v.as_str())
2762        .ok_or("missing 'text' parameter")?;
2763    let engine = get_inference_engine(state);
2764    let ids = engine
2765        .tokenize(model, text)
2766        .await
2767        .map_err(|e| e.to_string())?;
2768    Ok(serde_json::json!({"tokens": ids}))
2769}
2770
2771async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2772    let model = msg
2773        .params
2774        .get("model")
2775        .and_then(|v| v.as_str())
2776        .ok_or("missing 'model' parameter")?;
2777    let tokens: Vec<u32> = msg
2778        .params
2779        .get("tokens")
2780        .and_then(|v| v.as_array())
2781        .ok_or("missing 'tokens' parameter")?
2782        .iter()
2783        .map(|t| {
2784            t.as_u64()
2785                .and_then(|n| u32::try_from(n).ok())
2786                .ok_or_else(|| "tokens[] must be u32 values".to_string())
2787        })
2788        .collect::<Result<Vec<_>, _>>()?;
2789    let engine = get_inference_engine(state);
2790    let text = engine
2791        .detokenize(model, &tokens)
2792        .await
2793        .map_err(|e| e.to_string())?;
2794    Ok(serde_json::json!({"text": text}))
2795}
2796
2797fn handle_models_list(state: &ServerState) -> Result<Value, String> {
2798    let engine = get_inference_engine(state);
2799    let models = engine.list_models();
2800    serde_json::to_value(&models).map_err(|e| e.to_string())
2801}
2802
2803fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
2804    let engine = get_inference_engine(state);
2805    let models = engine.list_models_unified();
2806    serde_json::to_value(&models).map_err(|e| e.to_string())
2807}
2808
2809#[derive(Debug, Deserialize)]
2810#[serde(rename_all = "camelCase")]
2811struct ModelSearchParams {
2812    #[serde(default)]
2813    query: Option<String>,
2814    #[serde(default)]
2815    capability: Option<car_inference::ModelCapability>,
2816    #[serde(default)]
2817    provider: Option<String>,
2818    #[serde(default)]
2819    local_only: bool,
2820    #[serde(default)]
2821    available_only: bool,
2822    #[serde(default)]
2823    limit: Option<usize>,
2824}
2825
2826#[derive(Debug, Serialize)]
2827#[serde(rename_all = "camelCase")]
2828struct ModelSearchEntry {
2829    #[serde(flatten)]
2830    info: car_inference::ModelInfo,
2831    family: String,
2832    version: String,
2833    tags: Vec<String>,
2834    pullable: bool,
2835    upgrade: Option<car_inference::ModelUpgrade>,
2836}
2837
2838#[derive(Debug, Serialize)]
2839#[serde(rename_all = "camelCase")]
2840struct ModelSearchResponse {
2841    models: Vec<ModelSearchEntry>,
2842    upgrades: Vec<car_inference::ModelUpgrade>,
2843    total: usize,
2844    available: usize,
2845    local: usize,
2846    remote: usize,
2847}
2848
2849fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2850    let params: ModelSearchParams =
2851        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
2852            query: None,
2853            capability: None,
2854            provider: None,
2855            local_only: false,
2856            available_only: false,
2857            limit: None,
2858        });
2859    let engine = get_inference_engine(state);
2860    let upgrades = engine.available_model_upgrades();
2861    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
2862        .iter()
2863        .cloned()
2864        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
2865        .collect();
2866    let query = params
2867        .query
2868        .as_deref()
2869        .map(str::trim)
2870        .filter(|q| !q.is_empty())
2871        .map(|q| q.to_ascii_lowercase());
2872    let provider = params
2873        .provider
2874        .as_deref()
2875        .map(str::trim)
2876        .filter(|p| !p.is_empty())
2877        .map(|p| p.to_ascii_lowercase());
2878
2879    let mut entries: Vec<ModelSearchEntry> = engine
2880        .list_schemas()
2881        .into_iter()
2882        .filter(|schema| {
2883            if let Some(capability) = params.capability {
2884                if !schema.has_capability(capability) {
2885                    return false;
2886                }
2887            }
2888            if let Some(provider) = provider.as_deref() {
2889                if schema.provider.to_ascii_lowercase() != provider {
2890                    return false;
2891                }
2892            }
2893            if params.local_only && !schema.is_local() {
2894                return false;
2895            }
2896            if params.available_only && !schema.available {
2897                return false;
2898            }
2899            if let Some(query) = query.as_deref() {
2900                let capability_text = schema
2901                    .capabilities
2902                    .iter()
2903                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
2904                    .collect::<Vec<_>>()
2905                    .join(" ");
2906                let haystack = format!(
2907                    "{} {} {} {} {} {}",
2908                    schema.id,
2909                    schema.name,
2910                    schema.provider,
2911                    schema.family,
2912                    schema.tags.join(" "),
2913                    capability_text
2914                )
2915                .to_ascii_lowercase();
2916                if !haystack.contains(query) {
2917                    return false;
2918                }
2919            }
2920            true
2921        })
2922        .map(|schema| {
2923            let pullable = !schema.available
2924                && matches!(
2925                    schema.source,
2926                    car_inference::ModelSource::Local { .. } | car_inference::ModelSource::Mlx { .. }
2927                );
2928            let info = car_inference::ModelInfo::from(&schema);
2929            let upgrade = upgrades_by_from.get(&schema.id).cloned();
2930            ModelSearchEntry {
2931                info,
2932                family: schema.family,
2933                version: schema.version,
2934                tags: schema.tags,
2935                pullable,
2936                upgrade,
2937            }
2938        })
2939        .collect();
2940    entries.sort_by(|a, b| {
2941        b.info
2942            .available
2943            .cmp(&a.info.available)
2944            .then(b.info.is_local.cmp(&a.info.is_local))
2945            .then(a.info.name.cmp(&b.info.name))
2946    });
2947    if let Some(limit) = params.limit {
2948        entries.truncate(limit);
2949    }
2950
2951    let total = entries.len();
2952    let available = entries.iter().filter(|entry| entry.info.available).count();
2953    let local = entries.iter().filter(|entry| entry.info.is_local).count();
2954    let response = ModelSearchResponse {
2955        models: entries,
2956        upgrades,
2957        total,
2958        available,
2959        local,
2960        remote: total.saturating_sub(local),
2961    };
2962    serde_json::to_value(response).map_err(|e| e.to_string())
2963}
2964
2965fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
2966    let engine = get_inference_engine(state);
2967    serde_json::to_value(serde_json::json!({
2968        "upgrades": engine.available_model_upgrades()
2969    }))
2970    .map_err(|e| e.to_string())
2971}
2972
2973async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2974    let name = msg
2975        .params
2976        .get("name")
2977        .or_else(|| msg.params.get("id"))
2978        .or_else(|| msg.params.get("model"))
2979        .and_then(|v| v.as_str())
2980        .ok_or("missing 'name' parameter")?;
2981    let engine = get_inference_engine(state);
2982    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
2983    Ok(serde_json::json!({"path": path.display().to_string()}))
2984}
2985
2986async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2987    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2988        msg.params
2989            .get("events")
2990            .cloned()
2991            .unwrap_or(msg.params.clone()),
2992    )
2993    .map_err(|e| format!("invalid events: {}", e))?;
2994
2995    let inference = get_inference_engine(state).clone();
2996    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
2997
2998    let skills = engine.distill_skills(&events).await;
2999    serde_json::to_value(&skills).map_err(|e| e.to_string())
3000}
3001
3002/// Run memory consolidation against this client's session memgine
3003/// (or the daemon-owned per-agent memgine when bound — #170).
3004/// Returns the JSON `ConsolidationReport`.
3005async fn handle_memory_consolidate(
3006    session: &crate::session::ClientSession,
3007) -> Result<Value, String> {
3008    let engine_arc = session.effective_memgine().await;
3009    let report = {
3010        let mut engine = engine_arc.lock().await;
3011        engine.consolidate().await
3012    };
3013    if let Some(id) = session.agent_id.lock().await.clone() {
3014        if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3015            tracing::warn!(agent_id = %id, error = %e,
3016                "agent memgine persist after consolidate failed");
3017        }
3018    }
3019    serde_json::to_value(&report).map_err(|e| e.to_string())
3020}
3021
3022/// Repair a degraded skill on this client's session memgine.
3023/// Returns `{ code: "..." }` on success, `null` if the skill
3024/// isn't broken or repair failed.
3025async fn handle_skill_repair(
3026    msg: &JsonRpcMessage,
3027    session: &crate::session::ClientSession,
3028) -> Result<Value, String> {
3029    let name = msg
3030        .params
3031        .get("skill_name")
3032        .and_then(|v| v.as_str())
3033        .ok_or("missing 'skill_name' parameter")?;
3034    let mut engine = session.memgine.lock().await;
3035    let code = engine.repair_skill(name).await;
3036    Ok(match code {
3037        Some(c) => serde_json::json!({ "code": c }),
3038        None => Value::Null,
3039    })
3040}
3041
3042/// Ingest distilled skills into this client's session memgine.
3043/// Returns the number of nodes inserted.
3044async fn handle_skills_ingest_distilled(
3045    msg: &JsonRpcMessage,
3046    session: &crate::session::ClientSession,
3047) -> Result<Value, String> {
3048    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3049        msg.params
3050            .get("skills")
3051            .cloned()
3052            .unwrap_or(msg.params.clone()),
3053    )
3054    .map_err(|e| format!("invalid skills: {}", e))?;
3055    let mut engine = session.memgine.lock().await;
3056    let nodes = engine.ingest_distilled_skills(&skills);
3057    Ok(serde_json::json!({ "ingested": nodes.len() }))
3058}
3059
3060/// Run skill evolution against this session's memgine for a
3061/// specified domain.  Returns the resulting `DistilledSkill` array.
3062async fn handle_skills_evolve(
3063    msg: &JsonRpcMessage,
3064    session: &crate::session::ClientSession,
3065) -> Result<Value, String> {
3066    let domain = msg
3067        .params
3068        .get("domain")
3069        .and_then(|v| v.as_str())
3070        .ok_or("missing 'domain' parameter")?
3071        .to_string();
3072    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3073        msg.params
3074            .get("events")
3075            .cloned()
3076            .unwrap_or(Value::Array(vec![])),
3077    )
3078    .map_err(|e| format!("invalid events: {}", e))?;
3079    let mut engine = session.memgine.lock().await;
3080    let skills = engine.evolve_skills(&events, &domain).await;
3081    serde_json::to_value(&skills).map_err(|e| e.to_string())
3082}
3083
3084/// List domains whose skills are underperforming on this session.
3085async fn handle_skills_domains_needing_evolution(
3086    msg: &JsonRpcMessage,
3087    session: &crate::session::ClientSession,
3088) -> Result<Value, String> {
3089    let threshold = msg
3090        .params
3091        .get("threshold")
3092        .and_then(|v| v.as_f64())
3093        .unwrap_or(0.6);
3094    let engine = session.memgine.lock().await;
3095    let domains = engine.domains_needing_evolution(threshold);
3096    serde_json::to_value(&domains).map_err(|e| e.to_string())
3097}
3098
3099/// Rerank documents against a query using a cross-encoder model.
3100async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3101    let engine = get_inference_engine(state);
3102    let req: car_inference::RerankRequest =
3103        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3104    let _permit = state.admission.acquire().await;
3105    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3106    serde_json::to_value(&result).map_err(|e| e.to_string())
3107}
3108
3109/// Transcribe audio at the given path. The path is interpreted on
3110/// the daemon's filesystem, not the FFI caller's — Daemon-mode
3111/// callers must pass a path the daemon can read (typically a
3112/// shared `~/.car/...` location or stdin push via the streaming
3113/// API).
3114async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3115    use base64::Engine as _;
3116    let engine = get_inference_engine(state);
3117
3118    // Sandbox-crossing escape hatch (Parslee-ai/car-releases#31): when
3119    // the caller can't share a filesystem view with the daemon (e.g.
3120    // unsandboxed Milo talking to a sandboxed car-host), they pass
3121    // `audio_b64` instead of `audio_path`. We decode to a tempfile,
3122    // run transcribe against the path the engine expects, and clean up
3123    // on drop. Accepts either form; `audio_b64` wins if both are set.
3124    let mut params = msg.params.clone();
3125    let audio_b64 = params
3126        .as_object_mut()
3127        .and_then(|m| m.remove("audio_b64"))
3128        .and_then(|v| v.as_str().map(str::to_string));
3129    let _tmp_audio = if let Some(b64) = audio_b64 {
3130        let bytes = base64::engine::general_purpose::STANDARD
3131            .decode(b64.as_bytes())
3132            .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3133        let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3134        std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3135        let path = tmp.path().to_string_lossy().into_owned();
3136        if let Some(obj) = params.as_object_mut() {
3137            obj.insert("audio_path".to_string(), Value::String(path));
3138        }
3139        Some(tmp)
3140    } else {
3141        None
3142    };
3143
3144    let req: car_inference::TranscribeRequest =
3145        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3146    let _permit = state.admission.acquire().await;
3147    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3148    serde_json::to_value(&result).map_err(|e| e.to_string())
3149}
3150
3151/// Synthesize speech. By default writes to `output_path` on the
3152/// daemon's filesystem; when `return_b64: true` (or no `output_path`
3153/// was supplied) the result also includes an `audio_b64` field with
3154/// the rendered bytes inline so cross-sandbox callers can avoid
3155/// filesystem coordination. Closes Parslee-ai/car-releases#31.
3156async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3157    use base64::Engine as _;
3158    let engine = get_inference_engine(state);
3159
3160    let mut params = msg.params.clone();
3161    let return_b64 = params
3162        .as_object_mut()
3163        .and_then(|m| m.remove("return_b64"))
3164        .and_then(|v| v.as_bool())
3165        .unwrap_or(false);
3166    let no_output_path = params
3167        .as_object()
3168        .map(|m| !m.contains_key("output_path"))
3169        .unwrap_or(true);
3170
3171    let req: car_inference::SynthesizeRequest =
3172        serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3173    let _permit = state.admission.acquire().await;
3174    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3175    let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3176
3177    // Inline the bytes when the caller asked for them OR when no
3178    // output_path was specified (typical sandbox-crossing case —
3179    // they didn't pick a path because they have no shared one).
3180    if return_b64 || no_output_path {
3181        let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3182            format!(
3183                "synthesize: failed to read rendered audio at {}: {e}",
3184                result.audio_path
3185            )
3186        })?;
3187        let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3188        if let Some(obj) = value.as_object_mut() {
3189            obj.insert("audio_b64".to_string(), Value::String(encoded));
3190        }
3191    }
3192    Ok(value)
3193}
3194
3195/// Prepare the speech runtime (downloads / warmup). Returns a
3196/// JSON status string, mirroring the embedded
3197/// `prepare_speech_runtime` shape.
3198async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3199    let engine = get_inference_engine(state);
3200    let status = engine
3201        .prepare_speech_runtime()
3202        .await
3203        .map_err(|e| e.to_string())?;
3204    serde_json::to_value(&status).map_err(|e| e.to_string())
3205}
3206
3207/// Adaptive route decision for a prompt — returns the routing
3208/// JSON the FFI's `route_model` returns.
3209async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3210    let prompt = msg
3211        .params
3212        .get("prompt")
3213        .and_then(|v| v.as_str())
3214        .ok_or("missing 'prompt' parameter")?;
3215    let engine = get_inference_engine(state);
3216    let decision = engine.route_adaptive(prompt).await;
3217    serde_json::to_value(&decision).map_err(|e| e.to_string())
3218}
3219
3220/// Model performance profiles snapshot.
3221async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3222    let engine = get_inference_engine(state);
3223    let profiles = engine.export_profiles().await;
3224    serde_json::to_value(&profiles).map_err(|e| e.to_string())
3225}
3226
3227/// Per-session event log size.
3228async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3229    let n = session.runtime.log.lock().await.len();
3230    Ok(Value::from(n as u64))
3231}
3232
3233async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3234    let stats = session.runtime.log.lock().await.stats();
3235    serde_json::to_value(stats).map_err(|e| e.to_string())
3236}
3237
3238#[derive(Deserialize)]
3239#[serde(rename_all = "camelCase")]
3240struct EventsTruncateParams {
3241    #[serde(default)]
3242    max_events: Option<usize>,
3243    #[serde(default)]
3244    max_spans: Option<usize>,
3245}
3246
3247async fn handle_events_truncate(
3248    msg: &JsonRpcMessage,
3249    session: &crate::session::ClientSession,
3250) -> Result<Value, String> {
3251    let params: EventsTruncateParams =
3252        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3253            max_events: None,
3254            max_spans: None,
3255        });
3256    let mut log = session.runtime.log.lock().await;
3257    let removed_events = params
3258        .max_events
3259        .map(|max| log.truncate_events_keep_last(max))
3260        .unwrap_or(0);
3261    let removed_spans = params
3262        .max_spans
3263        .map(|max| log.truncate_spans_keep_last(max))
3264        .unwrap_or(0);
3265    let stats = log.stats();
3266    Ok(serde_json::json!({
3267        "removedEvents": removed_events,
3268        "removedSpans": removed_spans,
3269        "stats": stats,
3270    }))
3271}
3272
3273async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3274    let mut log = session.runtime.log.lock().await;
3275    let removed = log.clear();
3276    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3277}
3278
3279/// Update the per-session replan config. Wire shape mirrors the
3280/// FFI's positional `set_replan_config` arguments — the engine
3281/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
3282/// reconstruct it from a flat object here.
3283async fn handle_replan_set_config(
3284    msg: &JsonRpcMessage,
3285    session: &crate::session::ClientSession,
3286) -> Result<Value, String> {
3287    let max_replans = msg
3288        .params
3289        .get("max_replans")
3290        .and_then(|v| v.as_u64())
3291        .unwrap_or(0) as u32;
3292    let delay_ms = msg
3293        .params
3294        .get("delay_ms")
3295        .and_then(|v| v.as_u64())
3296        .unwrap_or(0);
3297    let verify_before_execute = msg
3298        .params
3299        .get("verify_before_execute")
3300        .and_then(|v| v.as_bool())
3301        .unwrap_or(true);
3302    let cfg = car_engine::ReplanConfig {
3303        max_replans,
3304        delay_ms,
3305        verify_before_execute,
3306    };
3307    session.runtime.set_replan_config(cfg).await;
3308    Ok(Value::Null)
3309}
3310
3311async fn handle_skills_list(
3312    msg: &JsonRpcMessage,
3313    session: &crate::session::ClientSession,
3314) -> Result<Value, String> {
3315    let domain = msg.params.get("domain").and_then(|v| v.as_str());
3316    let engine = session.memgine.lock().await;
3317    let skills: Vec<serde_json::Value> = engine
3318        .graph
3319        .inner
3320        .node_indices()
3321        .filter_map(|nix| {
3322            let node = engine.graph.inner.node_weight(nix)?;
3323            if node.kind != car_memgine::MemKind::Skill {
3324                return None;
3325            }
3326            let meta = car_memgine::SkillMeta::from_node(node)?;
3327            if let Some(d) = domain {
3328                match &meta.scope {
3329                    car_memgine::SkillScope::Global => {}
3330                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
3331                    _ => return None,
3332                }
3333            }
3334            Some(serde_json::to_value(&meta).unwrap_or_default())
3335        })
3336        .collect();
3337    serde_json::to_value(&skills).map_err(|e| e.to_string())
3338}
3339
3340#[derive(serde::Deserialize)]
3341struct SecretParams {
3342    #[serde(default)]
3343    service: Option<String>,
3344    key: String,
3345    #[serde(default)]
3346    value: Option<String>,
3347}
3348
3349fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3350    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3351    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3352    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3353}
3354
3355fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3356    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3357    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3358}
3359
3360fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3361    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3362    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3363}
3364
3365fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3366    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3367    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3368}
3369
3370#[derive(serde::Deserialize)]
3371struct PermParams {
3372    domain: String,
3373    #[serde(default)]
3374    target_bundle_id: Option<String>,
3375}
3376
3377fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3378    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3379    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3380}
3381
3382fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3383    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3384    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3385}
3386
3387fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3388    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3389    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3390}
3391
3392fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3393    #[derive(serde::Deserialize)]
3394    struct P {
3395        start: String,
3396        end: String,
3397        #[serde(default)]
3398        calendar_ids: Vec<String>,
3399    }
3400    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3401    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3402        .map_err(|e| format!("parse start: {}", e))?
3403        .with_timezone(&chrono::Utc);
3404    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3405        .map_err(|e| format!("parse end: {}", e))?
3406        .with_timezone(&chrono::Utc);
3407    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3408}
3409
3410fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3411    #[derive(serde::Deserialize)]
3412    struct P {
3413        query: String,
3414        #[serde(default = "default_limit")]
3415        limit: usize,
3416        #[serde(default)]
3417        container_ids: Vec<String>,
3418    }
3419    fn default_limit() -> usize {
3420        50
3421    }
3422    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3423    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
3424}
3425
3426fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
3427    #[derive(serde::Deserialize, Default)]
3428    struct P {
3429        #[serde(default)]
3430        account_ids: Vec<String>,
3431    }
3432    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
3433    car_ffi_common::integrations::mail_inbox(&p.account_ids)
3434}
3435
3436fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
3437    let raw = req.params.to_string();
3438    car_ffi_common::integrations::mail_send(&raw)
3439}
3440
3441fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
3442    #[derive(serde::Deserialize)]
3443    struct P {
3444        #[serde(default = "default_limit")]
3445        limit: usize,
3446    }
3447    fn default_limit() -> usize {
3448        50
3449    }
3450    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3451    car_ffi_common::integrations::messages_chats(p.limit)
3452}
3453
3454fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
3455    let raw = req.params.to_string();
3456    car_ffi_common::integrations::messages_send(&raw)
3457}
3458
3459fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
3460    #[derive(serde::Deserialize)]
3461    struct P {
3462        query: String,
3463        #[serde(default = "default_limit")]
3464        limit: usize,
3465    }
3466    fn default_limit() -> usize {
3467        50
3468    }
3469    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3470    car_ffi_common::integrations::notes_find(&p.query, p.limit)
3471}
3472
3473fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
3474    #[derive(serde::Deserialize)]
3475    struct P {
3476        #[serde(default = "default_limit")]
3477        limit: usize,
3478    }
3479    fn default_limit() -> usize {
3480        50
3481    }
3482    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3483    car_ffi_common::integrations::reminders_items(p.limit)
3484}
3485
3486fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
3487    #[derive(serde::Deserialize)]
3488    struct P {
3489        #[serde(default = "default_limit")]
3490        limit: usize,
3491    }
3492    fn default_limit() -> usize {
3493        100
3494    }
3495    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
3496    car_ffi_common::integrations::bookmarks_list(p.limit)
3497}
3498
3499fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
3500    #[derive(serde::Deserialize)]
3501    struct P {
3502        start: String,
3503        end: String,
3504    }
3505    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3506    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
3507        .map_err(|e| format!("parse start: {}", e))?
3508        .with_timezone(&chrono::Utc);
3509    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
3510        .map_err(|e| format!("parse end: {}", e))?
3511        .with_timezone(&chrono::Utc);
3512    car_ffi_common::health::sleep_windows(s, e)
3513}
3514
3515fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
3516    #[derive(serde::Deserialize)]
3517    struct P {
3518        start: String,
3519        end: String,
3520    }
3521    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3522    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
3523        .map_err(|e| format!("parse start: {}", e))?
3524        .with_timezone(&chrono::Utc);
3525    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
3526        .map_err(|e| format!("parse end: {}", e))?
3527        .with_timezone(&chrono::Utc);
3528    car_ffi_common::health::workouts(s, e)
3529}
3530
3531fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
3532    #[derive(serde::Deserialize)]
3533    struct P {
3534        start: String,
3535        end: String,
3536    }
3537    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3538    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
3539        .map_err(|e| format!("parse start: {}", e))?;
3540    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
3541        .map_err(|e| format!("parse end: {}", e))?;
3542    car_ffi_common::health::activity(s, e)
3543}
3544
3545async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
3546    let closed = session.browser.close().await?;
3547    Ok(serde_json::json!({"closed": closed}))
3548}
3549
3550async fn handle_browser_run(
3551    req: &JsonRpcMessage,
3552    session: &crate::session::ClientSession,
3553) -> Result<Value, String> {
3554    #[derive(serde::Deserialize)]
3555    struct BrowserRunParams {
3556        /// Inline JSON string (CLI-compatible), OR the structured object.
3557        script: Value,
3558        #[serde(default)]
3559        width: Option<u32>,
3560        #[serde(default)]
3561        height: Option<u32>,
3562        /// When true, launches a visible Chromium window for interactive
3563        /// flows (first-time auth, 2FA, supervised runs). Only honored on
3564        /// the call that first launches the browser session — subsequent
3565        /// calls reuse the existing browser regardless.
3566        #[serde(default)]
3567        headed: Option<bool>,
3568        /// Extra Chromium command-line flags appended verbatim at
3569        /// launch (#112). Honoured only on the launch call.
3570        #[serde(default)]
3571        extra_args: Option<Vec<String>>,
3572    }
3573    let params: BrowserRunParams =
3574        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3575
3576    // Accept either a JSON string OR a structured object under `script`.
3577    let script_json = match params.script {
3578        Value::String(s) => s,
3579        other => other.to_string(),
3580    };
3581
3582    let browser_session = session
3583        .browser
3584        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
3585            width: params.width.unwrap_or(1280),
3586            height: params.height.unwrap_or(720),
3587            headless: !params.headed.unwrap_or(false),
3588            extra_args: params.extra_args.unwrap_or_default(),
3589        })
3590        .await?;
3591
3592    let trace_json = browser_session.run(&script_json).await?;
3593    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
3594}
3595
3596// ---------------------------------------------------------------------------
3597// Voice streaming JSON-RPC methods
3598//
3599// Events are pushed back to the originating client as JSON-RPC notifications:
3600//   { "jsonrpc": "2.0", "method": "voice.event",
3601//     "params": { "session_id": "...", "event": {...} } }
3602//
3603// The session registry is process-wide (ServerState.voice_sessions); per-call
3604// WsVoiceEventSink instances bind each session to its originating WS so a
3605// client only ever sees events for sessions it started.
3606// ---------------------------------------------------------------------------
3607
3608#[derive(Deserialize)]
3609struct VoiceStartParams {
3610    session_id: String,
3611    audio_source: Value,
3612    #[serde(default)]
3613    options: Option<Value>,
3614}
3615
3616async fn handle_voice_transcribe_stream_start(
3617    req: &JsonRpcMessage,
3618    state: &Arc<ServerState>,
3619    session: &Arc<crate::session::ClientSession>,
3620) -> Result<Value, String> {
3621    let params: VoiceStartParams =
3622        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3623    let audio_source_json =
3624        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
3625    let options_json = params
3626        .options
3627        .as_ref()
3628        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
3629        .transpose()?;
3630    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
3631        channel: session.channel.clone(),
3632    });
3633    let json = car_ffi_common::voice::transcribe_stream_start(
3634        &params.session_id,
3635        &audio_source_json,
3636        options_json.as_deref(),
3637        state.voice_sessions.clone(),
3638        sink,
3639    )
3640    .await?;
3641    serde_json::from_str(&json).map_err(|e| e.to_string())
3642}
3643
3644#[derive(Deserialize)]
3645struct VoiceStopParams {
3646    session_id: String,
3647}
3648
3649async fn handle_voice_transcribe_stream_stop(
3650    req: &JsonRpcMessage,
3651    state: &Arc<ServerState>,
3652) -> Result<Value, String> {
3653    let params: VoiceStopParams =
3654        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3655    let json = car_ffi_common::voice::transcribe_stream_stop(
3656        &params.session_id,
3657        state.voice_sessions.clone(),
3658    )
3659    .await?;
3660    serde_json::from_str(&json).map_err(|e| e.to_string())
3661}
3662
3663#[derive(Deserialize)]
3664struct VoicePushParams {
3665    session_id: String,
3666    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
3667    /// audio frames have to be encoded; clients in WS-binary contexts that
3668    /// want to skip the round trip can call the FFI directly.
3669    pcm_b64: String,
3670}
3671
3672async fn handle_voice_transcribe_stream_push(
3673    req: &JsonRpcMessage,
3674    state: &Arc<ServerState>,
3675) -> Result<Value, String> {
3676    use base64::Engine;
3677    let params: VoicePushParams =
3678        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3679    let pcm = base64::engine::general_purpose::STANDARD
3680        .decode(&params.pcm_b64)
3681        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
3682    let json = car_ffi_common::voice::transcribe_stream_push(
3683        &params.session_id,
3684        &pcm,
3685        state.voice_sessions.clone(),
3686    )
3687    .await?;
3688    serde_json::from_str(&json).map_err(|e| e.to_string())
3689}
3690
3691fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
3692    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
3693    serde_json::from_str(&json).unwrap_or(Value::Null)
3694}
3695
3696async fn handle_voice_dispatch_turn(
3697    req: &JsonRpcMessage,
3698    state: &Arc<ServerState>,
3699    session: &Arc<crate::session::ClientSession>,
3700) -> Result<Value, String> {
3701    let req_value = req.params.clone();
3702    let request: crate::voice_turn::DispatchVoiceTurnRequest =
3703        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
3704    let engine = get_inference_engine(state).clone();
3705    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
3706        channel: session.channel.clone(),
3707    });
3708    let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
3709    serde_json::to_value(resp).map_err(|e| e.to_string())
3710}
3711
3712async fn handle_voice_cancel_turn() -> Result<Value, String> {
3713    crate::voice_turn::cancel().await;
3714    Ok(serde_json::json!({"cancelled": true}))
3715}
3716
3717async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
3718    let engine = get_inference_engine(state).clone();
3719    crate::voice_turn::prewarm(engine).await;
3720    Ok(serde_json::json!({"prewarmed": true}))
3721}
3722
3723// ---------------------------------------------------------------------------
3724// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
3725//
3726// Bidirectional protocol shape:
3727//   1. Client → server: `inference.register_runner` (no params). The
3728//      session that calls this becomes the host for delegated models.
3729//   2. Server → client: `inference.runner.invoke` notification with
3730//      {call_id, request} when CAR needs to dispatch a delegated turn.
3731//   3. Client → server: `inference.runner.event` with {call_id, event}
3732//      for each chunk; `inference.runner.complete` with {call_id, result}
3733//      on success; `inference.runner.fail` with {call_id, error} on
3734//      failure.
3735//
3736// The server-side data is process-wide because only one inference
3737// runner can be registered at a time (matches the FFI bindings'
3738// constraint). The per-call mailboxes live in dedicated DashMaps.
3739// ---------------------------------------------------------------------------
3740
3741fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
3742    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
3743        std::sync::OnceLock::new();
3744    SLOT.get_or_init(|| std::sync::RwLock::new(None))
3745}
3746
3747fn ws_runner_calls(
3748) -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
3749    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
3750        std::sync::OnceLock::new();
3751    MAP.get_or_init(dashmap::DashMap::new)
3752}
3753
3754fn ws_runner_completions() -> &'static dashmap::DashMap<
3755    String,
3756    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
3757> {
3758    static MAP: std::sync::OnceLock<
3759        dashmap::DashMap<
3760            String,
3761            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
3762        >,
3763    > = std::sync::OnceLock::new();
3764    MAP.get_or_init(dashmap::DashMap::new)
3765}
3766
3767struct WsInferenceRunner;
3768
3769#[async_trait::async_trait]
3770impl car_inference::InferenceRunner for WsInferenceRunner {
3771    async fn run(
3772        &self,
3773        request: car_inference::tasks::generate::GenerateRequest,
3774        emitter: car_inference::EventEmitter,
3775    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
3776        let channel = ws_runner_session()
3777            .read()
3778            .map_err(|e| {
3779                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
3780            })?
3781            .clone()
3782            .ok_or_else(|| {
3783                car_inference::RunnerError::Declined(
3784                    "no WebSocket inference runner registered — call inference.register_runner first"
3785                        .into(),
3786                )
3787            })?;
3788
3789        let call_id = uuid::Uuid::new_v4().to_string();
3790        let request_json = serde_json::to_value(&request)
3791            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
3792        let (tx, rx) = tokio::sync::oneshot::channel();
3793        ws_runner_calls().insert(call_id.clone(), emitter);
3794        ws_runner_completions().insert(call_id.clone(), tx);
3795
3796        // Fire the invoke notification.
3797        use futures::SinkExt;
3798        let notification = serde_json::json!({
3799            "jsonrpc": "2.0",
3800            "method": "inference.runner.invoke",
3801            "params": {
3802                "call_id": call_id,
3803                "request": request_json,
3804            },
3805        });
3806        let text = serde_json::to_string(&notification)
3807            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
3808        let _ = channel
3809            .write
3810            .lock()
3811            .await
3812            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
3813            .await;
3814
3815        let result = rx.await.map_err(|_| {
3816            car_inference::RunnerError::Failed("runner completion channel dropped".into())
3817        })?;
3818        ws_runner_calls().remove(&call_id);
3819        result.map_err(car_inference::RunnerError::Failed)
3820    }
3821}
3822
3823async fn handle_inference_register_runner(
3824    session: &Arc<crate::session::ClientSession>,
3825) -> Result<Value, String> {
3826    let mut guard = ws_runner_session()
3827        .write()
3828        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
3829    *guard = Some(session.channel.clone());
3830    drop(guard);
3831    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
3832    Ok(serde_json::json!({"registered": true}))
3833}
3834
3835#[derive(serde::Deserialize)]
3836struct InferenceRunnerEventParams {
3837    call_id: String,
3838    event: Value,
3839}
3840
3841async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
3842    let params: InferenceRunnerEventParams =
3843        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3844    let stream_event = match parse_runner_event_value(&params.event) {
3845        Some(e) => e,
3846        None => return Err("unrecognised runner event shape".into()),
3847    };
3848    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
3849        let emitter = entry.value().clone();
3850        tokio::spawn(async move { emitter.emit(stream_event).await });
3851    }
3852    Ok(serde_json::json!({"emitted": true}))
3853}
3854
3855#[derive(serde::Deserialize)]
3856struct InferenceRunnerCompleteParams {
3857    call_id: String,
3858    result: Value,
3859}
3860
3861async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
3862    let params: InferenceRunnerCompleteParams =
3863        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3864    let result: std::result::Result<car_inference::RunnerResult, String> =
3865        serde_json::from_value(params.result)
3866            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
3867    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
3868        let _ = tx.send(result);
3869    }
3870    Ok(serde_json::json!({"completed": true}))
3871}
3872
3873#[derive(serde::Deserialize)]
3874struct InferenceRunnerFailParams {
3875    call_id: String,
3876    error: String,
3877}
3878
3879async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
3880    let params: InferenceRunnerFailParams =
3881        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3882    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
3883        let _ = tx.send(Err(params.error));
3884    }
3885    Ok(serde_json::json!({"failed": true}))
3886}
3887
3888fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
3889    let ty = v.get("type").and_then(|t| t.as_str())?;
3890    match ty {
3891        "text" => Some(car_inference::StreamEvent::TextDelta(
3892            v.get("data")?.as_str()?.to_string(),
3893        )),
3894        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
3895            name: v.get("name")?.as_str()?.to_string(),
3896            index: v.get("index")?.as_u64()? as usize,
3897            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
3898        }),
3899        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
3900            index: v.get("index")?.as_u64()? as usize,
3901            arguments_delta: v.get("data")?.as_str()?.to_string(),
3902        }),
3903        "usage" => Some(car_inference::StreamEvent::Usage {
3904            input_tokens: v.get("input_tokens")?.as_u64()?,
3905            output_tokens: v.get("output_tokens")?.as_u64()?,
3906        }),
3907        "done" => Some(car_inference::StreamEvent::Done {
3908            text: v.get("text")?.as_str()?.to_string(),
3909            tool_calls: v
3910                .get("tool_calls")
3911                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
3912                .unwrap_or_default(),
3913        }),
3914        _ => None,
3915    }
3916}
3917
3918#[derive(Deserialize)]
3919struct EnrollSpeakerParams {
3920    label: String,
3921    audio: Value,
3922}
3923
3924async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
3925    let params: EnrollSpeakerParams =
3926        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3927    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
3928    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
3929    serde_json::from_str(&json).map_err(|e| e.to_string())
3930}
3931
3932#[derive(Deserialize)]
3933struct RemoveEnrollmentParams {
3934    label: String,
3935}
3936
3937fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
3938    let params: RemoveEnrollmentParams =
3939        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3940    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
3941    serde_json::from_str(&json).map_err(|e| e.to_string())
3942}
3943
3944#[derive(Deserialize)]
3945struct WorkflowRunParams {
3946    workflow: Value,
3947}
3948
3949async fn handle_workflow_run(
3950    req: &JsonRpcMessage,
3951    session: &Arc<crate::session::ClientSession>,
3952) -> Result<Value, String> {
3953    let params: WorkflowRunParams =
3954        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3955    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
3956    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3957        channel: session.channel.clone(),
3958        host: session.host.clone(),
3959        client_id: session.client_id.clone(),
3960    });
3961    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
3962    serde_json::from_str(&json).map_err(|e| e.to_string())
3963}
3964
3965#[derive(Deserialize)]
3966struct WorkflowVerifyParams {
3967    workflow: Value,
3968}
3969
3970fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
3971    let params: WorkflowVerifyParams =
3972        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3973    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
3974    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
3975    serde_json::from_str(&json).map_err(|e| e.to_string())
3976}
3977
3978// ---------------------------------------------------------------------------
3979// Meeting JSON-RPC methods
3980// ---------------------------------------------------------------------------
3981
3982async fn handle_meeting_start(
3983    req: &JsonRpcMessage,
3984    state: &Arc<ServerState>,
3985    session: &Arc<crate::session::ClientSession>,
3986) -> Result<Value, String> {
3987    // We need the meeting id BEFORE handing the upstream sink to
3988    // start_meeting so the WsMemgineIngestSink stamps transcripts with
3989    // the correct `meeting/<id>/<source>` speaker. Parse the request
3990    // here, mint an id if none was provided, and pass the same id
3991    // through to start_meeting via the request JSON.
3992    let mut req_value = req.params.clone();
3993    let meeting_id = req_value
3994        .get("id")
3995        .and_then(|v| v.as_str())
3996        .map(str::to_string)
3997        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
3998    if let Some(map) = req_value.as_object_mut() {
3999        map.insert("id".into(), Value::String(meeting_id.clone()));
4000    }
4001    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4002
4003    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4004        Arc::new(crate::session::WsVoiceEventSink {
4005            channel: session.channel.clone(),
4006        });
4007
4008    // Wrap the WS upstream with a memgine-ingest fanout that uses the
4009    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
4010    // the FFI-common `start_meeting` memgine arg to avoid the
4011    // sync-mutex contract there — ingest happens here instead.
4012    let upstream: Arc<dyn car_voice::VoiceEventSink> =
4013        Arc::new(crate::session::WsMemgineIngestSink {
4014            meeting_id,
4015            engine: session.memgine.clone(),
4016            upstream: ws_upstream,
4017        });
4018
4019    let cwd = std::env::current_dir().ok();
4020    let json = crate::meeting::start_meeting(
4021        &request_json,
4022        state.meetings.clone(),
4023        state.voice_sessions.clone(),
4024        upstream,
4025        None,
4026        cwd,
4027    )
4028    .await?;
4029    serde_json::from_str(&json).map_err(|e| e.to_string())
4030}
4031
4032#[derive(Deserialize)]
4033struct MeetingStopParams {
4034    meeting_id: String,
4035    #[serde(default = "default_summarize")]
4036    summarize: bool,
4037}
4038
4039fn default_summarize() -> bool {
4040    true
4041}
4042
4043async fn handle_meeting_stop(
4044    req: &JsonRpcMessage,
4045    state: &Arc<ServerState>,
4046    _session: &Arc<crate::session::ClientSession>,
4047) -> Result<Value, String> {
4048    let params: MeetingStopParams =
4049        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4050    let inference = if params.summarize {
4051        Some(state.inference.get().cloned()).flatten()
4052    } else {
4053        None
4054    };
4055    let json = crate::meeting::stop_meeting(
4056        &params.meeting_id,
4057        params.summarize,
4058        state.meetings.clone(),
4059        state.voice_sessions.clone(),
4060        inference,
4061    )
4062    .await?;
4063    serde_json::from_str(&json).map_err(|e| e.to_string())
4064}
4065
4066#[derive(Deserialize, Default)]
4067struct MeetingListParams {
4068    #[serde(default)]
4069    root: Option<std::path::PathBuf>,
4070}
4071
4072fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4073    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4074    let cwd = std::env::current_dir().ok();
4075    let json = crate::meeting::list_meetings(params.root, cwd)?;
4076    serde_json::from_str(&json).map_err(|e| e.to_string())
4077}
4078
4079#[derive(Deserialize)]
4080struct MeetingGetParams {
4081    meeting_id: String,
4082    #[serde(default)]
4083    root: Option<std::path::PathBuf>,
4084}
4085
4086fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4087    let params: MeetingGetParams =
4088        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4089    let cwd = std::env::current_dir().ok();
4090    let json = crate::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
4091    serde_json::from_str(&json).map_err(|e| e.to_string())
4092}
4093
4094// ---------------------------------------------------------------------------
4095// Agent registry — file-based cross-process discovery (#111)
4096// ---------------------------------------------------------------------------
4097
4098#[derive(Deserialize, Default)]
4099struct RegistryRegisterParams {
4100    /// Caller serializes their AgentEntry as a JSON value; we
4101    /// re-serialize it so the ffi-common helper can validate the
4102    /// shape with the same parser used by the bindings.
4103    entry: Value,
4104    #[serde(default)]
4105    registry_path: Option<std::path::PathBuf>,
4106}
4107
4108fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4109    let params: RegistryRegisterParams =
4110        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4111    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
4112    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4113    Ok(Value::Null)
4114}
4115
4116#[derive(Deserialize, Default)]
4117struct RegistryNameParams {
4118    name: String,
4119    #[serde(default)]
4120    registry_path: Option<std::path::PathBuf>,
4121}
4122
4123fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4124    let params: RegistryNameParams =
4125        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4126    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
4127    serde_json::from_str(&json).map_err(|e| e.to_string())
4128}
4129
4130fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4131    let params: RegistryNameParams =
4132        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4133    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
4134    Ok(Value::Null)
4135}
4136
4137#[derive(Deserialize, Default)]
4138struct RegistryListParams {
4139    #[serde(default)]
4140    registry_path: Option<std::path::PathBuf>,
4141}
4142
4143fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4144    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4145    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4146    serde_json::from_str(&json).map_err(|e| e.to_string())
4147}
4148
4149#[derive(Deserialize, Default)]
4150struct RegistryReapParams {
4151    /// Heartbeats older than this many seconds are reaped. Default
4152    /// 60 — two missed 20s heartbeats trigger removal.
4153    #[serde(default = "default_reap_age")]
4154    max_age_secs: u64,
4155    #[serde(default)]
4156    registry_path: Option<std::path::PathBuf>,
4157}
4158
4159fn default_reap_age() -> u64 {
4160    60
4161}
4162
4163fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4164    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4165    let json =
4166        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4167    serde_json::from_str(&json).map_err(|e| e.to_string())
4168}
4169
4170// ---------------------------------------------------------------------------
4171// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
4172// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
4173// a2a_server_status — closes the binding gap noted in #126).
4174// ---------------------------------------------------------------------------
4175
4176async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
4177    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4178    let json = crate::a2a::start_a2a(&params_json).await?;
4179    serde_json::from_str(&json).map_err(|e| e.to_string())
4180}
4181
4182fn handle_a2a_stop() -> Result<Value, String> {
4183    let json = crate::a2a::stop_a2a()?;
4184    serde_json::from_str(&json).map_err(|e| e.to_string())
4185}
4186
4187fn handle_a2a_status() -> Result<Value, String> {
4188    let json = crate::a2a::a2a_status()?;
4189    serde_json::from_str(&json).map_err(|e| e.to_string())
4190}
4191
4192#[derive(Deserialize)]
4193#[serde(rename_all = "camelCase")]
4194struct A2aSendParams {
4195    endpoint: String,
4196    message: car_a2a::Message,
4197    #[serde(default)]
4198    blocking: bool,
4199    #[serde(default = "default_true")]
4200    ingest_a2ui: bool,
4201    #[serde(default)]
4202    route_auth: Option<A2aRouteAuth>,
4203    #[serde(default)]
4204    allow_untrusted_endpoint: bool,
4205}
4206
4207fn default_true() -> bool {
4208    true
4209}
4210
4211/// In-core A2A dispatcher entry point. Forwards the JSON-RPC method
4212/// + params to the lazy-initialized [`car_a2a::A2aDispatcher`] held
4213/// on `ServerState`. Closes Parslee-ai/car-releases#28.
4214///
4215/// Streaming methods (`message/stream`, `tasks/resubscribe` and their
4216/// PascalCase aliases) return `MethodNotFound` from the dispatcher's
4217/// transport-neutral surface — the standalone `start_a2a_listener`
4218/// HTTP path serves SSE for those, but the in-core WS surface is
4219/// JSON-RPC only. Same trade as the dispatcher itself.
4220async fn handle_a2a_dispatch(
4221    method: &str,
4222    req: &JsonRpcMessage,
4223    state: &Arc<ServerState>,
4224) -> Result<Value, String> {
4225    let dispatcher = state.a2a_dispatcher().await;
4226    dispatcher
4227        .dispatch(method, req.params.clone())
4228        .await
4229        .map_err(|e| e.to_string())
4230}
4231
4232async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4233    let params: A2aSendParams =
4234        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4235    let endpoint = trusted_route_endpoint(
4236        Some(params.endpoint.clone()),
4237        params.allow_untrusted_endpoint,
4238    )
4239    .ok_or_else(|| {
4240        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4241    })?;
4242    let client = match params.route_auth.clone() {
4243        Some(auth) => {
4244            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4245        }
4246        None => car_a2a::A2aClient::new(endpoint.clone()),
4247    };
4248    let result = client
4249        .send_message(params.message, params.blocking)
4250        .await
4251        .map_err(|e| e.to_string())?;
4252    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4253    let mut applied = Vec::new();
4254    if params.ingest_a2ui {
4255        state
4256            .a2ui
4257            .validate_payload(&result_value)
4258            .map_err(|e| e.to_string())?;
4259        let routed_endpoint = Some(endpoint.clone());
4260        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4261            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4262                if owner.endpoint.is_none() {
4263                    owner.with_endpoint(routed_endpoint.clone())
4264                } else {
4265                    owner
4266                }
4267            });
4268            applied.push(
4269                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4270            );
4271        }
4272    }
4273    Ok(serde_json::json!({
4274        "result": result,
4275        "a2ui": {
4276            "applied": applied,
4277        }
4278    }))
4279}
4280
4281// ---------------------------------------------------------------------------
4282// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
4283// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
4284// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
4285// vision_ocr.
4286// ---------------------------------------------------------------------------
4287
4288async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4289    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4290    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4291    serde_json::from_str(&json).map_err(|e| e.to_string())
4292}
4293
4294async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4295    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4296    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4297    serde_json::from_str(&json).map_err(|e| e.to_string())
4298}
4299
4300async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4301    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4302    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4303    serde_json::from_str(&json).map_err(|e| e.to_string())
4304}
4305
4306async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4307    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4308    let json = car_ffi_common::notifications::local(&args_json).await?;
4309    serde_json::from_str(&json).map_err(|e| e.to_string())
4310}
4311
4312async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4313    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4314    let json = car_ffi_common::vision::ocr(&args_json).await?;
4315    serde_json::from_str(&json).map_err(|e| e.to_string())
4316}
4317
4318// ---------------------------------------------------------------------------
4319// Lifecycle-managed agents (car_registry::supervisor) — Parslee-ai/car-releases#27
4320// ---------------------------------------------------------------------------
4321
4322async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4323    let supervisor = state.supervisor()?;
4324    let agents = supervisor.list().await;
4325    // Decorate each entry with `attached` + `session_id` so operators
4326    // see whether the supervised process has actually called
4327    // `session.auth { agent_id }` and bound a WS connection (#169) —
4328    // the lifecycle status (`Running`, etc.) only reports the
4329    // process-level view, which can't tell "alive but never
4330    // attached" from "alive and attached".
4331    let attached = state.attached_agents.lock().await.clone();
4332    let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4333    for a in agents {
4334        let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4335        let session_id = attached.get(&a.spec.id).cloned();
4336        if let Some(map) = v.as_object_mut() {
4337            map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4338            if let Some(sid) = session_id {
4339                map.insert("session_id".to_string(), Value::String(sid));
4340            }
4341        }
4342        decorated.push(v);
4343    }
4344    Ok(Value::Array(decorated))
4345}
4346
4347async fn handle_agents_upsert(
4348    req: &JsonRpcMessage,
4349    state: &Arc<ServerState>,
4350) -> Result<Value, String> {
4351    let mut params = req.params.clone();
4352    // Optional `interpreter` sugar (#171). When present, the
4353    // supervisor resolves the bare program name (`"node"`,
4354    // `"python"`, …) against `$PATH` and writes the absolute path
4355    // into `command` *before* validation. This keeps the strict
4356    // no-PATH-lookup rule at upsert time while letting callers
4357    // stop hand-coding `/opt/homebrew/bin/node` into every
4358    // agents.json entry. Resolution happens once; subsequent PATH
4359    // changes do not silently rewire the binding.
4360    if let Some(name) = params
4361        .get("interpreter")
4362        .and_then(|v| v.as_str())
4363        .map(str::to_string)
4364    {
4365        let resolved = car_registry::supervisor::resolve_interpreter(&name)
4366            .map_err(|e| e.to_string())?;
4367        params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4368    }
4369    let spec: car_registry::supervisor::AgentSpec =
4370        serde_json::from_value(params).map_err(|e| e.to_string())?;
4371    let supervisor = state.supervisor()?;
4372    let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4373    serde_json::to_value(agent).map_err(|e| e.to_string())
4374}
4375
4376async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
4377    let supervisor = state.supervisor()?;
4378    let entries = supervisor.health().await;
4379    serde_json::to_value(entries).map_err(|e| e.to_string())
4380}
4381
4382fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
4383    req.params
4384        .get("id")
4385        .and_then(Value::as_str)
4386        .map(str::to_string)
4387        .ok_or_else(|| "missing required `id` parameter".to_string())
4388}
4389
4390async fn handle_agents_remove(
4391    req: &JsonRpcMessage,
4392    state: &Arc<ServerState>,
4393) -> Result<Value, String> {
4394    let id = extract_agent_id(req)?;
4395    let supervisor = state.supervisor()?;
4396    let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
4397    Ok(serde_json::json!({ "removed": removed }))
4398}
4399
4400async fn handle_agents_start(
4401    req: &JsonRpcMessage,
4402    state: &Arc<ServerState>,
4403) -> Result<Value, String> {
4404    let id = extract_agent_id(req)?;
4405    let supervisor = state.supervisor()?;
4406    let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
4407    serde_json::to_value(agent).map_err(|e| e.to_string())
4408}
4409
4410async fn handle_agents_stop(
4411    req: &JsonRpcMessage,
4412    state: &Arc<ServerState>,
4413) -> Result<Value, String> {
4414    let id = extract_agent_id(req)?;
4415    let signal: car_registry::supervisor::StopSignal = req
4416        .params
4417        .get("signal")
4418        .map(|v| serde_json::from_value(v.clone()))
4419        .transpose()
4420        .map_err(|e| e.to_string())?
4421        .unwrap_or_default();
4422    let supervisor = state.supervisor()?;
4423    let agent = supervisor
4424        .stop(&id, signal)
4425        .await
4426        .map_err(|e| e.to_string())?;
4427    serde_json::to_value(agent).map_err(|e| e.to_string())
4428}
4429
4430async fn handle_agents_restart(
4431    req: &JsonRpcMessage,
4432    state: &Arc<ServerState>,
4433) -> Result<Value, String> {
4434    let id = extract_agent_id(req)?;
4435    let supervisor = state.supervisor()?;
4436    let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
4437    serde_json::to_value(agent).map_err(|e| e.to_string())
4438}
4439
4440async fn handle_agents_tail_log(
4441    req: &JsonRpcMessage,
4442    state: &Arc<ServerState>,
4443) -> Result<Value, String> {
4444    let id = extract_agent_id(req)?;
4445    let n = req
4446        .params
4447        .get("n")
4448        .and_then(Value::as_u64)
4449        .unwrap_or(100) as usize;
4450    let supervisor = state.supervisor()?;
4451    let lines = supervisor
4452        .tail_log(&id, n)
4453        .await
4454        .map_err(|e| e.to_string())?;
4455    Ok(serde_json::json!({ "lines": lines }))
4456}
4457
4458// ---------------------------------------------------------------------------
4459// External-agent detection (Phase 1 of docs/proposals/external-agent-detection.md)
4460//
4461// Discovery surface for agentic CLIs the user has already installed and
4462// authenticated (Claude Code, Codex, Gemini). Read-only — no invocation
4463// path yet; agents.invoke_external lands in Phase 2 alongside the JSON
4464// stdio adapter. The cache lives in car_ffi_common::external_agents so
4465// the in-process FFI singletons share the same snapshot.
4466// ---------------------------------------------------------------------------
4467
4468async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
4469    let include_health = req
4470        .params
4471        .get("include_health")
4472        .and_then(Value::as_bool)
4473        .unwrap_or(false);
4474    let json = car_ffi_common::external_agents::list(include_health).await?;
4475    serde_json::from_str(&json).map_err(|e| e.to_string())
4476}
4477
4478async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
4479    let include_health = req
4480        .params
4481        .get("include_health")
4482        .and_then(Value::as_bool)
4483        .unwrap_or(false);
4484    let json = car_ffi_common::external_agents::detect(include_health).await?;
4485    serde_json::from_str(&json).map_err(|e| e.to_string())
4486}
4487
4488/// Per-task invocation of an external CLI agent. Required params:
4489/// `id` (adapter, e.g. `"claude-code"`) and `task` (the prompt).
4490/// Optional: `cwd`, `allowed_tools`, `max_turns`, `timeout_secs`.
4491///
4492/// Phase 2 stage 3 ships with `claude-code` only. Other adapter
4493/// ids return `is_error: true` with a structured `error` so hosts
4494/// can surface the gap without a separate error code.
4495///
4496/// Phase 2 stage 4a (governance): every invocation appends a
4497/// structured audit record to `~/.car/external-agents.jsonl`. The
4498/// record captures id, task, options, result, and the full
4499/// `tool_uses` list the assistant emitted — so even though the
4500/// agent executes its built-in tools in-process (which we can't
4501/// gate via stream-json), there's a complete after-the-fact audit
4502/// trail. Full policy gating (proposing each tool_use to CAR's
4503/// validator + getting a yes/no) requires the MCP server route in
4504/// stage 4b.
4505async fn handle_agents_invoke_external(
4506    req: &JsonRpcMessage,
4507    state: &Arc<ServerState>,
4508) -> Result<Value, String> {
4509    let id = req
4510        .params
4511        .get("id")
4512        .and_then(Value::as_str)
4513        .ok_or_else(|| "missing required `id` parameter".to_string())?;
4514    let task = req
4515        .params
4516        .get("task")
4517        .and_then(Value::as_str)
4518        .ok_or_else(|| "missing required `task` parameter".to_string())?;
4519    // Build the options sub-object directly from req.params so
4520    // hosts can pass `cwd` / `allowed_tools` / `max_turns` /
4521    // `timeout_secs` as siblings of `id`/`task`. Strip the two
4522    // dispatch fields so they don't pollute the options serde.
4523    let mut options_value = req.params.clone();
4524    if let Some(obj) = options_value.as_object_mut() {
4525        obj.remove("id");
4526        obj.remove("task");
4527        // Auto-fill `mcp_endpoint` from the bound MCP URL when the
4528        // caller didn't supply one. This is the load-bearing
4529        // wiring of MCP-4: external agents get CAR's tools (memory,
4530        // skills, verify) routed through the daemon's policy +
4531        // shared memgine without any per-call host configuration.
4532        // Callers who want to opt out can pass `"mcp_endpoint": ""`
4533        // (empty string) — the runner skips the temp-file write
4534        // when the value isn't a non-empty URL.
4535        let has_explicit_mcp = obj.contains_key("mcp_endpoint");
4536        if !has_explicit_mcp {
4537            if let Some(url) = state.mcp_url.get() {
4538                obj.insert(
4539                    "mcp_endpoint".to_string(),
4540                    Value::String(url.clone()),
4541                );
4542            }
4543        }
4544    }
4545    let options_json = options_value.to_string();
4546    let json = car_ffi_common::external_agents::invoke(id, task, &options_json).await?;
4547    let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
4548    append_external_agent_audit(id, task, &options_value, &result);
4549    Ok(result)
4550}
4551
4552/// Append one JSONL audit record to `~/.car/external-agents.jsonl`.
4553/// Best-effort: a failure to open the journal must NOT fail the
4554/// invocation; the in-memory result already returned is the
4555/// authoritative answer. Logs at warn level when the write fails so
4556/// operators notice repeated failures.
4557fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
4558    use std::io::Write;
4559    let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
4560        Some(home) => home.join(".car"),
4561        None => return,
4562    };
4563    if std::fs::create_dir_all(&car_dir).is_err() {
4564        return;
4565    }
4566    let path = car_dir.join("external-agents.jsonl");
4567    let record = serde_json::json!({
4568        "ts": chrono::Utc::now().to_rfc3339(),
4569        "adapter_id": id,
4570        "task": task,
4571        "options": options,
4572        "result": result,
4573    });
4574    let line = match serde_json::to_string(&record) {
4575        Ok(s) => s,
4576        Err(_) => return,
4577    };
4578    if let Ok(mut f) = std::fs::OpenOptions::new()
4579        .create(true)
4580        .append(true)
4581        .open(&path)
4582    {
4583        let _ = writeln!(f, "{}", line);
4584    } else {
4585        tracing::warn!(
4586            path = %path.display(),
4587            "failed to append external-agent audit record"
4588        );
4589    }
4590}
4591
4592/// Ground-truth health check. Optional `id` param picks one tool;
4593/// without it, every detected adapter is checked. `force: true`
4594/// bypasses the 30s per-tool TTL cache. Replaces the Phase 1
4595/// credential-file shape heuristic as the load-bearing signal for
4596/// "is this tool ready to invoke."
4597async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
4598    let force = req
4599        .params
4600        .get("force")
4601        .and_then(Value::as_bool)
4602        .unwrap_or(false);
4603    if let Some(id) = req.params.get("id").and_then(Value::as_str) {
4604        let json = car_ffi_common::external_agents::health_one(id, force).await?;
4605        serde_json::from_str(&json).map_err(|e| e.to_string())
4606    } else {
4607        let json = car_ffi_common::external_agents::health(force).await?;
4608        serde_json::from_str(&json).map_err(|e| e.to_string())
4609    }
4610}