Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` (e.g. the future tokhn-daemon) skip this and
87/// call `run_dispatch` directly with the split halves.
88#[instrument(
89    name = "ws.connection",
90    skip_all,
91    fields(peer = %peer),
92)]
93pub async fn handle_connection(
94    stream: TcpStream,
95    peer: SocketAddr,
96    state: Arc<ServerState>,
97) -> Result<(), Box<dyn std::error::Error>> {
98    let ws_stream = accept_async(stream).await?;
99    let (write, read) = ws_stream.split();
100    run_dispatch(read, write, peer, state).await
101}
102
103/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
104/// against an already-handshake-completed split WebSocket. Both the
105/// standalone car-server binary and embedders (e.g. tokhn-daemon at
106/// U7) call this same function.
107///
108/// Concrete typing reflects today's pipeline (`tokio-tungstenite`
109/// over [`TcpStream`]). U7 is responsible for any further generalization
110/// — for example, by opening an axum WebSocket via tungstenite-native
111/// or by introducing a sink/stream wrapper that adapts axum's WS to
112/// these concrete types.
113#[instrument(
114    name = "ws.dispatch",
115    skip_all,
116    fields(client_id = tracing::field::Empty, peer = %peer),
117)]
118pub async fn run_dispatch(
119    mut read: futures::stream::SplitStream<
120        tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
121    >,
122    write: futures::stream::SplitSink<
123        tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
124        Message,
125    >,
126    peer: SocketAddr,
127    state: Arc<ServerState>,
128) -> Result<(), Box<dyn std::error::Error>> {
129    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
130    tracing::Span::current().record("client_id", &client_id.as_str());
131
132    info!("New connection from {}", peer);
133
134    let channel = Arc::new(WsChannel {
135        write: Mutex::new(write),
136        pending: Mutex::new(HashMap::new()),
137        next_id: AtomicU64::new(1),
138    });
139
140    let session = state.create_session(&client_id, channel.clone()).await;
141
142    while let Some(msg) = read.next().await {
143        let msg = msg?;
144        if msg.is_text() {
145            let text = msg.to_text()?;
146            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
147                Ok(m) => m,
148                Err(e) => {
149                    send_response(
150                        &session.channel,
151                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
152                    )
153                    .await?;
154                    continue;
155                }
156            };
157
158            // Is this a response to a pending tool callback?
159            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
160                if let Some(id_str) = parsed.id.as_str() {
161                    let mut pending = session.channel.pending.lock().await;
162                    if let Some(tx) = pending.remove(id_str) {
163                        let tool_resp = if let Some(result) = parsed.result {
164                            ToolExecuteResponse {
165                                action_id: id_str.to_string(),
166                                output: Some(result),
167                                error: None,
168                            }
169                        } else {
170                            let err_msg = parsed
171                                .error
172                                .as_ref()
173                                .and_then(|e| e.get("message"))
174                                .and_then(|m| m.as_str())
175                                .unwrap_or("unknown error")
176                                .to_string();
177                            ToolExecuteResponse {
178                                action_id: id_str.to_string(),
179                                output: None,
180                                error: Some(err_msg),
181                            }
182                        };
183                        let _ = tx.send(tool_resp);
184                        continue;
185                    }
186                }
187            }
188
189            // Otherwise it's a client request
190            if let Some(method) = &parsed.method {
191                info!(method = %method, "dispatching JSON-RPC method");
192                let result = match method.as_str() {
193                    "session.init" => handle_session_init(&parsed, &session).await,
194                    "host.subscribe" => handle_host_subscribe(&session).await,
195                    "host.agents" => handle_host_agents(&session).await,
196                    "host.events" => handle_host_events(&parsed, &session).await,
197                    "host.approvals" => handle_host_approvals(&session).await,
198                    "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
199                    "host.unregister_agent" => {
200                        handle_host_unregister_agent(&parsed, &session).await
201                    }
202                    "host.set_status" => handle_host_set_status(&parsed, &session).await,
203                    "host.notify" => handle_host_notify(&parsed, &session).await,
204                    "host.request_approval" => {
205                        handle_host_request_approval(&parsed, &session).await
206                    }
207                    "host.resolve_approval" => {
208                        handle_host_resolve_approval(&parsed, &session).await
209                    }
210                    "tools.register" => handle_tools_register(&parsed, &session).await,
211                    "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
212                    "policy.register" => handle_policy_register(&parsed, &session).await,
213                    "session.policy.open" => handle_session_policy_open(&session).await,
214                    "session.policy.close" => {
215                        handle_session_policy_close(&parsed, &session).await
216                    }
217                    "verify" => handle_verify(&parsed, &session).await,
218                    "state.get" => handle_state_get(&parsed, &session).await,
219                    "state.set" => handle_state_set(&parsed, &session).await,
220                    "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
221                    "memory.query" => handle_memory_query(&parsed, &session).await,
222                    "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
223                    "memory.consolidate" => handle_memory_consolidate(&session).await,
224                    "memory.fact_count" => handle_memory_fact_count(&session).await,
225                    "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
226                    "skill.find" => handle_skill_find(&parsed, &session).await,
227                    "skill.report" => handle_skill_report(&parsed, &session).await,
228                    "skill.repair" => handle_skill_repair(&parsed, &session).await,
229                    "skills.ingest_distilled" => {
230                        handle_skills_ingest_distilled(&parsed, &session).await
231                    }
232                    "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
233                    "skills.domains_needing_evolution" => {
234                        handle_skills_domains_needing_evolution(&parsed, &session).await
235                    }
236                    "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
237                    "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
238                    "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
239                    "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
240                    "multi.vote" => handle_multi_vote(&parsed, &session).await,
241                    "scheduler.create" => handle_scheduler_create(&parsed),
242                    "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
243                    "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
244                    "infer" => handle_infer(&parsed, &state, &session).await,
245                    "embed" => handle_embed(&parsed, &state).await,
246                    "classify" => handle_classify(&parsed, &state).await,
247                    "tokenize" => handle_tokenize(&parsed, &state).await,
248                    "detokenize" => handle_detokenize(&parsed, &state).await,
249                    "rerank" => handle_rerank(&parsed, &state).await,
250                    "transcribe" => handle_transcribe(&parsed, &state).await,
251                    "synthesize" => handle_synthesize(&parsed, &state).await,
252                    "speech.prepare" => handle_speech_prepare(&state).await,
253                    "models.route" => handle_models_route(&parsed, &state).await,
254                    "models.stats" => handle_models_stats(&state).await,
255                    "events.count" => handle_events_count(&session).await,
256                    "events.stats" => handle_events_stats(&session).await,
257                    "events.truncate" => handle_events_truncate(&parsed, &session).await,
258                    "events.clear" => handle_events_clear(&session).await,
259                    "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
260                    "models.list" => handle_models_list(&state),
261                    "models.list_unified" => handle_models_list_unified(&state),
262                    "models.search" => handle_models_search(&parsed, &state),
263                    "models.upgrades" => handle_models_upgrades(&state),
264                    "models.pull" => handle_models_pull(&parsed, &state).await,
265                    "models.install" => handle_models_pull(&parsed, &state).await,
266                    "skills.distill" => handle_skills_distill(&parsed, &state).await,
267                    "skills.list" => handle_skills_list(&parsed, &session).await,
268                    "browser.run" => handle_browser_run(&parsed, &session).await,
269                    "browser.close" => handle_browser_close(&session).await,
270                    "secret.put" => handle_secret_put(&parsed),
271                    "secret.get" => handle_secret_get(&parsed),
272                    "secret.delete" => handle_secret_delete(&parsed),
273                    "secret.status" => handle_secret_status(&parsed),
274                    "secret.available" => Ok(car_ffi_common::secrets::is_available()),
275                    "permissions.status" => handle_perm_status(&parsed),
276                    "permissions.request" => handle_perm_request(&parsed),
277                    "permissions.explain" => handle_perm_explain(&parsed),
278                    "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
279                    "accounts.list" => car_ffi_common::accounts::list(),
280                    "accounts.open" => {
281                        #[derive(serde::Deserialize, Default)]
282                        struct OpenParams {
283                            #[serde(default)]
284                            account_id: Option<String>,
285                        }
286                        let p: OpenParams =
287                            serde_json::from_value(parsed.params.clone()).unwrap_or_default();
288                        car_ffi_common::accounts::open_settings(p.account_id.as_deref())
289                    }
290                    "calendar.list" => car_ffi_common::integrations::calendar_list(),
291                    "calendar.events" => handle_calendar_events(&parsed),
292                    "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
293                    "contacts.find" => handle_contacts_find(&parsed),
294                    "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
295                    "mail.inbox" => handle_mail_inbox(&parsed),
296                    "mail.send" => handle_mail_send(&parsed),
297                    "messages.services" => car_ffi_common::integrations::messages_services(),
298                    "messages.chats" => handle_messages_chats(&parsed),
299                    "messages.send" => handle_messages_send(&parsed),
300                    "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
301                    "notes.find" => handle_notes_find(&parsed),
302                    "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
303                    "reminders.items" => handle_reminders_items(&parsed),
304                    "photos.albums" => car_ffi_common::integrations::photos_albums(),
305                    "bookmarks.list" => handle_bookmarks_list(&parsed),
306                    "files.locations" => car_ffi_common::integrations::files_locations(),
307                    "keychain.status" => car_ffi_common::integrations::keychain_status(),
308                    "health.status" => car_ffi_common::health::status(),
309                    "health.sleep" => handle_health_sleep(&parsed),
310                    "health.workouts" => handle_health_workouts(&parsed),
311                    "health.activity" => handle_health_activity(&parsed),
312                    "voice.transcribe_stream.start" => {
313                        handle_voice_transcribe_stream_start(&parsed, &state, &session).await
314                    }
315                    "voice.transcribe_stream.stop" => {
316                        handle_voice_transcribe_stream_stop(&parsed, &state).await
317                    }
318                    "voice.transcribe_stream.push" => {
319                        handle_voice_transcribe_stream_push(&parsed, &state).await
320                    }
321                    "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
322                    "voice.dispatch_turn" => {
323                        handle_voice_dispatch_turn(&parsed, &state, &session).await
324                    }
325                    "voice.cancel_turn" => handle_voice_cancel_turn().await,
326                    "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
327                    "inference.register_runner" => {
328                        handle_inference_register_runner(&session).await
329                    }
330                    "inference.runner.event" => {
331                        handle_inference_runner_event(&parsed).await
332                    }
333                    "inference.runner.complete" => {
334                        handle_inference_runner_complete(&parsed).await
335                    }
336                    "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
337                    "voice.providers.list" => {
338                        // Stateless: enumerates STT/TTS providers compiled into
339                        // this build. Runtime readiness (API key, permission,
340                        // model download) is reported via per-provider errors.
341                        serde_json::from_str::<serde_json::Value>(
342                            &car_voice::list_voice_providers_json(),
343                        )
344                        .map_err(|e| e.to_string())
345                    }
346                    "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
347                        .await
348                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
349                    "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
350                        .await
351                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
352                    "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
353                    "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
354                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
355                    "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
356                    "workflow.run" => handle_workflow_run(&parsed, &session).await,
357                    "workflow.verify" => handle_workflow_verify(&parsed),
358                    "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
359                    "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
360                    "meeting.list" => handle_meeting_list(&parsed),
361                    "meeting.get" => handle_meeting_get(&parsed),
362                    "registry.register" => handle_registry_register(&parsed),
363                    "registry.heartbeat" => handle_registry_heartbeat(&parsed),
364                    "registry.unregister" => handle_registry_unregister(&parsed),
365                    "registry.list" => handle_registry_list(&parsed),
366                    "registry.reap" => handle_registry_reap(&parsed),
367                    "admission.status" => handle_admission_status(&state),
368                    "a2a.start" => handle_a2a_start(&parsed).await,
369                    "a2a.stop" => handle_a2a_stop(),
370                    "a2a.status" => handle_a2a_status(),
371                    "a2a.send" => handle_a2a_send(&parsed, &state).await,
372                    "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
373                    "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
374                    "a2ui.capabilities" => handle_a2ui_capabilities(&state),
375                    "a2ui.reap" => handle_a2ui_reap(&state).await,
376                    "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
377                    "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
378                    "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
379                    "automation.run_applescript" => handle_run_applescript(&parsed).await,
380                    "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
381                    "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
382                    "vision.ocr" => handle_vision_ocr(&parsed).await,
383                    _ => Err(format!("unknown method: {}", method)),
384                };
385
386                let resp = match result {
387                    Ok(value) => JsonRpcResponse::success(parsed.id, value),
388                    Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
389                };
390                send_response(&session.channel, resp).await?;
391            }
392        } else if msg.is_close() {
393            info!("Client {} disconnected", client_id);
394            break;
395        }
396    }
397
398    session.host.unsubscribe(&client_id).await;
399
400    // Fix for MULTI-4 / WS-3: drop the session from the registry and
401    // drain any pending tool callbacks. Without this, every connection
402    // we ever accepted keeps an `Arc<ClientSession>` alive in
403    // `state.sessions`, and outstanding `oneshot::Sender`s in
404    // `session.channel.pending` outlive the closed connection until
405    // their per-call timeout (60s). Dropping the senders here causes any
406    // awaiting `recv()` in `WsToolExecutor::execute` to return
407    // `RecvError` immediately, which the existing error-handler path
408    // already maps to "callback channel closed" — same shape as the
409    // timeout path, just faster.
410    let _removed = state.remove_session(&client_id).await;
411    {
412        let mut pending = session.channel.pending.lock().await;
413        pending.clear();
414    }
415
416    Ok(())
417}
418
419async fn send_response(
420    channel: &WsChannel,
421    resp: JsonRpcResponse,
422) -> Result<(), Box<dyn std::error::Error>> {
423    use futures::SinkExt;
424    let json = serde_json::to_string(&resp)?;
425    channel
426        .write
427        .lock()
428        .await
429        .send(Message::Text(json.into()))
430        .await?;
431    Ok(())
432}
433
434// --- Request handlers ---
435
436async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
437    session
438        .host
439        .subscribe(&session.client_id, session.channel.clone())
440        .await;
441    serde_json::to_value(HostSnapshot {
442        subscribed: true,
443        agents: session.host.agents().await,
444        approvals: session.host.approvals().await,
445        events: session.host.events(50).await,
446    })
447    .map_err(|e| e.to_string())
448}
449
450async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
451    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
452}
453
454async fn handle_host_events(
455    req: &JsonRpcMessage,
456    session: &crate::session::ClientSession,
457) -> Result<Value, String> {
458    let limit = req
459        .params
460        .get("limit")
461        .and_then(|v| v.as_u64())
462        .unwrap_or(100) as usize;
463    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
464}
465
466async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
467    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
468}
469
470async fn handle_a2ui_apply(
471    req: &JsonRpcMessage,
472    state: &Arc<ServerState>,
473) -> Result<Value, String> {
474    #[derive(Deserialize)]
475    struct Params {
476        #[serde(default)]
477        envelope: Option<car_a2ui::A2uiEnvelope>,
478        #[serde(default)]
479        message: Option<car_a2ui::A2uiEnvelope>,
480    }
481
482    let envelope = if req.params.get("createSurface").is_some()
483        || req.params.get("updateComponents").is_some()
484        || req.params.get("updateDataModel").is_some()
485        || req.params.get("deleteSurface").is_some()
486    {
487        serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
488            .map_err(|e| e.to_string())?
489    } else {
490        match serde_json::from_value::<Params>(req.params.clone()) {
491            Ok(params) => params
492                .envelope
493                .or(params.message)
494                .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
495            Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
496                .map_err(|e| e.to_string())?,
497        }
498    };
499
500    apply_a2ui_envelope(state, envelope, None, None).await
501}
502
503async fn handle_a2ui_ingest(
504    req: &JsonRpcMessage,
505    state: &Arc<ServerState>,
506) -> Result<Value, String> {
507    #[derive(Deserialize)]
508    #[serde(rename_all = "camelCase")]
509    struct Params {
510        #[serde(default)]
511        endpoint: Option<String>,
512        #[serde(default)]
513        a2a_endpoint: Option<String>,
514        #[serde(default)]
515        owner: Option<car_a2ui::A2uiSurfaceOwner>,
516        #[serde(default)]
517        route_auth: Option<A2aRouteAuth>,
518        #[serde(default)]
519        allow_untrusted_endpoint: bool,
520    }
521
522    let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
523        endpoint: None,
524        a2a_endpoint: None,
525        owner: None,
526        route_auth: None,
527        allow_untrusted_endpoint: false,
528    });
529    let payload = req.params.get("payload").unwrap_or(&req.params);
530    state
531        .a2ui
532        .validate_payload(payload)
533        .map_err(|e| e.to_string())?;
534    let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
535    if envelopes.is_empty() {
536        return Err("no A2UI envelopes found in payload".into());
537    }
538    let endpoint = params.endpoint.or(params.a2a_endpoint);
539    let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
540    let owner = params
541        .owner
542        .or_else(|| car_a2ui::owner_from_value(payload))
543        .map(|owner| match endpoint.clone() {
544            Some(endpoint) => owner.with_endpoint(Some(endpoint)),
545            None => owner,
546        });
547
548    let mut results = Vec::new();
549    for envelope in envelopes {
550        let value =
551            apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
552        results.push(value);
553    }
554    Ok(serde_json::json!({ "applied": results }))
555}
556
557async fn apply_a2ui_envelope(
558    state: &Arc<ServerState>,
559    envelope: car_a2ui::A2uiEnvelope,
560    owner: Option<car_a2ui::A2uiSurfaceOwner>,
561    route_auth: Option<A2aRouteAuth>,
562) -> Result<Value, String> {
563    let result = state
564        .a2ui
565        .apply_with_owner(envelope, owner)
566        .await
567        .map_err(|e| e.to_string())?;
568    update_a2ui_route_auth(state, &result, route_auth).await;
569    let kind = if result.deleted {
570        "a2ui.surface_deleted"
571    } else {
572        "a2ui.surface_updated"
573    };
574    let message = if result.deleted {
575        format!("A2UI surface {} deleted", result.surface_id)
576    } else {
577        format!("A2UI surface {} updated", result.surface_id)
578    };
579    let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
580    state.host.record_event(kind, None, message, payload).await;
581    serde_json::to_value(result).map_err(|e| e.to_string())
582}
583
584async fn update_a2ui_route_auth(
585    state: &Arc<ServerState>,
586    result: &car_a2ui::A2uiApplyResult,
587    route_auth: Option<A2aRouteAuth>,
588) {
589    let mut auth = state.a2ui_route_auth.lock().await;
590    if result.deleted {
591        auth.remove(&result.surface_id);
592        return;
593    }
594
595    let has_route_endpoint = result
596        .surface
597        .as_ref()
598        .and_then(|surface| surface.owner.as_ref())
599        .and_then(|owner| owner.endpoint.as_ref())
600        .is_some();
601    match (has_route_endpoint, route_auth) {
602        (true, Some(route_auth)) => {
603            auth.insert(result.surface_id.clone(), route_auth);
604        }
605        _ => {
606            auth.remove(&result.surface_id);
607        }
608    }
609}
610
611fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
612    serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
613}
614
615async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
616    let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
617    if !removed.is_empty() {
618        let mut auth = state.a2ui_route_auth.lock().await;
619        for surface_id in &removed {
620            auth.remove(surface_id);
621        }
622    }
623    Ok(serde_json::json!({ "removed": removed }))
624}
625
626async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
627    serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
628}
629
630async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
631    let surface_id = req
632        .params
633        .get("surface_id")
634        .or_else(|| req.params.get("surfaceId"))
635        .and_then(Value::as_str)
636        .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
637    serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
638}
639
640async fn handle_a2ui_action(
641    req: &JsonRpcMessage,
642    state: &Arc<ServerState>,
643) -> Result<Value, String> {
644    let action: car_a2ui::ClientAction =
645        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
646    let owner = state.a2ui.owner(&action.surface_id).await;
647    let route = route_a2ui_action(state, &action, owner.clone()).await;
648    let payload = serde_json::json!({
649        "action": action,
650        "owner": owner,
651        "route": route,
652    });
653    let event = state
654        .host
655        .record_event(
656            "a2ui.action",
657            None,
658            format!(
659                "A2UI action {} from {}",
660                action.name, action.source_component_id
661            ),
662            payload,
663        )
664        .await;
665    Ok(serde_json::json!({
666        "event": event,
667        "route": route,
668    }))
669}
670
671async fn route_a2ui_action(
672    state: &Arc<ServerState>,
673    action: &car_a2ui::ClientAction,
674    owner: Option<car_a2ui::A2uiSurfaceOwner>,
675) -> Value {
676    let Some(owner) = owner else {
677        return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
678    };
679    if owner.kind != "a2a" {
680        return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
681    }
682    let Some(endpoint) = owner.endpoint.clone() else {
683        return serde_json::json!({
684            "delivered": false,
685            "reason": "surface owner has no endpoint",
686            "owner": owner
687        });
688    };
689
690    let message = car_a2a::Message {
691        message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
692        role: car_a2a::MessageRole::User,
693        parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
694            data: serde_json::json!({
695                "a2uiAction": action,
696            }),
697            metadata: Default::default(),
698        })],
699        task_id: owner.task_id.clone(),
700        context_id: owner.context_id.clone(),
701        metadata: Default::default(),
702    };
703
704    let auth = state
705        .a2ui_route_auth
706        .lock()
707        .await
708        .get(&action.surface_id)
709        .cloned()
710        .map(client_auth_from_route_auth)
711        .unwrap_or(car_a2a::ClientAuth::None);
712
713    match car_a2a::A2aClient::new(endpoint.clone())
714        .with_auth(auth)
715        .send_message(message, false)
716        .await
717    {
718        Ok(result) => serde_json::json!({
719            "delivered": true,
720            "owner": owner,
721            "endpoint": endpoint,
722            "result": result,
723        }),
724        Err(error) => serde_json::json!({
725            "delivered": false,
726            "owner": owner,
727            "endpoint": endpoint,
728            "error": error.to_string(),
729        }),
730    }
731}
732
733fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
734    match auth {
735        A2aRouteAuth::None => car_a2a::ClientAuth::None,
736        A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
737        A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
738    }
739}
740
741fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
742    let endpoint = endpoint?;
743    if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
744        Some(endpoint)
745    } else {
746        None
747    }
748}
749
750fn is_loopback_http_endpoint(endpoint: &str) -> bool {
751    endpoint == "http://localhost"
752        || endpoint.starts_with("http://localhost:")
753        || endpoint.starts_with("http://localhost/")
754        || endpoint == "http://127.0.0.1"
755        || endpoint.starts_with("http://127.0.0.1:")
756        || endpoint.starts_with("http://127.0.0.1/")
757        || endpoint == "http://[::1]"
758        || endpoint.starts_with("http://[::1]:")
759        || endpoint.starts_with("http://[::1]/")
760}
761
762async fn handle_host_register_agent(
763    req: &JsonRpcMessage,
764    session: &crate::session::ClientSession,
765) -> Result<Value, String> {
766    let request: RegisterHostAgentRequest =
767        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
768    serde_json::to_value(
769        session
770            .host
771            .register_agent(&session.client_id, request)
772            .await?,
773    )
774    .map_err(|e| e.to_string())
775}
776
777async fn handle_host_unregister_agent(
778    req: &JsonRpcMessage,
779    session: &crate::session::ClientSession,
780) -> Result<Value, String> {
781    let agent_id = req
782        .params
783        .get("agent_id")
784        .and_then(|v| v.as_str())
785        .ok_or("missing agent_id")?;
786    session.host.unregister_agent(agent_id).await?;
787    Ok(serde_json::json!({"ok": true}))
788}
789
790async fn handle_host_set_status(
791    req: &JsonRpcMessage,
792    session: &crate::session::ClientSession,
793) -> Result<Value, String> {
794    let request: SetHostAgentStatusRequest =
795        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
796    serde_json::to_value(session.host.set_status(request).await?).map_err(|e| e.to_string())
797}
798
799async fn handle_host_notify(
800    req: &JsonRpcMessage,
801    session: &crate::session::ClientSession,
802) -> Result<Value, String> {
803    let kind = req
804        .params
805        .get("kind")
806        .and_then(|v| v.as_str())
807        .unwrap_or("host.notification");
808    let agent_id = req
809        .params
810        .get("agent_id")
811        .and_then(|v| v.as_str())
812        .map(str::to_string);
813    let message = req
814        .params
815        .get("message")
816        .and_then(|v| v.as_str())
817        .unwrap_or("");
818    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
819    serde_json::to_value(
820        session
821            .host
822            .record_event(kind, agent_id, message, payload)
823            .await,
824    )
825    .map_err(|e| e.to_string())
826}
827
828async fn handle_host_request_approval(
829    req: &JsonRpcMessage,
830    session: &crate::session::ClientSession,
831) -> Result<Value, String> {
832    let request: CreateHostApprovalRequest =
833        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
834    if let Some(agent_id) = &request.agent_id {
835        let _ = session
836            .host
837            .set_status(SetHostAgentStatusRequest {
838                agent_id: agent_id.clone(),
839                status: HostAgentStatus::WaitingForApproval,
840                current_task: None,
841                message: Some("Waiting for approval".to_string()),
842                payload: Value::Null,
843            })
844            .await;
845    }
846    serde_json::to_value(session.host.create_approval(request).await?).map_err(|e| e.to_string())
847}
848
849async fn handle_host_resolve_approval(
850    req: &JsonRpcMessage,
851    session: &crate::session::ClientSession,
852) -> Result<Value, String> {
853    let request: ResolveHostApprovalRequest =
854        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
855    serde_json::to_value(session.host.resolve_approval(request).await?).map_err(|e| e.to_string())
856}
857
858async fn handle_session_init(
859    req: &JsonRpcMessage,
860    session: &crate::session::ClientSession,
861) -> Result<Value, String> {
862    let init: SessionInitRequest =
863        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
864
865    for tool in &init.tools {
866        register_from_definition(&session.runtime, tool).await;
867    }
868
869    let mut policy_count = 0;
870    {
871        let mut policies = session.runtime.policies.write().await;
872        for policy_def in &init.policies {
873            if let Some(check) = build_policy_check(policy_def) {
874                policies.register(&policy_def.name, check, "");
875                policy_count += 1;
876            }
877        }
878    }
879
880    serde_json::to_value(SessionInitResponse {
881        session_id: session.client_id.clone(),
882        tools_registered: init.tools.len(),
883        policies_registered: policy_count,
884    })
885    .map_err(|e| e.to_string())
886}
887
888fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
889    match def.rule.as_str() {
890        "deny_tool" => {
891            let target = def.target.clone();
892            Some(Box::new(
893                move |action: &car_ir::Action, _: &car_state::StateStore| {
894                    if action.tool.as_deref() == Some(&target) {
895                        Some(format!("tool '{}' denied", target))
896                    } else {
897                        None
898                    }
899                },
900            ))
901        }
902        "require_state" => {
903            let key = def.key.clone();
904            let value = def.value.clone();
905            Some(Box::new(
906                move |_: &car_ir::Action, state: &car_state::StateStore| {
907                    if state.get(&key).as_ref() != Some(&value) {
908                        Some(format!("state['{}'] must be {:?}", key, value))
909                    } else {
910                        None
911                    }
912                },
913            ))
914        }
915        "deny_tool_param" => {
916            let target = def.target.clone();
917            let param = def.key.clone();
918            let pattern = def.pattern.clone();
919            Some(Box::new(
920                move |action: &car_ir::Action, _: &car_state::StateStore| {
921                    if action.tool.as_deref() != Some(&target) {
922                        return None;
923                    }
924                    if let Some(val) = action.parameters.get(&param) {
925                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
926                        if s.contains(&pattern) {
927                            return Some(format!("param '{}' matches '{}'", param, pattern));
928                        }
929                    }
930                    None
931                },
932            ))
933        }
934        _ => None,
935    }
936}
937
938async fn handle_tools_register(
939    req: &JsonRpcMessage,
940    session: &crate::session::ClientSession,
941) -> Result<Value, String> {
942    let tools: Vec<ToolDefinition> =
943        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
944    for tool in &tools {
945        register_from_definition(&session.runtime, tool).await;
946    }
947    Ok(Value::from(tools.len()))
948}
949
950/// Bridge a wire-protocol `ToolDefinition` to the engine's
951/// schema-aware registration. Carries the full ToolSchema shape
952/// (description, parameters, returns, idempotency, caching, rate
953/// limit) through to the validator. An empty `parameters` object is
954/// the legacy schemaless registration — the validator no-ops for
955/// those, so pre-v0.5.x callers see no change.
956async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
957    runtime
958        .register_tool_schema(car_ir::ToolSchema {
959            name: def.name.clone(),
960            description: def.description.clone(),
961            parameters: def.parameters.clone(),
962            returns: def.returns.clone(),
963            idempotent: def.idempotent,
964            cache_ttl_secs: def.cache_ttl_secs,
965            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
966                max_calls: rl.max_calls,
967                interval_secs: rl.interval_secs,
968            }),
969        })
970        .await;
971}
972
973async fn handle_proposal_submit(
974    req: &JsonRpcMessage,
975    session: &crate::session::ClientSession,
976) -> Result<Value, String> {
977    let submit: ProposalSubmitRequest =
978        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
979    // `session_id` is sibling to `proposal` in the params object —
980    // not part of `ProposalSubmitRequest` (kept proto-compatible). When
981    // present, executes the proposal under the named session so any
982    // session-scoped policies layer on top of global ones.
983    // See docs/proposals/per-session-policy-scoping.md.
984    let session_id = req
985        .params
986        .get("session_id")
987        .and_then(|v| v.as_str())
988        .map(str::to_string);
989    let result = match session_id {
990        Some(sid) => session.runtime.execute_with_session(&submit.proposal, &sid).await,
991        None => session.runtime.execute(&submit.proposal).await,
992    };
993    serde_json::to_value(result).map_err(|e| e.to_string())
994}
995
996async fn handle_session_policy_open(
997    session: &crate::session::ClientSession,
998) -> Result<Value, String> {
999    let id = session.runtime.open_session().await;
1000    Ok(serde_json::json!({ "session_id": id }))
1001}
1002
1003async fn handle_session_policy_close(
1004    req: &JsonRpcMessage,
1005    session: &crate::session::ClientSession,
1006) -> Result<Value, String> {
1007    let sid = req
1008        .params
1009        .get("session_id")
1010        .and_then(|v| v.as_str())
1011        .ok_or("missing 'session_id'")?;
1012    let closed = session.runtime.close_session(sid).await;
1013    Ok(serde_json::json!({ "closed": closed }))
1014}
1015
1016/// `policy.register` — register one policy against this WebSocket
1017/// session's runtime. Mirrors the `PolicyDefinition` shape used by
1018/// `session.init`. When `session_id` is present, the policy is scoped
1019/// to the named in-runtime session opened via `session.policy.open`;
1020/// otherwise it is global.
1021async fn handle_policy_register(
1022    req: &JsonRpcMessage,
1023    session: &crate::session::ClientSession,
1024) -> Result<Value, String> {
1025    let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1026        .map_err(|e| format!("invalid policy params: {e}"))?;
1027    let session_id = req
1028        .params
1029        .get("session_id")
1030        .and_then(|v| v.as_str())
1031        .map(str::to_string);
1032    let check = build_policy_check(&def).ok_or_else(|| {
1033        format!("unsupported policy rule '{}'", def.rule)
1034    })?;
1035    match session_id {
1036        Some(sid) => session
1037            .runtime
1038            .register_policy_in_session(&sid, &def.name, check, "")
1039            .await
1040            .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1041        None => {
1042            let mut policies = session.runtime.policies.write().await;
1043            policies.register(&def.name, check, "");
1044            Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1045        }
1046    }
1047}
1048
1049async fn handle_verify(
1050    req: &JsonRpcMessage,
1051    session: &crate::session::ClientSession,
1052) -> Result<Value, String> {
1053    let vr: VerifyRequest =
1054        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1055    let tools: std::collections::HashSet<String> =
1056        session.runtime.tools.read().await.keys().cloned().collect();
1057    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1058    serde_json::to_value(VerifyResponse {
1059        valid: result.valid,
1060        issues: result
1061            .issues
1062            .iter()
1063            .map(|i| VerifyIssueProto {
1064                action_id: i.action_id.clone(),
1065                severity: i.severity.clone(),
1066                message: i.message.clone(),
1067            })
1068            .collect(),
1069        simulated_state: result.simulated_state,
1070    })
1071    .map_err(|e| e.to_string())
1072}
1073
1074async fn handle_state_get(
1075    req: &JsonRpcMessage,
1076    session: &crate::session::ClientSession,
1077) -> Result<Value, String> {
1078    let key = req
1079        .params
1080        .get("key")
1081        .and_then(|v| v.as_str())
1082        .ok_or("missing 'key'")?;
1083    Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
1084}
1085
1086async fn handle_state_set(
1087    req: &JsonRpcMessage,
1088    session: &crate::session::ClientSession,
1089) -> Result<Value, String> {
1090    let key = req
1091        .params
1092        .get("key")
1093        .and_then(|v| v.as_str())
1094        .ok_or("missing 'key'")?;
1095    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1096    session.runtime.state.set(key, value, "client");
1097    Ok(Value::from("ok"))
1098}
1099
1100// --- Memory handlers ---
1101
1102/// `memory.fact_count` — return `valid_fact_count()` of the
1103/// session's memgine. Used by FFI bindings to mirror their
1104/// embedded `fact_count()` accessor without round-tripping a full
1105/// query. No params.
1106async fn handle_memory_fact_count(
1107    session: &crate::session::ClientSession,
1108) -> Result<Value, String> {
1109    let engine = session.memgine.lock().await;
1110    Ok(Value::from(engine.valid_fact_count()))
1111}
1112
1113async fn handle_memory_add_fact(
1114    req: &JsonRpcMessage,
1115    session: &crate::session::ClientSession,
1116) -> Result<Value, String> {
1117    let subject = req
1118        .params
1119        .get("subject")
1120        .and_then(|v| v.as_str())
1121        .ok_or("missing subject")?;
1122    let body = req
1123        .params
1124        .get("body")
1125        .and_then(|v| v.as_str())
1126        .ok_or("missing body")?;
1127    let kind = req
1128        .params
1129        .get("kind")
1130        .and_then(|v| v.as_str())
1131        .unwrap_or("pattern");
1132    let mut engine = session.memgine.lock().await;
1133    let fid = format!("ws-{}", engine.valid_fact_count());
1134    engine.ingest_fact(
1135        &fid,
1136        subject,
1137        body,
1138        "user",
1139        "peer",
1140        chrono::Utc::now(),
1141        "global",
1142        None,
1143        vec![],
1144        kind == "constraint",
1145    );
1146    Ok(Value::from(engine.valid_fact_count()))
1147}
1148
1149async fn handle_memory_query(
1150    req: &JsonRpcMessage,
1151    session: &crate::session::ClientSession,
1152) -> Result<Value, String> {
1153    let query = req
1154        .params
1155        .get("query")
1156        .and_then(|v| v.as_str())
1157        .ok_or("missing query")?;
1158    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
1159    let engine = session.memgine.lock().await;
1160    let seeds = engine.graph.find_seeds(query, 5);
1161    let hits = if !seeds.is_empty() {
1162        engine.graph.retrieve(&seeds, 3, k, 0.6, 0.05)
1163    } else {
1164        vec![]
1165    };
1166    let results: Vec<Value> = hits.iter().filter_map(|hit| {
1167        let node = engine.graph.inner.node_weight(hit.node_ix)?;
1168        Some(serde_json::json!({"subject": node.key, "body": node.value, "activation": hit.activation}))
1169    }).collect();
1170    serde_json::to_value(results).map_err(|e| e.to_string())
1171}
1172
1173async fn handle_memory_build_context(
1174    req: &JsonRpcMessage,
1175    session: &crate::session::ClientSession,
1176) -> Result<Value, String> {
1177    let query = req
1178        .params
1179        .get("query")
1180        .and_then(|v| v.as_str())
1181        .unwrap_or("");
1182    let mut engine = session.memgine.lock().await;
1183    Ok(Value::from(engine.build_context(query)))
1184}
1185
1186// --- Skill handlers ---
1187
1188async fn handle_skill_ingest(
1189    req: &JsonRpcMessage,
1190    session: &crate::session::ClientSession,
1191) -> Result<Value, String> {
1192    let name = req
1193        .params
1194        .get("name")
1195        .and_then(|v| v.as_str())
1196        .ok_or("missing name")?;
1197    let code = req
1198        .params
1199        .get("code")
1200        .and_then(|v| v.as_str())
1201        .ok_or("missing code")?;
1202    let platform = req
1203        .params
1204        .get("platform")
1205        .and_then(|v| v.as_str())
1206        .unwrap_or("unknown");
1207    let persona = req
1208        .params
1209        .get("persona")
1210        .and_then(|v| v.as_str())
1211        .unwrap_or("");
1212    let url_pattern = req
1213        .params
1214        .get("url_pattern")
1215        .and_then(|v| v.as_str())
1216        .unwrap_or("");
1217    let description = req
1218        .params
1219        .get("description")
1220        .and_then(|v| v.as_str())
1221        .unwrap_or("");
1222    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
1223    let keywords: Vec<String> = req
1224        .params
1225        .get("task_keywords")
1226        .and_then(|v| v.as_array())
1227        .map(|arr| {
1228            arr.iter()
1229                .filter_map(|v| v.as_str().map(String::from))
1230                .collect()
1231        })
1232        .unwrap_or_default();
1233
1234    let trigger = car_memgine::SkillTrigger {
1235        persona: persona.into(),
1236        url_pattern: url_pattern.into(),
1237        task_keywords: keywords,
1238    };
1239    let mut engine = session.memgine.lock().await;
1240    engine.ingest_skill(
1241        name,
1242        code,
1243        platform,
1244        trigger,
1245        description,
1246        supersedes,
1247        vec![],
1248        vec![],
1249    );
1250    Ok(Value::from("ok"))
1251}
1252
1253async fn handle_skill_find(
1254    req: &JsonRpcMessage,
1255    session: &crate::session::ClientSession,
1256) -> Result<Value, String> {
1257    let persona = req
1258        .params
1259        .get("persona")
1260        .and_then(|v| v.as_str())
1261        .unwrap_or("");
1262    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
1263    let task = req
1264        .params
1265        .get("task")
1266        .and_then(|v| v.as_str())
1267        .unwrap_or("");
1268    let max = req
1269        .params
1270        .get("max_results")
1271        .and_then(|v| v.as_u64())
1272        .unwrap_or(1) as usize;
1273    let engine = session.memgine.lock().await;
1274    let results = engine.find_skill(persona, url, task, max);
1275    let json: Vec<Value> = results
1276        .iter()
1277        .map(|(m, s)| {
1278            serde_json::json!({
1279                "name": m.name, "code": m.code, "platform": m.platform,
1280                "description": m.description, "stats": m.stats, "match_score": s,
1281            })
1282        })
1283        .collect();
1284    serde_json::to_value(json).map_err(|e| e.to_string())
1285}
1286
1287async fn handle_skill_report(
1288    req: &JsonRpcMessage,
1289    session: &crate::session::ClientSession,
1290) -> Result<Value, String> {
1291    let name = req
1292        .params
1293        .get("skill_name")
1294        .and_then(|v| v.as_str())
1295        .ok_or("missing skill_name")?;
1296    let outcome_str = req
1297        .params
1298        .get("outcome")
1299        .and_then(|v| v.as_str())
1300        .ok_or("missing outcome")?;
1301    let outcome = match outcome_str {
1302        "success" => car_memgine::SkillOutcome::Success,
1303        _ => car_memgine::SkillOutcome::Fail,
1304    };
1305    let mut engine = session.memgine.lock().await;
1306    let stats = engine
1307        .report_outcome(name, outcome)
1308        .ok_or(format!("skill '{}' not found", name))?;
1309    serde_json::to_value(stats).map_err(|e| e.to_string())
1310}
1311
1312// ---------------------------------------------------------------------------
1313// Multi-agent coordination handlers
1314//
1315// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
1316// The client runs the model loop and responds with AgentOutput JSON.
1317// ---------------------------------------------------------------------------
1318
1319/// AgentRunner backed by WebSocket callback to the client.
1320struct WsAgentRunner {
1321    channel: Arc<WsChannel>,
1322    host: Arc<crate::host::HostState>,
1323    client_id: String,
1324}
1325
1326#[async_trait::async_trait]
1327impl car_multi::AgentRunner for WsAgentRunner {
1328    async fn run(
1329        &self,
1330        spec: &car_multi::AgentSpec,
1331        task: &str,
1332        _runtime: &car_engine::Runtime,
1333        _mailbox: &car_multi::Mailbox,
1334    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
1335        use futures::SinkExt;
1336
1337        let request_id = self.channel.next_request_id();
1338        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
1339        let agent = self
1340            .host
1341            .register_agent(
1342                &self.client_id,
1343                RegisterHostAgentRequest {
1344                    id: Some(agent_id.clone()),
1345                    name: spec.name.clone(),
1346                    kind: "callback".to_string(),
1347                    capabilities: spec.tools.clone(),
1348                    project: spec
1349                        .metadata
1350                        .get("project")
1351                        .and_then(|v| v.as_str())
1352                        .map(str::to_string),
1353                    pid: None,
1354                    display: serde_json::from_value(
1355                        spec.metadata
1356                            .get("display")
1357                            .cloned()
1358                            .unwrap_or(serde_json::Value::Null),
1359                    )
1360                    .unwrap_or_default(),
1361                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
1362                },
1363            )
1364            .await
1365            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
1366        let _ = self
1367            .host
1368            .set_status(SetHostAgentStatusRequest {
1369                agent_id: agent.id.clone(),
1370                status: HostAgentStatus::Running,
1371                current_task: Some(task.to_string()),
1372                message: Some(format!("{} started", spec.name)),
1373                payload: serde_json::json!({ "task": task }),
1374            })
1375            .await;
1376
1377        let rpc_request = serde_json::json!({
1378            "jsonrpc": "2.0",
1379            "method": "multi.run_agent",
1380            "params": {
1381                "spec": spec,
1382                "task": task,
1383            },
1384            "id": request_id,
1385        });
1386
1387        // Create oneshot channel for the response
1388        let (tx, rx) = tokio::sync::oneshot::channel();
1389        self.channel
1390            .pending
1391            .lock()
1392            .await
1393            .insert(request_id.clone(), tx);
1394
1395        let msg = Message::Text(
1396            serde_json::to_string(&rpc_request)
1397                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
1398                .into(),
1399        );
1400        if let Err(e) = self.channel.write.lock().await.send(msg).await {
1401            let _ = self
1402                .host
1403                .set_status(SetHostAgentStatusRequest {
1404                    agent_id: agent_id.clone(),
1405                    status: HostAgentStatus::Errored,
1406                    current_task: None,
1407                    message: Some(format!("{} failed to start", spec.name)),
1408                    payload: serde_json::json!({ "error": e.to_string() }),
1409                })
1410                .await;
1411            return Err(car_multi::MultiError::AgentFailed(
1412                spec.name.clone(),
1413                format!("ws send error: {}", e),
1414            ));
1415        }
1416
1417        // Wait for client response (5 min timeout for model loops)
1418        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
1419            Ok(Ok(response)) => response,
1420            Ok(Err(_)) => {
1421                let _ = self
1422                    .host
1423                    .set_status(SetHostAgentStatusRequest {
1424                        agent_id: agent_id.clone(),
1425                        status: HostAgentStatus::Errored,
1426                        current_task: None,
1427                        message: Some(format!("{} callback channel closed", spec.name)),
1428                        payload: Value::Null,
1429                    })
1430                    .await;
1431                return Err(car_multi::MultiError::AgentFailed(
1432                    spec.name.clone(),
1433                    "agent callback channel closed".into(),
1434                ));
1435            }
1436            Err(_) => {
1437                let _ = self
1438                    .host
1439                    .set_status(SetHostAgentStatusRequest {
1440                        agent_id: agent_id.clone(),
1441                        status: HostAgentStatus::Errored,
1442                        current_task: None,
1443                        message: Some(format!("{} timed out", spec.name)),
1444                        payload: Value::Null,
1445                    })
1446                    .await;
1447                return Err(car_multi::MultiError::AgentFailed(
1448                    spec.name.clone(),
1449                    "agent callback timed out (300s)".into(),
1450                ));
1451            }
1452        };
1453
1454        if let Some(err) = response.error {
1455            let _ = self
1456                .host
1457                .set_status(SetHostAgentStatusRequest {
1458                    agent_id: agent_id.clone(),
1459                    status: HostAgentStatus::Errored,
1460                    current_task: None,
1461                    message: Some(format!("{} errored", spec.name)),
1462                    payload: serde_json::json!({ "error": err }),
1463                })
1464                .await;
1465            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
1466        }
1467
1468        let output_value = response.output.unwrap_or(Value::Null);
1469        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
1470            car_multi::MultiError::AgentFailed(
1471                spec.name.clone(),
1472                format!("invalid AgentOutput: {}", e),
1473            )
1474        })?;
1475        let status = if output.error.is_some() {
1476            HostAgentStatus::Errored
1477        } else {
1478            HostAgentStatus::Completed
1479        };
1480        let message = if output.error.is_some() {
1481            format!("{} errored", spec.name)
1482        } else {
1483            format!("{} completed", spec.name)
1484        };
1485        let _ = self
1486            .host
1487            .set_status(SetHostAgentStatusRequest {
1488                agent_id,
1489                status,
1490                current_task: None,
1491                message: Some(message),
1492                payload: serde_json::to_value(&output).unwrap_or(Value::Null),
1493            })
1494            .await;
1495
1496        Ok(output)
1497    }
1498}
1499
1500fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
1501    let safe_name: String = name
1502        .chars()
1503        .map(|c| {
1504            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
1505                c
1506            } else {
1507                '-'
1508            }
1509        })
1510        .collect();
1511    format!("{}:{}:{}", client_id, safe_name, request_id)
1512}
1513
1514async fn handle_multi_swarm(
1515    req: &JsonRpcMessage,
1516    session: &crate::session::ClientSession,
1517) -> Result<Value, String> {
1518    let mode_str = req
1519        .params
1520        .get("mode")
1521        .and_then(|v| v.as_str())
1522        .ok_or("missing 'mode'")?;
1523    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1524    let task = req
1525        .params
1526        .get("task")
1527        .and_then(|v| v.as_str())
1528        .ok_or("missing 'task'")?;
1529
1530    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
1531        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
1532    let agent_specs: Vec<car_multi::AgentSpec> =
1533        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1534    let synth: Option<car_multi::AgentSpec> = req
1535        .params
1536        .get("synthesizer")
1537        .map(|v| serde_json::from_value(v.clone()))
1538        .transpose()
1539        .map_err(|e| format!("invalid synthesizer: {}", e))?;
1540
1541    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1542        channel: session.channel.clone(),
1543        host: session.host.clone(),
1544        client_id: session.client_id.clone(),
1545    });
1546    let infra = car_multi::SharedInfra::new();
1547
1548    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
1549    if let Some(s) = synth {
1550        swarm = swarm.with_synthesizer(s);
1551    }
1552
1553    let result = swarm
1554        .run(task, &runner, &infra)
1555        .await
1556        .map_err(|e| format!("swarm error: {}", e))?;
1557    serde_json::to_value(result).map_err(|e| e.to_string())
1558}
1559
1560async fn handle_multi_pipeline(
1561    req: &JsonRpcMessage,
1562    session: &crate::session::ClientSession,
1563) -> Result<Value, String> {
1564    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
1565    let task = req
1566        .params
1567        .get("task")
1568        .and_then(|v| v.as_str())
1569        .ok_or("missing 'task'")?;
1570
1571    let stage_specs: Vec<car_multi::AgentSpec> =
1572        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
1573
1574    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1575        channel: session.channel.clone(),
1576        host: session.host.clone(),
1577        client_id: session.client_id.clone(),
1578    });
1579    let infra = car_multi::SharedInfra::new();
1580
1581    let result = car_multi::Pipeline::new(stage_specs)
1582        .run(task, &runner, &infra)
1583        .await
1584        .map_err(|e| format!("pipeline error: {}", e))?;
1585    serde_json::to_value(result).map_err(|e| e.to_string())
1586}
1587
1588async fn handle_multi_supervisor(
1589    req: &JsonRpcMessage,
1590    session: &crate::session::ClientSession,
1591) -> Result<Value, String> {
1592    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
1593    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
1594    let task = req
1595        .params
1596        .get("task")
1597        .and_then(|v| v.as_str())
1598        .ok_or("missing 'task'")?;
1599    let max_rounds = req
1600        .params
1601        .get("max_rounds")
1602        .and_then(|v| v.as_u64())
1603        .unwrap_or(3) as u32;
1604
1605    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
1606        .map_err(|e| format!("invalid workers: {}", e))?;
1607    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
1608        .map_err(|e| format!("invalid supervisor: {}", e))?;
1609
1610    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1611        channel: session.channel.clone(),
1612        host: session.host.clone(),
1613        client_id: session.client_id.clone(),
1614    });
1615    let infra = car_multi::SharedInfra::new();
1616
1617    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
1618        .with_max_rounds(max_rounds)
1619        .run(task, &runner, &infra)
1620        .await
1621        .map_err(|e| format!("supervisor error: {}", e))?;
1622    serde_json::to_value(result).map_err(|e| e.to_string())
1623}
1624
1625async fn handle_multi_map_reduce(
1626    req: &JsonRpcMessage,
1627    session: &crate::session::ClientSession,
1628) -> Result<Value, String> {
1629    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
1630    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
1631    let task = req
1632        .params
1633        .get("task")
1634        .and_then(|v| v.as_str())
1635        .ok_or("missing 'task'")?;
1636    let items_val = req.params.get("items").ok_or("missing 'items'")?;
1637
1638    let mapper_spec: car_multi::AgentSpec =
1639        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
1640    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
1641        .map_err(|e| format!("invalid reducer: {}", e))?;
1642    let items: Vec<String> =
1643        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
1644
1645    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1646        channel: session.channel.clone(),
1647        host: session.host.clone(),
1648        client_id: session.client_id.clone(),
1649    });
1650    let infra = car_multi::SharedInfra::new();
1651
1652    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
1653        .run(task, &items, &runner, &infra)
1654        .await
1655        .map_err(|e| format!("map_reduce error: {}", e))?;
1656    serde_json::to_value(result).map_err(|e| e.to_string())
1657}
1658
1659async fn handle_multi_vote(
1660    req: &JsonRpcMessage,
1661    session: &crate::session::ClientSession,
1662) -> Result<Value, String> {
1663    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1664    let task = req
1665        .params
1666        .get("task")
1667        .and_then(|v| v.as_str())
1668        .ok_or("missing 'task'")?;
1669
1670    let agent_specs: Vec<car_multi::AgentSpec> =
1671        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1672    let synth: Option<car_multi::AgentSpec> = req
1673        .params
1674        .get("synthesizer")
1675        .map(|v| serde_json::from_value(v.clone()))
1676        .transpose()
1677        .map_err(|e| format!("invalid synthesizer: {}", e))?;
1678
1679    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1680        channel: session.channel.clone(),
1681        host: session.host.clone(),
1682        client_id: session.client_id.clone(),
1683    });
1684    let infra = car_multi::SharedInfra::new();
1685
1686    let mut vote = car_multi::Vote::new(agent_specs);
1687    if let Some(s) = synth {
1688        vote = vote.with_synthesizer(s);
1689    }
1690
1691    let result = vote
1692        .run(task, &runner, &infra)
1693        .await
1694        .map_err(|e| format!("vote error: {}", e))?;
1695    serde_json::to_value(result).map_err(|e| e.to_string())
1696}
1697
1698// ---------------------------------------------------------------------------
1699// Scheduler handlers
1700// ---------------------------------------------------------------------------
1701
1702fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
1703    let name = req
1704        .params
1705        .get("name")
1706        .and_then(|v| v.as_str())
1707        .ok_or("scheduler.create requires 'name'")?;
1708    let prompt = req
1709        .params
1710        .get("prompt")
1711        .and_then(|v| v.as_str())
1712        .ok_or("scheduler.create requires 'prompt'")?;
1713
1714    let mut task = car_scheduler::Task::new(name, prompt);
1715
1716    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
1717        let trigger = match t {
1718            "once" => car_scheduler::TaskTrigger::Once,
1719            "cron" => car_scheduler::TaskTrigger::Cron,
1720            "interval" => car_scheduler::TaskTrigger::Interval,
1721            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
1722            _ => car_scheduler::TaskTrigger::Manual,
1723        };
1724        let schedule = req
1725            .params
1726            .get("schedule")
1727            .and_then(|v| v.as_str())
1728            .unwrap_or("");
1729        task = task.with_trigger(trigger, schedule);
1730    }
1731
1732    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
1733        task = task.with_system_prompt(sp);
1734    }
1735
1736    serde_json::to_value(&task).map_err(|e| e.to_string())
1737}
1738
1739async fn handle_scheduler_run(
1740    req: &JsonRpcMessage,
1741    session: &crate::session::ClientSession,
1742) -> Result<Value, String> {
1743    let task_val = req
1744        .params
1745        .get("task")
1746        .ok_or("scheduler.run requires 'task'")?;
1747    let mut task: car_scheduler::Task =
1748        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1749
1750    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1751        channel: session.channel.clone(),
1752        host: session.host.clone(),
1753        client_id: session.client_id.clone(),
1754    });
1755    let executor = car_scheduler::Executor::new(runner);
1756    let execution = executor.run_once(&mut task).await;
1757
1758    serde_json::to_value(&execution).map_err(|e| e.to_string())
1759}
1760
1761async fn handle_scheduler_run_loop(
1762    req: &JsonRpcMessage,
1763    session: &crate::session::ClientSession,
1764) -> Result<Value, String> {
1765    let task_val = req
1766        .params
1767        .get("task")
1768        .ok_or("scheduler.run_loop requires 'task'")?;
1769    let mut task: car_scheduler::Task =
1770        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1771    let max_iterations = req
1772        .params
1773        .get("max_iterations")
1774        .and_then(|v| v.as_u64())
1775        .map(|v| v as u32);
1776
1777    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1778        channel: session.channel.clone(),
1779        host: session.host.clone(),
1780        client_id: session.client_id.clone(),
1781    });
1782    let executor = car_scheduler::Executor::new(runner);
1783    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1784    let executions = executor
1785        .run_loop(&mut task, max_iterations, cancel_rx)
1786        .await;
1787
1788    serde_json::to_value(&executions).map_err(|e| e.to_string())
1789}
1790
1791// ---------------------------------------------------------------------------
1792// Inference handlers
1793// ---------------------------------------------------------------------------
1794
1795fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
1796    state.inference.get_or_init(|| {
1797        Arc::new(car_inference::InferenceEngine::new(
1798            car_inference::InferenceConfig::default(),
1799        ))
1800    })
1801}
1802
1803async fn handle_infer(
1804    msg: &JsonRpcMessage,
1805    state: &ServerState,
1806    session: &crate::session::ClientSession,
1807) -> Result<Value, String> {
1808    let engine = get_inference_engine(state);
1809    let mut req: car_inference::GenerateRequest =
1810        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1811
1812    // If context_query is provided, build context from memgine and inject it
1813    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
1814        let mut memgine = session.memgine.lock().await;
1815        let ctx = memgine.build_context(cq);
1816        if !ctx.is_empty() {
1817            req.context = Some(ctx);
1818        }
1819    }
1820
1821    // Process-wide admission gate. Held for the duration of the
1822    // generation so a burst of concurrent infer RPCs can't multiply
1823    // KV-cache + activation memory and take the host out. The
1824    // `_permit` binding is intentional — its `Drop` releases the slot
1825    // when this future returns.
1826    let _permit = state.admission.acquire().await;
1827
1828    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
1829    // latency_ms are preserved in the response. Plain `generate()` discards
1830    // everything except `.text`, which silently breaks tool-use over the
1831    // WebSocket protocol (issue #43).
1832    //
1833    // NOTE: This directly serializes `InferenceResult`. Any field added to
1834    // that struct in `car-inference` becomes part of the public WebSocket
1835    // protocol. The shape is locked by `inference_result_serializes_*` tests
1836    // in car-inference; updating those tests is part of intentionally
1837    // changing the wire contract.
1838    let result = engine
1839        .generate_tracked(req)
1840        .await
1841        .map_err(|e| e.to_string())?;
1842    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
1843}
1844
1845async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1846    let engine = get_inference_engine(state);
1847    let req: car_inference::EmbedRequest =
1848        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1849    // Embeds load their own model weights; share the same admission
1850    // gate as generations so a burst of embed requests can't smuggle
1851    // around the concurrency cap.
1852    let _permit = state.admission.acquire().await;
1853    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
1854    Ok(serde_json::json!({"embeddings": result}))
1855}
1856
1857async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1858    let engine = get_inference_engine(state);
1859    let req: car_inference::ClassifyRequest =
1860        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1861    let _permit = state.admission.acquire().await;
1862    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
1863    Ok(serde_json::json!({"classifications": result}))
1864}
1865
1866/// Surface the current admission state so the menubar tray and
1867/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
1868/// snapshot — racy by definition but correct enough for status panels.
1869fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
1870    let total = state.admission.permits();
1871    let available = state.admission.permits_available();
1872    let in_use = total.saturating_sub(available);
1873    Ok(serde_json::json!({
1874        "permits_total": total,
1875        "permits_available": available,
1876        "permits_in_use": in_use,
1877        "env_override": crate::admission::ENV_MAX_CONCURRENT,
1878    }))
1879}
1880
1881async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1882    let model = msg
1883        .params
1884        .get("model")
1885        .and_then(|v| v.as_str())
1886        .ok_or("missing 'model' parameter")?;
1887    let text = msg
1888        .params
1889        .get("text")
1890        .and_then(|v| v.as_str())
1891        .ok_or("missing 'text' parameter")?;
1892    let engine = get_inference_engine(state);
1893    let ids = engine
1894        .tokenize(model, text)
1895        .await
1896        .map_err(|e| e.to_string())?;
1897    Ok(serde_json::json!({"tokens": ids}))
1898}
1899
1900async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1901    let model = msg
1902        .params
1903        .get("model")
1904        .and_then(|v| v.as_str())
1905        .ok_or("missing 'model' parameter")?;
1906    let tokens: Vec<u32> = msg
1907        .params
1908        .get("tokens")
1909        .and_then(|v| v.as_array())
1910        .ok_or("missing 'tokens' parameter")?
1911        .iter()
1912        .map(|t| {
1913            t.as_u64()
1914                .and_then(|n| u32::try_from(n).ok())
1915                .ok_or_else(|| "tokens[] must be u32 values".to_string())
1916        })
1917        .collect::<Result<Vec<_>, _>>()?;
1918    let engine = get_inference_engine(state);
1919    let text = engine
1920        .detokenize(model, &tokens)
1921        .await
1922        .map_err(|e| e.to_string())?;
1923    Ok(serde_json::json!({"text": text}))
1924}
1925
1926fn handle_models_list(state: &ServerState) -> Result<Value, String> {
1927    let engine = get_inference_engine(state);
1928    let models = engine.list_models();
1929    serde_json::to_value(&models).map_err(|e| e.to_string())
1930}
1931
1932fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
1933    let engine = get_inference_engine(state);
1934    let models = engine.list_models_unified();
1935    serde_json::to_value(&models).map_err(|e| e.to_string())
1936}
1937
1938#[derive(Debug, Deserialize)]
1939#[serde(rename_all = "camelCase")]
1940struct ModelSearchParams {
1941    #[serde(default)]
1942    query: Option<String>,
1943    #[serde(default)]
1944    capability: Option<car_inference::ModelCapability>,
1945    #[serde(default)]
1946    provider: Option<String>,
1947    #[serde(default)]
1948    local_only: bool,
1949    #[serde(default)]
1950    available_only: bool,
1951    #[serde(default)]
1952    limit: Option<usize>,
1953}
1954
1955#[derive(Debug, Serialize)]
1956#[serde(rename_all = "camelCase")]
1957struct ModelSearchEntry {
1958    #[serde(flatten)]
1959    info: car_inference::ModelInfo,
1960    family: String,
1961    version: String,
1962    tags: Vec<String>,
1963    pullable: bool,
1964    upgrade: Option<car_inference::ModelUpgrade>,
1965}
1966
1967#[derive(Debug, Serialize)]
1968#[serde(rename_all = "camelCase")]
1969struct ModelSearchResponse {
1970    models: Vec<ModelSearchEntry>,
1971    upgrades: Vec<car_inference::ModelUpgrade>,
1972    total: usize,
1973    available: usize,
1974    local: usize,
1975    remote: usize,
1976}
1977
1978fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1979    let params: ModelSearchParams =
1980        serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
1981            query: None,
1982            capability: None,
1983            provider: None,
1984            local_only: false,
1985            available_only: false,
1986            limit: None,
1987        });
1988    let engine = get_inference_engine(state);
1989    let upgrades = engine.available_model_upgrades();
1990    let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
1991        .iter()
1992        .cloned()
1993        .map(|upgrade| (upgrade.from_id.clone(), upgrade))
1994        .collect();
1995    let query = params
1996        .query
1997        .as_deref()
1998        .map(str::trim)
1999        .filter(|q| !q.is_empty())
2000        .map(|q| q.to_ascii_lowercase());
2001    let provider = params
2002        .provider
2003        .as_deref()
2004        .map(str::trim)
2005        .filter(|p| !p.is_empty())
2006        .map(|p| p.to_ascii_lowercase());
2007
2008    let mut entries: Vec<ModelSearchEntry> = engine
2009        .list_schemas()
2010        .into_iter()
2011        .filter(|schema| {
2012            if let Some(capability) = params.capability {
2013                if !schema.has_capability(capability) {
2014                    return false;
2015                }
2016            }
2017            if let Some(provider) = provider.as_deref() {
2018                if schema.provider.to_ascii_lowercase() != provider {
2019                    return false;
2020                }
2021            }
2022            if params.local_only && !schema.is_local() {
2023                return false;
2024            }
2025            if params.available_only && !schema.available {
2026                return false;
2027            }
2028            if let Some(query) = query.as_deref() {
2029                let capability_text = schema
2030                    .capabilities
2031                    .iter()
2032                    .map(|cap| format!("{cap:?}").to_ascii_lowercase())
2033                    .collect::<Vec<_>>()
2034                    .join(" ");
2035                let haystack = format!(
2036                    "{} {} {} {} {} {}",
2037                    schema.id,
2038                    schema.name,
2039                    schema.provider,
2040                    schema.family,
2041                    schema.tags.join(" "),
2042                    capability_text
2043                )
2044                .to_ascii_lowercase();
2045                if !haystack.contains(query) {
2046                    return false;
2047                }
2048            }
2049            true
2050        })
2051        .map(|schema| {
2052            let pullable = !schema.available
2053                && matches!(
2054                    schema.source,
2055                    car_inference::ModelSource::Local { .. } | car_inference::ModelSource::Mlx { .. }
2056                );
2057            let info = car_inference::ModelInfo::from(&schema);
2058            let upgrade = upgrades_by_from.get(&schema.id).cloned();
2059            ModelSearchEntry {
2060                info,
2061                family: schema.family,
2062                version: schema.version,
2063                tags: schema.tags,
2064                pullable,
2065                upgrade,
2066            }
2067        })
2068        .collect();
2069    entries.sort_by(|a, b| {
2070        b.info
2071            .available
2072            .cmp(&a.info.available)
2073            .then(b.info.is_local.cmp(&a.info.is_local))
2074            .then(a.info.name.cmp(&b.info.name))
2075    });
2076    if let Some(limit) = params.limit {
2077        entries.truncate(limit);
2078    }
2079
2080    let total = entries.len();
2081    let available = entries.iter().filter(|entry| entry.info.available).count();
2082    let local = entries.iter().filter(|entry| entry.info.is_local).count();
2083    let response = ModelSearchResponse {
2084        models: entries,
2085        upgrades,
2086        total,
2087        available,
2088        local,
2089        remote: total.saturating_sub(local),
2090    };
2091    serde_json::to_value(response).map_err(|e| e.to_string())
2092}
2093
2094fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
2095    let engine = get_inference_engine(state);
2096    serde_json::to_value(serde_json::json!({
2097        "upgrades": engine.available_model_upgrades()
2098    }))
2099    .map_err(|e| e.to_string())
2100}
2101
2102async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2103    let name = msg
2104        .params
2105        .get("name")
2106        .or_else(|| msg.params.get("id"))
2107        .or_else(|| msg.params.get("model"))
2108        .and_then(|v| v.as_str())
2109        .ok_or("missing 'name' parameter")?;
2110    let engine = get_inference_engine(state);
2111    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
2112    Ok(serde_json::json!({"path": path.display().to_string()}))
2113}
2114
2115async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2116    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2117        msg.params
2118            .get("events")
2119            .cloned()
2120            .unwrap_or(msg.params.clone()),
2121    )
2122    .map_err(|e| format!("invalid events: {}", e))?;
2123
2124    let inference = get_inference_engine(state).clone();
2125    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
2126
2127    let skills = engine.distill_skills(&events).await;
2128    serde_json::to_value(&skills).map_err(|e| e.to_string())
2129}
2130
2131/// Run memory consolidation against this client's session memgine.
2132/// Returns the JSON `ConsolidationReport`.
2133async fn handle_memory_consolidate(
2134    session: &crate::session::ClientSession,
2135) -> Result<Value, String> {
2136    let mut engine = session.memgine.lock().await;
2137    let report = engine.consolidate().await;
2138    serde_json::to_value(&report).map_err(|e| e.to_string())
2139}
2140
2141/// Repair a degraded skill on this client's session memgine.
2142/// Returns `{ code: "..." }` on success, `null` if the skill
2143/// isn't broken or repair failed.
2144async fn handle_skill_repair(
2145    msg: &JsonRpcMessage,
2146    session: &crate::session::ClientSession,
2147) -> Result<Value, String> {
2148    let name = msg
2149        .params
2150        .get("skill_name")
2151        .and_then(|v| v.as_str())
2152        .ok_or("missing 'skill_name' parameter")?;
2153    let mut engine = session.memgine.lock().await;
2154    let code = engine.repair_skill(name).await;
2155    Ok(match code {
2156        Some(c) => serde_json::json!({ "code": c }),
2157        None => Value::Null,
2158    })
2159}
2160
2161/// Ingest distilled skills into this client's session memgine.
2162/// Returns the number of nodes inserted.
2163async fn handle_skills_ingest_distilled(
2164    msg: &JsonRpcMessage,
2165    session: &crate::session::ClientSession,
2166) -> Result<Value, String> {
2167    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
2168        msg.params
2169            .get("skills")
2170            .cloned()
2171            .unwrap_or(msg.params.clone()),
2172    )
2173    .map_err(|e| format!("invalid skills: {}", e))?;
2174    let mut engine = session.memgine.lock().await;
2175    let nodes = engine.ingest_distilled_skills(&skills);
2176    Ok(serde_json::json!({ "ingested": nodes.len() }))
2177}
2178
2179/// Run skill evolution against this session's memgine for a
2180/// specified domain.  Returns the resulting `DistilledSkill` array.
2181async fn handle_skills_evolve(
2182    msg: &JsonRpcMessage,
2183    session: &crate::session::ClientSession,
2184) -> Result<Value, String> {
2185    let domain = msg
2186        .params
2187        .get("domain")
2188        .and_then(|v| v.as_str())
2189        .ok_or("missing 'domain' parameter")?
2190        .to_string();
2191    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2192        msg.params
2193            .get("events")
2194            .cloned()
2195            .unwrap_or(Value::Array(vec![])),
2196    )
2197    .map_err(|e| format!("invalid events: {}", e))?;
2198    let mut engine = session.memgine.lock().await;
2199    let skills = engine.evolve_skills(&events, &domain).await;
2200    serde_json::to_value(&skills).map_err(|e| e.to_string())
2201}
2202
2203/// List domains whose skills are underperforming on this session.
2204async fn handle_skills_domains_needing_evolution(
2205    msg: &JsonRpcMessage,
2206    session: &crate::session::ClientSession,
2207) -> Result<Value, String> {
2208    let threshold = msg
2209        .params
2210        .get("threshold")
2211        .and_then(|v| v.as_f64())
2212        .unwrap_or(0.6);
2213    let engine = session.memgine.lock().await;
2214    let domains = engine.domains_needing_evolution(threshold);
2215    serde_json::to_value(&domains).map_err(|e| e.to_string())
2216}
2217
2218/// Rerank documents against a query using a cross-encoder model.
2219async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2220    let engine = get_inference_engine(state);
2221    let req: car_inference::RerankRequest =
2222        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2223    let _permit = state.admission.acquire().await;
2224    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
2225    serde_json::to_value(&result).map_err(|e| e.to_string())
2226}
2227
2228/// Transcribe audio at the given path. The path is interpreted on
2229/// the daemon's filesystem, not the FFI caller's — Daemon-mode
2230/// callers must pass a path the daemon can read (typically a
2231/// shared `~/.car/...` location or stdin push via the streaming
2232/// API).
2233async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2234    let engine = get_inference_engine(state);
2235    let req: car_inference::TranscribeRequest =
2236        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2237    let _permit = state.admission.acquire().await;
2238    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
2239    serde_json::to_value(&result).map_err(|e| e.to_string())
2240}
2241
2242/// Synthesize speech to a daemon-side output path. Same caveat
2243/// as transcribe: `output_path` is on the daemon's filesystem.
2244async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2245    let engine = get_inference_engine(state);
2246    let req: car_inference::SynthesizeRequest =
2247        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2248    let _permit = state.admission.acquire().await;
2249    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
2250    serde_json::to_value(&result).map_err(|e| e.to_string())
2251}
2252
2253/// Prepare the speech runtime (downloads / warmup). Returns a
2254/// JSON status string, mirroring the embedded
2255/// `prepare_speech_runtime` shape.
2256async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
2257    let engine = get_inference_engine(state);
2258    let status = engine
2259        .prepare_speech_runtime()
2260        .await
2261        .map_err(|e| e.to_string())?;
2262    serde_json::to_value(&status).map_err(|e| e.to_string())
2263}
2264
2265/// Adaptive route decision for a prompt — returns the routing
2266/// JSON the FFI's `route_model` returns.
2267async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2268    let prompt = msg
2269        .params
2270        .get("prompt")
2271        .and_then(|v| v.as_str())
2272        .ok_or("missing 'prompt' parameter")?;
2273    let engine = get_inference_engine(state);
2274    let decision = engine.route_adaptive(prompt).await;
2275    serde_json::to_value(&decision).map_err(|e| e.to_string())
2276}
2277
2278/// Model performance profiles snapshot.
2279async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
2280    let engine = get_inference_engine(state);
2281    let profiles = engine.export_profiles().await;
2282    serde_json::to_value(&profiles).map_err(|e| e.to_string())
2283}
2284
2285/// Per-session event log size.
2286async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
2287    let n = session.runtime.log.lock().await.len();
2288    Ok(Value::from(n as u64))
2289}
2290
2291async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
2292    let stats = session.runtime.log.lock().await.stats();
2293    serde_json::to_value(stats).map_err(|e| e.to_string())
2294}
2295
2296#[derive(Deserialize)]
2297#[serde(rename_all = "camelCase")]
2298struct EventsTruncateParams {
2299    #[serde(default)]
2300    max_events: Option<usize>,
2301    #[serde(default)]
2302    max_spans: Option<usize>,
2303}
2304
2305async fn handle_events_truncate(
2306    msg: &JsonRpcMessage,
2307    session: &crate::session::ClientSession,
2308) -> Result<Value, String> {
2309    let params: EventsTruncateParams =
2310        serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
2311            max_events: None,
2312            max_spans: None,
2313        });
2314    let mut log = session.runtime.log.lock().await;
2315    let removed_events = params
2316        .max_events
2317        .map(|max| log.truncate_events_keep_last(max))
2318        .unwrap_or(0);
2319    let removed_spans = params
2320        .max_spans
2321        .map(|max| log.truncate_spans_keep_last(max))
2322        .unwrap_or(0);
2323    let stats = log.stats();
2324    Ok(serde_json::json!({
2325        "removedEvents": removed_events,
2326        "removedSpans": removed_spans,
2327        "stats": stats,
2328    }))
2329}
2330
2331async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
2332    let mut log = session.runtime.log.lock().await;
2333    let removed = log.clear();
2334    Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
2335}
2336
2337/// Update the per-session replan config. Wire shape mirrors the
2338/// FFI's positional `set_replan_config` arguments — the engine
2339/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
2340/// reconstruct it from a flat object here.
2341async fn handle_replan_set_config(
2342    msg: &JsonRpcMessage,
2343    session: &crate::session::ClientSession,
2344) -> Result<Value, String> {
2345    let max_replans = msg
2346        .params
2347        .get("max_replans")
2348        .and_then(|v| v.as_u64())
2349        .unwrap_or(0) as u32;
2350    let delay_ms = msg
2351        .params
2352        .get("delay_ms")
2353        .and_then(|v| v.as_u64())
2354        .unwrap_or(0);
2355    let verify_before_execute = msg
2356        .params
2357        .get("verify_before_execute")
2358        .and_then(|v| v.as_bool())
2359        .unwrap_or(true);
2360    let cfg = car_engine::ReplanConfig {
2361        max_replans,
2362        delay_ms,
2363        verify_before_execute,
2364    };
2365    session.runtime.set_replan_config(cfg).await;
2366    Ok(Value::Null)
2367}
2368
2369async fn handle_skills_list(
2370    msg: &JsonRpcMessage,
2371    session: &crate::session::ClientSession,
2372) -> Result<Value, String> {
2373    let domain = msg.params.get("domain").and_then(|v| v.as_str());
2374    let engine = session.memgine.lock().await;
2375    let skills: Vec<serde_json::Value> = engine
2376        .graph
2377        .inner
2378        .node_indices()
2379        .filter_map(|nix| {
2380            let node = engine.graph.inner.node_weight(nix)?;
2381            if node.kind != car_memgine::MemKind::Skill {
2382                return None;
2383            }
2384            let meta = car_memgine::SkillMeta::from_node(node)?;
2385            if let Some(d) = domain {
2386                match &meta.scope {
2387                    car_memgine::SkillScope::Global => {}
2388                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
2389                    _ => return None,
2390                }
2391            }
2392            Some(serde_json::to_value(&meta).unwrap_or_default())
2393        })
2394        .collect();
2395    serde_json::to_value(&skills).map_err(|e| e.to_string())
2396}
2397
2398#[derive(serde::Deserialize)]
2399struct SecretParams {
2400    #[serde(default)]
2401    service: Option<String>,
2402    key: String,
2403    #[serde(default)]
2404    value: Option<String>,
2405}
2406
2407fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
2408    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2409    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
2410    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
2411}
2412
2413fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
2414    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2415    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
2416}
2417
2418fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
2419    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2420    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
2421}
2422
2423fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
2424    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2425    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
2426}
2427
2428#[derive(serde::Deserialize)]
2429struct PermParams {
2430    domain: String,
2431    #[serde(default)]
2432    target_bundle_id: Option<String>,
2433}
2434
2435fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
2436    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2437    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
2438}
2439
2440fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
2441    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2442    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
2443}
2444
2445fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
2446    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2447    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
2448}
2449
2450fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
2451    #[derive(serde::Deserialize)]
2452    struct P {
2453        start: String,
2454        end: String,
2455        #[serde(default)]
2456        calendar_ids: Vec<String>,
2457    }
2458    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2459    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
2460        .map_err(|e| format!("parse start: {}", e))?
2461        .with_timezone(&chrono::Utc);
2462    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
2463        .map_err(|e| format!("parse end: {}", e))?
2464        .with_timezone(&chrono::Utc);
2465    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
2466}
2467
2468fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
2469    #[derive(serde::Deserialize)]
2470    struct P {
2471        query: String,
2472        #[serde(default = "default_limit")]
2473        limit: usize,
2474        #[serde(default)]
2475        container_ids: Vec<String>,
2476    }
2477    fn default_limit() -> usize {
2478        50
2479    }
2480    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2481    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
2482}
2483
2484fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
2485    #[derive(serde::Deserialize, Default)]
2486    struct P {
2487        #[serde(default)]
2488        account_ids: Vec<String>,
2489    }
2490    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
2491    car_ffi_common::integrations::mail_inbox(&p.account_ids)
2492}
2493
2494fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
2495    let raw = req.params.to_string();
2496    car_ffi_common::integrations::mail_send(&raw)
2497}
2498
2499fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
2500    #[derive(serde::Deserialize)]
2501    struct P {
2502        #[serde(default = "default_limit")]
2503        limit: usize,
2504    }
2505    fn default_limit() -> usize {
2506        50
2507    }
2508    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
2509    car_ffi_common::integrations::messages_chats(p.limit)
2510}
2511
2512fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
2513    let raw = req.params.to_string();
2514    car_ffi_common::integrations::messages_send(&raw)
2515}
2516
2517fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
2518    #[derive(serde::Deserialize)]
2519    struct P {
2520        query: String,
2521        #[serde(default = "default_limit")]
2522        limit: usize,
2523    }
2524    fn default_limit() -> usize {
2525        50
2526    }
2527    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2528    car_ffi_common::integrations::notes_find(&p.query, p.limit)
2529}
2530
2531fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
2532    #[derive(serde::Deserialize)]
2533    struct P {
2534        #[serde(default = "default_limit")]
2535        limit: usize,
2536    }
2537    fn default_limit() -> usize {
2538        50
2539    }
2540    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
2541    car_ffi_common::integrations::reminders_items(p.limit)
2542}
2543
2544fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
2545    #[derive(serde::Deserialize)]
2546    struct P {
2547        #[serde(default = "default_limit")]
2548        limit: usize,
2549    }
2550    fn default_limit() -> usize {
2551        100
2552    }
2553    let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
2554    car_ffi_common::integrations::bookmarks_list(p.limit)
2555}
2556
2557fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
2558    #[derive(serde::Deserialize)]
2559    struct P {
2560        start: String,
2561        end: String,
2562    }
2563    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2564    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
2565        .map_err(|e| format!("parse start: {}", e))?
2566        .with_timezone(&chrono::Utc);
2567    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
2568        .map_err(|e| format!("parse end: {}", e))?
2569        .with_timezone(&chrono::Utc);
2570    car_ffi_common::health::sleep_windows(s, e)
2571}
2572
2573fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
2574    #[derive(serde::Deserialize)]
2575    struct P {
2576        start: String,
2577        end: String,
2578    }
2579    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2580    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
2581        .map_err(|e| format!("parse start: {}", e))?
2582        .with_timezone(&chrono::Utc);
2583    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
2584        .map_err(|e| format!("parse end: {}", e))?
2585        .with_timezone(&chrono::Utc);
2586    car_ffi_common::health::workouts(s, e)
2587}
2588
2589fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
2590    #[derive(serde::Deserialize)]
2591    struct P {
2592        start: String,
2593        end: String,
2594    }
2595    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2596    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
2597        .map_err(|e| format!("parse start: {}", e))?;
2598    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
2599        .map_err(|e| format!("parse end: {}", e))?;
2600    car_ffi_common::health::activity(s, e)
2601}
2602
2603async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
2604    let closed = session.browser.close().await?;
2605    Ok(serde_json::json!({"closed": closed}))
2606}
2607
2608async fn handle_browser_run(
2609    req: &JsonRpcMessage,
2610    session: &crate::session::ClientSession,
2611) -> Result<Value, String> {
2612    #[derive(serde::Deserialize)]
2613    struct BrowserRunParams {
2614        /// Inline JSON string (CLI-compatible), OR the structured object.
2615        script: Value,
2616        #[serde(default)]
2617        width: Option<u32>,
2618        #[serde(default)]
2619        height: Option<u32>,
2620        /// When true, launches a visible Chromium window for interactive
2621        /// flows (first-time auth, 2FA, supervised runs). Only honored on
2622        /// the call that first launches the browser session — subsequent
2623        /// calls reuse the existing browser regardless.
2624        #[serde(default)]
2625        headed: Option<bool>,
2626        /// Extra Chromium command-line flags appended verbatim at
2627        /// launch (#112). Honoured only on the launch call.
2628        #[serde(default)]
2629        extra_args: Option<Vec<String>>,
2630    }
2631    let params: BrowserRunParams =
2632        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2633
2634    // Accept either a JSON string OR a structured object under `script`.
2635    let script_json = match params.script {
2636        Value::String(s) => s,
2637        other => other.to_string(),
2638    };
2639
2640    let browser_session = session
2641        .browser
2642        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
2643            width: params.width.unwrap_or(1280),
2644            height: params.height.unwrap_or(720),
2645            headless: !params.headed.unwrap_or(false),
2646            extra_args: params.extra_args.unwrap_or_default(),
2647        })
2648        .await?;
2649
2650    let trace_json = browser_session.run(&script_json).await?;
2651    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
2652}
2653
2654// ---------------------------------------------------------------------------
2655// Voice streaming JSON-RPC methods
2656//
2657// Events are pushed back to the originating client as JSON-RPC notifications:
2658//   { "jsonrpc": "2.0", "method": "voice.event",
2659//     "params": { "session_id": "...", "event": {...} } }
2660//
2661// The session registry is process-wide (ServerState.voice_sessions); per-call
2662// WsVoiceEventSink instances bind each session to its originating WS so a
2663// client only ever sees events for sessions it started.
2664// ---------------------------------------------------------------------------
2665
2666#[derive(Deserialize)]
2667struct VoiceStartParams {
2668    session_id: String,
2669    audio_source: Value,
2670    #[serde(default)]
2671    options: Option<Value>,
2672}
2673
2674async fn handle_voice_transcribe_stream_start(
2675    req: &JsonRpcMessage,
2676    state: &Arc<ServerState>,
2677    session: &Arc<crate::session::ClientSession>,
2678) -> Result<Value, String> {
2679    let params: VoiceStartParams =
2680        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2681    let audio_source_json =
2682        serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
2683    let options_json = params
2684        .options
2685        .as_ref()
2686        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
2687        .transpose()?;
2688    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2689        channel: session.channel.clone(),
2690    });
2691    let json = car_ffi_common::voice::transcribe_stream_start(
2692        &params.session_id,
2693        &audio_source_json,
2694        options_json.as_deref(),
2695        state.voice_sessions.clone(),
2696        sink,
2697    )
2698    .await?;
2699    serde_json::from_str(&json).map_err(|e| e.to_string())
2700}
2701
2702#[derive(Deserialize)]
2703struct VoiceStopParams {
2704    session_id: String,
2705}
2706
2707async fn handle_voice_transcribe_stream_stop(
2708    req: &JsonRpcMessage,
2709    state: &Arc<ServerState>,
2710) -> Result<Value, String> {
2711    let params: VoiceStopParams =
2712        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2713    let json = car_ffi_common::voice::transcribe_stream_stop(
2714        &params.session_id,
2715        state.voice_sessions.clone(),
2716    )
2717    .await?;
2718    serde_json::from_str(&json).map_err(|e| e.to_string())
2719}
2720
2721#[derive(Deserialize)]
2722struct VoicePushParams {
2723    session_id: String,
2724    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
2725    /// audio frames have to be encoded; clients in WS-binary contexts that
2726    /// want to skip the round trip can call the FFI directly.
2727    pcm_b64: String,
2728}
2729
2730async fn handle_voice_transcribe_stream_push(
2731    req: &JsonRpcMessage,
2732    state: &Arc<ServerState>,
2733) -> Result<Value, String> {
2734    use base64::Engine;
2735    let params: VoicePushParams =
2736        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2737    let pcm = base64::engine::general_purpose::STANDARD
2738        .decode(&params.pcm_b64)
2739        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
2740    let json = car_ffi_common::voice::transcribe_stream_push(
2741        &params.session_id,
2742        &pcm,
2743        state.voice_sessions.clone(),
2744    )
2745    .await?;
2746    serde_json::from_str(&json).map_err(|e| e.to_string())
2747}
2748
2749fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
2750    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
2751    serde_json::from_str(&json).unwrap_or(Value::Null)
2752}
2753
2754async fn handle_voice_dispatch_turn(
2755    req: &JsonRpcMessage,
2756    state: &Arc<ServerState>,
2757    session: &Arc<crate::session::ClientSession>,
2758) -> Result<Value, String> {
2759    let req_value = req.params.clone();
2760    let request: car_ffi_common::voice_turn::DispatchVoiceTurnRequest =
2761        serde_json::from_value(req_value).map_err(|e| e.to_string())?;
2762    let engine = get_inference_engine(state).clone();
2763    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2764        channel: session.channel.clone(),
2765    });
2766    let resp = car_ffi_common::voice_turn::dispatch(engine, request, sink).await?;
2767    serde_json::to_value(resp).map_err(|e| e.to_string())
2768}
2769
2770async fn handle_voice_cancel_turn() -> Result<Value, String> {
2771    car_ffi_common::voice_turn::cancel().await;
2772    Ok(serde_json::json!({"cancelled": true}))
2773}
2774
2775async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
2776    let engine = get_inference_engine(state).clone();
2777    car_ffi_common::voice_turn::prewarm(engine).await;
2778    Ok(serde_json::json!({"prewarmed": true}))
2779}
2780
2781// ---------------------------------------------------------------------------
2782// Inference runner over WebSocket — closes Parslee-ai/car-releases#24
2783//
2784// Bidirectional protocol shape:
2785//   1. Client → server: `inference.register_runner` (no params). The
2786//      session that calls this becomes the host for delegated models.
2787//   2. Server → client: `inference.runner.invoke` notification with
2788//      {call_id, request} when CAR needs to dispatch a delegated turn.
2789//   3. Client → server: `inference.runner.event` with {call_id, event}
2790//      for each chunk; `inference.runner.complete` with {call_id, result}
2791//      on success; `inference.runner.fail` with {call_id, error} on
2792//      failure.
2793//
2794// The server-side data is process-wide because only one inference
2795// runner can be registered at a time (matches the FFI bindings'
2796// constraint). The per-call mailboxes live in dedicated DashMaps.
2797// ---------------------------------------------------------------------------
2798
2799fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
2800    static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
2801        std::sync::OnceLock::new();
2802    SLOT.get_or_init(|| std::sync::RwLock::new(None))
2803}
2804
2805fn ws_runner_calls(
2806) -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
2807    static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
2808        std::sync::OnceLock::new();
2809    MAP.get_or_init(dashmap::DashMap::new)
2810}
2811
2812fn ws_runner_completions() -> &'static dashmap::DashMap<
2813    String,
2814    tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
2815> {
2816    static MAP: std::sync::OnceLock<
2817        dashmap::DashMap<
2818            String,
2819            tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
2820        >,
2821    > = std::sync::OnceLock::new();
2822    MAP.get_or_init(dashmap::DashMap::new)
2823}
2824
2825struct WsInferenceRunner;
2826
2827#[async_trait::async_trait]
2828impl car_inference::InferenceRunner for WsInferenceRunner {
2829    async fn run(
2830        &self,
2831        request: car_inference::tasks::generate::GenerateRequest,
2832        emitter: car_inference::EventEmitter,
2833    ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
2834        let channel = ws_runner_session()
2835            .read()
2836            .map_err(|e| {
2837                car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
2838            })?
2839            .clone()
2840            .ok_or_else(|| {
2841                car_inference::RunnerError::Declined(
2842                    "no WebSocket inference runner registered — call inference.register_runner first"
2843                        .into(),
2844                )
2845            })?;
2846
2847        let call_id = uuid::Uuid::new_v4().to_string();
2848        let request_json = serde_json::to_value(&request)
2849            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
2850        let (tx, rx) = tokio::sync::oneshot::channel();
2851        ws_runner_calls().insert(call_id.clone(), emitter);
2852        ws_runner_completions().insert(call_id.clone(), tx);
2853
2854        // Fire the invoke notification.
2855        use futures::SinkExt;
2856        let notification = serde_json::json!({
2857            "jsonrpc": "2.0",
2858            "method": "inference.runner.invoke",
2859            "params": {
2860                "call_id": call_id,
2861                "request": request_json,
2862            },
2863        });
2864        let text = serde_json::to_string(&notification)
2865            .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
2866        let _ = channel
2867            .write
2868            .lock()
2869            .await
2870            .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
2871            .await;
2872
2873        let result = rx.await.map_err(|_| {
2874            car_inference::RunnerError::Failed("runner completion channel dropped".into())
2875        })?;
2876        ws_runner_calls().remove(&call_id);
2877        result.map_err(car_inference::RunnerError::Failed)
2878    }
2879}
2880
2881async fn handle_inference_register_runner(
2882    session: &Arc<crate::session::ClientSession>,
2883) -> Result<Value, String> {
2884    let mut guard = ws_runner_session()
2885        .write()
2886        .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
2887    *guard = Some(session.channel.clone());
2888    drop(guard);
2889    car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
2890    Ok(serde_json::json!({"registered": true}))
2891}
2892
2893#[derive(serde::Deserialize)]
2894struct InferenceRunnerEventParams {
2895    call_id: String,
2896    event: Value,
2897}
2898
2899async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
2900    let params: InferenceRunnerEventParams =
2901        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2902    let stream_event = match parse_runner_event_value(&params.event) {
2903        Some(e) => e,
2904        None => return Err("unrecognised runner event shape".into()),
2905    };
2906    if let Some(entry) = ws_runner_calls().get(&params.call_id) {
2907        let emitter = entry.value().clone();
2908        tokio::spawn(async move { emitter.emit(stream_event).await });
2909    }
2910    Ok(serde_json::json!({"emitted": true}))
2911}
2912
2913#[derive(serde::Deserialize)]
2914struct InferenceRunnerCompleteParams {
2915    call_id: String,
2916    result: Value,
2917}
2918
2919async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
2920    let params: InferenceRunnerCompleteParams =
2921        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2922    let result: std::result::Result<car_inference::RunnerResult, String> =
2923        serde_json::from_value(params.result)
2924            .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
2925    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
2926        let _ = tx.send(result);
2927    }
2928    Ok(serde_json::json!({"completed": true}))
2929}
2930
2931#[derive(serde::Deserialize)]
2932struct InferenceRunnerFailParams {
2933    call_id: String,
2934    error: String,
2935}
2936
2937async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
2938    let params: InferenceRunnerFailParams =
2939        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2940    if let Some((_, tx)) = ws_runner_completions().remove(&params.call_id) {
2941        let _ = tx.send(Err(params.error));
2942    }
2943    Ok(serde_json::json!({"failed": true}))
2944}
2945
2946fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
2947    let ty = v.get("type").and_then(|t| t.as_str())?;
2948    match ty {
2949        "text" => Some(car_inference::StreamEvent::TextDelta(
2950            v.get("data")?.as_str()?.to_string(),
2951        )),
2952        "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
2953            name: v.get("name")?.as_str()?.to_string(),
2954            index: v.get("index")?.as_u64()? as usize,
2955            id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
2956        }),
2957        "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
2958            index: v.get("index")?.as_u64()? as usize,
2959            arguments_delta: v.get("data")?.as_str()?.to_string(),
2960        }),
2961        "usage" => Some(car_inference::StreamEvent::Usage {
2962            input_tokens: v.get("input_tokens")?.as_u64()?,
2963            output_tokens: v.get("output_tokens")?.as_u64()?,
2964        }),
2965        "done" => Some(car_inference::StreamEvent::Done {
2966            text: v.get("text")?.as_str()?.to_string(),
2967            tool_calls: v
2968                .get("tool_calls")
2969                .and_then(|tc| serde_json::from_value(tc.clone()).ok())
2970                .unwrap_or_default(),
2971        }),
2972        _ => None,
2973    }
2974}
2975
2976#[derive(Deserialize)]
2977struct EnrollSpeakerParams {
2978    label: String,
2979    audio: Value,
2980}
2981
2982async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
2983    let params: EnrollSpeakerParams =
2984        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2985    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
2986    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
2987    serde_json::from_str(&json).map_err(|e| e.to_string())
2988}
2989
2990#[derive(Deserialize)]
2991struct RemoveEnrollmentParams {
2992    label: String,
2993}
2994
2995fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
2996    let params: RemoveEnrollmentParams =
2997        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2998    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
2999    serde_json::from_str(&json).map_err(|e| e.to_string())
3000}
3001
3002#[derive(Deserialize)]
3003struct WorkflowRunParams {
3004    workflow: Value,
3005}
3006
3007async fn handle_workflow_run(
3008    req: &JsonRpcMessage,
3009    session: &Arc<crate::session::ClientSession>,
3010) -> Result<Value, String> {
3011    let params: WorkflowRunParams =
3012        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3013    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
3014    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3015        channel: session.channel.clone(),
3016        host: session.host.clone(),
3017        client_id: session.client_id.clone(),
3018    });
3019    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
3020    serde_json::from_str(&json).map_err(|e| e.to_string())
3021}
3022
3023#[derive(Deserialize)]
3024struct WorkflowVerifyParams {
3025    workflow: Value,
3026}
3027
3028fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
3029    let params: WorkflowVerifyParams =
3030        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3031    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
3032    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
3033    serde_json::from_str(&json).map_err(|e| e.to_string())
3034}
3035
3036// ---------------------------------------------------------------------------
3037// Meeting JSON-RPC methods
3038// ---------------------------------------------------------------------------
3039
3040async fn handle_meeting_start(
3041    req: &JsonRpcMessage,
3042    state: &Arc<ServerState>,
3043    session: &Arc<crate::session::ClientSession>,
3044) -> Result<Value, String> {
3045    // We need the meeting id BEFORE handing the upstream sink to
3046    // start_meeting so the WsMemgineIngestSink stamps transcripts with
3047    // the correct `meeting/<id>/<source>` speaker. Parse the request
3048    // here, mint an id if none was provided, and pass the same id
3049    // through to start_meeting via the request JSON.
3050    let mut req_value = req.params.clone();
3051    let meeting_id = req_value
3052        .get("id")
3053        .and_then(|v| v.as_str())
3054        .map(str::to_string)
3055        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
3056    if let Some(map) = req_value.as_object_mut() {
3057        map.insert("id".into(), Value::String(meeting_id.clone()));
3058    }
3059    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
3060
3061    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
3062        Arc::new(crate::session::WsVoiceEventSink {
3063            channel: session.channel.clone(),
3064        });
3065
3066    // Wrap the WS upstream with a memgine-ingest fanout that uses the
3067    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
3068    // the FFI-common `start_meeting` memgine arg to avoid the
3069    // sync-mutex contract there — ingest happens here instead.
3070    let upstream: Arc<dyn car_voice::VoiceEventSink> =
3071        Arc::new(crate::session::WsMemgineIngestSink {
3072            meeting_id,
3073            engine: session.memgine.clone(),
3074            upstream: ws_upstream,
3075        });
3076
3077    let cwd = std::env::current_dir().ok();
3078    let json = car_ffi_common::meeting::start_meeting(
3079        &request_json,
3080        state.meetings.clone(),
3081        state.voice_sessions.clone(),
3082        upstream,
3083        None,
3084        cwd,
3085    )
3086    .await?;
3087    serde_json::from_str(&json).map_err(|e| e.to_string())
3088}
3089
3090#[derive(Deserialize)]
3091struct MeetingStopParams {
3092    meeting_id: String,
3093    #[serde(default = "default_summarize")]
3094    summarize: bool,
3095}
3096
3097fn default_summarize() -> bool {
3098    true
3099}
3100
3101async fn handle_meeting_stop(
3102    req: &JsonRpcMessage,
3103    state: &Arc<ServerState>,
3104    _session: &Arc<crate::session::ClientSession>,
3105) -> Result<Value, String> {
3106    let params: MeetingStopParams =
3107        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3108    let inference = if params.summarize {
3109        Some(state.inference.get().cloned()).flatten()
3110    } else {
3111        None
3112    };
3113    let json = car_ffi_common::meeting::stop_meeting(
3114        &params.meeting_id,
3115        params.summarize,
3116        state.meetings.clone(),
3117        state.voice_sessions.clone(),
3118        inference,
3119    )
3120    .await?;
3121    serde_json::from_str(&json).map_err(|e| e.to_string())
3122}
3123
3124#[derive(Deserialize, Default)]
3125struct MeetingListParams {
3126    #[serde(default)]
3127    root: Option<std::path::PathBuf>,
3128}
3129
3130fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
3131    let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3132    let cwd = std::env::current_dir().ok();
3133    let json = car_ffi_common::meeting::list_meetings(params.root, cwd)?;
3134    serde_json::from_str(&json).map_err(|e| e.to_string())
3135}
3136
3137#[derive(Deserialize)]
3138struct MeetingGetParams {
3139    meeting_id: String,
3140    #[serde(default)]
3141    root: Option<std::path::PathBuf>,
3142}
3143
3144fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
3145    let params: MeetingGetParams =
3146        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3147    let cwd = std::env::current_dir().ok();
3148    let json = car_ffi_common::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
3149    serde_json::from_str(&json).map_err(|e| e.to_string())
3150}
3151
3152// ---------------------------------------------------------------------------
3153// Agent registry — file-based cross-process discovery (#111)
3154// ---------------------------------------------------------------------------
3155
3156#[derive(Deserialize, Default)]
3157struct RegistryRegisterParams {
3158    /// Caller serializes their AgentEntry as a JSON value; we
3159    /// re-serialize it so the ffi-common helper can validate the
3160    /// shape with the same parser used by the bindings.
3161    entry: Value,
3162    #[serde(default)]
3163    registry_path: Option<std::path::PathBuf>,
3164}
3165
3166fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
3167    let params: RegistryRegisterParams =
3168        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3169    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
3170    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
3171    Ok(Value::Null)
3172}
3173
3174#[derive(Deserialize, Default)]
3175struct RegistryNameParams {
3176    name: String,
3177    #[serde(default)]
3178    registry_path: Option<std::path::PathBuf>,
3179}
3180
3181fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
3182    let params: RegistryNameParams =
3183        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3184    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
3185    serde_json::from_str(&json).map_err(|e| e.to_string())
3186}
3187
3188fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
3189    let params: RegistryNameParams =
3190        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3191    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
3192    Ok(Value::Null)
3193}
3194
3195#[derive(Deserialize, Default)]
3196struct RegistryListParams {
3197    #[serde(default)]
3198    registry_path: Option<std::path::PathBuf>,
3199}
3200
3201fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
3202    let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3203    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
3204    serde_json::from_str(&json).map_err(|e| e.to_string())
3205}
3206
3207#[derive(Deserialize, Default)]
3208struct RegistryReapParams {
3209    /// Heartbeats older than this many seconds are reaped. Default
3210    /// 60 — two missed 20s heartbeats trigger removal.
3211    #[serde(default = "default_reap_age")]
3212    max_age_secs: u64,
3213    #[serde(default)]
3214    registry_path: Option<std::path::PathBuf>,
3215}
3216
3217fn default_reap_age() -> u64 {
3218    60
3219}
3220
3221fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
3222    let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3223    let json =
3224        car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
3225    serde_json::from_str(&json).map_err(|e| e.to_string())
3226}
3227
3228// ---------------------------------------------------------------------------
3229// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
3230// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
3231// a2a_server_status — closes the binding gap noted in #126).
3232// ---------------------------------------------------------------------------
3233
3234async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
3235    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3236    let json = car_ffi_common::a2a::start_a2a(&params_json).await?;
3237    serde_json::from_str(&json).map_err(|e| e.to_string())
3238}
3239
3240fn handle_a2a_stop() -> Result<Value, String> {
3241    let json = car_ffi_common::a2a::stop_a2a()?;
3242    serde_json::from_str(&json).map_err(|e| e.to_string())
3243}
3244
3245fn handle_a2a_status() -> Result<Value, String> {
3246    let json = car_ffi_common::a2a::a2a_status()?;
3247    serde_json::from_str(&json).map_err(|e| e.to_string())
3248}
3249
3250#[derive(Deserialize)]
3251#[serde(rename_all = "camelCase")]
3252struct A2aSendParams {
3253    endpoint: String,
3254    message: car_a2a::Message,
3255    #[serde(default)]
3256    blocking: bool,
3257    #[serde(default = "default_true")]
3258    ingest_a2ui: bool,
3259    #[serde(default)]
3260    route_auth: Option<A2aRouteAuth>,
3261    #[serde(default)]
3262    allow_untrusted_endpoint: bool,
3263}
3264
3265fn default_true() -> bool {
3266    true
3267}
3268
3269async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
3270    let params: A2aSendParams =
3271        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3272    let endpoint = trusted_route_endpoint(
3273        Some(params.endpoint.clone()),
3274        params.allow_untrusted_endpoint,
3275    )
3276    .ok_or_else(|| {
3277        "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
3278    })?;
3279    let client = match params.route_auth.clone() {
3280        Some(auth) => {
3281            car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
3282        }
3283        None => car_a2a::A2aClient::new(endpoint.clone()),
3284    };
3285    let result = client
3286        .send_message(params.message, params.blocking)
3287        .await
3288        .map_err(|e| e.to_string())?;
3289    let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3290    let mut applied = Vec::new();
3291    if params.ingest_a2ui {
3292        state
3293            .a2ui
3294            .validate_payload(&result_value)
3295            .map_err(|e| e.to_string())?;
3296        let routed_endpoint = Some(endpoint.clone());
3297        for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
3298            let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
3299                if owner.endpoint.is_none() {
3300                    owner.with_endpoint(routed_endpoint.clone())
3301                } else {
3302                    owner
3303                }
3304            });
3305            applied.push(
3306                apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
3307            );
3308        }
3309    }
3310    Ok(serde_json::json!({
3311        "result": result,
3312        "a2ui": {
3313            "applied": applied,
3314        }
3315    }))
3316}
3317
3318// ---------------------------------------------------------------------------
3319// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
3320// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
3321// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
3322// vision_ocr.
3323// ---------------------------------------------------------------------------
3324
3325async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
3326    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3327    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
3328    serde_json::from_str(&json).map_err(|e| e.to_string())
3329}
3330
3331async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
3332    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3333    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
3334    serde_json::from_str(&json).map_err(|e| e.to_string())
3335}
3336
3337async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
3338    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3339    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
3340    serde_json::from_str(&json).map_err(|e| e.to_string())
3341}
3342
3343async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
3344    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3345    let json = car_ffi_common::vision::ocr(&args_json).await?;
3346    serde_json::from_str(&json).map_err(|e| e.to_string())
3347}