1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<
150 Item = Result<Message, tokio_tungstenite::tungstenite::Error>,
151 >
152 + Unpin
153 + Send,
154{
155 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
156 tracing::Span::current().record("client_id", &client_id.as_str());
157
158 info!("New connection from {}", peer);
159
160 let channel = Arc::new(WsChannel {
161 write: Mutex::new(write),
162 pending: Mutex::new(HashMap::new()),
163 next_id: AtomicU64::new(1),
164 });
165
166 let session = state.create_session(&client_id, channel.clone()).await;
167
168 while let Some(msg) = read.next().await {
169 let msg = msg?;
170 if msg.is_text() {
171 let text = msg.to_text()?;
172 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
173 Ok(m) => m,
174 Err(e) => {
175 send_response(
176 &session.channel,
177 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
178 )
179 .await?;
180 continue;
181 }
182 };
183
184 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
186 if let Some(id_str) = parsed.id.as_str() {
187 let mut pending = session.channel.pending.lock().await;
188 if let Some(tx) = pending.remove(id_str) {
189 let tool_resp = if let Some(result) = parsed.result {
190 ToolExecuteResponse {
191 action_id: id_str.to_string(),
192 output: Some(result),
193 error: None,
194 }
195 } else {
196 let err_msg = parsed
197 .error
198 .as_ref()
199 .and_then(|e| e.get("message"))
200 .and_then(|m| m.as_str())
201 .unwrap_or("unknown error")
202 .to_string();
203 ToolExecuteResponse {
204 action_id: id_str.to_string(),
205 output: None,
206 error: Some(err_msg),
207 }
208 };
209 let _ = tx.send(tool_resp);
210 continue;
211 }
212 }
213 }
214
215 if let Some(method) = &parsed.method {
217 info!(method = %method, "dispatching JSON-RPC method");
218
219 if state.auth_token.get().is_some()
227 && !session
228 .authenticated
229 .load(std::sync::atomic::Ordering::Acquire)
230 && method != "session.auth"
231 {
232 let resp = JsonRpcResponse::error(
233 parsed.id.clone(),
234 -32001,
235 "auth required: send `session.auth` with the per-launch token \
236 from ~/Library/Application Support/ai.parslee.car/auth-token \
237 (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
238 as the first frame on this connection",
239 );
240 send_response(&session.channel, resp).await?;
241 info!(client = %client_id, method = %method,
242 "rejecting non-auth method on unauthenticated session; closing");
243 break;
244 }
245
246 if state.approval_gate.requires_approval(method.as_str()) {
258 match gate_high_risk_method(
259 method.as_str(),
260 &parsed.params,
261 &state,
262 )
263 .await
264 {
265 Ok(()) => {}
266 Err(reason) => {
267 let resp = JsonRpcResponse::error(
268 parsed.id.clone(),
269 -32003,
270 &reason,
271 );
272 send_response(&session.channel, resp).await?;
273 info!(
274 client = %client_id,
275 method = %method,
276 reason = %reason,
277 "approval gate blocked dispatch"
278 );
279 continue;
280 }
281 }
282 }
283
284 let session_task = session.clone();
299 let state_task = state.clone();
300 let method_owned = method.clone();
301 let parsed_task = parsed;
302 tokio::spawn(async move {
303 let session = session_task;
304 let state = state_task;
305 let parsed = parsed_task;
306 let result = match method_owned.as_str() {
307 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
308 "session.init" => handle_session_init(&parsed, &session).await,
309 "host.subscribe" => handle_host_subscribe(&session).await,
310 "host.agents" => handle_host_agents(&session).await,
311 "host.events" => handle_host_events(&parsed, &session).await,
312 "host.approvals" => handle_host_approvals(&session).await,
313 "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
314 "host.unregister_agent" => {
315 handle_host_unregister_agent(&parsed, &session).await
316 }
317 "host.set_status" => handle_host_set_status(&parsed, &session).await,
318 "host.notify" => handle_host_notify(&parsed, &session).await,
319 "host.request_approval" => {
320 handle_host_request_approval(&parsed, &session).await
321 }
322 "host.resolve_approval" => {
323 handle_host_resolve_approval(&parsed, &session).await
324 }
325 "tools.register" => handle_tools_register(&parsed, &session).await,
326 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
327 "policy.register" => handle_policy_register(&parsed, &session).await,
328 "session.policy.open" => handle_session_policy_open(&session).await,
329 "session.policy.close" => {
330 handle_session_policy_close(&parsed, &session).await
331 }
332 "verify" => handle_verify(&parsed, &session).await,
333 "state.get" => handle_state_get(&parsed, &session).await,
334 "state.set" => handle_state_set(&parsed, &session).await,
335 "state.exists" => handle_state_exists(&parsed, &session).await,
336 "state.keys" => handle_state_keys(&session).await,
337 "state.snapshot" => handle_state_snapshot(&session).await,
338 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
339 "memory.query" => handle_memory_query(&parsed, &session).await,
340 "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
341 "memory.build_context_fast" => {
342 handle_memory_build_context_fast(&parsed, &session).await
343 }
344 "memory.consolidate" => handle_memory_consolidate(&session).await,
345 "memory.fact_count" => handle_memory_fact_count(&session).await,
346 "memory.persist" => handle_memory_persist(&parsed, &session).await,
347 "memory.load" => handle_memory_load(&parsed, &session).await,
348 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
349 "skill.find" => handle_skill_find(&parsed, &session).await,
350 "skill.report" => handle_skill_report(&parsed, &session).await,
351 "skill.repair" => handle_skill_repair(&parsed, &session).await,
352 "skills.ingest_distilled" => {
353 handle_skills_ingest_distilled(&parsed, &session).await
354 }
355 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
356 "skills.domains_needing_evolution" => {
357 handle_skills_domains_needing_evolution(&parsed, &session).await
358 }
359 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
360 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
361 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
362 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
363 "multi.vote" => handle_multi_vote(&parsed, &session).await,
364 "scheduler.create" => handle_scheduler_create(&parsed),
365 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
366 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
367 "infer" => handle_infer(&parsed, &state, &session).await,
368 "embed" => handle_embed(&parsed, &state).await,
369 "classify" => handle_classify(&parsed, &state).await,
370 "tokenize" => handle_tokenize(&parsed, &state).await,
371 "detokenize" => handle_detokenize(&parsed, &state).await,
372 "rerank" => handle_rerank(&parsed, &state).await,
373 "transcribe" => handle_transcribe(&parsed, &state).await,
374 "synthesize" => handle_synthesize(&parsed, &state).await,
375 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
376 "speech.prepare" => handle_speech_prepare(&state).await,
377 "models.route" => handle_models_route(&parsed, &state).await,
378 "models.stats" => handle_models_stats(&state).await,
379 "events.count" => handle_events_count(&session).await,
380 "events.stats" => handle_events_stats(&session).await,
381 "events.truncate" => handle_events_truncate(&parsed, &session).await,
382 "events.clear" => handle_events_clear(&session).await,
383 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
384 "models.list" => handle_models_list(&state),
385 "models.list_unified" => handle_models_list_unified(&state),
386 "models.search" => handle_models_search(&parsed, &state),
387 "models.upgrades" => handle_models_upgrades(&state),
388 "models.pull" => handle_models_pull(&parsed, &state).await,
389 "models.install" => handle_models_pull(&parsed, &state).await,
390 "skills.distill" => handle_skills_distill(&parsed, &state).await,
391 "skills.list" => handle_skills_list(&parsed, &session).await,
392 "browser.run" => handle_browser_run(&parsed, &session).await,
393 "browser.close" => handle_browser_close(&session).await,
394 "secret.put" => handle_secret_put(&parsed),
395 "secret.get" => handle_secret_get(&parsed),
396 "secret.delete" => handle_secret_delete(&parsed),
397 "secret.status" => handle_secret_status(&parsed),
398 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
399 "permissions.status" => handle_perm_status(&parsed),
400 "permissions.request" => handle_perm_request(&parsed),
401 "permissions.explain" => handle_perm_explain(&parsed),
402 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
403 "accounts.list" => car_ffi_common::accounts::list(),
404 "accounts.open" => {
405 #[derive(serde::Deserialize, Default)]
406 struct OpenParams {
407 #[serde(default)]
408 account_id: Option<String>,
409 }
410 let p: OpenParams =
411 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
412 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
413 }
414 "calendar.list" => car_ffi_common::integrations::calendar_list(),
415 "calendar.events" => handle_calendar_events(&parsed),
416 "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
417 "contacts.find" => handle_contacts_find(&parsed),
418 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
419 "mail.inbox" => handle_mail_inbox(&parsed),
420 "mail.send" => handle_mail_send(&parsed),
421 "messages.services" => car_ffi_common::integrations::messages_services(),
422 "messages.chats" => handle_messages_chats(&parsed),
423 "messages.send" => handle_messages_send(&parsed),
424 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
425 "notes.find" => handle_notes_find(&parsed),
426 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
427 "reminders.items" => handle_reminders_items(&parsed),
428 "photos.albums" => car_ffi_common::integrations::photos_albums(),
429 "bookmarks.list" => handle_bookmarks_list(&parsed),
430 "files.locations" => car_ffi_common::integrations::files_locations(),
431 "keychain.status" => car_ffi_common::integrations::keychain_status(),
432 "health.status" => car_ffi_common::health::status(),
433 "health.sleep" => handle_health_sleep(&parsed),
434 "health.workouts" => handle_health_workouts(&parsed),
435 "health.activity" => handle_health_activity(&parsed),
436 "voice.transcribe_stream.start" => {
437 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
438 }
439 "voice.transcribe_stream.stop" => {
440 handle_voice_transcribe_stream_stop(&parsed, &state).await
441 }
442 "voice.transcribe_stream.push" => {
443 handle_voice_transcribe_stream_push(&parsed, &state).await
444 }
445 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
446 "voice.dispatch_turn" => {
447 handle_voice_dispatch_turn(&parsed, &state, &session).await
448 }
449 "voice.cancel_turn" => handle_voice_cancel_turn().await,
450 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
451 "inference.register_runner" => {
452 handle_inference_register_runner(&session).await
453 }
454 "inference.runner.event" => {
455 handle_inference_runner_event(&parsed).await
456 }
457 "inference.runner.complete" => {
458 handle_inference_runner_complete(&parsed).await
459 }
460 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
461 "voice.providers.list" => {
462 serde_json::from_str::<serde_json::Value>(
466 &car_voice::list_voice_providers_json(),
467 )
468 .map_err(|e| e.to_string())
469 }
470 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
471 .await
472 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
473 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
474 .await
475 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
476 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
477 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
478 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
479 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
480 "workflow.run" => handle_workflow_run(&parsed, &session).await,
481 "workflow.verify" => handle_workflow_verify(&parsed),
482 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
483 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
484 "meeting.list" => handle_meeting_list(&parsed),
485 "meeting.get" => handle_meeting_get(&parsed),
486 "registry.register" => handle_registry_register(&parsed),
487 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
488 "registry.unregister" => handle_registry_unregister(&parsed),
489 "registry.list" => handle_registry_list(&parsed),
490 "registry.reap" => handle_registry_reap(&parsed),
491 "admission.status" => handle_admission_status(&state),
492 "a2a.start" => handle_a2a_start(&parsed).await,
493 "a2a.stop" => handle_a2a_stop(),
494 "a2a.status" => handle_a2a_status(),
495 "a2a.send" => handle_a2a_send(&parsed, &state).await,
496 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
497 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
498 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
499 "a2ui.reap" => handle_a2ui_reap(&state).await,
500 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
501 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
502 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
503 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
504 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
505 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
506 "automation.run_applescript" => handle_run_applescript(&parsed).await,
507 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
508 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
509 "notifications.local" => handle_local_notification(&parsed).await,
510 "vision.ocr" => handle_vision_ocr(&parsed).await,
511 "agents.list" => handle_agents_list(&state).await,
512 "agents.health" => handle_agents_health(&state).await,
513 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
514 "agents.remove" => handle_agents_remove(&parsed, &state).await,
515 "agents.start" => handle_agents_start(&parsed, &state).await,
516 "agents.stop" => handle_agents_stop(&parsed, &state).await,
517 "agents.restart" => handle_agents_restart(&parsed, &state).await,
518 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
519 "agents.list_external" => handle_agents_list_external(&parsed).await,
520 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
521 "agents.health_external" => handle_agents_health_external(&parsed).await,
522 "agents.invoke_external" => {
523 handle_agents_invoke_external(&parsed, &state).await
524 }
525 "message/send" | "SendMessage"
532 | "message/stream" | "SendStreamingMessage"
533 | "tasks/get" | "GetTask"
534 | "tasks/list" | "ListTasks"
535 | "tasks/cancel" | "CancelTask"
536 | "tasks/resubscribe" | "SubscribeToTask"
537 | "tasks/pushNotificationConfig/set"
538 | "CreateTaskPushNotificationConfig"
539 | "tasks/pushNotificationConfig/get"
540 | "GetTaskPushNotificationConfig"
541 | "tasks/pushNotificationConfig/list"
542 | "ListTaskPushNotificationConfigs"
543 | "tasks/pushNotificationConfig/delete"
544 | "DeleteTaskPushNotificationConfig"
545 | "agent/getAuthenticatedExtendedCard"
546 | "GetExtendedAgentCard" => handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await,
547 _ => Err(format!("unknown method: {}", method_owned)),
548 };
549
550 let resp = match result {
551 Ok(value) => JsonRpcResponse::success(parsed.id, value),
552 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
553 };
554 let _ = send_response(&session.channel, resp).await;
555 });
556 }
557 } else if msg.is_close() {
558 info!("Client {} disconnected", client_id);
559 break;
560 }
561 }
562
563 session.host.unsubscribe(&client_id).await;
564 state.a2ui_subscribers.lock().await.remove(&client_id);
565
566 let _removed = state.remove_session(&client_id).await;
577 {
578 let mut pending = session.channel.pending.lock().await;
579 pending.clear();
580 }
581
582 Ok(())
583}
584
585async fn send_response(
586 channel: &WsChannel,
587 resp: JsonRpcResponse,
588) -> Result<(), Box<dyn std::error::Error>> {
589 use futures::SinkExt;
590 let json = serde_json::to_string(&resp)?;
591 channel
592 .write
593 .lock()
594 .await
595 .send(Message::Text(json.into()))
596 .await?;
597 Ok(())
598}
599
600async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
603 session
604 .host
605 .subscribe(&session.client_id, session.channel.clone())
606 .await;
607 serde_json::to_value(HostSnapshot {
608 subscribed: true,
609 agents: session.host.agents().await,
610 approvals: session.host.approvals().await,
611 events: session.host.events(50).await,
612 })
613 .map_err(|e| e.to_string())
614}
615
616async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
617 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
618}
619
620async fn handle_host_events(
621 req: &JsonRpcMessage,
622 session: &crate::session::ClientSession,
623) -> Result<Value, String> {
624 let limit = req
625 .params
626 .get("limit")
627 .and_then(|v| v.as_u64())
628 .unwrap_or(100) as usize;
629 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
630}
631
632async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
633 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
634}
635
636async fn handle_a2ui_apply(
637 req: &JsonRpcMessage,
638 state: &Arc<ServerState>,
639) -> Result<Value, String> {
640 #[derive(Deserialize)]
641 struct Params {
642 #[serde(default)]
643 envelope: Option<car_a2ui::A2uiEnvelope>,
644 #[serde(default)]
645 message: Option<car_a2ui::A2uiEnvelope>,
646 }
647
648 let envelope = if req.params.get("createSurface").is_some()
649 || req.params.get("updateComponents").is_some()
650 || req.params.get("updateDataModel").is_some()
651 || req.params.get("deleteSurface").is_some()
652 {
653 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
654 .map_err(|e| e.to_string())?
655 } else {
656 match serde_json::from_value::<Params>(req.params.clone()) {
657 Ok(params) => params
658 .envelope
659 .or(params.message)
660 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
661 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
662 .map_err(|e| e.to_string())?,
663 }
664 };
665
666 apply_a2ui_envelope(state, envelope, None, None).await
667}
668
669async fn handle_a2ui_ingest(
670 req: &JsonRpcMessage,
671 state: &Arc<ServerState>,
672) -> Result<Value, String> {
673 #[derive(Deserialize)]
674 #[serde(rename_all = "camelCase")]
675 struct Params {
676 #[serde(default)]
677 endpoint: Option<String>,
678 #[serde(default)]
679 a2a_endpoint: Option<String>,
680 #[serde(default)]
681 owner: Option<car_a2ui::A2uiSurfaceOwner>,
682 #[serde(default)]
683 route_auth: Option<A2aRouteAuth>,
684 #[serde(default)]
685 allow_untrusted_endpoint: bool,
686 }
687
688 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
689 endpoint: None,
690 a2a_endpoint: None,
691 owner: None,
692 route_auth: None,
693 allow_untrusted_endpoint: false,
694 });
695 let payload = req.params.get("payload").unwrap_or(&req.params);
696 state
697 .a2ui
698 .validate_payload(payload)
699 .map_err(|e| e.to_string())?;
700 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
701 if envelopes.is_empty() {
702 return Err("no A2UI envelopes found in payload".into());
703 }
704 let endpoint = params.endpoint.or(params.a2a_endpoint);
705 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
706 let owner = params
707 .owner
708 .or_else(|| car_a2ui::owner_from_value(payload))
709 .map(|owner| match endpoint.clone() {
710 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
711 None => owner,
712 });
713
714 let mut results = Vec::new();
715 for envelope in envelopes {
716 let value =
717 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
718 results.push(value);
719 }
720 Ok(serde_json::json!({ "applied": results }))
721}
722
723async fn apply_a2ui_envelope(
724 state: &Arc<ServerState>,
725 envelope: car_a2ui::A2uiEnvelope,
726 owner: Option<car_a2ui::A2uiSurfaceOwner>,
727 route_auth: Option<A2aRouteAuth>,
728) -> Result<Value, String> {
729 let result = state
730 .a2ui
731 .apply_with_owner(envelope, owner)
732 .await
733 .map_err(|e| e.to_string())?;
734 update_a2ui_route_auth(state, &result, route_auth).await;
735 let kind = if result.deleted {
736 "a2ui.surface_deleted"
737 } else {
738 "a2ui.surface_updated"
739 };
740 let message = if result.deleted {
741 format!("A2UI surface {} deleted", result.surface_id)
742 } else {
743 format!("A2UI surface {} updated", result.surface_id)
744 };
745 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
746 state
747 .host
748 .record_event(kind, None, message, payload.clone())
749 .await;
750 broadcast_a2ui_event(state, kind, &payload).await;
754 serde_json::to_value(result).map_err(|e| e.to_string())
755}
756
757async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
758 use futures::SinkExt;
759 use tokio_tungstenite::tungstenite::Message;
760 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
761 .a2ui_subscribers
762 .lock()
763 .await
764 .values()
765 .cloned()
766 .collect();
767 if subscribers.is_empty() {
768 return;
769 }
770 let Ok(json) = serde_json::to_string(&serde_json::json!({
771 "jsonrpc": "2.0",
772 "method": "a2ui.event",
773 "params": {
774 "kind": kind,
775 "result": result,
776 },
777 })) else {
778 return;
779 };
780 for channel in subscribers {
781 let _ = channel
782 .write
783 .lock()
784 .await
785 .send(Message::Text(json.clone().into()))
786 .await;
787 }
788}
789
790async fn update_a2ui_route_auth(
791 state: &Arc<ServerState>,
792 result: &car_a2ui::A2uiApplyResult,
793 route_auth: Option<A2aRouteAuth>,
794) {
795 let mut auth = state.a2ui_route_auth.lock().await;
796 if result.deleted {
797 auth.remove(&result.surface_id);
798 return;
799 }
800
801 let has_route_endpoint = result
802 .surface
803 .as_ref()
804 .and_then(|surface| surface.owner.as_ref())
805 .and_then(|owner| owner.endpoint.as_ref())
806 .is_some();
807 match (has_route_endpoint, route_auth) {
808 (true, Some(route_auth)) => {
809 auth.insert(result.surface_id.clone(), route_auth);
810 }
811 _ => {
812 auth.remove(&result.surface_id);
813 }
814 }
815}
816
817fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
818 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
819}
820
821async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
822 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
823 if !removed.is_empty() {
824 let mut auth = state.a2ui_route_auth.lock().await;
825 for surface_id in &removed {
826 auth.remove(surface_id);
827 }
828 }
829 Ok(serde_json::json!({ "removed": removed }))
830}
831
832async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
833 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
834}
835
836async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
837 let surface_id = req
838 .params
839 .get("surface_id")
840 .or_else(|| req.params.get("surfaceId"))
841 .and_then(Value::as_str)
842 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
843 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
844}
845
846async fn handle_a2ui_subscribe(
852 session: &crate::session::ClientSession,
853 state: &Arc<ServerState>,
854) -> Result<Value, String> {
855 state
856 .a2ui_subscribers
857 .lock()
858 .await
859 .insert(session.client_id.clone(), session.channel.clone());
860 Ok(serde_json::json!({ "subscribed": true }))
861}
862
863async fn handle_a2ui_unsubscribe(
867 session: &crate::session::ClientSession,
868 state: &Arc<ServerState>,
869) -> Result<Value, String> {
870 state.a2ui_subscribers.lock().await.remove(&session.client_id);
871 Ok(serde_json::json!({ "subscribed": false }))
872}
873
874async fn handle_a2ui_replay(
881 req: &JsonRpcMessage,
882 state: &Arc<ServerState>,
883) -> Result<Value, String> {
884 let surface_id = req
885 .params
886 .get("surface_id")
887 .or_else(|| req.params.get("surfaceId"))
888 .and_then(Value::as_str)
889 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
890 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
891}
892
893async fn handle_a2ui_action(
894 req: &JsonRpcMessage,
895 state: &Arc<ServerState>,
896) -> Result<Value, String> {
897 let action: car_a2ui::ClientAction =
898 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
899 let owner = state.a2ui.owner(&action.surface_id).await;
900 let route = route_a2ui_action(state, &action, owner.clone()).await;
901 let payload = serde_json::json!({
902 "action": action,
903 "owner": owner,
904 "route": route,
905 });
906 let event = state
907 .host
908 .record_event(
909 "a2ui.action",
910 None,
911 format!(
912 "A2UI action {} from {}",
913 action.name, action.source_component_id
914 ),
915 payload,
916 )
917 .await;
918 Ok(serde_json::json!({
919 "event": event,
920 "route": route,
921 }))
922}
923
924async fn route_a2ui_action(
925 state: &Arc<ServerState>,
926 action: &car_a2ui::ClientAction,
927 owner: Option<car_a2ui::A2uiSurfaceOwner>,
928) -> Value {
929 let Some(owner) = owner else {
930 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
931 };
932 if owner.kind != "a2a" {
933 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
934 }
935 let Some(endpoint) = owner.endpoint.clone() else {
936 return serde_json::json!({
937 "delivered": false,
938 "reason": "surface owner has no endpoint",
939 "owner": owner
940 });
941 };
942
943 let message = car_a2a::Message {
944 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
945 role: car_a2a::MessageRole::User,
946 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
947 data: serde_json::json!({
948 "a2uiAction": action,
949 }),
950 metadata: Default::default(),
951 })],
952 task_id: owner.task_id.clone(),
953 context_id: owner.context_id.clone(),
954 metadata: Default::default(),
955 };
956
957 let auth = state
958 .a2ui_route_auth
959 .lock()
960 .await
961 .get(&action.surface_id)
962 .cloned()
963 .map(client_auth_from_route_auth)
964 .unwrap_or(car_a2a::ClientAuth::None);
965
966 match car_a2a::A2aClient::new(endpoint.clone())
967 .with_auth(auth)
968 .send_message(message, false)
969 .await
970 {
971 Ok(result) => serde_json::json!({
972 "delivered": true,
973 "owner": owner,
974 "endpoint": endpoint,
975 "result": result,
976 }),
977 Err(error) => serde_json::json!({
978 "delivered": false,
979 "owner": owner,
980 "endpoint": endpoint,
981 "error": error.to_string(),
982 }),
983 }
984}
985
986fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
987 match auth {
988 A2aRouteAuth::None => car_a2a::ClientAuth::None,
989 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
990 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
991 }
992}
993
994fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
995 let endpoint = endpoint?;
996 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
997 Some(endpoint)
998 } else {
999 None
1000 }
1001}
1002
1003fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1004 endpoint == "http://localhost"
1005 || endpoint.starts_with("http://localhost:")
1006 || endpoint.starts_with("http://localhost/")
1007 || endpoint == "http://127.0.0.1"
1008 || endpoint.starts_with("http://127.0.0.1:")
1009 || endpoint.starts_with("http://127.0.0.1/")
1010 || endpoint == "http://[::1]"
1011 || endpoint.starts_with("http://[::1]:")
1012 || endpoint.starts_with("http://[::1]/")
1013}
1014
1015async fn handle_host_register_agent(
1016 req: &JsonRpcMessage,
1017 session: &crate::session::ClientSession,
1018) -> Result<Value, String> {
1019 let request: RegisterHostAgentRequest =
1020 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1021 serde_json::to_value(
1022 session
1023 .host
1024 .register_agent(&session.client_id, request)
1025 .await?,
1026 )
1027 .map_err(|e| e.to_string())
1028}
1029
1030async fn handle_host_unregister_agent(
1031 req: &JsonRpcMessage,
1032 session: &crate::session::ClientSession,
1033) -> Result<Value, String> {
1034 let agent_id = req
1035 .params
1036 .get("agent_id")
1037 .and_then(|v| v.as_str())
1038 .ok_or("missing agent_id")?;
1039 session
1040 .host
1041 .unregister_agent(&session.client_id, agent_id)
1042 .await?;
1043 Ok(serde_json::json!({"ok": true}))
1044}
1045
1046async fn handle_host_set_status(
1047 req: &JsonRpcMessage,
1048 session: &crate::session::ClientSession,
1049) -> Result<Value, String> {
1050 let request: SetHostAgentStatusRequest =
1051 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1052 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1053 .map_err(|e| e.to_string())
1054}
1055
1056async fn handle_host_notify(
1057 req: &JsonRpcMessage,
1058 session: &crate::session::ClientSession,
1059) -> Result<Value, String> {
1060 let kind = req
1061 .params
1062 .get("kind")
1063 .and_then(|v| v.as_str())
1064 .unwrap_or("host.notification");
1065 let agent_id = req
1066 .params
1067 .get("agent_id")
1068 .and_then(|v| v.as_str())
1069 .map(str::to_string);
1070 let message = req
1071 .params
1072 .get("message")
1073 .and_then(|v| v.as_str())
1074 .unwrap_or("");
1075 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1076 serde_json::to_value(
1077 session
1078 .host
1079 .record_event(kind, agent_id, message, payload)
1080 .await,
1081 )
1082 .map_err(|e| e.to_string())
1083}
1084
1085async fn handle_host_request_approval(
1086 req: &JsonRpcMessage,
1087 session: &crate::session::ClientSession,
1088) -> Result<Value, String> {
1089 let request: CreateHostApprovalRequest =
1090 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1091 if let Some(agent_id) = &request.agent_id {
1092 let _ = session
1097 .host
1098 .set_status(
1099 &session.client_id,
1100 SetHostAgentStatusRequest {
1101 agent_id: agent_id.clone(),
1102 status: HostAgentStatus::WaitingForApproval,
1103 current_task: None,
1104 message: Some("Waiting for approval".to_string()),
1105 payload: Value::Null,
1106 },
1107 )
1108 .await;
1109 }
1110 serde_json::to_value(
1111 session
1112 .host
1113 .create_approval(Some(&session.client_id), request)
1114 .await?,
1115 )
1116 .map_err(|e| e.to_string())
1117}
1118
1119async fn handle_host_resolve_approval(
1120 req: &JsonRpcMessage,
1121 session: &crate::session::ClientSession,
1122) -> Result<Value, String> {
1123 let request: ResolveHostApprovalRequest =
1124 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1125 serde_json::to_value(
1126 session
1127 .host
1128 .resolve_approval(&session.client_id, request)
1129 .await?,
1130 )
1131 .map_err(|e| e.to_string())
1132}
1133
1134async fn handle_session_auth(
1145 req: &JsonRpcMessage,
1146 session: &crate::session::ClientSession,
1147 state: &Arc<ServerState>,
1148) -> Result<Value, String> {
1149 let supplied = req
1150 .params
1151 .get("token")
1152 .and_then(Value::as_str)
1153 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1154 let agent_id = req
1161 .params
1162 .get("agent_id")
1163 .and_then(Value::as_str)
1164 .map(str::to_string);
1165
1166 if let Some(id) = agent_id {
1167 let supervisor = state.supervisor()?;
1168 if !supervisor.validate_agent_token(&id, supplied).await {
1169 return Err(format!(
1170 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1171 ));
1172 }
1173 {
1177 let mut attached = state.attached_agents.lock().await;
1178 if let Some(prior) = attached.get(&id) {
1179 if prior != &session.client_id {
1180 return Err(format!(
1181 "auth failed: agent_id `{id}` is already attached on \
1182 another connection (client_id={prior})"
1183 ));
1184 }
1185 }
1186 attached.insert(id.clone(), session.client_id.clone());
1187 }
1188 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1193 *session.bound_memgine.lock().await = Some(agent_eng);
1194 *session.agent_id.lock().await = Some(id.clone());
1195 session
1196 .authenticated
1197 .store(true, std::sync::atomic::Ordering::Release);
1198 return Ok(serde_json::json!({
1199 "ok": true,
1200 "auth_enabled": true,
1201 "agent_id": id,
1202 }));
1203 }
1204
1205 let expected = match state.auth_token.get() {
1206 Some(t) => t,
1207 None => {
1208 session
1214 .authenticated
1215 .store(true, std::sync::atomic::Ordering::Release);
1216 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1217 }
1218 };
1219 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1220 return Err("auth failed: token mismatch".to_string());
1221 }
1222 session
1223 .authenticated
1224 .store(true, std::sync::atomic::Ordering::Release);
1225 Ok(serde_json::json!({ "ok": true, "auth_enabled": true }))
1226}
1227
1228fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1232 if a.len() != b.len() {
1233 return false;
1234 }
1235 let mut diff: u8 = 0;
1236 for (x, y) in a.iter().zip(b.iter()) {
1237 diff |= x ^ y;
1238 }
1239 diff == 0
1240}
1241
1242async fn gate_high_risk_method(
1252 method: &str,
1253 params: &Value,
1254 state: &Arc<ServerState>,
1255) -> Result<(), String> {
1256 let timeout = state.approval_gate.timeout;
1257 let req = CreateHostApprovalRequest {
1258 agent_id: None,
1259 action: format!("ws.method:{method}"),
1260 details: serde_json::json!({
1261 "method": method,
1262 "params_preview": preview_params(params, 2_000),
1266 }),
1267 options: vec!["approve".to_string(), "deny".to_string()],
1268 };
1269 match state
1270 .host
1271 .request_and_wait_approval(req, "approve", timeout)
1272 .await
1273 {
1274 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1275 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1276 "{method} denied by user (approval gate, audit 2026-05). \
1277 To call this method without an interactive prompt, start \
1278 car-server with --no-approvals on a trusted machine."
1279 )),
1280 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1281 "{method} approval timed out after {}s with no resolution. \
1282 The approval is still visible in `host.approvals` for \
1283 forensics; resubmit the request to retry.",
1284 timeout.as_secs()
1285 )),
1286 Err(e) => Err(format!("approval gate error: {e}")),
1287 }
1288}
1289
1290fn preview_params(value: &Value, max_chars: usize) -> Value {
1291 let s = value.to_string();
1292 if s.len() <= max_chars {
1293 value.clone()
1294 } else {
1295 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1296 }
1297}
1298
1299async fn handle_session_init(
1300 req: &JsonRpcMessage,
1301 session: &crate::session::ClientSession,
1302) -> Result<Value, String> {
1303 let init: SessionInitRequest =
1304 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1305
1306 for tool in &init.tools {
1307 register_from_definition(&session.runtime, tool).await;
1308 }
1309
1310 let mut policy_count = 0;
1311 {
1312 let mut policies = session.runtime.policies.write().await;
1313 for policy_def in &init.policies {
1314 if let Some(check) = build_policy_check(policy_def) {
1315 policies.register(&policy_def.name, check, "");
1316 policy_count += 1;
1317 }
1318 }
1319 }
1320
1321 serde_json::to_value(SessionInitResponse {
1322 session_id: session.client_id.clone(),
1323 tools_registered: init.tools.len(),
1324 policies_registered: policy_count,
1325 })
1326 .map_err(|e| e.to_string())
1327}
1328
1329fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1330 match def.rule.as_str() {
1331 "deny_tool" => {
1332 let target = def.target.clone();
1333 Some(Box::new(
1334 move |action: &car_ir::Action, _: &car_state::StateStore| {
1335 if action.tool.as_deref() == Some(&target) {
1336 Some(format!("tool '{}' denied", target))
1337 } else {
1338 None
1339 }
1340 },
1341 ))
1342 }
1343 "require_state" => {
1344 let key = def.key.clone();
1345 let value = def.value.clone();
1346 Some(Box::new(
1347 move |_: &car_ir::Action, state: &car_state::StateStore| {
1348 if state.get(&key).as_ref() != Some(&value) {
1349 Some(format!("state['{}'] must be {:?}", key, value))
1350 } else {
1351 None
1352 }
1353 },
1354 ))
1355 }
1356 "deny_tool_param" => {
1357 let target = def.target.clone();
1358 let param = def.key.clone();
1359 let pattern = def.pattern.clone();
1360 Some(Box::new(
1361 move |action: &car_ir::Action, _: &car_state::StateStore| {
1362 if action.tool.as_deref() != Some(&target) {
1363 return None;
1364 }
1365 if let Some(val) = action.parameters.get(¶m) {
1366 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1367 if s.contains(&pattern) {
1368 return Some(format!("param '{}' matches '{}'", param, pattern));
1369 }
1370 }
1371 None
1372 },
1373 ))
1374 }
1375 _ => None,
1376 }
1377}
1378
1379async fn handle_tools_register(
1380 req: &JsonRpcMessage,
1381 session: &crate::session::ClientSession,
1382) -> Result<Value, String> {
1383 let tools: Vec<ToolDefinition> =
1384 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1385 for tool in &tools {
1386 register_from_definition(&session.runtime, tool).await;
1387 }
1388 Ok(Value::from(tools.len()))
1389}
1390
1391async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1398 runtime
1399 .register_tool_schema(car_ir::ToolSchema {
1400 name: def.name.clone(),
1401 description: def.description.clone(),
1402 parameters: def.parameters.clone(),
1403 returns: def.returns.clone(),
1404 idempotent: def.idempotent,
1405 cache_ttl_secs: def.cache_ttl_secs,
1406 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1407 max_calls: rl.max_calls,
1408 interval_secs: rl.interval_secs,
1409 }),
1410 })
1411 .await;
1412}
1413
1414async fn handle_proposal_submit(
1415 req: &JsonRpcMessage,
1416 session: &crate::session::ClientSession,
1417) -> Result<Value, String> {
1418 let submit: ProposalSubmitRequest =
1419 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1420 let session_id = req
1426 .params
1427 .get("session_id")
1428 .and_then(|v| v.as_str())
1429 .map(str::to_string);
1430 let result = match session_id {
1431 Some(sid) => session.runtime.execute_with_session(&submit.proposal, &sid).await,
1432 None => session.runtime.execute(&submit.proposal).await,
1433 };
1434 serde_json::to_value(result).map_err(|e| e.to_string())
1435}
1436
1437async fn handle_session_policy_open(
1438 session: &crate::session::ClientSession,
1439) -> Result<Value, String> {
1440 let id = session.runtime.open_session().await;
1441 Ok(serde_json::json!({ "session_id": id }))
1442}
1443
1444async fn handle_session_policy_close(
1445 req: &JsonRpcMessage,
1446 session: &crate::session::ClientSession,
1447) -> Result<Value, String> {
1448 let sid = req
1449 .params
1450 .get("session_id")
1451 .and_then(|v| v.as_str())
1452 .ok_or("missing 'session_id'")?;
1453 let closed = session.runtime.close_session(sid).await;
1454 Ok(serde_json::json!({ "closed": closed }))
1455}
1456
1457async fn handle_policy_register(
1463 req: &JsonRpcMessage,
1464 session: &crate::session::ClientSession,
1465) -> Result<Value, String> {
1466 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1467 .map_err(|e| format!("invalid policy params: {e}"))?;
1468 let session_id = req
1469 .params
1470 .get("session_id")
1471 .and_then(|v| v.as_str())
1472 .map(str::to_string);
1473 let check = build_policy_check(&def).ok_or_else(|| {
1474 format!("unsupported policy rule '{}'", def.rule)
1475 })?;
1476 match session_id {
1477 Some(sid) => session
1478 .runtime
1479 .register_policy_in_session(&sid, &def.name, check, "")
1480 .await
1481 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1482 None => {
1483 let mut policies = session.runtime.policies.write().await;
1484 policies.register(&def.name, check, "");
1485 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1486 }
1487 }
1488}
1489
1490async fn handle_verify(
1491 req: &JsonRpcMessage,
1492 session: &crate::session::ClientSession,
1493) -> Result<Value, String> {
1494 let vr: VerifyRequest =
1495 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1496 let tools: std::collections::HashSet<String> =
1497 session.runtime.tools.read().await.keys().cloned().collect();
1498 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1499 serde_json::to_value(VerifyResponse {
1500 valid: result.valid,
1501 issues: result
1502 .issues
1503 .iter()
1504 .map(|i| VerifyIssueProto {
1505 action_id: i.action_id.clone(),
1506 severity: i.severity.clone(),
1507 message: i.message.clone(),
1508 })
1509 .collect(),
1510 simulated_state: result.simulated_state,
1511 })
1512 .map_err(|e| e.to_string())
1513}
1514
1515async fn handle_state_get(
1516 req: &JsonRpcMessage,
1517 session: &crate::session::ClientSession,
1518) -> Result<Value, String> {
1519 let key = req
1520 .params
1521 .get("key")
1522 .and_then(|v| v.as_str())
1523 .ok_or("missing 'key'")?;
1524 Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
1525}
1526
1527async fn handle_state_set(
1528 req: &JsonRpcMessage,
1529 session: &crate::session::ClientSession,
1530) -> Result<Value, String> {
1531 let key = req
1532 .params
1533 .get("key")
1534 .and_then(|v| v.as_str())
1535 .ok_or("missing 'key'")?;
1536 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1537 session.runtime.state.set(key, value, "client");
1538 Ok(Value::from("ok"))
1539}
1540
1541async fn handle_state_exists(
1545 req: &JsonRpcMessage,
1546 session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548 let key = req
1549 .params
1550 .get("key")
1551 .and_then(|v| v.as_str())
1552 .ok_or("missing 'key'")?;
1553 Ok(Value::Bool(session.runtime.state.exists(key)))
1554}
1555
1556async fn handle_state_keys(session: &crate::session::ClientSession) -> Result<Value, String> {
1559 Ok(Value::Array(
1560 session
1561 .runtime
1562 .state
1563 .keys()
1564 .into_iter()
1565 .map(Value::String)
1566 .collect(),
1567 ))
1568}
1569
1570async fn handle_state_snapshot(
1575 session: &crate::session::ClientSession,
1576) -> Result<Value, String> {
1577 Ok(serde_json::to_value(session.runtime.state.snapshot())
1578 .map_err(|e| format!("serialize snapshot: {e}"))?)
1579}
1580
1581fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1587 let base = car_ffi_common::memory_path::ensure_base()
1588 .map_err(|e| format!("memory base unavailable: {e}"))?;
1589 let dir = base.join("agents");
1590 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1591 Ok(dir.join(format!("{agent_id}.json")))
1592}
1593
1594async fn get_or_load_agent_memgine(
1601 state: &Arc<ServerState>,
1602 agent_id: &str,
1603) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1604 {
1605 let map = state.agent_memgines.lock().await;
1606 if let Some(eng) = map.get(agent_id) {
1607 return Ok(eng.clone());
1608 }
1609 }
1610 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(None)));
1612 let path = agent_memgine_snapshot_path(agent_id)?;
1613 if path.exists() {
1614 let content = std::fs::read_to_string(&path)
1615 .map_err(|e| format!("read {}: {}", path.display(), e))?;
1616 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1617 let mut g = engine.lock().await;
1618 let mut loaded: u32 = 0;
1619 for fact in &facts {
1620 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1621 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1622 let kind = fact
1623 .get("kind")
1624 .and_then(|v| v.as_str())
1625 .unwrap_or("pattern");
1626 let fid = format!("loaded-{loaded}");
1627 g.ingest_fact(
1628 &fid,
1629 subject,
1630 body,
1631 "user",
1632 "peer",
1633 chrono::Utc::now(),
1634 "global",
1635 None,
1636 vec![],
1637 kind == "constraint",
1638 );
1639 loaded += 1;
1640 }
1641 }
1642 let mut map = state.agent_memgines.lock().await;
1643 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1644 Ok(stored)
1645}
1646
1647async fn persist_agent_memgine(
1651 agent_id: &str,
1652 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1653) -> Result<(), String> {
1654 let path = agent_memgine_snapshot_path(agent_id)?;
1655 let g = engine.lock().await;
1656 let facts: Vec<Value> = g
1657 .graph
1658 .inner
1659 .node_indices()
1660 .filter_map(|nix| {
1661 let node = g.graph.inner.node_weight(nix)?;
1662 if !node.is_valid() {
1663 return None;
1664 }
1665 if node.kind == car_memgine::MemKind::Identity
1666 || node.kind == car_memgine::MemKind::Environment
1667 {
1668 return None;
1669 }
1670 Some(serde_json::json!({
1671 "subject": node.key,
1672 "body": node.value,
1673 "kind": match node.kind {
1674 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1675 car_memgine::MemKind::Conversation => "outcome",
1676 _ => "pattern",
1677 },
1678 "confidence": 0.5,
1679 "content_type": node.content_type.as_label(),
1680 }))
1681 })
1682 .collect();
1683 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1684 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
1685 Ok(())
1686}
1687
1688async fn handle_memory_fact_count(
1695 session: &crate::session::ClientSession,
1696) -> Result<Value, String> {
1697 let engine_arc = session.effective_memgine().await;
1698 let engine = engine_arc.lock().await;
1699 Ok(Value::from(engine.valid_fact_count()))
1700}
1701
1702async fn handle_memory_add_fact(
1703 req: &JsonRpcMessage,
1704 session: &crate::session::ClientSession,
1705) -> Result<Value, String> {
1706 let subject = req
1707 .params
1708 .get("subject")
1709 .and_then(|v| v.as_str())
1710 .ok_or("missing subject")?;
1711 let body = req
1712 .params
1713 .get("body")
1714 .and_then(|v| v.as_str())
1715 .ok_or("missing body")?;
1716 let kind = req
1717 .params
1718 .get("kind")
1719 .and_then(|v| v.as_str())
1720 .unwrap_or("pattern");
1721 let engine_arc = session.effective_memgine().await;
1725 let count = {
1726 let mut engine = engine_arc.lock().await;
1727 let fid = format!("ws-{}", engine.valid_fact_count());
1728 engine.ingest_fact(
1729 &fid,
1730 subject,
1731 body,
1732 "user",
1733 "peer",
1734 chrono::Utc::now(),
1735 "global",
1736 None,
1737 vec![],
1738 kind == "constraint",
1739 );
1740 engine.valid_fact_count()
1741 };
1742 if let Some(id) = session.agent_id.lock().await.clone() {
1745 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
1746 tracing::warn!(agent_id = %id, error = %e,
1747 "agent memgine persist failed; in-memory state is canonical");
1748 }
1749 }
1750 Ok(Value::from(count))
1751}
1752
1753async fn handle_memory_query(
1754 req: &JsonRpcMessage,
1755 session: &crate::session::ClientSession,
1756) -> Result<Value, String> {
1757 let query = req
1758 .params
1759 .get("query")
1760 .and_then(|v| v.as_str())
1761 .ok_or("missing query")?;
1762 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
1763 let engine_arc = session.effective_memgine().await;
1764 let engine = engine_arc.lock().await;
1765 let seeds = engine.graph.find_seeds(query, 5);
1766 let hits = if !seeds.is_empty() {
1771 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
1772 } else {
1773 vec![]
1774 };
1775 let results: Vec<Value> = hits.iter().filter_map(|hit| {
1776 let node = engine.graph.inner.node_weight(hit.node_ix)?;
1777 Some(serde_json::json!({
1778 "subject": node.key,
1779 "body": node.value,
1780 "kind": format!("{:?}", node.kind).to_lowercase(),
1781 "confidence": hit.activation,
1782 }))
1783 }).collect();
1784 serde_json::to_value(results).map_err(|e| e.to_string())
1785}
1786
1787async fn handle_memory_build_context(
1788 req: &JsonRpcMessage,
1789 session: &crate::session::ClientSession,
1790) -> Result<Value, String> {
1791 let query = req
1792 .params
1793 .get("query")
1794 .and_then(|v| v.as_str())
1795 .unwrap_or("");
1796 let model_context_window = req
1800 .params
1801 .get("model_context_window")
1802 .and_then(|v| v.as_u64())
1803 .map(|w| w as usize);
1804 let mut engine = session.memgine.lock().await;
1805 Ok(Value::from(engine.build_context_for_model(query, model_context_window)))
1806}
1807
1808async fn handle_memory_build_context_fast(
1814 req: &JsonRpcMessage,
1815 session: &crate::session::ClientSession,
1816) -> Result<Value, String> {
1817 let query = req
1818 .params
1819 .get("query")
1820 .and_then(|v| v.as_str())
1821 .unwrap_or("");
1822 let model_context_window = req
1823 .params
1824 .get("model_context_window")
1825 .and_then(|v| v.as_u64())
1826 .map(|w| w as usize);
1827 let mut engine = session.memgine.lock().await;
1828 Ok(Value::from(engine.build_context_with_options(
1829 query,
1830 model_context_window,
1831 car_memgine::ContextMode::Fast,
1832 )))
1833}
1834
1835async fn handle_memory_persist(
1851 req: &JsonRpcMessage,
1852 session: &crate::session::ClientSession,
1853) -> Result<Value, String> {
1854 let path = req
1855 .params
1856 .get("path")
1857 .and_then(|v| v.as_str())
1858 .ok_or("missing path")?;
1859 let resolved = car_ffi_common::memory_path::resolve(path)
1860 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
1861 let engine = session.memgine.lock().await;
1862 let facts: Vec<Value> = engine
1863 .graph
1864 .inner
1865 .node_indices()
1866 .filter_map(|nix| {
1867 let node = engine.graph.inner.node_weight(nix)?;
1868 if !node.is_valid() {
1869 return None;
1870 }
1871 if node.kind == car_memgine::MemKind::Identity
1872 || node.kind == car_memgine::MemKind::Environment
1873 {
1874 return None;
1875 }
1876 Some(serde_json::json!({
1877 "subject": node.key,
1878 "body": node.value,
1879 "kind": match node.kind {
1880 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1881 car_memgine::MemKind::Conversation => "outcome",
1882 _ => "pattern",
1883 },
1884 "confidence": 0.5,
1885 "content_type": node.content_type.as_label(),
1886 }))
1887 })
1888 .collect();
1889 let count = facts.len();
1890 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1891 std::fs::write(&resolved, json)
1892 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
1893 Ok(Value::from(count as u64))
1894}
1895
1896async fn handle_memory_load(
1902 req: &JsonRpcMessage,
1903 session: &crate::session::ClientSession,
1904) -> Result<Value, String> {
1905 let path = req
1906 .params
1907 .get("path")
1908 .and_then(|v| v.as_str())
1909 .ok_or("missing path")?;
1910 let resolved = car_ffi_common::memory_path::resolve(path)
1911 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
1912 let content = std::fs::read_to_string(&resolved)
1913 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
1914 let facts: Vec<Value> =
1915 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
1916 let mut engine = session.memgine.lock().await;
1917 engine.reset();
1918 let mut count: u32 = 0;
1919 for fact in &facts {
1920 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1921 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1922 let kind = fact
1923 .get("kind")
1924 .and_then(|v| v.as_str())
1925 .unwrap_or("pattern");
1926 let fid = format!("loaded-{}", count);
1927 engine.ingest_fact(
1928 &fid,
1929 subject,
1930 body,
1931 "user",
1932 "peer",
1933 chrono::Utc::now(),
1934 "global",
1935 None,
1936 vec![],
1937 kind == "constraint",
1938 );
1939 count += 1;
1940 }
1941 Ok(Value::from(count))
1942}
1943
1944async fn handle_skill_ingest(
1947 req: &JsonRpcMessage,
1948 session: &crate::session::ClientSession,
1949) -> Result<Value, String> {
1950 let name = req
1951 .params
1952 .get("name")
1953 .and_then(|v| v.as_str())
1954 .ok_or("missing name")?;
1955 let code = req
1956 .params
1957 .get("code")
1958 .and_then(|v| v.as_str())
1959 .ok_or("missing code")?;
1960 let platform = req
1961 .params
1962 .get("platform")
1963 .and_then(|v| v.as_str())
1964 .unwrap_or("unknown");
1965 let persona = req
1966 .params
1967 .get("persona")
1968 .and_then(|v| v.as_str())
1969 .unwrap_or("");
1970 let url_pattern = req
1971 .params
1972 .get("url_pattern")
1973 .and_then(|v| v.as_str())
1974 .unwrap_or("");
1975 let description = req
1976 .params
1977 .get("description")
1978 .and_then(|v| v.as_str())
1979 .unwrap_or("");
1980 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
1981 let keywords: Vec<String> = req
1982 .params
1983 .get("task_keywords")
1984 .and_then(|v| v.as_array())
1985 .map(|arr| {
1986 arr.iter()
1987 .filter_map(|v| v.as_str().map(String::from))
1988 .collect()
1989 })
1990 .unwrap_or_default();
1991
1992 let trigger = car_memgine::SkillTrigger {
1993 persona: persona.into(),
1994 url_pattern: url_pattern.into(),
1995 task_keywords: keywords,
1996 };
1997 let mut engine = session.memgine.lock().await;
1998 let node = engine.ingest_skill(
1999 name,
2000 code,
2001 platform,
2002 trigger,
2003 description,
2004 supersedes,
2005 vec![],
2006 vec![],
2007 );
2008 Ok(Value::from(node.index() as u64))
2009}
2010
2011async fn handle_skill_find(
2012 req: &JsonRpcMessage,
2013 session: &crate::session::ClientSession,
2014) -> Result<Value, String> {
2015 let persona = req
2016 .params
2017 .get("persona")
2018 .and_then(|v| v.as_str())
2019 .unwrap_or("");
2020 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2021 let task = req
2022 .params
2023 .get("task")
2024 .and_then(|v| v.as_str())
2025 .unwrap_or("");
2026 let max = req
2027 .params
2028 .get("max_results")
2029 .and_then(|v| v.as_u64())
2030 .unwrap_or(1) as usize;
2031 let engine = session.memgine.lock().await;
2032 let results = engine.find_skill(persona, url, task, max);
2033 let json: Vec<Value> = results
2034 .iter()
2035 .map(|(m, s)| {
2036 serde_json::json!({
2037 "name": m.name, "code": m.code, "platform": m.platform,
2038 "description": m.description, "stats": m.stats, "match_score": s,
2039 })
2040 })
2041 .collect();
2042 serde_json::to_value(json).map_err(|e| e.to_string())
2043}
2044
2045async fn handle_skill_report(
2046 req: &JsonRpcMessage,
2047 session: &crate::session::ClientSession,
2048) -> Result<Value, String> {
2049 let name = req
2050 .params
2051 .get("skill_name")
2052 .and_then(|v| v.as_str())
2053 .ok_or("missing skill_name")?;
2054 let outcome_str = req
2055 .params
2056 .get("outcome")
2057 .and_then(|v| v.as_str())
2058 .ok_or("missing outcome")?;
2059 let outcome = match outcome_str {
2060 "success" => car_memgine::SkillOutcome::Success,
2061 _ => car_memgine::SkillOutcome::Fail,
2062 };
2063 let mut engine = session.memgine.lock().await;
2064 let stats = engine
2065 .report_outcome(name, outcome)
2066 .ok_or(format!("skill '{}' not found", name))?;
2067 serde_json::to_value(stats).map_err(|e| e.to_string())
2068}
2069
2070struct WsAgentRunner {
2079 channel: Arc<WsChannel>,
2080 host: Arc<crate::host::HostState>,
2081 client_id: String,
2082}
2083
2084#[async_trait::async_trait]
2085impl car_multi::AgentRunner for WsAgentRunner {
2086 async fn run(
2087 &self,
2088 spec: &car_multi::AgentSpec,
2089 task: &str,
2090 _runtime: &car_engine::Runtime,
2091 _mailbox: &car_multi::Mailbox,
2092 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2093 use futures::SinkExt;
2094
2095 let request_id = self.channel.next_request_id();
2096 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2097 let agent = self
2098 .host
2099 .register_agent(
2100 &self.client_id,
2101 RegisterHostAgentRequest {
2102 id: Some(agent_id.clone()),
2103 name: spec.name.clone(),
2104 kind: "callback".to_string(),
2105 capabilities: spec.tools.clone(),
2106 project: spec
2107 .metadata
2108 .get("project")
2109 .and_then(|v| v.as_str())
2110 .map(str::to_string),
2111 pid: None,
2112 display: serde_json::from_value(
2113 spec.metadata
2114 .get("display")
2115 .cloned()
2116 .unwrap_or(serde_json::Value::Null),
2117 )
2118 .unwrap_or_default(),
2119 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2120 },
2121 )
2122 .await
2123 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2124 let _ = self
2125 .host
2126 .set_status(&self.client_id, SetHostAgentStatusRequest {
2127 agent_id: agent.id.clone(),
2128 status: HostAgentStatus::Running,
2129 current_task: Some(task.to_string()),
2130 message: Some(format!("{} started", spec.name)),
2131 payload: serde_json::json!({ "task": task }),
2132 })
2133 .await;
2134
2135 let rpc_request = serde_json::json!({
2136 "jsonrpc": "2.0",
2137 "method": "multi.run_agent",
2138 "params": {
2139 "spec": spec,
2140 "task": task,
2141 },
2142 "id": request_id,
2143 });
2144
2145 let (tx, rx) = tokio::sync::oneshot::channel();
2147 self.channel
2148 .pending
2149 .lock()
2150 .await
2151 .insert(request_id.clone(), tx);
2152
2153 let msg = Message::Text(
2154 serde_json::to_string(&rpc_request)
2155 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2156 .into(),
2157 );
2158 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2159 let _ = self
2160 .host
2161 .set_status(&self.client_id, SetHostAgentStatusRequest {
2162 agent_id: agent_id.clone(),
2163 status: HostAgentStatus::Errored,
2164 current_task: None,
2165 message: Some(format!("{} failed to start", spec.name)),
2166 payload: serde_json::json!({ "error": e.to_string() }),
2167 })
2168 .await;
2169 return Err(car_multi::MultiError::AgentFailed(
2170 spec.name.clone(),
2171 format!("ws send error: {}", e),
2172 ));
2173 }
2174
2175 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2177 Ok(Ok(response)) => response,
2178 Ok(Err(_)) => {
2179 let _ = self
2180 .host
2181 .set_status(&self.client_id, SetHostAgentStatusRequest {
2182 agent_id: agent_id.clone(),
2183 status: HostAgentStatus::Errored,
2184 current_task: None,
2185 message: Some(format!("{} callback channel closed", spec.name)),
2186 payload: Value::Null,
2187 })
2188 .await;
2189 return Err(car_multi::MultiError::AgentFailed(
2190 spec.name.clone(),
2191 "agent callback channel closed".into(),
2192 ));
2193 }
2194 Err(_) => {
2195 let _ = self
2196 .host
2197 .set_status(&self.client_id, SetHostAgentStatusRequest {
2198 agent_id: agent_id.clone(),
2199 status: HostAgentStatus::Errored,
2200 current_task: None,
2201 message: Some(format!("{} timed out", spec.name)),
2202 payload: Value::Null,
2203 })
2204 .await;
2205 return Err(car_multi::MultiError::AgentFailed(
2206 spec.name.clone(),
2207 "agent callback timed out (300s)".into(),
2208 ));
2209 }
2210 };
2211
2212 if let Some(err) = response.error {
2213 let _ = self
2214 .host
2215 .set_status(&self.client_id, SetHostAgentStatusRequest {
2216 agent_id: agent_id.clone(),
2217 status: HostAgentStatus::Errored,
2218 current_task: None,
2219 message: Some(format!("{} errored", spec.name)),
2220 payload: serde_json::json!({ "error": err }),
2221 })
2222 .await;
2223 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2224 }
2225
2226 let output_value = response.output.unwrap_or(Value::Null);
2227 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2228 car_multi::MultiError::AgentFailed(
2229 spec.name.clone(),
2230 format!("invalid AgentOutput: {}", e),
2231 )
2232 })?;
2233 let status = if output.error.is_some() {
2234 HostAgentStatus::Errored
2235 } else {
2236 HostAgentStatus::Completed
2237 };
2238 let message = if output.error.is_some() {
2239 format!("{} errored", spec.name)
2240 } else {
2241 format!("{} completed", spec.name)
2242 };
2243 let _ = self
2244 .host
2245 .set_status(&self.client_id, SetHostAgentStatusRequest {
2246 agent_id,
2247 status,
2248 current_task: None,
2249 message: Some(message),
2250 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2251 })
2252 .await;
2253
2254 Ok(output)
2255 }
2256}
2257
2258fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2259 let safe_name: String = name
2260 .chars()
2261 .map(|c| {
2262 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2263 c
2264 } else {
2265 '-'
2266 }
2267 })
2268 .collect();
2269 format!("{}:{}:{}", client_id, safe_name, request_id)
2270}
2271
2272async fn handle_multi_swarm(
2273 req: &JsonRpcMessage,
2274 session: &crate::session::ClientSession,
2275) -> Result<Value, String> {
2276 let mode_str = req
2277 .params
2278 .get("mode")
2279 .and_then(|v| v.as_str())
2280 .ok_or("missing 'mode'")?;
2281 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2282 let task = req
2283 .params
2284 .get("task")
2285 .and_then(|v| v.as_str())
2286 .ok_or("missing 'task'")?;
2287
2288 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2289 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2290 let agent_specs: Vec<car_multi::AgentSpec> =
2291 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2292 let synth: Option<car_multi::AgentSpec> = req
2293 .params
2294 .get("synthesizer")
2295 .map(|v| serde_json::from_value(v.clone()))
2296 .transpose()
2297 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2298
2299 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2300 channel: session.channel.clone(),
2301 host: session.host.clone(),
2302 client_id: session.client_id.clone(),
2303 });
2304 let infra = car_multi::SharedInfra::new();
2305
2306 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2307 if let Some(s) = synth {
2308 swarm = swarm.with_synthesizer(s);
2309 }
2310
2311 let result = swarm
2312 .run(task, &runner, &infra)
2313 .await
2314 .map_err(|e| format!("swarm error: {}", e))?;
2315 serde_json::to_value(result).map_err(|e| e.to_string())
2316}
2317
2318async fn handle_multi_pipeline(
2319 req: &JsonRpcMessage,
2320 session: &crate::session::ClientSession,
2321) -> Result<Value, String> {
2322 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2323 let task = req
2324 .params
2325 .get("task")
2326 .and_then(|v| v.as_str())
2327 .ok_or("missing 'task'")?;
2328
2329 let stage_specs: Vec<car_multi::AgentSpec> =
2330 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2331
2332 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2333 channel: session.channel.clone(),
2334 host: session.host.clone(),
2335 client_id: session.client_id.clone(),
2336 });
2337 let infra = car_multi::SharedInfra::new();
2338
2339 let result = car_multi::Pipeline::new(stage_specs)
2340 .run(task, &runner, &infra)
2341 .await
2342 .map_err(|e| format!("pipeline error: {}", e))?;
2343 serde_json::to_value(result).map_err(|e| e.to_string())
2344}
2345
2346async fn handle_multi_supervisor(
2347 req: &JsonRpcMessage,
2348 session: &crate::session::ClientSession,
2349) -> Result<Value, String> {
2350 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2351 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2352 let task = req
2353 .params
2354 .get("task")
2355 .and_then(|v| v.as_str())
2356 .ok_or("missing 'task'")?;
2357 let max_rounds = req
2358 .params
2359 .get("max_rounds")
2360 .and_then(|v| v.as_u64())
2361 .unwrap_or(3) as u32;
2362
2363 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2364 .map_err(|e| format!("invalid workers: {}", e))?;
2365 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2366 .map_err(|e| format!("invalid supervisor: {}", e))?;
2367
2368 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2369 channel: session.channel.clone(),
2370 host: session.host.clone(),
2371 client_id: session.client_id.clone(),
2372 });
2373 let infra = car_multi::SharedInfra::new();
2374
2375 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2376 .with_max_rounds(max_rounds)
2377 .run(task, &runner, &infra)
2378 .await
2379 .map_err(|e| format!("supervisor error: {}", e))?;
2380 serde_json::to_value(result).map_err(|e| e.to_string())
2381}
2382
2383async fn handle_multi_map_reduce(
2384 req: &JsonRpcMessage,
2385 session: &crate::session::ClientSession,
2386) -> Result<Value, String> {
2387 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2388 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2389 let task = req
2390 .params
2391 .get("task")
2392 .and_then(|v| v.as_str())
2393 .ok_or("missing 'task'")?;
2394 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2395
2396 let mapper_spec: car_multi::AgentSpec =
2397 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2398 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2399 .map_err(|e| format!("invalid reducer: {}", e))?;
2400 let items: Vec<String> =
2401 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2402
2403 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2404 channel: session.channel.clone(),
2405 host: session.host.clone(),
2406 client_id: session.client_id.clone(),
2407 });
2408 let infra = car_multi::SharedInfra::new();
2409
2410 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2411 .run(task, &items, &runner, &infra)
2412 .await
2413 .map_err(|e| format!("map_reduce error: {}", e))?;
2414 serde_json::to_value(result).map_err(|e| e.to_string())
2415}
2416
2417async fn handle_multi_vote(
2418 req: &JsonRpcMessage,
2419 session: &crate::session::ClientSession,
2420) -> Result<Value, String> {
2421 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2422 let task = req
2423 .params
2424 .get("task")
2425 .and_then(|v| v.as_str())
2426 .ok_or("missing 'task'")?;
2427
2428 let agent_specs: Vec<car_multi::AgentSpec> =
2429 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2430 let synth: Option<car_multi::AgentSpec> = req
2431 .params
2432 .get("synthesizer")
2433 .map(|v| serde_json::from_value(v.clone()))
2434 .transpose()
2435 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2436
2437 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2438 channel: session.channel.clone(),
2439 host: session.host.clone(),
2440 client_id: session.client_id.clone(),
2441 });
2442 let infra = car_multi::SharedInfra::new();
2443
2444 let mut vote = car_multi::Vote::new(agent_specs);
2445 if let Some(s) = synth {
2446 vote = vote.with_synthesizer(s);
2447 }
2448
2449 let result = vote
2450 .run(task, &runner, &infra)
2451 .await
2452 .map_err(|e| format!("vote error: {}", e))?;
2453 serde_json::to_value(result).map_err(|e| e.to_string())
2454}
2455
2456fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2461 let name = req
2462 .params
2463 .get("name")
2464 .and_then(|v| v.as_str())
2465 .ok_or("scheduler.create requires 'name'")?;
2466 let prompt = req
2467 .params
2468 .get("prompt")
2469 .and_then(|v| v.as_str())
2470 .ok_or("scheduler.create requires 'prompt'")?;
2471
2472 let mut task = car_scheduler::Task::new(name, prompt);
2473
2474 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2475 let trigger = match t {
2476 "once" => car_scheduler::TaskTrigger::Once,
2477 "cron" => car_scheduler::TaskTrigger::Cron,
2478 "interval" => car_scheduler::TaskTrigger::Interval,
2479 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2480 _ => car_scheduler::TaskTrigger::Manual,
2481 };
2482 let schedule = req
2483 .params
2484 .get("schedule")
2485 .and_then(|v| v.as_str())
2486 .unwrap_or("");
2487 task = task.with_trigger(trigger, schedule);
2488 }
2489
2490 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2491 task = task.with_system_prompt(sp);
2492 }
2493
2494 serde_json::to_value(&task).map_err(|e| e.to_string())
2495}
2496
2497async fn handle_scheduler_run(
2498 req: &JsonRpcMessage,
2499 session: &crate::session::ClientSession,
2500) -> Result<Value, String> {
2501 let task_val = req
2502 .params
2503 .get("task")
2504 .ok_or("scheduler.run requires 'task'")?;
2505 let mut task: car_scheduler::Task =
2506 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2507
2508 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2509 channel: session.channel.clone(),
2510 host: session.host.clone(),
2511 client_id: session.client_id.clone(),
2512 });
2513 let executor = car_scheduler::Executor::new(runner);
2514 let execution = executor.run_once(&mut task).await;
2515
2516 serde_json::to_value(&execution).map_err(|e| e.to_string())
2517}
2518
2519async fn handle_scheduler_run_loop(
2520 req: &JsonRpcMessage,
2521 session: &crate::session::ClientSession,
2522) -> Result<Value, String> {
2523 let task_val = req
2524 .params
2525 .get("task")
2526 .ok_or("scheduler.run_loop requires 'task'")?;
2527 let mut task: car_scheduler::Task =
2528 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2529 let max_iterations = req
2530 .params
2531 .get("max_iterations")
2532 .and_then(|v| v.as_u64())
2533 .map(|v| v as u32);
2534
2535 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2536 channel: session.channel.clone(),
2537 host: session.host.clone(),
2538 client_id: session.client_id.clone(),
2539 });
2540 let executor = car_scheduler::Executor::new(runner);
2541 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2542 let executions = executor
2543 .run_loop(&mut task, max_iterations, cancel_rx)
2544 .await;
2545
2546 serde_json::to_value(&executions).map_err(|e| e.to_string())
2547}
2548
2549fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2554 state.inference.get_or_init(|| {
2555 Arc::new(car_inference::InferenceEngine::new(
2556 car_inference::InferenceConfig::default(),
2557 ))
2558 })
2559}
2560
2561async fn handle_infer(
2562 msg: &JsonRpcMessage,
2563 state: &ServerState,
2564 session: &crate::session::ClientSession,
2565) -> Result<Value, String> {
2566 let engine = get_inference_engine(state);
2567 let mut req: car_inference::GenerateRequest =
2568 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2569
2570 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2572 let mut memgine = session.memgine.lock().await;
2573 let ctx = memgine.build_context(cq);
2574 if !ctx.is_empty() {
2575 req.context = Some(ctx);
2576 }
2577 }
2578
2579 let _permit = state.admission.acquire().await;
2585
2586 let result = engine
2597 .generate_tracked(req)
2598 .await
2599 .map_err(|e| e.to_string())?;
2600 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2601}
2602
2603async fn handle_infer_stream(
2624 msg: &JsonRpcMessage,
2625 session: &crate::session::ClientSession,
2626 state: &ServerState,
2627) -> Result<Value, String> {
2628 use futures::SinkExt;
2629 use tokio_tungstenite::tungstenite::Message;
2630
2631 let engine = get_inference_engine(state);
2632 let mut req: car_inference::GenerateRequest =
2633 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2634
2635 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2638 let mut memgine = session.memgine.lock().await;
2639 let ctx = memgine.build_context(cq);
2640 if !ctx.is_empty() {
2641 req.context = Some(ctx);
2642 }
2643 }
2644
2645 let _permit = state.admission.acquire().await;
2646 let mut rx = engine
2647 .generate_tracked_stream(req)
2648 .await
2649 .map_err(|e| e.to_string())?;
2650
2651 let mut accumulator = car_inference::StreamAccumulator::default();
2652 let request_id = msg.id.clone();
2653
2654 while let Some(event) = rx.recv().await {
2655 let event_payload = match &event {
2656 car_inference::StreamEvent::TextDelta(text) => {
2657 serde_json::json!({"type": "text", "data": text})
2658 }
2659 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
2660 serde_json::json!({"type": "tool_start", "name": name, "index": index})
2661 }
2662 car_inference::StreamEvent::ToolCallDelta {
2663 index,
2664 arguments_delta,
2665 } => serde_json::json!({
2666 "type": "tool_delta",
2667 "index": index,
2668 "data": arguments_delta,
2669 }),
2670 car_inference::StreamEvent::Usage {
2671 input_tokens,
2672 output_tokens,
2673 } => serde_json::json!({
2674 "type": "usage",
2675 "input_tokens": input_tokens,
2676 "output_tokens": output_tokens,
2677 }),
2678 car_inference::StreamEvent::Done { .. } => {
2683 accumulator.push(&event);
2684 continue;
2685 }
2686 };
2687
2688 let notif = serde_json::json!({
2689 "jsonrpc": "2.0",
2690 "method": "inference.stream.event",
2691 "params": {
2692 "request_id": request_id,
2693 "event": event_payload,
2694 },
2695 });
2696 if let Ok(text) = serde_json::to_string(¬if) {
2697 let _ = session
2698 .channel
2699 .write
2700 .lock()
2701 .await
2702 .send(Message::Text(text.into()))
2703 .await;
2704 }
2705 accumulator.push(&event);
2706 }
2707
2708 let (text, tool_calls, usage) = accumulator.finish_with_usage();
2709 Ok(serde_json::json!({
2710 "text": text,
2711 "tool_calls": tool_calls,
2712 "usage": usage,
2713 }))
2714}
2715
2716async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2717 let engine = get_inference_engine(state);
2718 let req: car_inference::EmbedRequest =
2719 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2720 let _permit = state.admission.acquire().await;
2724 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
2725 Ok(serde_json::json!({"embeddings": result}))
2726}
2727
2728async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2729 let engine = get_inference_engine(state);
2730 let req: car_inference::ClassifyRequest =
2731 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2732 let _permit = state.admission.acquire().await;
2733 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
2734 Ok(serde_json::json!({"classifications": result}))
2735}
2736
2737fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
2741 let total = state.admission.permits();
2742 let available = state.admission.permits_available();
2743 let in_use = total.saturating_sub(available);
2744 Ok(serde_json::json!({
2745 "permits_total": total,
2746 "permits_available": available,
2747 "permits_in_use": in_use,
2748 "env_override": crate::admission::ENV_MAX_CONCURRENT,
2749 }))
2750}
2751
2752async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2753 let model = msg
2754 .params
2755 .get("model")
2756 .and_then(|v| v.as_str())
2757 .ok_or("missing 'model' parameter")?;
2758 let text = msg
2759 .params
2760 .get("text")
2761 .and_then(|v| v.as_str())
2762 .ok_or("missing 'text' parameter")?;
2763 let engine = get_inference_engine(state);
2764 let ids = engine
2765 .tokenize(model, text)
2766 .await
2767 .map_err(|e| e.to_string())?;
2768 Ok(serde_json::json!({"tokens": ids}))
2769}
2770
2771async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2772 let model = msg
2773 .params
2774 .get("model")
2775 .and_then(|v| v.as_str())
2776 .ok_or("missing 'model' parameter")?;
2777 let tokens: Vec<u32> = msg
2778 .params
2779 .get("tokens")
2780 .and_then(|v| v.as_array())
2781 .ok_or("missing 'tokens' parameter")?
2782 .iter()
2783 .map(|t| {
2784 t.as_u64()
2785 .and_then(|n| u32::try_from(n).ok())
2786 .ok_or_else(|| "tokens[] must be u32 values".to_string())
2787 })
2788 .collect::<Result<Vec<_>, _>>()?;
2789 let engine = get_inference_engine(state);
2790 let text = engine
2791 .detokenize(model, &tokens)
2792 .await
2793 .map_err(|e| e.to_string())?;
2794 Ok(serde_json::json!({"text": text}))
2795}
2796
2797fn handle_models_list(state: &ServerState) -> Result<Value, String> {
2798 let engine = get_inference_engine(state);
2799 let models = engine.list_models();
2800 serde_json::to_value(&models).map_err(|e| e.to_string())
2801}
2802
2803fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
2804 let engine = get_inference_engine(state);
2805 let models = engine.list_models_unified();
2806 serde_json::to_value(&models).map_err(|e| e.to_string())
2807}
2808
2809#[derive(Debug, Deserialize)]
2810#[serde(rename_all = "camelCase")]
2811struct ModelSearchParams {
2812 #[serde(default)]
2813 query: Option<String>,
2814 #[serde(default)]
2815 capability: Option<car_inference::ModelCapability>,
2816 #[serde(default)]
2817 provider: Option<String>,
2818 #[serde(default)]
2819 local_only: bool,
2820 #[serde(default)]
2821 available_only: bool,
2822 #[serde(default)]
2823 limit: Option<usize>,
2824}
2825
2826#[derive(Debug, Serialize)]
2827#[serde(rename_all = "camelCase")]
2828struct ModelSearchEntry {
2829 #[serde(flatten)]
2830 info: car_inference::ModelInfo,
2831 family: String,
2832 version: String,
2833 tags: Vec<String>,
2834 pullable: bool,
2835 upgrade: Option<car_inference::ModelUpgrade>,
2836}
2837
2838#[derive(Debug, Serialize)]
2839#[serde(rename_all = "camelCase")]
2840struct ModelSearchResponse {
2841 models: Vec<ModelSearchEntry>,
2842 upgrades: Vec<car_inference::ModelUpgrade>,
2843 total: usize,
2844 available: usize,
2845 local: usize,
2846 remote: usize,
2847}
2848
2849fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2850 let params: ModelSearchParams =
2851 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
2852 query: None,
2853 capability: None,
2854 provider: None,
2855 local_only: false,
2856 available_only: false,
2857 limit: None,
2858 });
2859 let engine = get_inference_engine(state);
2860 let upgrades = engine.available_model_upgrades();
2861 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
2862 .iter()
2863 .cloned()
2864 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
2865 .collect();
2866 let query = params
2867 .query
2868 .as_deref()
2869 .map(str::trim)
2870 .filter(|q| !q.is_empty())
2871 .map(|q| q.to_ascii_lowercase());
2872 let provider = params
2873 .provider
2874 .as_deref()
2875 .map(str::trim)
2876 .filter(|p| !p.is_empty())
2877 .map(|p| p.to_ascii_lowercase());
2878
2879 let mut entries: Vec<ModelSearchEntry> = engine
2880 .list_schemas()
2881 .into_iter()
2882 .filter(|schema| {
2883 if let Some(capability) = params.capability {
2884 if !schema.has_capability(capability) {
2885 return false;
2886 }
2887 }
2888 if let Some(provider) = provider.as_deref() {
2889 if schema.provider.to_ascii_lowercase() != provider {
2890 return false;
2891 }
2892 }
2893 if params.local_only && !schema.is_local() {
2894 return false;
2895 }
2896 if params.available_only && !schema.available {
2897 return false;
2898 }
2899 if let Some(query) = query.as_deref() {
2900 let capability_text = schema
2901 .capabilities
2902 .iter()
2903 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
2904 .collect::<Vec<_>>()
2905 .join(" ");
2906 let haystack = format!(
2907 "{} {} {} {} {} {}",
2908 schema.id,
2909 schema.name,
2910 schema.provider,
2911 schema.family,
2912 schema.tags.join(" "),
2913 capability_text
2914 )
2915 .to_ascii_lowercase();
2916 if !haystack.contains(query) {
2917 return false;
2918 }
2919 }
2920 true
2921 })
2922 .map(|schema| {
2923 let pullable = !schema.available
2924 && matches!(
2925 schema.source,
2926 car_inference::ModelSource::Local { .. } | car_inference::ModelSource::Mlx { .. }
2927 );
2928 let info = car_inference::ModelInfo::from(&schema);
2929 let upgrade = upgrades_by_from.get(&schema.id).cloned();
2930 ModelSearchEntry {
2931 info,
2932 family: schema.family,
2933 version: schema.version,
2934 tags: schema.tags,
2935 pullable,
2936 upgrade,
2937 }
2938 })
2939 .collect();
2940 entries.sort_by(|a, b| {
2941 b.info
2942 .available
2943 .cmp(&a.info.available)
2944 .then(b.info.is_local.cmp(&a.info.is_local))
2945 .then(a.info.name.cmp(&b.info.name))
2946 });
2947 if let Some(limit) = params.limit {
2948 entries.truncate(limit);
2949 }
2950
2951 let total = entries.len();
2952 let available = entries.iter().filter(|entry| entry.info.available).count();
2953 let local = entries.iter().filter(|entry| entry.info.is_local).count();
2954 let response = ModelSearchResponse {
2955 models: entries,
2956 upgrades,
2957 total,
2958 available,
2959 local,
2960 remote: total.saturating_sub(local),
2961 };
2962 serde_json::to_value(response).map_err(|e| e.to_string())
2963}
2964
2965fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
2966 let engine = get_inference_engine(state);
2967 serde_json::to_value(serde_json::json!({
2968 "upgrades": engine.available_model_upgrades()
2969 }))
2970 .map_err(|e| e.to_string())
2971}
2972
2973async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2974 let name = msg
2975 .params
2976 .get("name")
2977 .or_else(|| msg.params.get("id"))
2978 .or_else(|| msg.params.get("model"))
2979 .and_then(|v| v.as_str())
2980 .ok_or("missing 'name' parameter")?;
2981 let engine = get_inference_engine(state);
2982 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
2983 Ok(serde_json::json!({"path": path.display().to_string()}))
2984}
2985
2986async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2987 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2988 msg.params
2989 .get("events")
2990 .cloned()
2991 .unwrap_or(msg.params.clone()),
2992 )
2993 .map_err(|e| format!("invalid events: {}", e))?;
2994
2995 let inference = get_inference_engine(state).clone();
2996 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
2997
2998 let skills = engine.distill_skills(&events).await;
2999 serde_json::to_value(&skills).map_err(|e| e.to_string())
3000}
3001
3002async fn handle_memory_consolidate(
3006 session: &crate::session::ClientSession,
3007) -> Result<Value, String> {
3008 let engine_arc = session.effective_memgine().await;
3009 let report = {
3010 let mut engine = engine_arc.lock().await;
3011 engine.consolidate().await
3012 };
3013 if let Some(id) = session.agent_id.lock().await.clone() {
3014 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3015 tracing::warn!(agent_id = %id, error = %e,
3016 "agent memgine persist after consolidate failed");
3017 }
3018 }
3019 serde_json::to_value(&report).map_err(|e| e.to_string())
3020}
3021
3022async fn handle_skill_repair(
3026 msg: &JsonRpcMessage,
3027 session: &crate::session::ClientSession,
3028) -> Result<Value, String> {
3029 let name = msg
3030 .params
3031 .get("skill_name")
3032 .and_then(|v| v.as_str())
3033 .ok_or("missing 'skill_name' parameter")?;
3034 let mut engine = session.memgine.lock().await;
3035 let code = engine.repair_skill(name).await;
3036 Ok(match code {
3037 Some(c) => serde_json::json!({ "code": c }),
3038 None => Value::Null,
3039 })
3040}
3041
3042async fn handle_skills_ingest_distilled(
3045 msg: &JsonRpcMessage,
3046 session: &crate::session::ClientSession,
3047) -> Result<Value, String> {
3048 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3049 msg.params
3050 .get("skills")
3051 .cloned()
3052 .unwrap_or(msg.params.clone()),
3053 )
3054 .map_err(|e| format!("invalid skills: {}", e))?;
3055 let mut engine = session.memgine.lock().await;
3056 let nodes = engine.ingest_distilled_skills(&skills);
3057 Ok(serde_json::json!({ "ingested": nodes.len() }))
3058}
3059
3060async fn handle_skills_evolve(
3063 msg: &JsonRpcMessage,
3064 session: &crate::session::ClientSession,
3065) -> Result<Value, String> {
3066 let domain = msg
3067 .params
3068 .get("domain")
3069 .and_then(|v| v.as_str())
3070 .ok_or("missing 'domain' parameter")?
3071 .to_string();
3072 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3073 msg.params
3074 .get("events")
3075 .cloned()
3076 .unwrap_or(Value::Array(vec![])),
3077 )
3078 .map_err(|e| format!("invalid events: {}", e))?;
3079 let mut engine = session.memgine.lock().await;
3080 let skills = engine.evolve_skills(&events, &domain).await;
3081 serde_json::to_value(&skills).map_err(|e| e.to_string())
3082}
3083
3084async fn handle_skills_domains_needing_evolution(
3086 msg: &JsonRpcMessage,
3087 session: &crate::session::ClientSession,
3088) -> Result<Value, String> {
3089 let threshold = msg
3090 .params
3091 .get("threshold")
3092 .and_then(|v| v.as_f64())
3093 .unwrap_or(0.6);
3094 let engine = session.memgine.lock().await;
3095 let domains = engine.domains_needing_evolution(threshold);
3096 serde_json::to_value(&domains).map_err(|e| e.to_string())
3097}
3098
3099async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3101 let engine = get_inference_engine(state);
3102 let req: car_inference::RerankRequest =
3103 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3104 let _permit = state.admission.acquire().await;
3105 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3106 serde_json::to_value(&result).map_err(|e| e.to_string())
3107}
3108
3109async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3115 use base64::Engine as _;
3116 let engine = get_inference_engine(state);
3117
3118 let mut params = msg.params.clone();
3125 let audio_b64 = params
3126 .as_object_mut()
3127 .and_then(|m| m.remove("audio_b64"))
3128 .and_then(|v| v.as_str().map(str::to_string));
3129 let _tmp_audio = if let Some(b64) = audio_b64 {
3130 let bytes = base64::engine::general_purpose::STANDARD
3131 .decode(b64.as_bytes())
3132 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3133 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3134 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3135 let path = tmp.path().to_string_lossy().into_owned();
3136 if let Some(obj) = params.as_object_mut() {
3137 obj.insert("audio_path".to_string(), Value::String(path));
3138 }
3139 Some(tmp)
3140 } else {
3141 None
3142 };
3143
3144 let req: car_inference::TranscribeRequest =
3145 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3146 let _permit = state.admission.acquire().await;
3147 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3148 serde_json::to_value(&result).map_err(|e| e.to_string())
3149}
3150
3151async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3157 use base64::Engine as _;
3158 let engine = get_inference_engine(state);
3159
3160 let mut params = msg.params.clone();
3161 let return_b64 = params
3162 .as_object_mut()
3163 .and_then(|m| m.remove("return_b64"))
3164 .and_then(|v| v.as_bool())
3165 .unwrap_or(false);
3166 let no_output_path = params
3167 .as_object()
3168 .map(|m| !m.contains_key("output_path"))
3169 .unwrap_or(true);
3170
3171 let req: car_inference::SynthesizeRequest =
3172 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3173 let _permit = state.admission.acquire().await;
3174 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3175 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3176
3177 if return_b64 || no_output_path {
3181 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3182 format!(
3183 "synthesize: failed to read rendered audio at {}: {e}",
3184 result.audio_path
3185 )
3186 })?;
3187 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3188 if let Some(obj) = value.as_object_mut() {
3189 obj.insert("audio_b64".to_string(), Value::String(encoded));
3190 }
3191 }
3192 Ok(value)
3193}
3194
3195async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3199 let engine = get_inference_engine(state);
3200 let status = engine
3201 .prepare_speech_runtime()
3202 .await
3203 .map_err(|e| e.to_string())?;
3204 serde_json::to_value(&status).map_err(|e| e.to_string())
3205}
3206
3207async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3210 let prompt = msg
3211 .params
3212 .get("prompt")
3213 .and_then(|v| v.as_str())
3214 .ok_or("missing 'prompt' parameter")?;
3215 let engine = get_inference_engine(state);
3216 let decision = engine.route_adaptive(prompt).await;
3217 serde_json::to_value(&decision).map_err(|e| e.to_string())
3218}
3219
3220async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3222 let engine = get_inference_engine(state);
3223 let profiles = engine.export_profiles().await;
3224 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3225}
3226
3227async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3229 let n = session.runtime.log.lock().await.len();
3230 Ok(Value::from(n as u64))
3231}
3232
3233async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3234 let stats = session.runtime.log.lock().await.stats();
3235 serde_json::to_value(stats).map_err(|e| e.to_string())
3236}
3237
3238#[derive(Deserialize)]
3239#[serde(rename_all = "camelCase")]
3240struct EventsTruncateParams {
3241 #[serde(default)]
3242 max_events: Option<usize>,
3243 #[serde(default)]
3244 max_spans: Option<usize>,
3245}
3246
3247async fn handle_events_truncate(
3248 msg: &JsonRpcMessage,
3249 session: &crate::session::ClientSession,
3250) -> Result<Value, String> {
3251 let params: EventsTruncateParams =
3252 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3253 max_events: None,
3254 max_spans: None,
3255 });
3256 let mut log = session.runtime.log.lock().await;
3257 let removed_events = params
3258 .max_events
3259 .map(|max| log.truncate_events_keep_last(max))
3260 .unwrap_or(0);
3261 let removed_spans = params
3262 .max_spans
3263 .map(|max| log.truncate_spans_keep_last(max))
3264 .unwrap_or(0);
3265 let stats = log.stats();
3266 Ok(serde_json::json!({
3267 "removedEvents": removed_events,
3268 "removedSpans": removed_spans,
3269 "stats": stats,
3270 }))
3271}
3272
3273async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3274 let mut log = session.runtime.log.lock().await;
3275 let removed = log.clear();
3276 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3277}
3278
3279async fn handle_replan_set_config(
3284 msg: &JsonRpcMessage,
3285 session: &crate::session::ClientSession,
3286) -> Result<Value, String> {
3287 let max_replans = msg
3288 .params
3289 .get("max_replans")
3290 .and_then(|v| v.as_u64())
3291 .unwrap_or(0) as u32;
3292 let delay_ms = msg
3293 .params
3294 .get("delay_ms")
3295 .and_then(|v| v.as_u64())
3296 .unwrap_or(0);
3297 let verify_before_execute = msg
3298 .params
3299 .get("verify_before_execute")
3300 .and_then(|v| v.as_bool())
3301 .unwrap_or(true);
3302 let cfg = car_engine::ReplanConfig {
3303 max_replans,
3304 delay_ms,
3305 verify_before_execute,
3306 };
3307 session.runtime.set_replan_config(cfg).await;
3308 Ok(Value::Null)
3309}
3310
3311async fn handle_skills_list(
3312 msg: &JsonRpcMessage,
3313 session: &crate::session::ClientSession,
3314) -> Result<Value, String> {
3315 let domain = msg.params.get("domain").and_then(|v| v.as_str());
3316 let engine = session.memgine.lock().await;
3317 let skills: Vec<serde_json::Value> = engine
3318 .graph
3319 .inner
3320 .node_indices()
3321 .filter_map(|nix| {
3322 let node = engine.graph.inner.node_weight(nix)?;
3323 if node.kind != car_memgine::MemKind::Skill {
3324 return None;
3325 }
3326 let meta = car_memgine::SkillMeta::from_node(node)?;
3327 if let Some(d) = domain {
3328 match &meta.scope {
3329 car_memgine::SkillScope::Global => {}
3330 car_memgine::SkillScope::Domain(sd) if sd == d => {}
3331 _ => return None,
3332 }
3333 }
3334 Some(serde_json::to_value(&meta).unwrap_or_default())
3335 })
3336 .collect();
3337 serde_json::to_value(&skills).map_err(|e| e.to_string())
3338}
3339
3340#[derive(serde::Deserialize)]
3341struct SecretParams {
3342 #[serde(default)]
3343 service: Option<String>,
3344 key: String,
3345 #[serde(default)]
3346 value: Option<String>,
3347}
3348
3349fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3350 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3351 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3352 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3353}
3354
3355fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3356 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3357 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3358}
3359
3360fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3361 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3362 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3363}
3364
3365fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3366 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3367 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3368}
3369
3370#[derive(serde::Deserialize)]
3371struct PermParams {
3372 domain: String,
3373 #[serde(default)]
3374 target_bundle_id: Option<String>,
3375}
3376
3377fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3378 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3379 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3380}
3381
3382fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3383 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3384 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3385}
3386
3387fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3388 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3389 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3390}
3391
3392fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3393 #[derive(serde::Deserialize)]
3394 struct P {
3395 start: String,
3396 end: String,
3397 #[serde(default)]
3398 calendar_ids: Vec<String>,
3399 }
3400 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3401 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3402 .map_err(|e| format!("parse start: {}", e))?
3403 .with_timezone(&chrono::Utc);
3404 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3405 .map_err(|e| format!("parse end: {}", e))?
3406 .with_timezone(&chrono::Utc);
3407 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3408}
3409
3410fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3411 #[derive(serde::Deserialize)]
3412 struct P {
3413 query: String,
3414 #[serde(default = "default_limit")]
3415 limit: usize,
3416 #[serde(default)]
3417 container_ids: Vec<String>,
3418 }
3419 fn default_limit() -> usize {
3420 50
3421 }
3422 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3423 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
3424}
3425
3426fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
3427 #[derive(serde::Deserialize, Default)]
3428 struct P {
3429 #[serde(default)]
3430 account_ids: Vec<String>,
3431 }
3432 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
3433 car_ffi_common::integrations::mail_inbox(&p.account_ids)
3434}
3435
3436fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
3437 let raw = req.params.to_string();
3438 car_ffi_common::integrations::mail_send(&raw)
3439}
3440
3441fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
3442 #[derive(serde::Deserialize)]
3443 struct P {
3444 #[serde(default = "default_limit")]
3445 limit: usize,
3446 }
3447 fn default_limit() -> usize {
3448 50
3449 }
3450 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3451 car_ffi_common::integrations::messages_chats(p.limit)
3452}
3453
3454fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
3455 let raw = req.params.to_string();
3456 car_ffi_common::integrations::messages_send(&raw)
3457}
3458
3459fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
3460 #[derive(serde::Deserialize)]
3461 struct P {
3462 query: String,
3463 #[serde(default = "default_limit")]
3464 limit: usize,
3465 }
3466 fn default_limit() -> usize {
3467 50
3468 }
3469 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3470 car_ffi_common::integrations::notes_find(&p.query, p.limit)
3471}
3472
3473fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
3474 #[derive(serde::Deserialize)]
3475 struct P {
3476 #[serde(default = "default_limit")]
3477 limit: usize,
3478 }
3479 fn default_limit() -> usize {
3480 50
3481 }
3482 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3483 car_ffi_common::integrations::reminders_items(p.limit)
3484}
3485
3486fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
3487 #[derive(serde::Deserialize)]
3488 struct P {
3489 #[serde(default = "default_limit")]
3490 limit: usize,
3491 }
3492 fn default_limit() -> usize {
3493 100
3494 }
3495 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
3496 car_ffi_common::integrations::bookmarks_list(p.limit)
3497}
3498
3499fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
3500 #[derive(serde::Deserialize)]
3501 struct P {
3502 start: String,
3503 end: String,
3504 }
3505 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3506 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
3507 .map_err(|e| format!("parse start: {}", e))?
3508 .with_timezone(&chrono::Utc);
3509 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
3510 .map_err(|e| format!("parse end: {}", e))?
3511 .with_timezone(&chrono::Utc);
3512 car_ffi_common::health::sleep_windows(s, e)
3513}
3514
3515fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
3516 #[derive(serde::Deserialize)]
3517 struct P {
3518 start: String,
3519 end: String,
3520 }
3521 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3522 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
3523 .map_err(|e| format!("parse start: {}", e))?
3524 .with_timezone(&chrono::Utc);
3525 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
3526 .map_err(|e| format!("parse end: {}", e))?
3527 .with_timezone(&chrono::Utc);
3528 car_ffi_common::health::workouts(s, e)
3529}
3530
3531fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
3532 #[derive(serde::Deserialize)]
3533 struct P {
3534 start: String,
3535 end: String,
3536 }
3537 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3538 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
3539 .map_err(|e| format!("parse start: {}", e))?;
3540 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
3541 .map_err(|e| format!("parse end: {}", e))?;
3542 car_ffi_common::health::activity(s, e)
3543}
3544
3545async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
3546 let closed = session.browser.close().await?;
3547 Ok(serde_json::json!({"closed": closed}))
3548}
3549
3550async fn handle_browser_run(
3551 req: &JsonRpcMessage,
3552 session: &crate::session::ClientSession,
3553) -> Result<Value, String> {
3554 #[derive(serde::Deserialize)]
3555 struct BrowserRunParams {
3556 script: Value,
3558 #[serde(default)]
3559 width: Option<u32>,
3560 #[serde(default)]
3561 height: Option<u32>,
3562 #[serde(default)]
3567 headed: Option<bool>,
3568 #[serde(default)]
3571 extra_args: Option<Vec<String>>,
3572 }
3573 let params: BrowserRunParams =
3574 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3575
3576 let script_json = match params.script {
3578 Value::String(s) => s,
3579 other => other.to_string(),
3580 };
3581
3582 let browser_session = session
3583 .browser
3584 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
3585 width: params.width.unwrap_or(1280),
3586 height: params.height.unwrap_or(720),
3587 headless: !params.headed.unwrap_or(false),
3588 extra_args: params.extra_args.unwrap_or_default(),
3589 })
3590 .await?;
3591
3592 let trace_json = browser_session.run(&script_json).await?;
3593 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
3594}
3595
3596#[derive(Deserialize)]
3609struct VoiceStartParams {
3610 session_id: String,
3611 audio_source: Value,
3612 #[serde(default)]
3613 options: Option<Value>,
3614}
3615
3616async fn handle_voice_transcribe_stream_start(
3617 req: &JsonRpcMessage,
3618 state: &Arc<ServerState>,
3619 session: &Arc<crate::session::ClientSession>,
3620) -> Result<Value, String> {
3621 let params: VoiceStartParams =
3622 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3623 let audio_source_json =
3624 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
3625 let options_json = params
3626 .options
3627 .as_ref()
3628 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
3629 .transpose()?;
3630 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
3631 channel: session.channel.clone(),
3632 });
3633 let json = car_ffi_common::voice::transcribe_stream_start(
3634 ¶ms.session_id,
3635 &audio_source_json,
3636 options_json.as_deref(),
3637 state.voice_sessions.clone(),
3638 sink,
3639 )
3640 .await?;
3641 serde_json::from_str(&json).map_err(|e| e.to_string())
3642}
3643
3644#[derive(Deserialize)]
3645struct VoiceStopParams {
3646 session_id: String,
3647}
3648
3649async fn handle_voice_transcribe_stream_stop(
3650 req: &JsonRpcMessage,
3651 state: &Arc<ServerState>,
3652) -> Result<Value, String> {
3653 let params: VoiceStopParams =
3654 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3655 let json = car_ffi_common::voice::transcribe_stream_stop(
3656 ¶ms.session_id,
3657 state.voice_sessions.clone(),
3658 )
3659 .await?;
3660 serde_json::from_str(&json).map_err(|e| e.to_string())
3661}
3662
3663#[derive(Deserialize)]
3664struct VoicePushParams {
3665 session_id: String,
3666 pcm_b64: String,
3670}
3671
3672async fn handle_voice_transcribe_stream_push(
3673 req: &JsonRpcMessage,
3674 state: &Arc<ServerState>,
3675) -> Result<Value, String> {
3676 use base64::Engine;
3677 let params: VoicePushParams =
3678 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3679 let pcm = base64::engine::general_purpose::STANDARD
3680 .decode(¶ms.pcm_b64)
3681 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
3682 let json = car_ffi_common::voice::transcribe_stream_push(
3683 ¶ms.session_id,
3684 &pcm,
3685 state.voice_sessions.clone(),
3686 )
3687 .await?;
3688 serde_json::from_str(&json).map_err(|e| e.to_string())
3689}
3690
3691fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
3692 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
3693 serde_json::from_str(&json).unwrap_or(Value::Null)
3694}
3695
3696async fn handle_voice_dispatch_turn(
3697 req: &JsonRpcMessage,
3698 state: &Arc<ServerState>,
3699 session: &Arc<crate::session::ClientSession>,
3700) -> Result<Value, String> {
3701 let req_value = req.params.clone();
3702 let request: crate::voice_turn::DispatchVoiceTurnRequest =
3703 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
3704 let engine = get_inference_engine(state).clone();
3705 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
3706 channel: session.channel.clone(),
3707 });
3708 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
3709 serde_json::to_value(resp).map_err(|e| e.to_string())
3710}
3711
3712async fn handle_voice_cancel_turn() -> Result<Value, String> {
3713 crate::voice_turn::cancel().await;
3714 Ok(serde_json::json!({"cancelled": true}))
3715}
3716
3717async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
3718 let engine = get_inference_engine(state).clone();
3719 crate::voice_turn::prewarm(engine).await;
3720 Ok(serde_json::json!({"prewarmed": true}))
3721}
3722
3723fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
3742 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
3743 std::sync::OnceLock::new();
3744 SLOT.get_or_init(|| std::sync::RwLock::new(None))
3745}
3746
3747fn ws_runner_calls(
3748) -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
3749 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
3750 std::sync::OnceLock::new();
3751 MAP.get_or_init(dashmap::DashMap::new)
3752}
3753
3754fn ws_runner_completions() -> &'static dashmap::DashMap<
3755 String,
3756 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
3757> {
3758 static MAP: std::sync::OnceLock<
3759 dashmap::DashMap<
3760 String,
3761 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
3762 >,
3763 > = std::sync::OnceLock::new();
3764 MAP.get_or_init(dashmap::DashMap::new)
3765}
3766
3767struct WsInferenceRunner;
3768
3769#[async_trait::async_trait]
3770impl car_inference::InferenceRunner for WsInferenceRunner {
3771 async fn run(
3772 &self,
3773 request: car_inference::tasks::generate::GenerateRequest,
3774 emitter: car_inference::EventEmitter,
3775 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
3776 let channel = ws_runner_session()
3777 .read()
3778 .map_err(|e| {
3779 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
3780 })?
3781 .clone()
3782 .ok_or_else(|| {
3783 car_inference::RunnerError::Declined(
3784 "no WebSocket inference runner registered — call inference.register_runner first"
3785 .into(),
3786 )
3787 })?;
3788
3789 let call_id = uuid::Uuid::new_v4().to_string();
3790 let request_json = serde_json::to_value(&request)
3791 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
3792 let (tx, rx) = tokio::sync::oneshot::channel();
3793 ws_runner_calls().insert(call_id.clone(), emitter);
3794 ws_runner_completions().insert(call_id.clone(), tx);
3795
3796 use futures::SinkExt;
3798 let notification = serde_json::json!({
3799 "jsonrpc": "2.0",
3800 "method": "inference.runner.invoke",
3801 "params": {
3802 "call_id": call_id,
3803 "request": request_json,
3804 },
3805 });
3806 let text = serde_json::to_string(¬ification)
3807 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
3808 let _ = channel
3809 .write
3810 .lock()
3811 .await
3812 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
3813 .await;
3814
3815 let result = rx.await.map_err(|_| {
3816 car_inference::RunnerError::Failed("runner completion channel dropped".into())
3817 })?;
3818 ws_runner_calls().remove(&call_id);
3819 result.map_err(car_inference::RunnerError::Failed)
3820 }
3821}
3822
3823async fn handle_inference_register_runner(
3824 session: &Arc<crate::session::ClientSession>,
3825) -> Result<Value, String> {
3826 let mut guard = ws_runner_session()
3827 .write()
3828 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
3829 *guard = Some(session.channel.clone());
3830 drop(guard);
3831 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
3832 Ok(serde_json::json!({"registered": true}))
3833}
3834
3835#[derive(serde::Deserialize)]
3836struct InferenceRunnerEventParams {
3837 call_id: String,
3838 event: Value,
3839}
3840
3841async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
3842 let params: InferenceRunnerEventParams =
3843 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3844 let stream_event = match parse_runner_event_value(¶ms.event) {
3845 Some(e) => e,
3846 None => return Err("unrecognised runner event shape".into()),
3847 };
3848 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
3849 let emitter = entry.value().clone();
3850 tokio::spawn(async move { emitter.emit(stream_event).await });
3851 }
3852 Ok(serde_json::json!({"emitted": true}))
3853}
3854
3855#[derive(serde::Deserialize)]
3856struct InferenceRunnerCompleteParams {
3857 call_id: String,
3858 result: Value,
3859}
3860
3861async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
3862 let params: InferenceRunnerCompleteParams =
3863 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3864 let result: std::result::Result<car_inference::RunnerResult, String> =
3865 serde_json::from_value(params.result)
3866 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
3867 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
3868 let _ = tx.send(result);
3869 }
3870 Ok(serde_json::json!({"completed": true}))
3871}
3872
3873#[derive(serde::Deserialize)]
3874struct InferenceRunnerFailParams {
3875 call_id: String,
3876 error: String,
3877}
3878
3879async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
3880 let params: InferenceRunnerFailParams =
3881 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3882 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
3883 let _ = tx.send(Err(params.error));
3884 }
3885 Ok(serde_json::json!({"failed": true}))
3886}
3887
3888fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
3889 let ty = v.get("type").and_then(|t| t.as_str())?;
3890 match ty {
3891 "text" => Some(car_inference::StreamEvent::TextDelta(
3892 v.get("data")?.as_str()?.to_string(),
3893 )),
3894 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
3895 name: v.get("name")?.as_str()?.to_string(),
3896 index: v.get("index")?.as_u64()? as usize,
3897 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
3898 }),
3899 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
3900 index: v.get("index")?.as_u64()? as usize,
3901 arguments_delta: v.get("data")?.as_str()?.to_string(),
3902 }),
3903 "usage" => Some(car_inference::StreamEvent::Usage {
3904 input_tokens: v.get("input_tokens")?.as_u64()?,
3905 output_tokens: v.get("output_tokens")?.as_u64()?,
3906 }),
3907 "done" => Some(car_inference::StreamEvent::Done {
3908 text: v.get("text")?.as_str()?.to_string(),
3909 tool_calls: v
3910 .get("tool_calls")
3911 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
3912 .unwrap_or_default(),
3913 }),
3914 _ => None,
3915 }
3916}
3917
3918#[derive(Deserialize)]
3919struct EnrollSpeakerParams {
3920 label: String,
3921 audio: Value,
3922}
3923
3924async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
3925 let params: EnrollSpeakerParams =
3926 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3927 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
3928 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
3929 serde_json::from_str(&json).map_err(|e| e.to_string())
3930}
3931
3932#[derive(Deserialize)]
3933struct RemoveEnrollmentParams {
3934 label: String,
3935}
3936
3937fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
3938 let params: RemoveEnrollmentParams =
3939 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3940 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
3941 serde_json::from_str(&json).map_err(|e| e.to_string())
3942}
3943
3944#[derive(Deserialize)]
3945struct WorkflowRunParams {
3946 workflow: Value,
3947}
3948
3949async fn handle_workflow_run(
3950 req: &JsonRpcMessage,
3951 session: &Arc<crate::session::ClientSession>,
3952) -> Result<Value, String> {
3953 let params: WorkflowRunParams =
3954 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3955 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
3956 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3957 channel: session.channel.clone(),
3958 host: session.host.clone(),
3959 client_id: session.client_id.clone(),
3960 });
3961 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
3962 serde_json::from_str(&json).map_err(|e| e.to_string())
3963}
3964
3965#[derive(Deserialize)]
3966struct WorkflowVerifyParams {
3967 workflow: Value,
3968}
3969
3970fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
3971 let params: WorkflowVerifyParams =
3972 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3973 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
3974 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
3975 serde_json::from_str(&json).map_err(|e| e.to_string())
3976}
3977
3978async fn handle_meeting_start(
3983 req: &JsonRpcMessage,
3984 state: &Arc<ServerState>,
3985 session: &Arc<crate::session::ClientSession>,
3986) -> Result<Value, String> {
3987 let mut req_value = req.params.clone();
3993 let meeting_id = req_value
3994 .get("id")
3995 .and_then(|v| v.as_str())
3996 .map(str::to_string)
3997 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
3998 if let Some(map) = req_value.as_object_mut() {
3999 map.insert("id".into(), Value::String(meeting_id.clone()));
4000 }
4001 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4002
4003 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4004 Arc::new(crate::session::WsVoiceEventSink {
4005 channel: session.channel.clone(),
4006 });
4007
4008 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4013 Arc::new(crate::session::WsMemgineIngestSink {
4014 meeting_id,
4015 engine: session.memgine.clone(),
4016 upstream: ws_upstream,
4017 });
4018
4019 let cwd = std::env::current_dir().ok();
4020 let json = crate::meeting::start_meeting(
4021 &request_json,
4022 state.meetings.clone(),
4023 state.voice_sessions.clone(),
4024 upstream,
4025 None,
4026 cwd,
4027 )
4028 .await?;
4029 serde_json::from_str(&json).map_err(|e| e.to_string())
4030}
4031
4032#[derive(Deserialize)]
4033struct MeetingStopParams {
4034 meeting_id: String,
4035 #[serde(default = "default_summarize")]
4036 summarize: bool,
4037}
4038
4039fn default_summarize() -> bool {
4040 true
4041}
4042
4043async fn handle_meeting_stop(
4044 req: &JsonRpcMessage,
4045 state: &Arc<ServerState>,
4046 _session: &Arc<crate::session::ClientSession>,
4047) -> Result<Value, String> {
4048 let params: MeetingStopParams =
4049 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4050 let inference = if params.summarize {
4051 Some(state.inference.get().cloned()).flatten()
4052 } else {
4053 None
4054 };
4055 let json = crate::meeting::stop_meeting(
4056 ¶ms.meeting_id,
4057 params.summarize,
4058 state.meetings.clone(),
4059 state.voice_sessions.clone(),
4060 inference,
4061 )
4062 .await?;
4063 serde_json::from_str(&json).map_err(|e| e.to_string())
4064}
4065
4066#[derive(Deserialize, Default)]
4067struct MeetingListParams {
4068 #[serde(default)]
4069 root: Option<std::path::PathBuf>,
4070}
4071
4072fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4073 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4074 let cwd = std::env::current_dir().ok();
4075 let json = crate::meeting::list_meetings(params.root, cwd)?;
4076 serde_json::from_str(&json).map_err(|e| e.to_string())
4077}
4078
4079#[derive(Deserialize)]
4080struct MeetingGetParams {
4081 meeting_id: String,
4082 #[serde(default)]
4083 root: Option<std::path::PathBuf>,
4084}
4085
4086fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4087 let params: MeetingGetParams =
4088 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4089 let cwd = std::env::current_dir().ok();
4090 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4091 serde_json::from_str(&json).map_err(|e| e.to_string())
4092}
4093
4094#[derive(Deserialize, Default)]
4099struct RegistryRegisterParams {
4100 entry: Value,
4104 #[serde(default)]
4105 registry_path: Option<std::path::PathBuf>,
4106}
4107
4108fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4109 let params: RegistryRegisterParams =
4110 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4111 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4112 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4113 Ok(Value::Null)
4114}
4115
4116#[derive(Deserialize, Default)]
4117struct RegistryNameParams {
4118 name: String,
4119 #[serde(default)]
4120 registry_path: Option<std::path::PathBuf>,
4121}
4122
4123fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4124 let params: RegistryNameParams =
4125 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4126 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4127 serde_json::from_str(&json).map_err(|e| e.to_string())
4128}
4129
4130fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4131 let params: RegistryNameParams =
4132 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4133 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4134 Ok(Value::Null)
4135}
4136
4137#[derive(Deserialize, Default)]
4138struct RegistryListParams {
4139 #[serde(default)]
4140 registry_path: Option<std::path::PathBuf>,
4141}
4142
4143fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4144 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4145 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4146 serde_json::from_str(&json).map_err(|e| e.to_string())
4147}
4148
4149#[derive(Deserialize, Default)]
4150struct RegistryReapParams {
4151 #[serde(default = "default_reap_age")]
4154 max_age_secs: u64,
4155 #[serde(default)]
4156 registry_path: Option<std::path::PathBuf>,
4157}
4158
4159fn default_reap_age() -> u64 {
4160 60
4161}
4162
4163fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4164 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4165 let json =
4166 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4167 serde_json::from_str(&json).map_err(|e| e.to_string())
4168}
4169
4170async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
4177 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4178 let json = crate::a2a::start_a2a(¶ms_json).await?;
4179 serde_json::from_str(&json).map_err(|e| e.to_string())
4180}
4181
4182fn handle_a2a_stop() -> Result<Value, String> {
4183 let json = crate::a2a::stop_a2a()?;
4184 serde_json::from_str(&json).map_err(|e| e.to_string())
4185}
4186
4187fn handle_a2a_status() -> Result<Value, String> {
4188 let json = crate::a2a::a2a_status()?;
4189 serde_json::from_str(&json).map_err(|e| e.to_string())
4190}
4191
4192#[derive(Deserialize)]
4193#[serde(rename_all = "camelCase")]
4194struct A2aSendParams {
4195 endpoint: String,
4196 message: car_a2a::Message,
4197 #[serde(default)]
4198 blocking: bool,
4199 #[serde(default = "default_true")]
4200 ingest_a2ui: bool,
4201 #[serde(default)]
4202 route_auth: Option<A2aRouteAuth>,
4203 #[serde(default)]
4204 allow_untrusted_endpoint: bool,
4205}
4206
4207fn default_true() -> bool {
4208 true
4209}
4210
4211async fn handle_a2a_dispatch(
4221 method: &str,
4222 req: &JsonRpcMessage,
4223 state: &Arc<ServerState>,
4224) -> Result<Value, String> {
4225 let dispatcher = state.a2a_dispatcher().await;
4226 dispatcher
4227 .dispatch(method, req.params.clone())
4228 .await
4229 .map_err(|e| e.to_string())
4230}
4231
4232async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4233 let params: A2aSendParams =
4234 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4235 let endpoint = trusted_route_endpoint(
4236 Some(params.endpoint.clone()),
4237 params.allow_untrusted_endpoint,
4238 )
4239 .ok_or_else(|| {
4240 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4241 })?;
4242 let client = match params.route_auth.clone() {
4243 Some(auth) => {
4244 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4245 }
4246 None => car_a2a::A2aClient::new(endpoint.clone()),
4247 };
4248 let result = client
4249 .send_message(params.message, params.blocking)
4250 .await
4251 .map_err(|e| e.to_string())?;
4252 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4253 let mut applied = Vec::new();
4254 if params.ingest_a2ui {
4255 state
4256 .a2ui
4257 .validate_payload(&result_value)
4258 .map_err(|e| e.to_string())?;
4259 let routed_endpoint = Some(endpoint.clone());
4260 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4261 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4262 if owner.endpoint.is_none() {
4263 owner.with_endpoint(routed_endpoint.clone())
4264 } else {
4265 owner
4266 }
4267 });
4268 applied.push(
4269 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4270 );
4271 }
4272 }
4273 Ok(serde_json::json!({
4274 "result": result,
4275 "a2ui": {
4276 "applied": applied,
4277 }
4278 }))
4279}
4280
4281async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4289 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4290 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4291 serde_json::from_str(&json).map_err(|e| e.to_string())
4292}
4293
4294async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4295 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4296 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4297 serde_json::from_str(&json).map_err(|e| e.to_string())
4298}
4299
4300async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4301 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4302 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4303 serde_json::from_str(&json).map_err(|e| e.to_string())
4304}
4305
4306async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4307 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4308 let json = car_ffi_common::notifications::local(&args_json).await?;
4309 serde_json::from_str(&json).map_err(|e| e.to_string())
4310}
4311
4312async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4313 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4314 let json = car_ffi_common::vision::ocr(&args_json).await?;
4315 serde_json::from_str(&json).map_err(|e| e.to_string())
4316}
4317
4318async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4323 let supervisor = state.supervisor()?;
4324 let agents = supervisor.list().await;
4325 let attached = state.attached_agents.lock().await.clone();
4332 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4333 for a in agents {
4334 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4335 let session_id = attached.get(&a.spec.id).cloned();
4336 if let Some(map) = v.as_object_mut() {
4337 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4338 if let Some(sid) = session_id {
4339 map.insert("session_id".to_string(), Value::String(sid));
4340 }
4341 }
4342 decorated.push(v);
4343 }
4344 Ok(Value::Array(decorated))
4345}
4346
4347async fn handle_agents_upsert(
4348 req: &JsonRpcMessage,
4349 state: &Arc<ServerState>,
4350) -> Result<Value, String> {
4351 let mut params = req.params.clone();
4352 if let Some(name) = params
4361 .get("interpreter")
4362 .and_then(|v| v.as_str())
4363 .map(str::to_string)
4364 {
4365 let resolved = car_registry::supervisor::resolve_interpreter(&name)
4366 .map_err(|e| e.to_string())?;
4367 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4368 }
4369 let spec: car_registry::supervisor::AgentSpec =
4370 serde_json::from_value(params).map_err(|e| e.to_string())?;
4371 let supervisor = state.supervisor()?;
4372 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4373 serde_json::to_value(agent).map_err(|e| e.to_string())
4374}
4375
4376async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
4377 let supervisor = state.supervisor()?;
4378 let entries = supervisor.health().await;
4379 serde_json::to_value(entries).map_err(|e| e.to_string())
4380}
4381
4382fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
4383 req.params
4384 .get("id")
4385 .and_then(Value::as_str)
4386 .map(str::to_string)
4387 .ok_or_else(|| "missing required `id` parameter".to_string())
4388}
4389
4390async fn handle_agents_remove(
4391 req: &JsonRpcMessage,
4392 state: &Arc<ServerState>,
4393) -> Result<Value, String> {
4394 let id = extract_agent_id(req)?;
4395 let supervisor = state.supervisor()?;
4396 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
4397 Ok(serde_json::json!({ "removed": removed }))
4398}
4399
4400async fn handle_agents_start(
4401 req: &JsonRpcMessage,
4402 state: &Arc<ServerState>,
4403) -> Result<Value, String> {
4404 let id = extract_agent_id(req)?;
4405 let supervisor = state.supervisor()?;
4406 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
4407 serde_json::to_value(agent).map_err(|e| e.to_string())
4408}
4409
4410async fn handle_agents_stop(
4411 req: &JsonRpcMessage,
4412 state: &Arc<ServerState>,
4413) -> Result<Value, String> {
4414 let id = extract_agent_id(req)?;
4415 let signal: car_registry::supervisor::StopSignal = req
4416 .params
4417 .get("signal")
4418 .map(|v| serde_json::from_value(v.clone()))
4419 .transpose()
4420 .map_err(|e| e.to_string())?
4421 .unwrap_or_default();
4422 let supervisor = state.supervisor()?;
4423 let agent = supervisor
4424 .stop(&id, signal)
4425 .await
4426 .map_err(|e| e.to_string())?;
4427 serde_json::to_value(agent).map_err(|e| e.to_string())
4428}
4429
4430async fn handle_agents_restart(
4431 req: &JsonRpcMessage,
4432 state: &Arc<ServerState>,
4433) -> Result<Value, String> {
4434 let id = extract_agent_id(req)?;
4435 let supervisor = state.supervisor()?;
4436 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
4437 serde_json::to_value(agent).map_err(|e| e.to_string())
4438}
4439
4440async fn handle_agents_tail_log(
4441 req: &JsonRpcMessage,
4442 state: &Arc<ServerState>,
4443) -> Result<Value, String> {
4444 let id = extract_agent_id(req)?;
4445 let n = req
4446 .params
4447 .get("n")
4448 .and_then(Value::as_u64)
4449 .unwrap_or(100) as usize;
4450 let supervisor = state.supervisor()?;
4451 let lines = supervisor
4452 .tail_log(&id, n)
4453 .await
4454 .map_err(|e| e.to_string())?;
4455 Ok(serde_json::json!({ "lines": lines }))
4456}
4457
4458async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
4469 let include_health = req
4470 .params
4471 .get("include_health")
4472 .and_then(Value::as_bool)
4473 .unwrap_or(false);
4474 let json = car_ffi_common::external_agents::list(include_health).await?;
4475 serde_json::from_str(&json).map_err(|e| e.to_string())
4476}
4477
4478async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
4479 let include_health = req
4480 .params
4481 .get("include_health")
4482 .and_then(Value::as_bool)
4483 .unwrap_or(false);
4484 let json = car_ffi_common::external_agents::detect(include_health).await?;
4485 serde_json::from_str(&json).map_err(|e| e.to_string())
4486}
4487
4488async fn handle_agents_invoke_external(
4506 req: &JsonRpcMessage,
4507 state: &Arc<ServerState>,
4508) -> Result<Value, String> {
4509 let id = req
4510 .params
4511 .get("id")
4512 .and_then(Value::as_str)
4513 .ok_or_else(|| "missing required `id` parameter".to_string())?;
4514 let task = req
4515 .params
4516 .get("task")
4517 .and_then(Value::as_str)
4518 .ok_or_else(|| "missing required `task` parameter".to_string())?;
4519 let mut options_value = req.params.clone();
4524 if let Some(obj) = options_value.as_object_mut() {
4525 obj.remove("id");
4526 obj.remove("task");
4527 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
4536 if !has_explicit_mcp {
4537 if let Some(url) = state.mcp_url.get() {
4538 obj.insert(
4539 "mcp_endpoint".to_string(),
4540 Value::String(url.clone()),
4541 );
4542 }
4543 }
4544 }
4545 let options_json = options_value.to_string();
4546 let json = car_ffi_common::external_agents::invoke(id, task, &options_json).await?;
4547 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
4548 append_external_agent_audit(id, task, &options_value, &result);
4549 Ok(result)
4550}
4551
4552fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
4558 use std::io::Write;
4559 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
4560 Some(home) => home.join(".car"),
4561 None => return,
4562 };
4563 if std::fs::create_dir_all(&car_dir).is_err() {
4564 return;
4565 }
4566 let path = car_dir.join("external-agents.jsonl");
4567 let record = serde_json::json!({
4568 "ts": chrono::Utc::now().to_rfc3339(),
4569 "adapter_id": id,
4570 "task": task,
4571 "options": options,
4572 "result": result,
4573 });
4574 let line = match serde_json::to_string(&record) {
4575 Ok(s) => s,
4576 Err(_) => return,
4577 };
4578 if let Ok(mut f) = std::fs::OpenOptions::new()
4579 .create(true)
4580 .append(true)
4581 .open(&path)
4582 {
4583 let _ = writeln!(f, "{}", line);
4584 } else {
4585 tracing::warn!(
4586 path = %path.display(),
4587 "failed to append external-agent audit record"
4588 );
4589 }
4590}
4591
4592async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
4598 let force = req
4599 .params
4600 .get("force")
4601 .and_then(Value::as_bool)
4602 .unwrap_or(false);
4603 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
4604 let json = car_ffi_common::external_agents::health_one(id, force).await?;
4605 serde_json::from_str(&json).map_err(|e| e.to_string())
4606 } else {
4607 let json = car_ffi_common::external_agents::health(force).await?;
4608 serde_json::from_str(&json).map_err(|e| e.to_string())
4609 }
4610}