Skip to main content

car_server_core/
handler.rs

1//! WebSocket connection handler — bidirectional JSON-RPC.
2//!
3//! Tool callback flow:
4//! 1. Client submits proposal via proposal.submit
5//! 2. Runtime encounters a ToolCall action
6//! 3. WsToolExecutor sends tools.execute request to client via shared write half
7//! 4. WsToolExecutor awaits response on a oneshot channel
8//! 5. Client executes tool locally, sends JSON-RPC response back
9//! 6. Handler receives the response, resolves the oneshot
10//! 7. Runtime continues execution with the tool result
11
12use crate::session::{ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30    #[serde(default)]
31    pub jsonrpc: String,
32    #[serde(default)]
33    pub method: Option<String>,
34    #[serde(default)]
35    pub params: Value,
36    #[serde(default)]
37    pub id: Value,
38    // Response fields
39    #[serde(default)]
40    pub result: Option<Value>,
41    #[serde(default)]
42    pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47    pub jsonrpc: &'static str,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub result: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<JsonRpcError>,
52    pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57    pub code: i32,
58    pub message: String,
59}
60
61impl JsonRpcResponse {
62    pub fn success(id: Value, result: Value) -> Self {
63        Self {
64            jsonrpc: "2.0",
65            result: Some(result),
66            error: None,
67            id,
68        }
69    }
70    pub fn error(id: Value, code: i32, message: &str) -> Self {
71        Self {
72            jsonrpc: "2.0",
73            result: None,
74            error: Some(JsonRpcError {
75                code,
76                message: message.to_string(),
77            }),
78            id,
79        }
80    }
81}
82
83/// Convenience wrapper for the standalone `car-server` binary: accepts
84/// the WebSocket handshake on a raw [`TcpStream`] then delegates to
85/// [`run_dispatch`]. Embedders that already have a handshake-completed
86/// `WebSocketStream` (e.g. the future tokhn-daemon) skip this and
87/// call `run_dispatch` directly with the split halves.
88#[instrument(
89    name = "ws.connection",
90    skip_all,
91    fields(peer = %peer),
92)]
93pub async fn handle_connection(
94    stream: TcpStream,
95    peer: SocketAddr,
96    state: Arc<ServerState>,
97) -> Result<(), Box<dyn std::error::Error>> {
98    let ws_stream = accept_async(stream).await?;
99    let (write, read) = ws_stream.split();
100    run_dispatch(read, write, peer, state).await
101}
102
103/// Transport-neutral entry point: drives the JSON-RPC dispatch loop
104/// against an already-handshake-completed split WebSocket. Both the
105/// standalone car-server binary and embedders (e.g. tokhn-daemon at
106/// U7) call this same function.
107///
108/// Concrete typing reflects today's pipeline (`tokio-tungstenite`
109/// over [`TcpStream`]). U7 is responsible for any further generalization
110/// — for example, by opening an axum WebSocket via tungstenite-native
111/// or by introducing a sink/stream wrapper that adapts axum's WS to
112/// these concrete types.
113#[instrument(
114    name = "ws.dispatch",
115    skip_all,
116    fields(client_id = tracing::field::Empty, peer = %peer),
117)]
118pub async fn run_dispatch(
119    mut read: futures::stream::SplitStream<
120        tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
121    >,
122    write: futures::stream::SplitSink<
123        tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
124        Message,
125    >,
126    peer: SocketAddr,
127    state: Arc<ServerState>,
128) -> Result<(), Box<dyn std::error::Error>> {
129    let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
130    tracing::Span::current().record("client_id", &client_id.as_str());
131
132    info!("New connection from {}", peer);
133
134    let channel = Arc::new(WsChannel {
135        write: Mutex::new(write),
136        pending: Mutex::new(HashMap::new()),
137        next_id: AtomicU64::new(1),
138    });
139
140    let session = state.create_session(&client_id, channel.clone()).await;
141
142    while let Some(msg) = read.next().await {
143        let msg = msg?;
144        if msg.is_text() {
145            let text = msg.to_text()?;
146            let parsed: JsonRpcMessage = match serde_json::from_str(text) {
147                Ok(m) => m,
148                Err(e) => {
149                    send_response(
150                        &session.channel,
151                        JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
152                    )
153                    .await?;
154                    continue;
155                }
156            };
157
158            // Is this a response to a pending tool callback?
159            if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
160                if let Some(id_str) = parsed.id.as_str() {
161                    let mut pending = session.channel.pending.lock().await;
162                    if let Some(tx) = pending.remove(id_str) {
163                        let tool_resp = if let Some(result) = parsed.result {
164                            ToolExecuteResponse {
165                                action_id: id_str.to_string(),
166                                output: Some(result),
167                                error: None,
168                            }
169                        } else {
170                            let err_msg = parsed
171                                .error
172                                .as_ref()
173                                .and_then(|e| e.get("message"))
174                                .and_then(|m| m.as_str())
175                                .unwrap_or("unknown error")
176                                .to_string();
177                            ToolExecuteResponse {
178                                action_id: id_str.to_string(),
179                                output: None,
180                                error: Some(err_msg),
181                            }
182                        };
183                        let _ = tx.send(tool_resp);
184                        continue;
185                    }
186                }
187            }
188
189            // Otherwise it's a client request
190            if let Some(method) = &parsed.method {
191                info!(method = %method, "dispatching JSON-RPC method");
192                let result = match method.as_str() {
193                    "session.init" => handle_session_init(&parsed, &session).await,
194                    "host.subscribe" => handle_host_subscribe(&session).await,
195                    "host.agents" => handle_host_agents(&session).await,
196                    "host.events" => handle_host_events(&parsed, &session).await,
197                    "host.approvals" => handle_host_approvals(&session).await,
198                    "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
199                    "host.unregister_agent" => {
200                        handle_host_unregister_agent(&parsed, &session).await
201                    }
202                    "host.set_status" => handle_host_set_status(&parsed, &session).await,
203                    "host.notify" => handle_host_notify(&parsed, &session).await,
204                    "host.request_approval" => {
205                        handle_host_request_approval(&parsed, &session).await
206                    }
207                    "host.resolve_approval" => {
208                        handle_host_resolve_approval(&parsed, &session).await
209                    }
210                    "tools.register" => handle_tools_register(&parsed, &session).await,
211                    "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
212                    "verify" => handle_verify(&parsed, &session).await,
213                    "state.get" => handle_state_get(&parsed, &session).await,
214                    "state.set" => handle_state_set(&parsed, &session).await,
215                    "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
216                    "memory.query" => handle_memory_query(&parsed, &session).await,
217                    "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
218                    "memory.consolidate" => handle_memory_consolidate(&session).await,
219                    "memory.fact_count" => handle_memory_fact_count(&session).await,
220                    "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
221                    "skill.find" => handle_skill_find(&parsed, &session).await,
222                    "skill.report" => handle_skill_report(&parsed, &session).await,
223                    "skill.repair" => handle_skill_repair(&parsed, &session).await,
224                    "skills.ingest_distilled" => {
225                        handle_skills_ingest_distilled(&parsed, &session).await
226                    }
227                    "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
228                    "skills.domains_needing_evolution" => {
229                        handle_skills_domains_needing_evolution(&parsed, &session).await
230                    }
231                    "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
232                    "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
233                    "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
234                    "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
235                    "multi.vote" => handle_multi_vote(&parsed, &session).await,
236                    "scheduler.create" => handle_scheduler_create(&parsed),
237                    "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
238                    "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
239                    "infer" => handle_infer(&parsed, &state, &session).await,
240                    "embed" => handle_embed(&parsed, &state).await,
241                    "classify" => handle_classify(&parsed, &state).await,
242                    "tokenize" => handle_tokenize(&parsed, &state).await,
243                    "detokenize" => handle_detokenize(&parsed, &state).await,
244                    "rerank" => handle_rerank(&parsed, &state).await,
245                    "transcribe" => handle_transcribe(&parsed, &state).await,
246                    "synthesize" => handle_synthesize(&parsed, &state).await,
247                    "speech.prepare" => handle_speech_prepare(&state).await,
248                    "models.route" => handle_models_route(&parsed, &state).await,
249                    "models.stats" => handle_models_stats(&state).await,
250                    "events.count" => handle_events_count(&session).await,
251                    "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
252                    "models.list" => handle_models_list(&state),
253                    "models.list_unified" => handle_models_list_unified(&state),
254                    "models.pull" => handle_models_pull(&parsed, &state).await,
255                    "skills.distill" => handle_skills_distill(&parsed, &state).await,
256                    "skills.list" => handle_skills_list(&parsed, &session).await,
257                    "browser.run" => handle_browser_run(&parsed, &session).await,
258                    "browser.close" => handle_browser_close(&session).await,
259                    "secret.put" => handle_secret_put(&parsed),
260                    "secret.get" => handle_secret_get(&parsed),
261                    "secret.delete" => handle_secret_delete(&parsed),
262                    "secret.status" => handle_secret_status(&parsed),
263                    "secret.available" => Ok(car_ffi_common::secrets::is_available()),
264                    "permissions.status" => handle_perm_status(&parsed),
265                    "permissions.request" => handle_perm_request(&parsed),
266                    "permissions.explain" => handle_perm_explain(&parsed),
267                    "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
268                    "accounts.list" => car_ffi_common::accounts::list(),
269                    "accounts.open" => {
270                        #[derive(serde::Deserialize, Default)]
271                        struct OpenParams {
272                            #[serde(default)]
273                            account_id: Option<String>,
274                        }
275                        let p: OpenParams =
276                            serde_json::from_value(parsed.params.clone()).unwrap_or_default();
277                        car_ffi_common::accounts::open_settings(p.account_id.as_deref())
278                    }
279                    "calendar.list" => car_ffi_common::integrations::calendar_list(),
280                    "calendar.events" => handle_calendar_events(&parsed),
281                    "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
282                    "contacts.find" => handle_contacts_find(&parsed),
283                    "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
284                    "mail.inbox" => handle_mail_inbox(&parsed),
285                    "mail.send" => handle_mail_send(&parsed),
286                    "health.status" => car_ffi_common::health::status(),
287                    "health.sleep" => handle_health_sleep(&parsed),
288                    "health.workouts" => handle_health_workouts(&parsed),
289                    "health.activity" => handle_health_activity(&parsed),
290                    "voice.transcribe_stream.start" => {
291                        handle_voice_transcribe_stream_start(&parsed, &state, &session).await
292                    }
293                    "voice.transcribe_stream.stop" => {
294                        handle_voice_transcribe_stream_stop(&parsed, &state).await
295                    }
296                    "voice.transcribe_stream.push" => {
297                        handle_voice_transcribe_stream_push(&parsed, &state).await
298                    }
299                    "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
300                    "voice.providers.list" => {
301                        // Stateless: enumerates STT/TTS providers compiled into
302                        // this build. Runtime readiness (API key, permission,
303                        // model download) is reported via per-provider errors.
304                        serde_json::from_str::<serde_json::Value>(
305                            &car_voice::list_voice_providers_json(),
306                        )
307                        .map_err(|e| e.to_string())
308                    }
309                    "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
310                        .await
311                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
312                    "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
313                        .await
314                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
315                    "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
316                    "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
317                        .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
318                    "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
319                    "workflow.run" => handle_workflow_run(&parsed, &session).await,
320                    "workflow.verify" => handle_workflow_verify(&parsed),
321                    "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
322                    "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
323                    "meeting.list" => handle_meeting_list(&parsed),
324                    "meeting.get" => handle_meeting_get(&parsed),
325                    "registry.register" => handle_registry_register(&parsed),
326                    "registry.heartbeat" => handle_registry_heartbeat(&parsed),
327                    "registry.unregister" => handle_registry_unregister(&parsed),
328                    "registry.list" => handle_registry_list(&parsed),
329                    "registry.reap" => handle_registry_reap(&parsed),
330                    "admission.status" => handle_admission_status(&state),
331                    "a2a.start" => handle_a2a_start(&parsed).await,
332                    "a2a.stop" => handle_a2a_stop(),
333                    "a2a.status" => handle_a2a_status(),
334                    "automation.run_applescript" => handle_run_applescript(&parsed).await,
335                    "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
336                    "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
337                    "vision.ocr" => handle_vision_ocr(&parsed).await,
338                    _ => Err(format!("unknown method: {}", method)),
339                };
340
341                let resp = match result {
342                    Ok(value) => JsonRpcResponse::success(parsed.id, value),
343                    Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
344                };
345                send_response(&session.channel, resp).await?;
346            }
347        } else if msg.is_close() {
348            info!("Client {} disconnected", client_id);
349            break;
350        }
351    }
352
353    session.host.unsubscribe(&client_id).await;
354
355    // Fix for MULTI-4 / WS-3: drop the session from the registry and
356    // drain any pending tool callbacks. Without this, every connection
357    // we ever accepted keeps an `Arc<ClientSession>` alive in
358    // `state.sessions`, and outstanding `oneshot::Sender`s in
359    // `session.channel.pending` outlive the closed connection until
360    // their per-call timeout (60s). Dropping the senders here causes any
361    // awaiting `recv()` in `WsToolExecutor::execute` to return
362    // `RecvError` immediately, which the existing error-handler path
363    // already maps to "callback channel closed" — same shape as the
364    // timeout path, just faster.
365    let _removed = state.remove_session(&client_id).await;
366    {
367        let mut pending = session.channel.pending.lock().await;
368        pending.clear();
369    }
370
371    Ok(())
372}
373
374async fn send_response(
375    channel: &WsChannel,
376    resp: JsonRpcResponse,
377) -> Result<(), Box<dyn std::error::Error>> {
378    use futures::SinkExt;
379    let json = serde_json::to_string(&resp)?;
380    channel
381        .write
382        .lock()
383        .await
384        .send(Message::Text(json.into()))
385        .await?;
386    Ok(())
387}
388
389// --- Request handlers ---
390
391async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
392    session
393        .host
394        .subscribe(&session.client_id, session.channel.clone())
395        .await;
396    serde_json::to_value(HostSnapshot {
397        subscribed: true,
398        agents: session.host.agents().await,
399        approvals: session.host.approvals().await,
400        events: session.host.events(50).await,
401    })
402    .map_err(|e| e.to_string())
403}
404
405async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
406    serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
407}
408
409async fn handle_host_events(
410    req: &JsonRpcMessage,
411    session: &crate::session::ClientSession,
412) -> Result<Value, String> {
413    let limit = req
414        .params
415        .get("limit")
416        .and_then(|v| v.as_u64())
417        .unwrap_or(100) as usize;
418    serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
419}
420
421async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
422    serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
423}
424
425async fn handle_host_register_agent(
426    req: &JsonRpcMessage,
427    session: &crate::session::ClientSession,
428) -> Result<Value, String> {
429    let request: RegisterHostAgentRequest =
430        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
431    serde_json::to_value(
432        session
433            .host
434            .register_agent(&session.client_id, request)
435            .await?,
436    )
437    .map_err(|e| e.to_string())
438}
439
440async fn handle_host_unregister_agent(
441    req: &JsonRpcMessage,
442    session: &crate::session::ClientSession,
443) -> Result<Value, String> {
444    let agent_id = req
445        .params
446        .get("agent_id")
447        .and_then(|v| v.as_str())
448        .ok_or("missing agent_id")?;
449    session.host.unregister_agent(agent_id).await?;
450    Ok(serde_json::json!({"ok": true}))
451}
452
453async fn handle_host_set_status(
454    req: &JsonRpcMessage,
455    session: &crate::session::ClientSession,
456) -> Result<Value, String> {
457    let request: SetHostAgentStatusRequest =
458        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
459    serde_json::to_value(session.host.set_status(request).await?).map_err(|e| e.to_string())
460}
461
462async fn handle_host_notify(
463    req: &JsonRpcMessage,
464    session: &crate::session::ClientSession,
465) -> Result<Value, String> {
466    let kind = req
467        .params
468        .get("kind")
469        .and_then(|v| v.as_str())
470        .unwrap_or("host.notification");
471    let agent_id = req
472        .params
473        .get("agent_id")
474        .and_then(|v| v.as_str())
475        .map(str::to_string);
476    let message = req
477        .params
478        .get("message")
479        .and_then(|v| v.as_str())
480        .unwrap_or("");
481    let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
482    serde_json::to_value(
483        session
484            .host
485            .record_event(kind, agent_id, message, payload)
486            .await,
487    )
488    .map_err(|e| e.to_string())
489}
490
491async fn handle_host_request_approval(
492    req: &JsonRpcMessage,
493    session: &crate::session::ClientSession,
494) -> Result<Value, String> {
495    let request: CreateHostApprovalRequest =
496        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
497    if let Some(agent_id) = &request.agent_id {
498        let _ = session
499            .host
500            .set_status(SetHostAgentStatusRequest {
501                agent_id: agent_id.clone(),
502                status: HostAgentStatus::WaitingForApproval,
503                current_task: None,
504                message: Some("Waiting for approval".to_string()),
505                payload: Value::Null,
506            })
507            .await;
508    }
509    serde_json::to_value(session.host.create_approval(request).await?).map_err(|e| e.to_string())
510}
511
512async fn handle_host_resolve_approval(
513    req: &JsonRpcMessage,
514    session: &crate::session::ClientSession,
515) -> Result<Value, String> {
516    let request: ResolveHostApprovalRequest =
517        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
518    serde_json::to_value(session.host.resolve_approval(request).await?).map_err(|e| e.to_string())
519}
520
521async fn handle_session_init(
522    req: &JsonRpcMessage,
523    session: &crate::session::ClientSession,
524) -> Result<Value, String> {
525    let init: SessionInitRequest =
526        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
527
528    for tool in &init.tools {
529        register_from_definition(&session.runtime, tool).await;
530    }
531
532    let mut policy_count = 0;
533    {
534        let mut policies = session.runtime.policies.write().await;
535        for policy_def in &init.policies {
536            if let Some(check) = build_policy_check(policy_def) {
537                policies.register(&policy_def.name, check, "");
538                policy_count += 1;
539            }
540        }
541    }
542
543    serde_json::to_value(SessionInitResponse {
544        session_id: session.client_id.clone(),
545        tools_registered: init.tools.len(),
546        policies_registered: policy_count,
547    })
548    .map_err(|e| e.to_string())
549}
550
551fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
552    match def.rule.as_str() {
553        "deny_tool" => {
554            let target = def.target.clone();
555            Some(Box::new(
556                move |action: &car_ir::Action, _: &car_state::StateStore| {
557                    if action.tool.as_deref() == Some(&target) {
558                        Some(format!("tool '{}' denied", target))
559                    } else {
560                        None
561                    }
562                },
563            ))
564        }
565        "require_state" => {
566            let key = def.key.clone();
567            let value = def.value.clone();
568            Some(Box::new(
569                move |_: &car_ir::Action, state: &car_state::StateStore| {
570                    if state.get(&key).as_ref() != Some(&value) {
571                        Some(format!("state['{}'] must be {:?}", key, value))
572                    } else {
573                        None
574                    }
575                },
576            ))
577        }
578        "deny_tool_param" => {
579            let target = def.target.clone();
580            let param = def.key.clone();
581            let pattern = def.pattern.clone();
582            Some(Box::new(
583                move |action: &car_ir::Action, _: &car_state::StateStore| {
584                    if action.tool.as_deref() != Some(&target) {
585                        return None;
586                    }
587                    if let Some(val) = action.parameters.get(&param) {
588                        let s = val.as_str().unwrap_or(&val.to_string()).to_string();
589                        if s.contains(&pattern) {
590                            return Some(format!("param '{}' matches '{}'", param, pattern));
591                        }
592                    }
593                    None
594                },
595            ))
596        }
597        _ => None,
598    }
599}
600
601async fn handle_tools_register(
602    req: &JsonRpcMessage,
603    session: &crate::session::ClientSession,
604) -> Result<Value, String> {
605    let tools: Vec<ToolDefinition> =
606        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
607    for tool in &tools {
608        register_from_definition(&session.runtime, tool).await;
609    }
610    Ok(Value::from(tools.len()))
611}
612
613/// Bridge a wire-protocol `ToolDefinition` to the engine's
614/// schema-aware registration. Carries the full ToolSchema shape
615/// (description, parameters, returns, idempotency, caching, rate
616/// limit) through to the validator. An empty `parameters` object is
617/// the legacy schemaless registration — the validator no-ops for
618/// those, so pre-v0.5.x callers see no change.
619async fn register_from_definition(
620    runtime: &car_engine::Runtime,
621    def: &ToolDefinition,
622) {
623    runtime
624        .register_tool_schema(car_ir::ToolSchema {
625            name: def.name.clone(),
626            description: def.description.clone(),
627            parameters: def.parameters.clone(),
628            returns: def.returns.clone(),
629            idempotent: def.idempotent,
630            cache_ttl_secs: def.cache_ttl_secs,
631            rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
632                max_calls: rl.max_calls,
633                interval_secs: rl.interval_secs,
634            }),
635        })
636        .await;
637}
638
639async fn handle_proposal_submit(
640    req: &JsonRpcMessage,
641    session: &crate::session::ClientSession,
642) -> Result<Value, String> {
643    let submit: ProposalSubmitRequest =
644        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
645    let result = session.runtime.execute(&submit.proposal).await;
646    serde_json::to_value(result).map_err(|e| e.to_string())
647}
648
649async fn handle_verify(
650    req: &JsonRpcMessage,
651    session: &crate::session::ClientSession,
652) -> Result<Value, String> {
653    let vr: VerifyRequest =
654        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
655    let tools: std::collections::HashSet<String> =
656        session.runtime.tools.read().await.keys().cloned().collect();
657    let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
658    serde_json::to_value(VerifyResponse {
659        valid: result.valid,
660        issues: result
661            .issues
662            .iter()
663            .map(|i| VerifyIssueProto {
664                action_id: i.action_id.clone(),
665                severity: i.severity.clone(),
666                message: i.message.clone(),
667            })
668            .collect(),
669        simulated_state: result.simulated_state,
670    })
671    .map_err(|e| e.to_string())
672}
673
674async fn handle_state_get(
675    req: &JsonRpcMessage,
676    session: &crate::session::ClientSession,
677) -> Result<Value, String> {
678    let key = req
679        .params
680        .get("key")
681        .and_then(|v| v.as_str())
682        .ok_or("missing 'key'")?;
683    Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
684}
685
686async fn handle_state_set(
687    req: &JsonRpcMessage,
688    session: &crate::session::ClientSession,
689) -> Result<Value, String> {
690    let key = req
691        .params
692        .get("key")
693        .and_then(|v| v.as_str())
694        .ok_or("missing 'key'")?;
695    let value = req.params.get("value").cloned().unwrap_or(Value::Null);
696    session.runtime.state.set(key, value, "client");
697    Ok(Value::from("ok"))
698}
699
700// --- Memory handlers ---
701
702/// `memory.fact_count` — return `valid_fact_count()` of the
703/// session's memgine. Used by FFI bindings to mirror their
704/// embedded `fact_count()` accessor without round-tripping a full
705/// query. No params.
706async fn handle_memory_fact_count(
707    session: &crate::session::ClientSession,
708) -> Result<Value, String> {
709    let engine = session.memgine.lock().await;
710    Ok(Value::from(engine.valid_fact_count()))
711}
712
713async fn handle_memory_add_fact(
714    req: &JsonRpcMessage,
715    session: &crate::session::ClientSession,
716) -> Result<Value, String> {
717    let subject = req
718        .params
719        .get("subject")
720        .and_then(|v| v.as_str())
721        .ok_or("missing subject")?;
722    let body = req
723        .params
724        .get("body")
725        .and_then(|v| v.as_str())
726        .ok_or("missing body")?;
727    let kind = req
728        .params
729        .get("kind")
730        .and_then(|v| v.as_str())
731        .unwrap_or("pattern");
732    let mut engine = session.memgine.lock().await;
733    let fid = format!("ws-{}", engine.valid_fact_count());
734    engine.ingest_fact(
735        &fid,
736        subject,
737        body,
738        "user",
739        "peer",
740        chrono::Utc::now(),
741        "global",
742        None,
743        vec![],
744        kind == "constraint",
745    );
746    Ok(Value::from(engine.valid_fact_count()))
747}
748
749async fn handle_memory_query(
750    req: &JsonRpcMessage,
751    session: &crate::session::ClientSession,
752) -> Result<Value, String> {
753    let query = req
754        .params
755        .get("query")
756        .and_then(|v| v.as_str())
757        .ok_or("missing query")?;
758    let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
759    let engine = session.memgine.lock().await;
760    let seeds = engine.graph.find_seeds(query, 5);
761    let hits = if !seeds.is_empty() {
762        engine.graph.retrieve(&seeds, 3, k, 0.6, 0.05)
763    } else {
764        vec![]
765    };
766    let results: Vec<Value> = hits.iter().filter_map(|hit| {
767        let node = engine.graph.inner.node_weight(hit.node_ix)?;
768        Some(serde_json::json!({"subject": node.key, "body": node.value, "activation": hit.activation}))
769    }).collect();
770    serde_json::to_value(results).map_err(|e| e.to_string())
771}
772
773async fn handle_memory_build_context(
774    req: &JsonRpcMessage,
775    session: &crate::session::ClientSession,
776) -> Result<Value, String> {
777    let query = req
778        .params
779        .get("query")
780        .and_then(|v| v.as_str())
781        .unwrap_or("");
782    let mut engine = session.memgine.lock().await;
783    Ok(Value::from(engine.build_context(query)))
784}
785
786// --- Skill handlers ---
787
788async fn handle_skill_ingest(
789    req: &JsonRpcMessage,
790    session: &crate::session::ClientSession,
791) -> Result<Value, String> {
792    let name = req
793        .params
794        .get("name")
795        .and_then(|v| v.as_str())
796        .ok_or("missing name")?;
797    let code = req
798        .params
799        .get("code")
800        .and_then(|v| v.as_str())
801        .ok_or("missing code")?;
802    let platform = req
803        .params
804        .get("platform")
805        .and_then(|v| v.as_str())
806        .unwrap_or("unknown");
807    let persona = req
808        .params
809        .get("persona")
810        .and_then(|v| v.as_str())
811        .unwrap_or("");
812    let url_pattern = req
813        .params
814        .get("url_pattern")
815        .and_then(|v| v.as_str())
816        .unwrap_or("");
817    let description = req
818        .params
819        .get("description")
820        .and_then(|v| v.as_str())
821        .unwrap_or("");
822    let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
823    let keywords: Vec<String> = req
824        .params
825        .get("task_keywords")
826        .and_then(|v| v.as_array())
827        .map(|arr| {
828            arr.iter()
829                .filter_map(|v| v.as_str().map(String::from))
830                .collect()
831        })
832        .unwrap_or_default();
833
834    let trigger = car_memgine::SkillTrigger {
835        persona: persona.into(),
836        url_pattern: url_pattern.into(),
837        task_keywords: keywords,
838    };
839    let mut engine = session.memgine.lock().await;
840    engine.ingest_skill(
841        name,
842        code,
843        platform,
844        trigger,
845        description,
846        supersedes,
847        vec![],
848        vec![],
849    );
850    Ok(Value::from("ok"))
851}
852
853async fn handle_skill_find(
854    req: &JsonRpcMessage,
855    session: &crate::session::ClientSession,
856) -> Result<Value, String> {
857    let persona = req
858        .params
859        .get("persona")
860        .and_then(|v| v.as_str())
861        .unwrap_or("");
862    let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
863    let task = req
864        .params
865        .get("task")
866        .and_then(|v| v.as_str())
867        .unwrap_or("");
868    let max = req
869        .params
870        .get("max_results")
871        .and_then(|v| v.as_u64())
872        .unwrap_or(1) as usize;
873    let engine = session.memgine.lock().await;
874    let results = engine.find_skill(persona, url, task, max);
875    let json: Vec<Value> = results
876        .iter()
877        .map(|(m, s)| {
878            serde_json::json!({
879                "name": m.name, "code": m.code, "platform": m.platform,
880                "description": m.description, "stats": m.stats, "match_score": s,
881            })
882        })
883        .collect();
884    serde_json::to_value(json).map_err(|e| e.to_string())
885}
886
887async fn handle_skill_report(
888    req: &JsonRpcMessage,
889    session: &crate::session::ClientSession,
890) -> Result<Value, String> {
891    let name = req
892        .params
893        .get("skill_name")
894        .and_then(|v| v.as_str())
895        .ok_or("missing skill_name")?;
896    let outcome_str = req
897        .params
898        .get("outcome")
899        .and_then(|v| v.as_str())
900        .ok_or("missing outcome")?;
901    let outcome = match outcome_str {
902        "success" => car_memgine::SkillOutcome::Success,
903        _ => car_memgine::SkillOutcome::Fail,
904    };
905    let mut engine = session.memgine.lock().await;
906    let stats = engine
907        .report_outcome(name, outcome)
908        .ok_or(format!("skill '{}' not found", name))?;
909    serde_json::to_value(stats).map_err(|e| e.to_string())
910}
911
912// ---------------------------------------------------------------------------
913// Multi-agent coordination handlers
914//
915// The WsAgentRunner sends a `multi.run_agent` JSON-RPC request to the client.
916// The client runs the model loop and responds with AgentOutput JSON.
917// ---------------------------------------------------------------------------
918
919/// AgentRunner backed by WebSocket callback to the client.
920struct WsAgentRunner {
921    channel: Arc<WsChannel>,
922    host: Arc<crate::host::HostState>,
923    client_id: String,
924}
925
926#[async_trait::async_trait]
927impl car_multi::AgentRunner for WsAgentRunner {
928    async fn run(
929        &self,
930        spec: &car_multi::AgentSpec,
931        task: &str,
932        _runtime: &car_engine::Runtime,
933        _mailbox: &car_multi::Mailbox,
934    ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
935        use futures::SinkExt;
936
937        let request_id = self.channel.next_request_id();
938        let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
939        let agent = self
940            .host
941            .register_agent(
942                &self.client_id,
943                RegisterHostAgentRequest {
944                    id: Some(agent_id.clone()),
945                    name: spec.name.clone(),
946                    kind: "callback".to_string(),
947                    capabilities: spec.tools.clone(),
948                    project: spec
949                        .metadata
950                        .get("project")
951                        .and_then(|v| v.as_str())
952                        .map(str::to_string),
953                    pid: None,
954                    display: serde_json::from_value(
955                        spec.metadata
956                            .get("display")
957                            .cloned()
958                            .unwrap_or(serde_json::Value::Null),
959                    )
960                    .unwrap_or_default(),
961                    metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
962                },
963            )
964            .await
965            .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
966        let _ = self
967            .host
968            .set_status(SetHostAgentStatusRequest {
969                agent_id: agent.id.clone(),
970                status: HostAgentStatus::Running,
971                current_task: Some(task.to_string()),
972                message: Some(format!("{} started", spec.name)),
973                payload: serde_json::json!({ "task": task }),
974            })
975            .await;
976
977        let rpc_request = serde_json::json!({
978            "jsonrpc": "2.0",
979            "method": "multi.run_agent",
980            "params": {
981                "spec": spec,
982                "task": task,
983            },
984            "id": request_id,
985        });
986
987        // Create oneshot channel for the response
988        let (tx, rx) = tokio::sync::oneshot::channel();
989        self.channel
990            .pending
991            .lock()
992            .await
993            .insert(request_id.clone(), tx);
994
995        let msg = Message::Text(
996            serde_json::to_string(&rpc_request)
997                .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
998                .into(),
999        );
1000        if let Err(e) = self.channel.write.lock().await.send(msg).await {
1001            let _ = self
1002                .host
1003                .set_status(SetHostAgentStatusRequest {
1004                    agent_id: agent_id.clone(),
1005                    status: HostAgentStatus::Errored,
1006                    current_task: None,
1007                    message: Some(format!("{} failed to start", spec.name)),
1008                    payload: serde_json::json!({ "error": e.to_string() }),
1009                })
1010                .await;
1011            return Err(car_multi::MultiError::AgentFailed(
1012                spec.name.clone(),
1013                format!("ws send error: {}", e),
1014            ));
1015        }
1016
1017        // Wait for client response (5 min timeout for model loops)
1018        let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
1019            Ok(Ok(response)) => response,
1020            Ok(Err(_)) => {
1021                let _ = self
1022                    .host
1023                    .set_status(SetHostAgentStatusRequest {
1024                        agent_id: agent_id.clone(),
1025                        status: HostAgentStatus::Errored,
1026                        current_task: None,
1027                        message: Some(format!("{} callback channel closed", spec.name)),
1028                        payload: Value::Null,
1029                    })
1030                    .await;
1031                return Err(car_multi::MultiError::AgentFailed(
1032                    spec.name.clone(),
1033                    "agent callback channel closed".into(),
1034                ));
1035            }
1036            Err(_) => {
1037                let _ = self
1038                    .host
1039                    .set_status(SetHostAgentStatusRequest {
1040                        agent_id: agent_id.clone(),
1041                        status: HostAgentStatus::Errored,
1042                        current_task: None,
1043                        message: Some(format!("{} timed out", spec.name)),
1044                        payload: Value::Null,
1045                    })
1046                    .await;
1047                return Err(car_multi::MultiError::AgentFailed(
1048                    spec.name.clone(),
1049                    "agent callback timed out (300s)".into(),
1050                ));
1051            }
1052        };
1053
1054        if let Some(err) = response.error {
1055            let _ = self
1056                .host
1057                .set_status(SetHostAgentStatusRequest {
1058                    agent_id: agent_id.clone(),
1059                    status: HostAgentStatus::Errored,
1060                    current_task: None,
1061                    message: Some(format!("{} errored", spec.name)),
1062                    payload: serde_json::json!({ "error": err }),
1063                })
1064                .await;
1065            return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
1066        }
1067
1068        let output_value = response.output.unwrap_or(Value::Null);
1069        let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
1070            car_multi::MultiError::AgentFailed(
1071                spec.name.clone(),
1072                format!("invalid AgentOutput: {}", e),
1073            )
1074        })?;
1075        let status = if output.error.is_some() {
1076            HostAgentStatus::Errored
1077        } else {
1078            HostAgentStatus::Completed
1079        };
1080        let message = if output.error.is_some() {
1081            format!("{} errored", spec.name)
1082        } else {
1083            format!("{} completed", spec.name)
1084        };
1085        let _ = self
1086            .host
1087            .set_status(SetHostAgentStatusRequest {
1088                agent_id,
1089                status,
1090                current_task: None,
1091                message: Some(message),
1092                payload: serde_json::to_value(&output).unwrap_or(Value::Null),
1093            })
1094            .await;
1095
1096        Ok(output)
1097    }
1098}
1099
1100fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
1101    let safe_name: String = name
1102        .chars()
1103        .map(|c| {
1104            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
1105                c
1106            } else {
1107                '-'
1108            }
1109        })
1110        .collect();
1111    format!("{}:{}:{}", client_id, safe_name, request_id)
1112}
1113
1114async fn handle_multi_swarm(
1115    req: &JsonRpcMessage,
1116    session: &crate::session::ClientSession,
1117) -> Result<Value, String> {
1118    let mode_str = req
1119        .params
1120        .get("mode")
1121        .and_then(|v| v.as_str())
1122        .ok_or("missing 'mode'")?;
1123    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1124    let task = req
1125        .params
1126        .get("task")
1127        .and_then(|v| v.as_str())
1128        .ok_or("missing 'task'")?;
1129
1130    let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
1131        .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
1132    let agent_specs: Vec<car_multi::AgentSpec> =
1133        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1134    let synth: Option<car_multi::AgentSpec> = req
1135        .params
1136        .get("synthesizer")
1137        .map(|v| serde_json::from_value(v.clone()))
1138        .transpose()
1139        .map_err(|e| format!("invalid synthesizer: {}", e))?;
1140
1141    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1142        channel: session.channel.clone(),
1143        host: session.host.clone(),
1144        client_id: session.client_id.clone(),
1145    });
1146    let infra = car_multi::SharedInfra::new();
1147
1148    let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
1149    if let Some(s) = synth {
1150        swarm = swarm.with_synthesizer(s);
1151    }
1152
1153    let result = swarm
1154        .run(task, &runner, &infra)
1155        .await
1156        .map_err(|e| format!("swarm error: {}", e))?;
1157    serde_json::to_value(result).map_err(|e| e.to_string())
1158}
1159
1160async fn handle_multi_pipeline(
1161    req: &JsonRpcMessage,
1162    session: &crate::session::ClientSession,
1163) -> Result<Value, String> {
1164    let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
1165    let task = req
1166        .params
1167        .get("task")
1168        .and_then(|v| v.as_str())
1169        .ok_or("missing 'task'")?;
1170
1171    let stage_specs: Vec<car_multi::AgentSpec> =
1172        serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
1173
1174    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1175        channel: session.channel.clone(),
1176        host: session.host.clone(),
1177        client_id: session.client_id.clone(),
1178    });
1179    let infra = car_multi::SharedInfra::new();
1180
1181    let result = car_multi::Pipeline::new(stage_specs)
1182        .run(task, &runner, &infra)
1183        .await
1184        .map_err(|e| format!("pipeline error: {}", e))?;
1185    serde_json::to_value(result).map_err(|e| e.to_string())
1186}
1187
1188async fn handle_multi_supervisor(
1189    req: &JsonRpcMessage,
1190    session: &crate::session::ClientSession,
1191) -> Result<Value, String> {
1192    let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
1193    let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
1194    let task = req
1195        .params
1196        .get("task")
1197        .and_then(|v| v.as_str())
1198        .ok_or("missing 'task'")?;
1199    let max_rounds = req
1200        .params
1201        .get("max_rounds")
1202        .and_then(|v| v.as_u64())
1203        .unwrap_or(3) as u32;
1204
1205    let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
1206        .map_err(|e| format!("invalid workers: {}", e))?;
1207    let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
1208        .map_err(|e| format!("invalid supervisor: {}", e))?;
1209
1210    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1211        channel: session.channel.clone(),
1212        host: session.host.clone(),
1213        client_id: session.client_id.clone(),
1214    });
1215    let infra = car_multi::SharedInfra::new();
1216
1217    let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
1218        .with_max_rounds(max_rounds)
1219        .run(task, &runner, &infra)
1220        .await
1221        .map_err(|e| format!("supervisor error: {}", e))?;
1222    serde_json::to_value(result).map_err(|e| e.to_string())
1223}
1224
1225async fn handle_multi_map_reduce(
1226    req: &JsonRpcMessage,
1227    session: &crate::session::ClientSession,
1228) -> Result<Value, String> {
1229    let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
1230    let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
1231    let task = req
1232        .params
1233        .get("task")
1234        .and_then(|v| v.as_str())
1235        .ok_or("missing 'task'")?;
1236    let items_val = req.params.get("items").ok_or("missing 'items'")?;
1237
1238    let mapper_spec: car_multi::AgentSpec =
1239        serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
1240    let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
1241        .map_err(|e| format!("invalid reducer: {}", e))?;
1242    let items: Vec<String> =
1243        serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
1244
1245    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1246        channel: session.channel.clone(),
1247        host: session.host.clone(),
1248        client_id: session.client_id.clone(),
1249    });
1250    let infra = car_multi::SharedInfra::new();
1251
1252    let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
1253        .run(task, &items, &runner, &infra)
1254        .await
1255        .map_err(|e| format!("map_reduce error: {}", e))?;
1256    serde_json::to_value(result).map_err(|e| e.to_string())
1257}
1258
1259async fn handle_multi_vote(
1260    req: &JsonRpcMessage,
1261    session: &crate::session::ClientSession,
1262) -> Result<Value, String> {
1263    let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1264    let task = req
1265        .params
1266        .get("task")
1267        .and_then(|v| v.as_str())
1268        .ok_or("missing 'task'")?;
1269
1270    let agent_specs: Vec<car_multi::AgentSpec> =
1271        serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1272    let synth: Option<car_multi::AgentSpec> = req
1273        .params
1274        .get("synthesizer")
1275        .map(|v| serde_json::from_value(v.clone()))
1276        .transpose()
1277        .map_err(|e| format!("invalid synthesizer: {}", e))?;
1278
1279    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1280        channel: session.channel.clone(),
1281        host: session.host.clone(),
1282        client_id: session.client_id.clone(),
1283    });
1284    let infra = car_multi::SharedInfra::new();
1285
1286    let mut vote = car_multi::Vote::new(agent_specs);
1287    if let Some(s) = synth {
1288        vote = vote.with_synthesizer(s);
1289    }
1290
1291    let result = vote
1292        .run(task, &runner, &infra)
1293        .await
1294        .map_err(|e| format!("vote error: {}", e))?;
1295    serde_json::to_value(result).map_err(|e| e.to_string())
1296}
1297
1298// ---------------------------------------------------------------------------
1299// Scheduler handlers
1300// ---------------------------------------------------------------------------
1301
1302fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
1303    let name = req
1304        .params
1305        .get("name")
1306        .and_then(|v| v.as_str())
1307        .ok_or("scheduler.create requires 'name'")?;
1308    let prompt = req
1309        .params
1310        .get("prompt")
1311        .and_then(|v| v.as_str())
1312        .ok_or("scheduler.create requires 'prompt'")?;
1313
1314    let mut task = car_scheduler::Task::new(name, prompt);
1315
1316    if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
1317        let trigger = match t {
1318            "once" => car_scheduler::TaskTrigger::Once,
1319            "cron" => car_scheduler::TaskTrigger::Cron,
1320            "interval" => car_scheduler::TaskTrigger::Interval,
1321            "file_watch" => car_scheduler::TaskTrigger::FileWatch,
1322            _ => car_scheduler::TaskTrigger::Manual,
1323        };
1324        let schedule = req
1325            .params
1326            .get("schedule")
1327            .and_then(|v| v.as_str())
1328            .unwrap_or("");
1329        task = task.with_trigger(trigger, schedule);
1330    }
1331
1332    if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
1333        task = task.with_system_prompt(sp);
1334    }
1335
1336    serde_json::to_value(&task).map_err(|e| e.to_string())
1337}
1338
1339async fn handle_scheduler_run(
1340    req: &JsonRpcMessage,
1341    session: &crate::session::ClientSession,
1342) -> Result<Value, String> {
1343    let task_val = req
1344        .params
1345        .get("task")
1346        .ok_or("scheduler.run requires 'task'")?;
1347    let mut task: car_scheduler::Task =
1348        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1349
1350    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1351        channel: session.channel.clone(),
1352        host: session.host.clone(),
1353        client_id: session.client_id.clone(),
1354    });
1355    let executor = car_scheduler::Executor::new(runner);
1356    let execution = executor.run_once(&mut task).await;
1357
1358    serde_json::to_value(&execution).map_err(|e| e.to_string())
1359}
1360
1361async fn handle_scheduler_run_loop(
1362    req: &JsonRpcMessage,
1363    session: &crate::session::ClientSession,
1364) -> Result<Value, String> {
1365    let task_val = req
1366        .params
1367        .get("task")
1368        .ok_or("scheduler.run_loop requires 'task'")?;
1369    let mut task: car_scheduler::Task =
1370        serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1371    let max_iterations = req
1372        .params
1373        .get("max_iterations")
1374        .and_then(|v| v.as_u64())
1375        .map(|v| v as u32);
1376
1377    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1378        channel: session.channel.clone(),
1379        host: session.host.clone(),
1380        client_id: session.client_id.clone(),
1381    });
1382    let executor = car_scheduler::Executor::new(runner);
1383    let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1384    let executions = executor
1385        .run_loop(&mut task, max_iterations, cancel_rx)
1386        .await;
1387
1388    serde_json::to_value(&executions).map_err(|e| e.to_string())
1389}
1390
1391// ---------------------------------------------------------------------------
1392// Inference handlers
1393// ---------------------------------------------------------------------------
1394
1395fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
1396    state.inference.get_or_init(|| {
1397        Arc::new(car_inference::InferenceEngine::new(
1398            car_inference::InferenceConfig::default(),
1399        ))
1400    })
1401}
1402
1403async fn handle_infer(
1404    msg: &JsonRpcMessage,
1405    state: &ServerState,
1406    session: &crate::session::ClientSession,
1407) -> Result<Value, String> {
1408    let engine = get_inference_engine(state);
1409    let mut req: car_inference::GenerateRequest =
1410        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1411
1412    // If context_query is provided, build context from memgine and inject it
1413    if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
1414        let mut memgine = session.memgine.lock().await;
1415        let ctx = memgine.build_context(cq);
1416        if !ctx.is_empty() {
1417            req.context = Some(ctx);
1418        }
1419    }
1420
1421    // Process-wide admission gate. Held for the duration of the
1422    // generation so a burst of concurrent infer RPCs can't multiply
1423    // KV-cache + activation memory and take the host out. The
1424    // `_permit` binding is intentional — its `Drop` releases the slot
1425    // when this future returns.
1426    let _permit = state.admission.acquire().await;
1427
1428    // Use generate_tracked() so tool_calls, usage, model_used, trace_id, and
1429    // latency_ms are preserved in the response. Plain `generate()` discards
1430    // everything except `.text`, which silently breaks tool-use over the
1431    // WebSocket protocol (issue #43).
1432    //
1433    // NOTE: This directly serializes `InferenceResult`. Any field added to
1434    // that struct in `car-inference` becomes part of the public WebSocket
1435    // protocol. The shape is locked by `inference_result_serializes_*` tests
1436    // in car-inference; updating those tests is part of intentionally
1437    // changing the wire contract.
1438    let result = engine
1439        .generate_tracked(req)
1440        .await
1441        .map_err(|e| e.to_string())?;
1442    serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
1443}
1444
1445async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1446    let engine = get_inference_engine(state);
1447    let req: car_inference::EmbedRequest =
1448        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1449    // Embeds load their own model weights; share the same admission
1450    // gate as generations so a burst of embed requests can't smuggle
1451    // around the concurrency cap.
1452    let _permit = state.admission.acquire().await;
1453    let result = engine.embed(req).await.map_err(|e| e.to_string())?;
1454    Ok(serde_json::json!({"embeddings": result}))
1455}
1456
1457async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1458    let engine = get_inference_engine(state);
1459    let req: car_inference::ClassifyRequest =
1460        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1461    let _permit = state.admission.acquire().await;
1462    let result = engine.classify(req).await.map_err(|e| e.to_string())?;
1463    Ok(serde_json::json!({"classifications": result}))
1464}
1465
1466/// Surface the current admission state so the menubar tray and
1467/// `car daemon status` can show "queued: N" / "permits: P/T". Read-only
1468/// snapshot — racy by definition but correct enough for status panels.
1469fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
1470    let total = state.admission.permits();
1471    let available = state.admission.permits_available();
1472    let in_use = total.saturating_sub(available);
1473    Ok(serde_json::json!({
1474        "permits_total": total,
1475        "permits_available": available,
1476        "permits_in_use": in_use,
1477        "env_override": crate::admission::ENV_MAX_CONCURRENT,
1478    }))
1479}
1480
1481async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1482    let model = msg
1483        .params
1484        .get("model")
1485        .and_then(|v| v.as_str())
1486        .ok_or("missing 'model' parameter")?;
1487    let text = msg
1488        .params
1489        .get("text")
1490        .and_then(|v| v.as_str())
1491        .ok_or("missing 'text' parameter")?;
1492    let engine = get_inference_engine(state);
1493    let ids = engine
1494        .tokenize(model, text)
1495        .await
1496        .map_err(|e| e.to_string())?;
1497    Ok(serde_json::json!({"tokens": ids}))
1498}
1499
1500async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1501    let model = msg
1502        .params
1503        .get("model")
1504        .and_then(|v| v.as_str())
1505        .ok_or("missing 'model' parameter")?;
1506    let tokens: Vec<u32> = msg
1507        .params
1508        .get("tokens")
1509        .and_then(|v| v.as_array())
1510        .ok_or("missing 'tokens' parameter")?
1511        .iter()
1512        .map(|t| {
1513            t.as_u64()
1514                .and_then(|n| u32::try_from(n).ok())
1515                .ok_or_else(|| "tokens[] must be u32 values".to_string())
1516        })
1517        .collect::<Result<Vec<_>, _>>()?;
1518    let engine = get_inference_engine(state);
1519    let text = engine
1520        .detokenize(model, &tokens)
1521        .await
1522        .map_err(|e| e.to_string())?;
1523    Ok(serde_json::json!({"text": text}))
1524}
1525
1526fn handle_models_list(state: &ServerState) -> Result<Value, String> {
1527    let engine = get_inference_engine(state);
1528    let models = engine.list_models();
1529    serde_json::to_value(&models).map_err(|e| e.to_string())
1530}
1531
1532fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
1533    let engine = get_inference_engine(state);
1534    let models = engine.list_models_unified();
1535    serde_json::to_value(&models).map_err(|e| e.to_string())
1536}
1537
1538async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1539    let name = msg
1540        .params
1541        .get("name")
1542        .and_then(|v| v.as_str())
1543        .ok_or("missing 'name' parameter")?;
1544    let engine = get_inference_engine(state);
1545    let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
1546    Ok(serde_json::json!({"path": path.display().to_string()}))
1547}
1548
1549async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1550    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
1551        msg.params
1552            .get("events")
1553            .cloned()
1554            .unwrap_or(msg.params.clone()),
1555    )
1556    .map_err(|e| format!("invalid events: {}", e))?;
1557
1558    let inference = get_inference_engine(state).clone();
1559    let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
1560
1561    let skills = engine.distill_skills(&events).await;
1562    serde_json::to_value(&skills).map_err(|e| e.to_string())
1563}
1564
1565/// Run memory consolidation against this client's session memgine.
1566/// Returns the JSON `ConsolidationReport`.
1567async fn handle_memory_consolidate(
1568    session: &crate::session::ClientSession,
1569) -> Result<Value, String> {
1570    let mut engine = session.memgine.lock().await;
1571    let report = engine.consolidate().await;
1572    serde_json::to_value(&report).map_err(|e| e.to_string())
1573}
1574
1575/// Repair a degraded skill on this client's session memgine.
1576/// Returns `{ code: "..." }` on success, `null` if the skill
1577/// isn't broken or repair failed.
1578async fn handle_skill_repair(
1579    msg: &JsonRpcMessage,
1580    session: &crate::session::ClientSession,
1581) -> Result<Value, String> {
1582    let name = msg
1583        .params
1584        .get("skill_name")
1585        .and_then(|v| v.as_str())
1586        .ok_or("missing 'skill_name' parameter")?;
1587    let mut engine = session.memgine.lock().await;
1588    let code = engine.repair_skill(name).await;
1589    Ok(match code {
1590        Some(c) => serde_json::json!({ "code": c }),
1591        None => Value::Null,
1592    })
1593}
1594
1595/// Ingest distilled skills into this client's session memgine.
1596/// Returns the number of nodes inserted.
1597async fn handle_skills_ingest_distilled(
1598    msg: &JsonRpcMessage,
1599    session: &crate::session::ClientSession,
1600) -> Result<Value, String> {
1601    let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
1602        msg.params
1603            .get("skills")
1604            .cloned()
1605            .unwrap_or(msg.params.clone()),
1606    )
1607    .map_err(|e| format!("invalid skills: {}", e))?;
1608    let mut engine = session.memgine.lock().await;
1609    let nodes = engine.ingest_distilled_skills(&skills);
1610    Ok(serde_json::json!({ "ingested": nodes.len() }))
1611}
1612
1613/// Run skill evolution against this session's memgine for a
1614/// specified domain.  Returns the resulting `DistilledSkill` array.
1615async fn handle_skills_evolve(
1616    msg: &JsonRpcMessage,
1617    session: &crate::session::ClientSession,
1618) -> Result<Value, String> {
1619    let domain = msg
1620        .params
1621        .get("domain")
1622        .and_then(|v| v.as_str())
1623        .ok_or("missing 'domain' parameter")?
1624        .to_string();
1625    let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
1626        msg.params
1627            .get("events")
1628            .cloned()
1629            .unwrap_or(Value::Array(vec![])),
1630    )
1631    .map_err(|e| format!("invalid events: {}", e))?;
1632    let mut engine = session.memgine.lock().await;
1633    let skills = engine.evolve_skills(&events, &domain).await;
1634    serde_json::to_value(&skills).map_err(|e| e.to_string())
1635}
1636
1637/// List domains whose skills are underperforming on this session.
1638async fn handle_skills_domains_needing_evolution(
1639    msg: &JsonRpcMessage,
1640    session: &crate::session::ClientSession,
1641) -> Result<Value, String> {
1642    let threshold = msg
1643        .params
1644        .get("threshold")
1645        .and_then(|v| v.as_f64())
1646        .unwrap_or(0.6);
1647    let engine = session.memgine.lock().await;
1648    let domains = engine.domains_needing_evolution(threshold);
1649    serde_json::to_value(&domains).map_err(|e| e.to_string())
1650}
1651
1652/// Rerank documents against a query using a cross-encoder model.
1653async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1654    let engine = get_inference_engine(state);
1655    let req: car_inference::RerankRequest =
1656        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1657    let _permit = state.admission.acquire().await;
1658    let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
1659    serde_json::to_value(&result).map_err(|e| e.to_string())
1660}
1661
1662/// Transcribe audio at the given path. The path is interpreted on
1663/// the daemon's filesystem, not the FFI caller's — Daemon-mode
1664/// callers must pass a path the daemon can read (typically a
1665/// shared `~/.car/...` location or stdin push via the streaming
1666/// API).
1667async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1668    let engine = get_inference_engine(state);
1669    let req: car_inference::TranscribeRequest =
1670        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1671    let _permit = state.admission.acquire().await;
1672    let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
1673    serde_json::to_value(&result).map_err(|e| e.to_string())
1674}
1675
1676/// Synthesize speech to a daemon-side output path. Same caveat
1677/// as transcribe: `output_path` is on the daemon's filesystem.
1678async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1679    let engine = get_inference_engine(state);
1680    let req: car_inference::SynthesizeRequest =
1681        serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1682    let _permit = state.admission.acquire().await;
1683    let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
1684    serde_json::to_value(&result).map_err(|e| e.to_string())
1685}
1686
1687/// Prepare the speech runtime (downloads / warmup). Returns a
1688/// JSON status string, mirroring the embedded
1689/// `prepare_speech_runtime` shape.
1690async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
1691    let engine = get_inference_engine(state);
1692    let status = engine
1693        .prepare_speech_runtime()
1694        .await
1695        .map_err(|e| e.to_string())?;
1696    serde_json::to_value(&status).map_err(|e| e.to_string())
1697}
1698
1699/// Adaptive route decision for a prompt — returns the routing
1700/// JSON the FFI's `route_model` returns.
1701async fn handle_models_route(
1702    msg: &JsonRpcMessage,
1703    state: &ServerState,
1704) -> Result<Value, String> {
1705    let prompt = msg
1706        .params
1707        .get("prompt")
1708        .and_then(|v| v.as_str())
1709        .ok_or("missing 'prompt' parameter")?;
1710    let engine = get_inference_engine(state);
1711    let decision = engine.route_adaptive(prompt).await;
1712    serde_json::to_value(&decision).map_err(|e| e.to_string())
1713}
1714
1715/// Model performance profiles snapshot.
1716async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
1717    let engine = get_inference_engine(state);
1718    let profiles = engine.export_profiles().await;
1719    serde_json::to_value(&profiles).map_err(|e| e.to_string())
1720}
1721
1722/// Per-session event log size.
1723async fn handle_events_count(
1724    session: &crate::session::ClientSession,
1725) -> Result<Value, String> {
1726    let n = session.runtime.log.lock().await.len();
1727    Ok(Value::from(n as u64))
1728}
1729
1730/// Update the per-session replan config. Wire shape mirrors the
1731/// FFI's positional `set_replan_config` arguments — the engine
1732/// crate's `ReplanConfig` struct doesn't derive Serialize, so we
1733/// reconstruct it from a flat object here.
1734async fn handle_replan_set_config(
1735    msg: &JsonRpcMessage,
1736    session: &crate::session::ClientSession,
1737) -> Result<Value, String> {
1738    let max_replans = msg
1739        .params
1740        .get("max_replans")
1741        .and_then(|v| v.as_u64())
1742        .unwrap_or(0) as u32;
1743    let delay_ms = msg
1744        .params
1745        .get("delay_ms")
1746        .and_then(|v| v.as_u64())
1747        .unwrap_or(0);
1748    let verify_before_execute = msg
1749        .params
1750        .get("verify_before_execute")
1751        .and_then(|v| v.as_bool())
1752        .unwrap_or(true);
1753    let cfg = car_engine::ReplanConfig {
1754        max_replans,
1755        delay_ms,
1756        verify_before_execute,
1757    };
1758    session.runtime.set_replan_config(cfg).await;
1759    Ok(Value::Null)
1760}
1761
1762async fn handle_skills_list(
1763    msg: &JsonRpcMessage,
1764    session: &crate::session::ClientSession,
1765) -> Result<Value, String> {
1766    let domain = msg.params.get("domain").and_then(|v| v.as_str());
1767    let engine = session.memgine.lock().await;
1768    let skills: Vec<serde_json::Value> = engine
1769        .graph
1770        .inner
1771        .node_indices()
1772        .filter_map(|nix| {
1773            let node = engine.graph.inner.node_weight(nix)?;
1774            if node.kind != car_memgine::MemKind::Skill {
1775                return None;
1776            }
1777            let meta = car_memgine::SkillMeta::from_node(node)?;
1778            if let Some(d) = domain {
1779                match &meta.scope {
1780                    car_memgine::SkillScope::Global => {}
1781                    car_memgine::SkillScope::Domain(sd) if sd == d => {}
1782                    _ => return None,
1783                }
1784            }
1785            Some(serde_json::to_value(&meta).unwrap_or_default())
1786        })
1787        .collect();
1788    serde_json::to_value(&skills).map_err(|e| e.to_string())
1789}
1790
1791#[derive(serde::Deserialize)]
1792struct SecretParams {
1793    #[serde(default)]
1794    service: Option<String>,
1795    key: String,
1796    #[serde(default)]
1797    value: Option<String>,
1798}
1799
1800fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
1801    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1802    let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
1803    car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
1804}
1805
1806fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
1807    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1808    car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
1809}
1810
1811fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
1812    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1813    car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
1814}
1815
1816fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
1817    let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818    car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
1819}
1820
1821#[derive(serde::Deserialize)]
1822struct PermParams {
1823    domain: String,
1824    #[serde(default)]
1825    target_bundle_id: Option<String>,
1826}
1827
1828fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
1829    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1830    car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
1831}
1832
1833fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
1834    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1835    car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
1836}
1837
1838fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
1839    let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1840    car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
1841}
1842
1843fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
1844    #[derive(serde::Deserialize)]
1845    struct P {
1846        start: String,
1847        end: String,
1848        #[serde(default)]
1849        calendar_ids: Vec<String>,
1850    }
1851    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1852    let start = chrono::DateTime::parse_from_rfc3339(&p.start)
1853        .map_err(|e| format!("parse start: {}", e))?
1854        .with_timezone(&chrono::Utc);
1855    let end = chrono::DateTime::parse_from_rfc3339(&p.end)
1856        .map_err(|e| format!("parse end: {}", e))?
1857        .with_timezone(&chrono::Utc);
1858    car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
1859}
1860
1861fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
1862    #[derive(serde::Deserialize)]
1863    struct P {
1864        query: String,
1865        #[serde(default = "default_limit")]
1866        limit: usize,
1867        #[serde(default)]
1868        container_ids: Vec<String>,
1869    }
1870    fn default_limit() -> usize {
1871        50
1872    }
1873    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1874    car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
1875}
1876
1877fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
1878    #[derive(serde::Deserialize, Default)]
1879    struct P {
1880        #[serde(default)]
1881        account_ids: Vec<String>,
1882    }
1883    let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
1884    car_ffi_common::integrations::mail_inbox(&p.account_ids)
1885}
1886
1887fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
1888    let raw = req.params.to_string();
1889    car_ffi_common::integrations::mail_send(&raw)
1890}
1891
1892fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
1893    #[derive(serde::Deserialize)]
1894    struct P {
1895        start: String,
1896        end: String,
1897    }
1898    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1899    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
1900        .map_err(|e| format!("parse start: {}", e))?
1901        .with_timezone(&chrono::Utc);
1902    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
1903        .map_err(|e| format!("parse end: {}", e))?
1904        .with_timezone(&chrono::Utc);
1905    car_ffi_common::health::sleep_windows(s, e)
1906}
1907
1908fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
1909    #[derive(serde::Deserialize)]
1910    struct P {
1911        start: String,
1912        end: String,
1913    }
1914    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1915    let s = chrono::DateTime::parse_from_rfc3339(&p.start)
1916        .map_err(|e| format!("parse start: {}", e))?
1917        .with_timezone(&chrono::Utc);
1918    let e = chrono::DateTime::parse_from_rfc3339(&p.end)
1919        .map_err(|e| format!("parse end: {}", e))?
1920        .with_timezone(&chrono::Utc);
1921    car_ffi_common::health::workouts(s, e)
1922}
1923
1924fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
1925    #[derive(serde::Deserialize)]
1926    struct P {
1927        start: String,
1928        end: String,
1929    }
1930    let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1931    let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
1932        .map_err(|e| format!("parse start: {}", e))?;
1933    let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
1934        .map_err(|e| format!("parse end: {}", e))?;
1935    car_ffi_common::health::activity(s, e)
1936}
1937
1938async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
1939    let closed = session.browser.close().await?;
1940    Ok(serde_json::json!({"closed": closed}))
1941}
1942
1943async fn handle_browser_run(
1944    req: &JsonRpcMessage,
1945    session: &crate::session::ClientSession,
1946) -> Result<Value, String> {
1947    #[derive(serde::Deserialize)]
1948    struct BrowserRunParams {
1949        /// Inline JSON string (CLI-compatible), OR the structured object.
1950        script: Value,
1951        #[serde(default)]
1952        width: Option<u32>,
1953        #[serde(default)]
1954        height: Option<u32>,
1955        /// When true, launches a visible Chromium window for interactive
1956        /// flows (first-time auth, 2FA, supervised runs). Only honored on
1957        /// the call that first launches the browser session — subsequent
1958        /// calls reuse the existing browser regardless.
1959        #[serde(default)]
1960        headed: Option<bool>,
1961        /// Extra Chromium command-line flags appended verbatim at
1962        /// launch (#112). Honoured only on the launch call.
1963        #[serde(default)]
1964        extra_args: Option<Vec<String>>,
1965    }
1966    let params: BrowserRunParams =
1967        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1968
1969    // Accept either a JSON string OR a structured object under `script`.
1970    let script_json = match params.script {
1971        Value::String(s) => s,
1972        other => other.to_string(),
1973    };
1974
1975    let browser_session = session
1976        .browser
1977        .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
1978            width: params.width.unwrap_or(1280),
1979            height: params.height.unwrap_or(720),
1980            headless: !params.headed.unwrap_or(false),
1981            extra_args: params.extra_args.unwrap_or_default(),
1982        })
1983        .await?;
1984
1985    let trace_json = browser_session.run(&script_json).await?;
1986    serde_json::from_str(&trace_json).map_err(|e| e.to_string())
1987}
1988
1989// ---------------------------------------------------------------------------
1990// Voice streaming JSON-RPC methods
1991//
1992// Events are pushed back to the originating client as JSON-RPC notifications:
1993//   { "jsonrpc": "2.0", "method": "voice.event",
1994//     "params": { "session_id": "...", "event": {...} } }
1995//
1996// The session registry is process-wide (ServerState.voice_sessions); per-call
1997// WsVoiceEventSink instances bind each session to its originating WS so a
1998// client only ever sees events for sessions it started.
1999// ---------------------------------------------------------------------------
2000
2001#[derive(Deserialize)]
2002struct VoiceStartParams {
2003    session_id: String,
2004    audio_source: Value,
2005    #[serde(default)]
2006    options: Option<Value>,
2007}
2008
2009async fn handle_voice_transcribe_stream_start(
2010    req: &JsonRpcMessage,
2011    state: &Arc<ServerState>,
2012    session: &Arc<crate::session::ClientSession>,
2013) -> Result<Value, String> {
2014    let params: VoiceStartParams =
2015        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2016    let audio_source_json = serde_json::to_string(&params.audio_source).map_err(|e| e.to_string())?;
2017    let options_json = params
2018        .options
2019        .as_ref()
2020        .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
2021        .transpose()?;
2022    let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2023        channel: session.channel.clone(),
2024    });
2025    let json = car_ffi_common::voice::transcribe_stream_start(
2026        &params.session_id,
2027        &audio_source_json,
2028        options_json.as_deref(),
2029        state.voice_sessions.clone(),
2030        sink,
2031    )
2032    .await?;
2033    serde_json::from_str(&json).map_err(|e| e.to_string())
2034}
2035
2036#[derive(Deserialize)]
2037struct VoiceStopParams {
2038    session_id: String,
2039}
2040
2041async fn handle_voice_transcribe_stream_stop(
2042    req: &JsonRpcMessage,
2043    state: &Arc<ServerState>,
2044) -> Result<Value, String> {
2045    let params: VoiceStopParams =
2046        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2047    let json =
2048        car_ffi_common::voice::transcribe_stream_stop(&params.session_id, state.voice_sessions.clone())
2049            .await?;
2050    serde_json::from_str(&json).map_err(|e| e.to_string())
2051}
2052
2053#[derive(Deserialize)]
2054struct VoicePushParams {
2055    session_id: String,
2056    /// Base64-encoded 16-bit signed PCM frame. JSON-RPC is text, so binary
2057    /// audio frames have to be encoded; clients in WS-binary contexts that
2058    /// want to skip the round trip can call the FFI directly.
2059    pcm_b64: String,
2060}
2061
2062async fn handle_voice_transcribe_stream_push(
2063    req: &JsonRpcMessage,
2064    state: &Arc<ServerState>,
2065) -> Result<Value, String> {
2066    use base64::Engine;
2067    let params: VoicePushParams =
2068        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2069    let pcm = base64::engine::general_purpose::STANDARD
2070        .decode(&params.pcm_b64)
2071        .map_err(|e| format!("invalid pcm_b64: {}", e))?;
2072    let json = car_ffi_common::voice::transcribe_stream_push(
2073        &params.session_id,
2074        &pcm,
2075        state.voice_sessions.clone(),
2076    )
2077    .await?;
2078    serde_json::from_str(&json).map_err(|e| e.to_string())
2079}
2080
2081fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
2082    let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
2083    serde_json::from_str(&json).unwrap_or(Value::Null)
2084}
2085
2086#[derive(Deserialize)]
2087struct EnrollSpeakerParams {
2088    label: String,
2089    audio: Value,
2090}
2091
2092async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
2093    let params: EnrollSpeakerParams =
2094        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2095    let audio_json = serde_json::to_string(&params.audio).map_err(|e| e.to_string())?;
2096    let json = car_ffi_common::voice::enroll_speaker(&params.label, &audio_json).await?;
2097    serde_json::from_str(&json).map_err(|e| e.to_string())
2098}
2099
2100#[derive(Deserialize)]
2101struct RemoveEnrollmentParams {
2102    label: String,
2103}
2104
2105fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
2106    let params: RemoveEnrollmentParams =
2107        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2108    let json = car_ffi_common::voice::remove_enrollment(&params.label)?;
2109    serde_json::from_str(&json).map_err(|e| e.to_string())
2110}
2111
2112#[derive(Deserialize)]
2113struct WorkflowRunParams {
2114    workflow: Value,
2115}
2116
2117async fn handle_workflow_run(
2118    req: &JsonRpcMessage,
2119    session: &Arc<crate::session::ClientSession>,
2120) -> Result<Value, String> {
2121    let params: WorkflowRunParams =
2122        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2123    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
2124    let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2125        channel: session.channel.clone(),
2126        host: session.host.clone(),
2127        client_id: session.client_id.clone(),
2128    });
2129    let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
2130    serde_json::from_str(&json).map_err(|e| e.to_string())
2131}
2132
2133#[derive(Deserialize)]
2134struct WorkflowVerifyParams {
2135    workflow: Value,
2136}
2137
2138fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
2139    let params: WorkflowVerifyParams =
2140        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2141    let workflow_json = serde_json::to_string(&params.workflow).map_err(|e| e.to_string())?;
2142    let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
2143    serde_json::from_str(&json).map_err(|e| e.to_string())
2144}
2145
2146// ---------------------------------------------------------------------------
2147// Meeting JSON-RPC methods
2148// ---------------------------------------------------------------------------
2149
2150async fn handle_meeting_start(
2151    req: &JsonRpcMessage,
2152    state: &Arc<ServerState>,
2153    session: &Arc<crate::session::ClientSession>,
2154) -> Result<Value, String> {
2155    // We need the meeting id BEFORE handing the upstream sink to
2156    // start_meeting so the WsMemgineIngestSink stamps transcripts with
2157    // the correct `meeting/<id>/<source>` speaker. Parse the request
2158    // here, mint an id if none was provided, and pass the same id
2159    // through to start_meeting via the request JSON.
2160    let mut req_value = req.params.clone();
2161    let meeting_id = req_value
2162        .get("id")
2163        .and_then(|v| v.as_str())
2164        .map(str::to_string)
2165        .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
2166    if let Some(map) = req_value.as_object_mut() {
2167        map.insert("id".into(), Value::String(meeting_id.clone()));
2168    }
2169    let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
2170
2171    let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
2172        Arc::new(crate::session::WsVoiceEventSink {
2173            channel: session.channel.clone(),
2174        });
2175
2176    // Wrap the WS upstream with a memgine-ingest fanout that uses the
2177    // tokio::sync::Mutex-wrapped session memgine. We pass `None` for
2178    // the FFI-common `start_meeting` memgine arg to avoid the
2179    // sync-mutex contract there — ingest happens here instead.
2180    let upstream: Arc<dyn car_voice::VoiceEventSink> =
2181        Arc::new(crate::session::WsMemgineIngestSink {
2182            meeting_id,
2183            engine: session.memgine.clone(),
2184            upstream: ws_upstream,
2185        });
2186
2187    let cwd = std::env::current_dir().ok();
2188    let json = car_ffi_common::meeting::start_meeting(
2189        &request_json,
2190        state.meetings.clone(),
2191        state.voice_sessions.clone(),
2192        upstream,
2193        None,
2194        cwd,
2195    )
2196    .await?;
2197    serde_json::from_str(&json).map_err(|e| e.to_string())
2198}
2199
2200#[derive(Deserialize)]
2201struct MeetingStopParams {
2202    meeting_id: String,
2203    #[serde(default = "default_summarize")]
2204    summarize: bool,
2205}
2206
2207fn default_summarize() -> bool {
2208    true
2209}
2210
2211async fn handle_meeting_stop(
2212    req: &JsonRpcMessage,
2213    state: &Arc<ServerState>,
2214    _session: &Arc<crate::session::ClientSession>,
2215) -> Result<Value, String> {
2216    let params: MeetingStopParams =
2217        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2218    let inference = if params.summarize {
2219        Some(state.inference.get().cloned()).flatten()
2220    } else {
2221        None
2222    };
2223    let json = car_ffi_common::meeting::stop_meeting(
2224        &params.meeting_id,
2225        params.summarize,
2226        state.meetings.clone(),
2227        state.voice_sessions.clone(),
2228        inference,
2229    )
2230    .await?;
2231    serde_json::from_str(&json).map_err(|e| e.to_string())
2232}
2233
2234#[derive(Deserialize, Default)]
2235struct MeetingListParams {
2236    #[serde(default)]
2237    root: Option<std::path::PathBuf>,
2238}
2239
2240fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
2241    let params: MeetingListParams =
2242        serde_json::from_value(req.params.clone()).unwrap_or_default();
2243    let cwd = std::env::current_dir().ok();
2244    let json = car_ffi_common::meeting::list_meetings(params.root, cwd)?;
2245    serde_json::from_str(&json).map_err(|e| e.to_string())
2246}
2247
2248#[derive(Deserialize)]
2249struct MeetingGetParams {
2250    meeting_id: String,
2251    #[serde(default)]
2252    root: Option<std::path::PathBuf>,
2253}
2254
2255fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
2256    let params: MeetingGetParams =
2257        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2258    let cwd = std::env::current_dir().ok();
2259    let json = car_ffi_common::meeting::get_meeting(&params.meeting_id, params.root, cwd)?;
2260    serde_json::from_str(&json).map_err(|e| e.to_string())
2261}
2262
2263// ---------------------------------------------------------------------------
2264// Agent registry — file-based cross-process discovery (#111)
2265// ---------------------------------------------------------------------------
2266
2267#[derive(Deserialize, Default)]
2268struct RegistryRegisterParams {
2269    /// Caller serializes their AgentEntry as a JSON value; we
2270    /// re-serialize it so the ffi-common helper can validate the
2271    /// shape with the same parser used by the bindings.
2272    entry: Value,
2273    #[serde(default)]
2274    registry_path: Option<std::path::PathBuf>,
2275}
2276
2277fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
2278    let params: RegistryRegisterParams =
2279        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2280    let entry_json = serde_json::to_string(&params.entry).map_err(|e| e.to_string())?;
2281    car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
2282    Ok(Value::Null)
2283}
2284
2285#[derive(Deserialize, Default)]
2286struct RegistryNameParams {
2287    name: String,
2288    #[serde(default)]
2289    registry_path: Option<std::path::PathBuf>,
2290}
2291
2292fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
2293    let params: RegistryNameParams =
2294        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2295    let json = car_ffi_common::registry::agent_heartbeat(&params.name, params.registry_path)?;
2296    serde_json::from_str(&json).map_err(|e| e.to_string())
2297}
2298
2299fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
2300    let params: RegistryNameParams =
2301        serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2302    car_ffi_common::registry::unregister_agent(&params.name, params.registry_path)?;
2303    Ok(Value::Null)
2304}
2305
2306#[derive(Deserialize, Default)]
2307struct RegistryListParams {
2308    #[serde(default)]
2309    registry_path: Option<std::path::PathBuf>,
2310}
2311
2312fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
2313    let params: RegistryListParams =
2314        serde_json::from_value(req.params.clone()).unwrap_or_default();
2315    let json = car_ffi_common::registry::list_agents(params.registry_path)?;
2316    serde_json::from_str(&json).map_err(|e| e.to_string())
2317}
2318
2319#[derive(Deserialize, Default)]
2320struct RegistryReapParams {
2321    /// Heartbeats older than this many seconds are reaped. Default
2322    /// 60 — two missed 20s heartbeats trigger removal.
2323    #[serde(default = "default_reap_age")]
2324    max_age_secs: u64,
2325    #[serde(default)]
2326    registry_path: Option<std::path::PathBuf>,
2327}
2328
2329fn default_reap_age() -> u64 {
2330    60
2331}
2332
2333fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
2334    let params: RegistryReapParams =
2335        serde_json::from_value(req.params.clone()).unwrap_or_default();
2336    let json = car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
2337    serde_json::from_str(&json).map_err(|e| e.to_string())
2338}
2339
2340// ---------------------------------------------------------------------------
2341// car-a2a server lifecycle (mirrors NAPI startA2aServer / stopA2aServer /
2342// a2aServerStatus and PyO3 start_a2a_server / stop_a2a_server /
2343// a2a_server_status — closes the binding gap noted in #126).
2344// ---------------------------------------------------------------------------
2345
2346async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
2347    let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2348    let json = car_ffi_common::a2a::start_a2a(&params_json).await?;
2349    serde_json::from_str(&json).map_err(|e| e.to_string())
2350}
2351
2352fn handle_a2a_stop() -> Result<Value, String> {
2353    let json = car_ffi_common::a2a::stop_a2a()?;
2354    serde_json::from_str(&json).map_err(|e| e.to_string())
2355}
2356
2357fn handle_a2a_status() -> Result<Value, String> {
2358    let json = car_ffi_common::a2a::a2a_status()?;
2359    serde_json::from_str(&json).map_err(|e| e.to_string())
2360}
2361
2362// ---------------------------------------------------------------------------
2363// macOS automation — AppleScript + Shortcuts (car-automation), Vision OCR
2364// (car-vision). Mirrors NAPI runApplescript / listShortcuts / runShortcut /
2365// visionOcr and PyO3 run_applescript / list_shortcuts / run_shortcut /
2366// vision_ocr.
2367// ---------------------------------------------------------------------------
2368
2369async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
2370    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2371    let json = car_ffi_common::automation::run_applescript(&args_json).await?;
2372    serde_json::from_str(&json).map_err(|e| e.to_string())
2373}
2374
2375async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
2376    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2377    let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
2378    serde_json::from_str(&json).map_err(|e| e.to_string())
2379}
2380
2381async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
2382    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2383    let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
2384    serde_json::from_str(&json).map_err(|e| e.to_string())
2385}
2386
2387async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
2388    let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2389    let json = car_ffi_common::vision::ocr(&args_json).await?;
2390    serde_json::from_str(&json).map_err(|e| e.to_string())
2391}