1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176 while let Some(msg) = read.next().await {
177 while conn_tasks.try_join_next().is_some() {}
180
181 let msg = match msg {
190 Ok(m) => m,
191 Err(e) => {
192 info!("read error from {}: {}; closing", client_id, e);
193 break;
194 }
195 };
196 if msg.is_text() {
197 let text = match msg.to_text() {
198 Ok(t) => t,
199 Err(e) => {
200 info!("non-text frame from {}: {}; closing", client_id, e);
201 break;
202 }
203 };
204 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205 Ok(m) => m,
206 Err(e) => {
207 send_response(
208 &session.channel,
209 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210 )
211 .await
212 .ok();
213 continue;
214 }
215 };
216
217 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219 if let Some(id_str) = parsed.id.as_str() {
220 let mut pending = session.channel.pending.lock().await;
221 if let Some(tx) = pending.remove(id_str) {
222 let tool_resp = if let Some(result) = parsed.result {
223 ToolExecuteResponse {
224 action_id: id_str.to_string(),
225 output: Some(result),
226 error: None,
227 }
228 } else {
229 let err_msg = parsed
230 .error
231 .as_ref()
232 .and_then(|e| e.get("message"))
233 .and_then(|m| m.as_str())
234 .unwrap_or("unknown error")
235 .to_string();
236 ToolExecuteResponse {
237 action_id: id_str.to_string(),
238 output: None,
239 error: Some(err_msg),
240 }
241 };
242 let _ = tx.send(tool_resp);
243 continue;
244 }
245 }
246 }
247
248 if try_forward_agent_chat_event(&parsed, &state).await {
257 continue;
258 }
259
260 if let Some(method) = &parsed.method {
262 info!(method = %method, "dispatching JSON-RPC method");
263
264 if state.auth_token.get().is_some()
272 && !session
273 .authenticated
274 .load(std::sync::atomic::Ordering::Acquire)
275 && method != "session.auth"
276 {
277 let resp = JsonRpcResponse::error(
278 parsed.id.clone(),
279 -32001,
280 "auth required: send `session.auth` with the per-launch token \
281 from ~/Library/Application Support/ai.parslee.car/auth-token \
282 (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283 or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284 as the first frame on this connection",
285 );
286 let _ = send_response(&session.channel, resp).await;
287 info!(client = %client_id, method = %method,
288 "rejecting non-auth method on unauthenticated session; closing");
289 break;
290 }
291
292 if state.approval_gate.requires_approval(method.as_str()) {
304 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305 Ok(()) => {}
306 Err(reason) => {
307 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308 let _ = send_response(&session.channel, resp).await;
309 info!(
310 client = %client_id,
311 method = %method,
312 reason = %reason,
313 "approval gate blocked dispatch"
314 );
315 continue;
316 }
317 }
318 }
319
320 let session_task = session.clone();
335 let state_task = state.clone();
336 let method_owned = method.clone();
337 let parsed_task = parsed;
338 conn_tasks.spawn(async move {
341 let session = session_task;
342 let state = state_task;
343 let parsed = parsed_task;
344 let result = match method_owned.as_str() {
345 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346 "parslee.auth" => handle_parslee_auth().await,
347 "auth.start" => handle_auth_start(&parsed).await,
348 "auth.complete" => handle_auth_complete(&parsed).await,
349 "auth.status" => handle_auth_status().await,
350 "auth.logout" => handle_auth_logout().await,
351 "session.init" => handle_session_init(&parsed, &session).await,
352 "host.subscribe" => handle_host_subscribe(&session, &state).await,
353 "host.agents" => handle_host_agents(&session).await,
354 "host.events" => handle_host_events(&parsed, &session).await,
355 "host.approvals" => handle_host_approvals(&session).await,
356 "host.register_agent" => {
357 handle_host_register_agent(&parsed, &session).await
358 }
359 "host.unregister_agent" => {
360 handle_host_unregister_agent(&parsed, &session).await
361 }
362 "host.set_status" => handle_host_set_status(&parsed, &session).await,
363 "host.notify" => handle_host_notify(&parsed, &session).await,
364 "host.request_approval" => {
365 handle_host_request_approval(&parsed, &session).await
366 }
367 "host.resolve_approval" => {
368 handle_host_resolve_approval(&parsed, &session).await
369 }
370 "tools.register" => handle_tools_register(&parsed, &session).await,
371 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
372 "policy.register" => handle_policy_register(&parsed, &session).await,
373 "session.policy.open" => handle_session_policy_open(&session).await,
374 "session.policy.close" => {
375 handle_session_policy_close(&parsed, &session).await
376 }
377 "verify" => handle_verify(&parsed, &session).await,
378 "state.get" => handle_state_get(&parsed, &session).await,
379 "state.set" => handle_state_set(&parsed, &session).await,
380 "state.exists" => handle_state_exists(&parsed, &session).await,
381 "state.keys" => handle_state_keys(&parsed, &session).await,
382 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
383 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
384 "memory.query" => handle_memory_query(&parsed, &session).await,
385 "memory.build_context" => {
386 handle_memory_build_context(&parsed, &session).await
387 }
388 "memory.build_context_fast" => {
389 handle_memory_build_context_fast(&parsed, &session).await
390 }
391 "memory.consolidate" => handle_memory_consolidate(&session).await,
392 "memory.fact_count" => handle_memory_fact_count(&session).await,
393 "memory.persist" => handle_memory_persist(&parsed, &session).await,
394 "memory.load" => handle_memory_load(&parsed, &session).await,
395 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
396 "skill.find" => handle_skill_find(&parsed, &session).await,
397 "skill.report" => handle_skill_report(&parsed, &session).await,
398 "skill.repair" => handle_skill_repair(&parsed, &session).await,
399 "skills.ingest_distilled" => {
400 handle_skills_ingest_distilled(&parsed, &session).await
401 }
402 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
403 "skills.domains_needing_evolution" => {
404 handle_skills_domains_needing_evolution(&parsed, &session).await
405 }
406 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
407 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
408 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
409 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
410 "multi.vote" => handle_multi_vote(&parsed, &session).await,
411 "scheduler.create" => handle_scheduler_create(&parsed),
412 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
413 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
414 "infer" => handle_infer(&parsed, &state, &session).await,
415 "image.generate" => handle_image_generate(&parsed, &state).await,
416 "video.generate" => handle_video_generate(&parsed, &state).await,
417 "embed" => handle_embed(&parsed, &state).await,
418 "classify" => handle_classify(&parsed, &state).await,
419 "tokenize" => handle_tokenize(&parsed, &state).await,
420 "detokenize" => handle_detokenize(&parsed, &state).await,
421 "rerank" => handle_rerank(&parsed, &state).await,
422 "transcribe" => handle_transcribe(&parsed, &state).await,
423 "synthesize" => handle_synthesize(&parsed, &state).await,
424 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
425 "speech.prepare" => handle_speech_prepare(&state).await,
426 "models.route" => handle_models_route(&parsed, &state).await,
427 "models.stats" => handle_models_stats(&state).await,
428 "outcomes.resolve_pending" => {
429 handle_outcomes_resolve_pending(&parsed, &state).await
430 }
431 "events.count" => handle_events_count(&session).await,
432 "events.stats" => handle_events_stats(&session).await,
433 "events.truncate" => handle_events_truncate(&parsed, &session).await,
434 "events.clear" => handle_events_clear(&session).await,
435 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
436 "models.list" => handle_models_list(&state),
437 "models.register" => handle_models_register(&parsed, &state).await,
438 "models.unregister" => handle_models_unregister(&parsed, &state).await,
439 "models.list_unified" => handle_models_list_unified(&state),
440 "models.search" => handle_models_search(&parsed, &state),
441 "models.upgrades" => handle_models_upgrades(&state),
442 "models.pull" => handle_models_pull(&parsed, &state).await,
443 "models.install" => handle_models_pull(&parsed, &state).await,
444 "skills.distill" => handle_skills_distill(&parsed, &state).await,
445 "skills.list" => handle_skills_list(&parsed, &session).await,
446 "browser.run" => handle_browser_run(&parsed, &session).await,
447 "browser.close" => handle_browser_close(&session).await,
448 "secret.put" => handle_secret_put(&parsed),
449 "secret.get" => handle_secret_get(&parsed),
450 "secret.delete" => handle_secret_delete(&parsed),
451 "secret.status" => handle_secret_status(&parsed),
452 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
453 "permissions.status" => handle_perm_status(&parsed),
454 "permissions.request" => handle_perm_request(&parsed),
455 "permissions.explain" => handle_perm_explain(&parsed),
456 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
457 "accounts.list" => car_ffi_common::accounts::list(),
458 "accounts.open" => {
459 #[derive(serde::Deserialize, Default)]
460 struct OpenParams {
461 #[serde(default)]
462 account_id: Option<String>,
463 }
464 let p: OpenParams =
465 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
466 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
467 }
468 "calendar.list" => car_ffi_common::integrations::calendar_list(),
469 "calendar.events" => handle_calendar_events(&parsed),
470 "contacts.containers" => {
471 car_ffi_common::integrations::contacts_containers()
472 }
473 "contacts.find" => handle_contacts_find(&parsed),
474 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
475 "mail.inbox" => handle_mail_inbox(&parsed),
476 "mail.send" => handle_mail_send(&parsed),
477 "messages.services" => car_ffi_common::integrations::messages_services(),
478 "messages.chats" => handle_messages_chats(&parsed),
479 "messages.send" => handle_messages_send(&parsed),
480 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
481 "notes.find" => handle_notes_find(&parsed),
482 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
483 "reminders.items" => handle_reminders_items(&parsed),
484 "photos.albums" => car_ffi_common::integrations::photos_albums(),
485 "bookmarks.list" => handle_bookmarks_list(&parsed),
486 "files.locations" => car_ffi_common::integrations::files_locations(),
487 "keychain.status" => car_ffi_common::integrations::keychain_status(),
488 "health.status" => car_ffi_common::health::status(),
489 "health.sleep" => handle_health_sleep(&parsed),
490 "health.workouts" => handle_health_workouts(&parsed),
491 "health.activity" => handle_health_activity(&parsed),
492 "voice.transcribe_stream.start" => {
493 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
494 }
495 "voice.transcribe_stream.stop" => {
496 handle_voice_transcribe_stream_stop(&parsed, &state).await
497 }
498 "voice.transcribe_stream.push" => {
499 handle_voice_transcribe_stream_push(&parsed, &state).await
500 }
501 "voice.tts_stream.start" => {
502 handle_voice_tts_stream_start(&parsed, &session).await
503 }
504 "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
505 "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
506 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
507 "voice.dispatch_turn" => {
508 handle_voice_dispatch_turn(&parsed, &state, &session).await
509 }
510 "voice.cancel_turn" => handle_voice_cancel_turn().await,
511 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
512 "inference.register_runner" => {
513 handle_inference_register_runner(&session).await
514 }
515 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
516 "inference.runner.complete" => {
517 handle_inference_runner_complete(&parsed).await
518 }
519 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
520 "voice.providers.list" => {
521 serde_json::from_str::<serde_json::Value>(
525 &car_voice::list_voice_providers_json(),
526 )
527 .map_err(|e| e.to_string())
528 }
529 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
530 .await
531 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
533 .await
534 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
535 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
536 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
537 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
538 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
539 "workflow.run" => handle_workflow_run(&parsed, &session).await,
540 "workflow.verify" => handle_workflow_verify(&parsed),
541 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
542 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
543 "meeting.list" => handle_meeting_list(&parsed),
544 "meeting.get" => handle_meeting_get(&parsed),
545 "registry.register" => handle_registry_register(&parsed),
546 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
547 "registry.unregister" => handle_registry_unregister(&parsed),
548 "registry.list" => handle_registry_list(&parsed),
549 "registry.reap" => handle_registry_reap(&parsed),
550 "admission.status" => handle_admission_status(&state),
551 "a2a.start" => handle_a2a_start(&parsed, &session).await,
552 "a2a.stop" => handle_a2a_stop(),
553 "a2a.status" => handle_a2a_status(),
554 "a2a.send" => handle_a2a_send(&parsed, &state).await,
555 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
556 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
557 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
558 "a2ui.reap" => handle_a2ui_reap(&state).await,
559 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
560 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
561 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
562 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
563 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
564 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
565 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
566 "automation.run_applescript" => handle_run_applescript(&parsed).await,
567 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
568 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
569 "notifications.local" => handle_local_notification(&parsed).await,
570 "vision.ocr" => handle_vision_ocr(&parsed).await,
571 "agents.list" => handle_agents_list(&state).await,
572 "agents.health" => handle_agents_health(&state).await,
573 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
574 "agents.install" => handle_agents_install(&parsed, &state).await,
575 "agents.remove" => handle_agents_remove(&parsed, &state).await,
576 "agents.start" => handle_agents_start(&parsed, &state).await,
577 "agents.stop" => handle_agents_stop(&parsed, &state).await,
578 "agents.restart" => handle_agents_restart(&parsed, &state).await,
579 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
580 "agents.list_external" => handle_agents_list_external(&parsed).await,
581 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
582 "agents.health_external" => handle_agents_health_external(&parsed).await,
583 "agents.invoke_external" => {
584 handle_agents_invoke_external(&parsed, &state, &session).await
585 }
586 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
587 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
588 "message/send"
595 | "SendMessage"
596 | "message/stream"
597 | "SendStreamingMessage"
598 | "tasks/get"
599 | "GetTask"
600 | "tasks/list"
601 | "ListTasks"
602 | "tasks/cancel"
603 | "CancelTask"
604 | "tasks/resubscribe"
605 | "SubscribeToTask"
606 | "tasks/pushNotificationConfig/set"
607 | "CreateTaskPushNotificationConfig"
608 | "tasks/pushNotificationConfig/get"
609 | "GetTaskPushNotificationConfig"
610 | "tasks/pushNotificationConfig/list"
611 | "ListTaskPushNotificationConfigs"
612 | "tasks/pushNotificationConfig/delete"
613 | "DeleteTaskPushNotificationConfig"
614 | "agent/getAuthenticatedExtendedCard"
615 | "GetExtendedAgentCard" => {
616 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
617 }
618 _ => Err(format!("unknown method: {}", method_owned)),
619 };
620
621 let resp = match result {
622 Ok(value) => JsonRpcResponse::success(parsed.id, value),
623 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
624 };
625 let _ = send_response(&session.channel, resp).await;
626 });
627 }
628 } else if msg.is_binary() {
629 let bytes = msg.into_data();
636 let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
637 Ok(p) => p,
638 Err(e) => {
639 tracing::warn!("binary frame from {} rejected: {}", client_id, e);
640 continue;
641 }
642 };
643 match parsed.frame_type {
644 car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
645 let registry = state.voice_sessions.clone();
646 let payload_owned = parsed.payload.to_vec();
647 let session_id_owned = parsed.session_id_hex.clone();
648 conn_tasks.spawn(async move {
649 if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
650 &session_id_owned,
651 &payload_owned,
652 registry,
653 )
654 .await
655 {
656 tracing::warn!(
657 "binary PCM push to session {} failed: {}",
658 session_id_owned,
659 e
660 );
661 }
662 });
663 }
664 other => {
665 tracing::debug!(
666 "binary frame type {:#04x} from {} not accepted server-side",
667 other,
668 client_id
669 );
670 }
671 }
672 } else if msg.is_close() {
673 info!("Client {} disconnected", client_id);
674 break;
675 }
676 }
677
678 conn_tasks.abort_all();
683
684 session.host.unsubscribe(&client_id).await;
685 session.host.reap_session_approvals(&client_id).await;
691 state.a2ui_subscribers.lock().await.remove(&client_id);
692
693 let _removed = state.remove_session(&client_id).await;
704 {
705 let mut pending = session.channel.pending.lock().await;
706 pending.clear();
707 }
708
709 Ok(())
710}
711
712async fn send_response(
713 channel: &WsChannel,
714 resp: JsonRpcResponse,
715) -> Result<(), Box<dyn std::error::Error>> {
716 use futures::SinkExt;
717 let json = serde_json::to_string(&resp)?;
718 channel
719 .write
720 .lock()
721 .await
722 .send(Message::Text(json.into()))
723 .await?;
724 Ok(())
725}
726
727async fn handle_host_subscribe(
730 session: &crate::session::ClientSession,
731 state: &Arc<ServerState>,
732) -> Result<Value, String> {
733 session
734 .host
735 .subscribe(&session.client_id, session.channel.clone())
736 .await;
737 serde_json::to_value(HostSnapshot {
738 subscribed: true,
739 agents: session.host.agents().await,
740 approvals: session.host.approvals().await,
741 events: session.host.events(50).await,
742 identity: Some(daemon_identity(state)),
743 })
744 .map_err(|e| e.to_string())
745}
746
747fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
755 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
762 (
763 Some(p.to_string_lossy().into_owned()),
764 car_proto::HostManifestRole::Observer,
765 )
766 } else if let Some(s) = state.supervisor_if_installed() {
767 (
768 Some(s.manifest_path().to_string_lossy().into_owned()),
769 car_proto::HostManifestRole::Owner,
770 )
771 } else {
772 (None, car_proto::HostManifestRole::None)
773 };
774 car_proto::HostIdentity {
775 version: env!("CARGO_PKG_VERSION").to_string(),
776 pid: std::process::id(),
777 manifest_path,
778 manifest_role,
779 parslee: state
780 .parslee_session
781 .get()
782 .map(|session| session.identity.clone()),
783 }
784}
785
786async fn handle_parslee_auth() -> Result<Value, String> {
797 let session = crate::parslee_auth::load_or_refresh()
798 .await?
799 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
800 Ok(serde_json::json!({
801 "authenticated": true,
802 "token_type": "Bearer",
803 "access_token": session.access_token,
804 "authorization_header": format!("Bearer {}", session.access_token),
805 "identity": session.identity,
806 }))
807}
808
809async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
815 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
816 let client_id = req
817 .params
818 .get("client_id")
819 .and_then(|v| v.as_str())
820 .unwrap_or("parslee-car");
821 let redirect_uri = req
822 .params
823 .get("redirect_uri")
824 .and_then(|v| v.as_str())
825 .ok_or_else(|| "redirect_uri is required".to_string())?;
826 let provider = req.params.get("provider").and_then(|v| v.as_str());
827 let state = car_auth::new_state();
828 let verifier = car_auth::pkce_verifier();
829 let challenge = car_auth::pkce_challenge(&verifier);
830 let url =
831 car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
832 Ok(serde_json::json!({
833 "authorize_url": url,
834 "state": state,
835 "verifier": verifier,
836 }))
837}
838
839async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
840 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
841 let client_id = req
842 .params
843 .get("client_id")
844 .and_then(|v| v.as_str())
845 .unwrap_or("parslee-car");
846 let redirect_uri = req
847 .params
848 .get("redirect_uri")
849 .and_then(|v| v.as_str())
850 .ok_or_else(|| "redirect_uri is required".to_string())?;
851 let code = req
852 .params
853 .get("code")
854 .and_then(|v| v.as_str())
855 .ok_or_else(|| "code is required".to_string())?;
856 let verifier = req
857 .params
858 .get("verifier")
859 .and_then(|v| v.as_str())
860 .ok_or_else(|| "verifier is required".to_string())?;
861 let token =
862 car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
863 car_auth::store_tokens(&api_base, &token)?;
864 Ok(serde_json::json!({ "ok": true }))
865}
866
867async fn handle_auth_status() -> Result<Value, String> {
868 match car_auth::fetch_status(None).await? {
869 Some(session_json) => {
870 let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
871 Ok(serde_json::json!({ "authenticated": true, "session": session }))
872 }
873 None => Ok(serde_json::json!({ "authenticated": false })),
874 }
875}
876
877async fn handle_auth_logout() -> Result<Value, String> {
878 car_auth::clear_tokens()?;
879 Ok(serde_json::json!({ "ok": true }))
880}
881
882async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
883 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
884}
885
886async fn handle_host_events(
887 req: &JsonRpcMessage,
888 session: &crate::session::ClientSession,
889) -> Result<Value, String> {
890 let limit = req
891 .params
892 .get("limit")
893 .and_then(|v| v.as_u64())
894 .unwrap_or(100) as usize;
895 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
896}
897
898async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
899 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
900}
901
902async fn handle_a2ui_apply(
903 req: &JsonRpcMessage,
904 state: &Arc<ServerState>,
905) -> Result<Value, String> {
906 #[derive(Deserialize)]
907 struct Params {
908 #[serde(default)]
909 envelope: Option<car_a2ui::A2uiEnvelope>,
910 #[serde(default)]
911 message: Option<car_a2ui::A2uiEnvelope>,
912 }
913
914 let envelope = if req.params.get("createSurface").is_some()
915 || req.params.get("updateComponents").is_some()
916 || req.params.get("updateDataModel").is_some()
917 || req.params.get("deleteSurface").is_some()
918 {
919 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
920 .map_err(|e| e.to_string())?
921 } else {
922 match serde_json::from_value::<Params>(req.params.clone()) {
923 Ok(params) => params
924 .envelope
925 .or(params.message)
926 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
927 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
928 .map_err(|e| e.to_string())?,
929 }
930 };
931
932 apply_a2ui_envelope(state, envelope, None, None).await
933}
934
935async fn handle_a2ui_ingest(
936 req: &JsonRpcMessage,
937 state: &Arc<ServerState>,
938) -> Result<Value, String> {
939 #[derive(Deserialize)]
940 #[serde(rename_all = "camelCase")]
941 struct Params {
942 #[serde(default)]
943 endpoint: Option<String>,
944 #[serde(default)]
945 a2a_endpoint: Option<String>,
946 #[serde(default)]
947 owner: Option<car_a2ui::A2uiSurfaceOwner>,
948 #[serde(default)]
949 route_auth: Option<A2aRouteAuth>,
950 #[serde(default)]
951 allow_untrusted_endpoint: bool,
952 }
953
954 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
955 endpoint: None,
956 a2a_endpoint: None,
957 owner: None,
958 route_auth: None,
959 allow_untrusted_endpoint: false,
960 });
961 let payload = req.params.get("payload").unwrap_or(&req.params);
962 state
963 .a2ui
964 .validate_payload(payload)
965 .map_err(|e| e.to_string())?;
966 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
967 if envelopes.is_empty() {
968 return Err("no A2UI envelopes found in payload".into());
969 }
970 let endpoint = params.endpoint.or(params.a2a_endpoint);
971 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
972 let owner = params
973 .owner
974 .or_else(|| car_a2ui::owner_from_value(payload))
975 .map(|owner| match endpoint.clone() {
976 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
977 None => owner,
978 });
979
980 let mut results = Vec::new();
981 for envelope in envelopes {
982 let value =
983 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
984 results.push(value);
985 }
986 Ok(serde_json::json!({ "applied": results }))
987}
988
989async fn apply_a2ui_envelope(
990 state: &Arc<ServerState>,
991 envelope: car_a2ui::A2uiEnvelope,
992 owner: Option<car_a2ui::A2uiSurfaceOwner>,
993 route_auth: Option<A2aRouteAuth>,
994) -> Result<Value, String> {
995 let result = state
996 .a2ui
997 .apply_with_owner(envelope, owner)
998 .await
999 .map_err(|e| e.to_string())?;
1000 update_a2ui_route_auth(state, &result, route_auth).await;
1001 let kind = if result.deleted {
1002 "a2ui.surface_deleted"
1003 } else {
1004 "a2ui.surface_updated"
1005 };
1006 let message = if result.deleted {
1007 format!("A2UI surface {} deleted", result.surface_id)
1008 } else {
1009 format!("A2UI surface {} updated", result.surface_id)
1010 };
1011 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1012 state
1013 .host
1014 .record_event(kind, None, message, payload.clone())
1015 .await;
1016 broadcast_a2ui_event(state, kind, &payload).await;
1020 serde_json::to_value(result).map_err(|e| e.to_string())
1021}
1022
1023async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1024 use futures::SinkExt;
1025 use tokio_tungstenite::tungstenite::Message;
1026 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1027 .a2ui_subscribers
1028 .lock()
1029 .await
1030 .values()
1031 .cloned()
1032 .collect();
1033 if subscribers.is_empty() {
1034 return;
1035 }
1036 let Ok(json) = serde_json::to_string(&serde_json::json!({
1037 "jsonrpc": "2.0",
1038 "method": "a2ui.event",
1039 "params": {
1040 "kind": kind,
1041 "result": result,
1042 },
1043 })) else {
1044 return;
1045 };
1046 for channel in subscribers {
1047 let _ = channel
1048 .write
1049 .lock()
1050 .await
1051 .send(Message::Text(json.clone().into()))
1052 .await;
1053 }
1054}
1055
1056async fn update_a2ui_route_auth(
1057 state: &Arc<ServerState>,
1058 result: &car_a2ui::A2uiApplyResult,
1059 route_auth: Option<A2aRouteAuth>,
1060) {
1061 let mut auth = state.a2ui_route_auth.lock().await;
1062 if result.deleted {
1063 auth.remove(&result.surface_id);
1064 return;
1065 }
1066
1067 let has_route_endpoint = result
1068 .surface
1069 .as_ref()
1070 .and_then(|surface| surface.owner.as_ref())
1071 .and_then(|owner| owner.endpoint.as_ref())
1072 .is_some();
1073 match (has_route_endpoint, route_auth) {
1074 (true, Some(route_auth)) => {
1075 auth.insert(result.surface_id.clone(), route_auth);
1076 }
1077 _ => {
1078 auth.remove(&result.surface_id);
1079 }
1080 }
1081}
1082
1083fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1084 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1085}
1086
1087async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1088 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1089 if !removed.is_empty() {
1090 let mut auth = state.a2ui_route_auth.lock().await;
1091 for surface_id in &removed {
1092 auth.remove(surface_id);
1093 }
1094 }
1095 Ok(serde_json::json!({ "removed": removed }))
1096}
1097
1098async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1099 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1100}
1101
1102async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1103 let surface_id = req
1104 .params
1105 .get("surface_id")
1106 .or_else(|| req.params.get("surfaceId"))
1107 .and_then(Value::as_str)
1108 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1109 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1110}
1111
1112async fn handle_a2ui_subscribe(
1118 session: &crate::session::ClientSession,
1119 state: &Arc<ServerState>,
1120) -> Result<Value, String> {
1121 state
1122 .a2ui_subscribers
1123 .lock()
1124 .await
1125 .insert(session.client_id.clone(), session.channel.clone());
1126 Ok(serde_json::json!({ "subscribed": true }))
1127}
1128
1129async fn handle_a2ui_unsubscribe(
1133 session: &crate::session::ClientSession,
1134 state: &Arc<ServerState>,
1135) -> Result<Value, String> {
1136 state
1137 .a2ui_subscribers
1138 .lock()
1139 .await
1140 .remove(&session.client_id);
1141 Ok(serde_json::json!({ "subscribed": false }))
1142}
1143
1144async fn handle_a2ui_replay(
1151 req: &JsonRpcMessage,
1152 state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154 let surface_id = req
1155 .params
1156 .get("surface_id")
1157 .or_else(|| req.params.get("surfaceId"))
1158 .and_then(Value::as_str)
1159 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1160 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1161}
1162
1163async fn handle_a2ui_action(
1164 req: &JsonRpcMessage,
1165 state: &Arc<ServerState>,
1166) -> Result<Value, String> {
1167 let action: car_a2ui::ClientAction =
1168 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1169 let owner = state.a2ui.owner(&action.surface_id).await;
1170 let route = route_a2ui_action(state, &action, owner.clone()).await;
1171 let payload = serde_json::json!({
1172 "action": action,
1173 "owner": owner,
1174 "route": route,
1175 });
1176 let event = state
1177 .host
1178 .record_event(
1179 "a2ui.action",
1180 None,
1181 format!(
1182 "A2UI action {} from {}",
1183 action.name, action.source_component_id
1184 ),
1185 payload,
1186 )
1187 .await;
1188 Ok(serde_json::json!({
1189 "event": event,
1190 "route": route,
1191 }))
1192}
1193
1194async fn handle_a2ui_render_report(
1201 req: &JsonRpcMessage,
1202 state: &Arc<ServerState>,
1203) -> Result<Value, String> {
1204 let report: car_a2ui::RenderReport =
1208 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1209 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1210 let kind = "a2ui.render_report";
1211 let message = format!("A2UI render report for surface {}", report.surface_id);
1212 let event = state
1213 .host
1214 .record_event(kind, None, message, payload.clone())
1215 .await;
1216 broadcast_a2ui_event(state, kind, &payload).await;
1217
1218 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1226 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1232 tracing::warn!(
1233 surface_id = %report.surface_id,
1234 count = state.ui_agent_budget.count(&report.surface_id),
1235 max = state.ui_agent_budget.max(),
1236 "ui-agent iteration budget exhausted; skipping agent invocation"
1237 );
1238 return Ok(serde_json::json!({ "event": event }));
1239 }
1240 match state.ui_agent.on_render_report(&report, &surface) {
1244 car_ui_agent::Decision::Patch {
1245 envelope,
1246 strategy_id,
1247 patch_hash,
1248 elapsed_ns,
1249 } => {
1250 if !state
1258 .ui_agent_oscillation
1259 .check_and_record(&report.surface_id, patch_hash)
1260 {
1261 tracing::warn!(
1262 surface_id = %report.surface_id,
1263 strategy = %strategy_id,
1264 patch_hash,
1265 "ui-agent oscillation detected; suppressing patch"
1266 );
1267 state.ui_agent_budget.refund(&report.surface_id);
1270 return Ok(serde_json::json!({ "event": event }));
1271 }
1272 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1273 patch_components: Some(envelope),
1274 ..Default::default()
1275 };
1276 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1277 tracing::warn!(
1278 surface_id = %report.surface_id,
1279 strategy = %strategy_id,
1280 patch_hash,
1281 elapsed_ns,
1282 error = %e,
1283 "ui-agent patch apply failed",
1284 );
1285 state.ui_agent_budget.refund(&report.surface_id);
1287 } else {
1288 tracing::debug!(
1289 surface_id = %report.surface_id,
1290 strategy = %strategy_id,
1291 patch_hash,
1292 elapsed_ns,
1293 iteration = state.ui_agent_budget.count(&report.surface_id),
1294 "ui-agent patch applied",
1295 );
1296 if let Some(memgine) = state.shared_memgine.clone() {
1306 let speaker = format!("ui-agent/{}", report.surface_id);
1307 let text = format!("strategy applied: {}", strategy_id);
1308 tokio::spawn(async move {
1309 let mut guard = memgine.lock().await;
1310 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1311 });
1312 }
1313 }
1314 }
1315 car_ui_agent::Decision::StableNoChange => {
1316 state.ui_agent_budget.refund(&report.surface_id);
1318 }
1319 car_ui_agent::Decision::HardStop { reason } => {
1320 state.ui_agent_budget.refund(&report.surface_id);
1321 tracing::error!(
1327 surface_id = %report.surface_id,
1328 reason = %reason,
1329 "ui-agent hard-stopped improvement loop",
1330 );
1331 }
1332 }
1333 } else {
1334 tracing::debug!(
1335 surface_id = %report.surface_id,
1336 "ui-agent skipped — surface not found in store",
1337 );
1338 }
1339
1340 Ok(serde_json::json!({ "event": event }))
1341}
1342
1343async fn route_a2ui_action(
1344 state: &Arc<ServerState>,
1345 action: &car_a2ui::ClientAction,
1346 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1347) -> Value {
1348 let Some(owner) = owner else {
1349 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1350 };
1351 if owner.kind != "a2a" {
1352 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1353 }
1354 let Some(endpoint) = owner.endpoint.clone() else {
1355 return serde_json::json!({
1356 "delivered": false,
1357 "reason": "surface owner has no endpoint",
1358 "owner": owner
1359 });
1360 };
1361
1362 let message = car_a2a::Message {
1363 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1364 role: car_a2a::MessageRole::User,
1365 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1366 data: serde_json::json!({
1367 "a2uiAction": action,
1368 }),
1369 metadata: Default::default(),
1370 })],
1371 task_id: owner.task_id.clone(),
1372 context_id: owner.context_id.clone(),
1373 metadata: Default::default(),
1374 };
1375
1376 let auth = state
1377 .a2ui_route_auth
1378 .lock()
1379 .await
1380 .get(&action.surface_id)
1381 .cloned()
1382 .map(client_auth_from_route_auth)
1383 .unwrap_or(car_a2a::ClientAuth::None);
1384
1385 match car_a2a::A2aClient::new(endpoint.clone())
1386 .with_auth(auth)
1387 .send_message(message, false)
1388 .await
1389 {
1390 Ok(result) => serde_json::json!({
1391 "delivered": true,
1392 "owner": owner,
1393 "endpoint": endpoint,
1394 "result": result,
1395 }),
1396 Err(error) => serde_json::json!({
1397 "delivered": false,
1398 "owner": owner,
1399 "endpoint": endpoint,
1400 "error": error.to_string(),
1401 }),
1402 }
1403}
1404
1405fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1406 match auth {
1407 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1408 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1409 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1410 }
1411}
1412
1413fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1414 let endpoint = endpoint?;
1415 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1416 Some(endpoint)
1417 } else {
1418 None
1419 }
1420}
1421
1422fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1423 endpoint == "http://localhost"
1424 || endpoint.starts_with("http://localhost:")
1425 || endpoint.starts_with("http://localhost/")
1426 || endpoint == "http://127.0.0.1"
1427 || endpoint.starts_with("http://127.0.0.1:")
1428 || endpoint.starts_with("http://127.0.0.1/")
1429 || endpoint == "http://[::1]"
1430 || endpoint.starts_with("http://[::1]:")
1431 || endpoint.starts_with("http://[::1]/")
1432}
1433
1434async fn handle_host_register_agent(
1435 req: &JsonRpcMessage,
1436 session: &crate::session::ClientSession,
1437) -> Result<Value, String> {
1438 let request: RegisterHostAgentRequest =
1439 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1440 serde_json::to_value(
1441 session
1442 .host
1443 .register_agent(&session.client_id, request)
1444 .await?,
1445 )
1446 .map_err(|e| e.to_string())
1447}
1448
1449async fn handle_host_unregister_agent(
1450 req: &JsonRpcMessage,
1451 session: &crate::session::ClientSession,
1452) -> Result<Value, String> {
1453 let agent_id = req
1454 .params
1455 .get("agent_id")
1456 .and_then(|v| v.as_str())
1457 .ok_or("missing agent_id")?;
1458 session
1459 .host
1460 .unregister_agent(&session.client_id, agent_id)
1461 .await?;
1462 Ok(serde_json::json!({"ok": true}))
1463}
1464
1465async fn handle_host_set_status(
1466 req: &JsonRpcMessage,
1467 session: &crate::session::ClientSession,
1468) -> Result<Value, String> {
1469 let request: SetHostAgentStatusRequest =
1470 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1471 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1472 .map_err(|e| e.to_string())
1473}
1474
1475async fn handle_host_notify(
1476 req: &JsonRpcMessage,
1477 session: &crate::session::ClientSession,
1478) -> Result<Value, String> {
1479 let kind = req
1480 .params
1481 .get("kind")
1482 .and_then(|v| v.as_str())
1483 .unwrap_or("host.notification");
1484 let agent_id = req
1485 .params
1486 .get("agent_id")
1487 .and_then(|v| v.as_str())
1488 .map(str::to_string);
1489 let message = req
1490 .params
1491 .get("message")
1492 .and_then(|v| v.as_str())
1493 .unwrap_or("");
1494 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1495 serde_json::to_value(
1496 session
1497 .host
1498 .record_event(kind, agent_id, message, payload)
1499 .await,
1500 )
1501 .map_err(|e| e.to_string())
1502}
1503
1504async fn handle_host_request_approval(
1505 req: &JsonRpcMessage,
1506 session: &crate::session::ClientSession,
1507) -> Result<Value, String> {
1508 let request: CreateHostApprovalRequest =
1509 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1510 if let Some(agent_id) = &request.agent_id {
1511 let _ = session
1516 .host
1517 .set_status(
1518 &session.client_id,
1519 SetHostAgentStatusRequest {
1520 agent_id: agent_id.clone(),
1521 status: HostAgentStatus::WaitingForApproval,
1522 current_task: None,
1523 message: Some("Waiting for approval".to_string()),
1524 payload: Value::Null,
1525 },
1526 )
1527 .await;
1528 }
1529 let owner_client_id = if request.system_level {
1536 None
1537 } else {
1538 Some(session.client_id.as_str())
1539 };
1540 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1541 .map_err(|e| e.to_string())
1542}
1543
1544async fn handle_host_resolve_approval(
1545 req: &JsonRpcMessage,
1546 session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548 let request: ResolveHostApprovalRequest =
1549 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1550 serde_json::to_value(
1551 session
1552 .host
1553 .resolve_approval(&session.client_id, request)
1554 .await?,
1555 )
1556 .map_err(|e| e.to_string())
1557}
1558
1559async fn handle_session_auth(
1570 req: &JsonRpcMessage,
1571 session: &crate::session::ClientSession,
1572 state: &Arc<ServerState>,
1573) -> Result<Value, String> {
1574 let supplied = req
1575 .params
1576 .get("token")
1577 .and_then(Value::as_str)
1578 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1579 let agent_id = req
1586 .params
1587 .get("agent_id")
1588 .and_then(Value::as_str)
1589 .map(str::to_string);
1590
1591 if let Some(id) = agent_id {
1592 let supervisor = state.supervisor()?;
1593 if !supervisor.validate_agent_token(&id, supplied).await {
1594 return Err(format!(
1595 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1596 ));
1597 }
1598 {
1602 let mut attached = state.attached_agents.lock().await;
1603 if let Some(prior) = attached.get(&id) {
1604 if prior != &session.client_id {
1605 return Err(format!(
1606 "auth failed: agent_id `{id}` is already attached on \
1607 another connection (client_id={prior})"
1608 ));
1609 }
1610 }
1611 attached.insert(id.clone(), session.client_id.clone());
1612 }
1613 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1618 *session.bound_memgine.lock().await = Some(agent_eng);
1619 *session.agent_id.lock().await = Some(id.clone());
1620 session
1621 .authenticated
1622 .store(true, std::sync::atomic::Ordering::Release);
1623 return Ok(serde_json::json!({
1624 "ok": true,
1625 "auth_enabled": true,
1626 "agent_id": id,
1627 }));
1628 }
1629
1630 let expected = match state.auth_token.get() {
1631 Some(t) => t,
1632 None => {
1633 session
1639 .authenticated
1640 .store(true, std::sync::atomic::Ordering::Release);
1641 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1642 }
1643 };
1644 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1645 return Err("auth failed: token mismatch".to_string());
1646 }
1647 session
1648 .authenticated
1649 .store(true, std::sync::atomic::Ordering::Release);
1650 Ok(serde_json::json!({
1651 "ok": true,
1652 "auth_enabled": true,
1653 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1654 }))
1655}
1656
1657fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1661 if a.len() != b.len() {
1662 return false;
1663 }
1664 let mut diff: u8 = 0;
1665 for (x, y) in a.iter().zip(b.iter()) {
1666 diff |= x ^ y;
1667 }
1668 diff == 0
1669}
1670
1671async fn gate_high_risk_method(
1681 method: &str,
1682 params: &Value,
1683 state: &Arc<ServerState>,
1684) -> Result<(), String> {
1685 let timeout = state.approval_gate.timeout;
1686 let req = CreateHostApprovalRequest {
1687 agent_id: None,
1688 action: format!("ws.method:{method}"),
1689 details: serde_json::json!({
1690 "method": method,
1691 "params_preview": preview_params(params, 2_000),
1695 }),
1696 options: vec!["approve".to_string(), "deny".to_string()],
1697 system_level: true,
1701 };
1702 match state
1703 .host
1704 .request_and_wait_approval(req, "approve", timeout)
1705 .await
1706 {
1707 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1708 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1709 "{method} denied by user (approval gate, audit 2026-05). \
1710 To call this method without an interactive prompt, start \
1711 car-server with --no-approvals on a trusted machine."
1712 )),
1713 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1714 "{method} approval timed out after {}s with no resolution. \
1715 The approval is still visible in `host.approvals` for \
1716 forensics; resubmit the request to retry.",
1717 timeout.as_secs()
1718 )),
1719 Err(e) => Err(format!("approval gate error: {e}")),
1720 }
1721}
1722
1723fn preview_params(value: &Value, max_chars: usize) -> Value {
1724 let s = value.to_string();
1725 if s.len() <= max_chars {
1726 value.clone()
1727 } else {
1728 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1729 }
1730}
1731
1732async fn handle_session_init(
1733 req: &JsonRpcMessage,
1734 session: &crate::session::ClientSession,
1735) -> Result<Value, String> {
1736 let init: SessionInitRequest =
1737 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1738
1739 for tool in &init.tools {
1740 register_from_definition(&session.runtime, tool).await;
1741 }
1742
1743 let mut policy_count = 0;
1744 {
1745 let mut policies = session.runtime.policies.write().await;
1746 for policy_def in &init.policies {
1747 if let Some(check) = build_policy_check(policy_def) {
1748 policies.register(&policy_def.name, check, "");
1749 policy_count += 1;
1750 }
1751 }
1752 }
1753
1754 serde_json::to_value(SessionInitResponse {
1755 session_id: session.client_id.clone(),
1756 tools_registered: init.tools.len(),
1757 policies_registered: policy_count,
1758 })
1759 .map_err(|e| e.to_string())
1760}
1761
1762fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1763 match def.rule.as_str() {
1764 "deny_tool" => {
1765 let target = def.target.clone();
1766 Some(Box::new(
1767 move |action: &car_ir::Action, _: &car_state::StateStore| {
1768 if action.tool.as_deref() == Some(&target) {
1769 Some(format!("tool '{}' denied", target))
1770 } else {
1771 None
1772 }
1773 },
1774 ))
1775 }
1776 "require_state" => {
1777 let key = def.key.clone();
1778 let value = def.value.clone();
1779 Some(Box::new(
1780 move |_: &car_ir::Action, state: &car_state::StateStore| {
1781 if state.get(&key).as_ref() != Some(&value) {
1782 Some(format!("state['{}'] must be {:?}", key, value))
1783 } else {
1784 None
1785 }
1786 },
1787 ))
1788 }
1789 "deny_tool_param" => {
1790 let target = def.target.clone();
1791 let param = def.key.clone();
1792 let pattern = def.pattern.clone();
1793 Some(Box::new(
1794 move |action: &car_ir::Action, _: &car_state::StateStore| {
1795 if action.tool.as_deref() != Some(&target) {
1796 return None;
1797 }
1798 if let Some(val) = action.parameters.get(¶m) {
1799 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1800 if s.contains(&pattern) {
1801 return Some(format!("param '{}' matches '{}'", param, pattern));
1802 }
1803 }
1804 None
1805 },
1806 ))
1807 }
1808 _ => None,
1809 }
1810}
1811
1812async fn handle_tools_register(
1813 req: &JsonRpcMessage,
1814 session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816 let tools: Vec<ToolDefinition> =
1817 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818 for tool in &tools {
1819 register_from_definition(&session.runtime, tool).await;
1820 }
1821 Ok(Value::from(tools.len()))
1822}
1823
1824async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1831 runtime
1832 .register_tool_schema(car_ir::ToolSchema {
1833 name: def.name.clone(),
1834 description: def.description.clone(),
1835 parameters: def.parameters.clone(),
1836 returns: def.returns.clone(),
1837 idempotent: def.idempotent,
1838 cache_ttl_secs: def.cache_ttl_secs,
1839 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1840 max_calls: rl.max_calls,
1841 interval_secs: rl.interval_secs,
1842 }),
1843 })
1844 .await;
1845}
1846
1847async fn handle_proposal_submit(
1848 req: &JsonRpcMessage,
1849 session: &crate::session::ClientSession,
1850) -> Result<Value, String> {
1851 let submit: ProposalSubmitRequest =
1852 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1853 let session_id = req
1859 .params
1860 .get("session_id")
1861 .and_then(|v| v.as_str())
1862 .map(str::to_string);
1863
1864 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1873 Some(v) if !v.is_null() => {
1874 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1875 }
1876 _ => None,
1877 };
1878
1879 let result = match (session_id, scope) {
1880 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1885 (Some(sid), None) => {
1886 session
1887 .runtime
1888 .execute_with_session(&submit.proposal, &sid)
1889 .await
1890 }
1891 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1892 (None, None) => session.runtime.execute(&submit.proposal).await,
1893 };
1894 serde_json::to_value(result).map_err(|e| e.to_string())
1895}
1896
1897async fn handle_session_policy_open(
1898 session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900 let id = session.runtime.open_session().await;
1901 Ok(serde_json::json!({ "session_id": id }))
1902}
1903
1904async fn handle_session_policy_close(
1905 req: &JsonRpcMessage,
1906 session: &crate::session::ClientSession,
1907) -> Result<Value, String> {
1908 let sid = req
1909 .params
1910 .get("session_id")
1911 .and_then(|v| v.as_str())
1912 .ok_or("missing 'session_id'")?;
1913 let closed = session.runtime.close_session(sid).await;
1914 Ok(serde_json::json!({ "closed": closed }))
1915}
1916
1917async fn handle_policy_register(
1923 req: &JsonRpcMessage,
1924 session: &crate::session::ClientSession,
1925) -> Result<Value, String> {
1926 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1927 .map_err(|e| format!("invalid policy params: {e}"))?;
1928 let session_id = req
1929 .params
1930 .get("session_id")
1931 .and_then(|v| v.as_str())
1932 .map(str::to_string);
1933 let check = build_policy_check(&def)
1934 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1935 match session_id {
1936 Some(sid) => session
1937 .runtime
1938 .register_policy_in_session(&sid, &def.name, check, "")
1939 .await
1940 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1941 None => {
1942 let mut policies = session.runtime.policies.write().await;
1943 policies.register(&def.name, check, "");
1944 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1945 }
1946 }
1947}
1948
1949async fn handle_verify(
1950 req: &JsonRpcMessage,
1951 session: &crate::session::ClientSession,
1952) -> Result<Value, String> {
1953 let vr: VerifyRequest =
1954 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1955 let tools: std::collections::HashSet<String> =
1956 session.runtime.tools.read().await.keys().cloned().collect();
1957 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1958 serde_json::to_value(VerifyResponse {
1959 valid: result.valid,
1960 issues: result
1961 .issues
1962 .iter()
1963 .map(|i| VerifyIssueProto {
1964 action_id: i.action_id.clone(),
1965 severity: i.severity.clone(),
1966 message: i.message.clone(),
1967 })
1968 .collect(),
1969 simulated_state: result.simulated_state,
1970 })
1971 .map_err(|e| e.to_string())
1972}
1973
1974fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1981 req.params
1982 .get("tenant_id")
1983 .and_then(|v| v.as_str())
1984 .filter(|s| !s.is_empty())
1985 .map(str::to_string)
1986}
1987
1988async fn handle_state_get(
1989 req: &JsonRpcMessage,
1990 session: &crate::session::ClientSession,
1991) -> Result<Value, String> {
1992 let key = req
1993 .params
1994 .get("key")
1995 .and_then(|v| v.as_str())
1996 .ok_or("missing 'key'")?;
1997 let tenant = tenant_from_params(req);
1998 Ok(session
1999 .runtime
2000 .state
2001 .scoped(tenant.as_deref())
2002 .get(key)
2003 .unwrap_or(Value::Null))
2004}
2005
2006async fn handle_state_set(
2007 req: &JsonRpcMessage,
2008 session: &crate::session::ClientSession,
2009) -> Result<Value, String> {
2010 let key = req
2011 .params
2012 .get("key")
2013 .and_then(|v| v.as_str())
2014 .ok_or("missing 'key'")?;
2015 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2016 let tenant = tenant_from_params(req);
2017 session
2018 .runtime
2019 .state
2020 .scoped(tenant.as_deref())
2021 .set(key, value, "client");
2022 Ok(Value::from("ok"))
2023}
2024
2025async fn handle_state_exists(
2029 req: &JsonRpcMessage,
2030 session: &crate::session::ClientSession,
2031) -> Result<Value, String> {
2032 let key = req
2033 .params
2034 .get("key")
2035 .and_then(|v| v.as_str())
2036 .ok_or("missing 'key'")?;
2037 let tenant = tenant_from_params(req);
2038 Ok(Value::Bool(
2039 session.runtime.state.scoped(tenant.as_deref()).exists(key),
2040 ))
2041}
2042
2043async fn handle_state_keys(
2046 req: &JsonRpcMessage,
2047 session: &crate::session::ClientSession,
2048) -> Result<Value, String> {
2049 let tenant = tenant_from_params(req);
2050 Ok(Value::Array(
2051 session
2052 .runtime
2053 .state
2054 .scoped(tenant.as_deref())
2055 .keys()
2056 .into_iter()
2057 .map(Value::String)
2058 .collect(),
2059 ))
2060}
2061
2062async fn handle_state_snapshot(
2073 req: &JsonRpcMessage,
2074 session: &crate::session::ClientSession,
2075) -> Result<Value, String> {
2076 let tenant = tenant_from_params(req);
2077 let view = session.runtime.state.scoped(tenant.as_deref());
2078 let mut map = serde_json::Map::new();
2079 for key in view.keys() {
2080 if let Some(value) = view.get(&key) {
2081 map.insert(key, value);
2082 }
2083 }
2084 Ok(Value::Object(map))
2085}
2086
2087fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2093 let base = car_ffi_common::memory_path::ensure_base()
2094 .map_err(|e| format!("memory base unavailable: {e}"))?;
2095 let dir = base.join("agents");
2096 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2097 Ok(dir.join(format!("{agent_id}.json")))
2098}
2099
2100async fn get_or_load_agent_memgine(
2107 state: &Arc<ServerState>,
2108 agent_id: &str,
2109) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2110 {
2111 let map = state.agent_memgines.lock().await;
2112 if let Some(eng) = map.get(agent_id) {
2113 return Ok(eng.clone());
2114 }
2115 }
2116 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2118 None,
2119 )));
2120 let path = agent_memgine_snapshot_path(agent_id)?;
2121 if path.exists() {
2122 let content = std::fs::read_to_string(&path)
2123 .map_err(|e| format!("read {}: {}", path.display(), e))?;
2124 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2125 let mut g = engine.lock().await;
2126 let mut loaded: u32 = 0;
2127 for fact in &facts {
2128 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2129 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2130 let kind = fact
2131 .get("kind")
2132 .and_then(|v| v.as_str())
2133 .unwrap_or("pattern");
2134 let fid = format!("loaded-{loaded}");
2135 g.ingest_fact(
2136 &fid,
2137 subject,
2138 body,
2139 "user",
2140 "peer",
2141 chrono::Utc::now(),
2142 "global",
2143 None,
2144 vec![],
2145 kind == "constraint",
2146 );
2147 loaded += 1;
2148 }
2149 }
2150 let mut map = state.agent_memgines.lock().await;
2151 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2152 Ok(stored)
2153}
2154
2155async fn persist_agent_memgine(
2159 agent_id: &str,
2160 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2161) -> Result<(), String> {
2162 let path = agent_memgine_snapshot_path(agent_id)?;
2163 let g = engine.lock().await;
2164 let facts: Vec<Value> = g
2165 .graph
2166 .inner
2167 .node_indices()
2168 .filter_map(|nix| {
2169 let node = g.graph.inner.node_weight(nix)?;
2170 if !node.is_valid() {
2171 return None;
2172 }
2173 if node.kind == car_memgine::MemKind::Identity
2174 || node.kind == car_memgine::MemKind::Environment
2175 {
2176 return None;
2177 }
2178 Some(serde_json::json!({
2179 "subject": node.key,
2180 "body": node.value,
2181 "kind": match node.kind {
2182 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2183 car_memgine::MemKind::Conversation => "outcome",
2184 _ => "pattern",
2185 },
2186 "confidence": 0.5,
2187 "content_type": node.content_type.as_label(),
2188 }))
2189 })
2190 .collect();
2191 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2192 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2193 Ok(())
2194}
2195
2196async fn handle_memory_fact_count(
2203 session: &crate::session::ClientSession,
2204) -> Result<Value, String> {
2205 let engine_arc = session.effective_memgine().await;
2206 let engine = engine_arc.lock().await;
2207 Ok(Value::from(engine.valid_fact_count()))
2208}
2209
2210async fn handle_memory_add_fact(
2211 req: &JsonRpcMessage,
2212 session: &crate::session::ClientSession,
2213) -> Result<Value, String> {
2214 let subject = req
2215 .params
2216 .get("subject")
2217 .and_then(|v| v.as_str())
2218 .ok_or("missing subject")?;
2219 let body = req
2220 .params
2221 .get("body")
2222 .and_then(|v| v.as_str())
2223 .ok_or("missing body")?;
2224 let kind = req
2225 .params
2226 .get("kind")
2227 .and_then(|v| v.as_str())
2228 .unwrap_or("pattern");
2229 let engine_arc = session.effective_memgine().await;
2233 let count = {
2234 let mut engine = engine_arc.lock().await;
2235 let fid = format!("ws-{}", engine.valid_fact_count());
2236 engine.ingest_fact(
2237 &fid,
2238 subject,
2239 body,
2240 "user",
2241 "peer",
2242 chrono::Utc::now(),
2243 "global",
2244 None,
2245 vec![],
2246 kind == "constraint",
2247 );
2248 engine.valid_fact_count()
2249 };
2250 if let Some(id) = session.agent_id.lock().await.clone() {
2253 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2254 tracing::warn!(agent_id = %id, error = %e,
2255 "agent memgine persist failed; in-memory state is canonical");
2256 }
2257 }
2258 Ok(Value::from(count))
2259}
2260
2261async fn handle_memory_query(
2262 req: &JsonRpcMessage,
2263 session: &crate::session::ClientSession,
2264) -> Result<Value, String> {
2265 let query = req
2266 .params
2267 .get("query")
2268 .and_then(|v| v.as_str())
2269 .ok_or("missing query")?;
2270 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2271 let engine_arc = session.effective_memgine().await;
2272 let engine = engine_arc.lock().await;
2273 let seeds = engine.graph.find_seeds(query, 5);
2274 let hits = if !seeds.is_empty() {
2279 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2280 } else {
2281 vec![]
2282 };
2283 let results: Vec<Value> = hits
2284 .iter()
2285 .filter_map(|hit| {
2286 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2287 Some(serde_json::json!({
2288 "subject": node.key,
2289 "body": node.value,
2290 "kind": format!("{:?}", node.kind).to_lowercase(),
2291 "confidence": hit.activation,
2292 }))
2293 })
2294 .collect();
2295 serde_json::to_value(results).map_err(|e| e.to_string())
2296}
2297
2298async fn handle_memory_build_context(
2299 req: &JsonRpcMessage,
2300 session: &crate::session::ClientSession,
2301) -> Result<Value, String> {
2302 let query = req
2303 .params
2304 .get("query")
2305 .and_then(|v| v.as_str())
2306 .unwrap_or("");
2307 let model_context_window = req
2311 .params
2312 .get("model_context_window")
2313 .and_then(|v| v.as_u64())
2314 .map(|w| w as usize);
2315 let mut engine = session.memgine.lock().await;
2316 Ok(Value::from(
2317 engine.build_context_for_model(query, model_context_window),
2318 ))
2319}
2320
2321async fn handle_memory_build_context_fast(
2327 req: &JsonRpcMessage,
2328 session: &crate::session::ClientSession,
2329) -> Result<Value, String> {
2330 let query = req
2331 .params
2332 .get("query")
2333 .and_then(|v| v.as_str())
2334 .unwrap_or("");
2335 let model_context_window = req
2336 .params
2337 .get("model_context_window")
2338 .and_then(|v| v.as_u64())
2339 .map(|w| w as usize);
2340 let mut engine = session.memgine.lock().await;
2341 Ok(Value::from(engine.build_context_with_options(
2342 query,
2343 model_context_window,
2344 car_memgine::ContextMode::Fast,
2345 None,
2346 )))
2347}
2348
2349async fn handle_memory_persist(
2365 req: &JsonRpcMessage,
2366 session: &crate::session::ClientSession,
2367) -> Result<Value, String> {
2368 let path = req
2369 .params
2370 .get("path")
2371 .and_then(|v| v.as_str())
2372 .ok_or("missing path")?;
2373 let resolved = car_ffi_common::memory_path::resolve(path)
2374 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2375 let engine = session.memgine.lock().await;
2376 let facts: Vec<Value> = engine
2377 .graph
2378 .inner
2379 .node_indices()
2380 .filter_map(|nix| {
2381 let node = engine.graph.inner.node_weight(nix)?;
2382 if !node.is_valid() {
2383 return None;
2384 }
2385 if node.kind == car_memgine::MemKind::Identity
2386 || node.kind == car_memgine::MemKind::Environment
2387 {
2388 return None;
2389 }
2390 Some(serde_json::json!({
2391 "subject": node.key,
2392 "body": node.value,
2393 "kind": match node.kind {
2394 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2395 car_memgine::MemKind::Conversation => "outcome",
2396 _ => "pattern",
2397 },
2398 "confidence": 0.5,
2399 "content_type": node.content_type.as_label(),
2400 }))
2401 })
2402 .collect();
2403 let count = facts.len();
2404 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2405 std::fs::write(&resolved, json)
2406 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2407 Ok(Value::from(count as u64))
2408}
2409
2410async fn handle_memory_load(
2416 req: &JsonRpcMessage,
2417 session: &crate::session::ClientSession,
2418) -> Result<Value, String> {
2419 let path = req
2420 .params
2421 .get("path")
2422 .and_then(|v| v.as_str())
2423 .ok_or("missing path")?;
2424 let resolved = car_ffi_common::memory_path::resolve(path)
2425 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2426 let content = std::fs::read_to_string(&resolved)
2427 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2428 let facts: Vec<Value> =
2429 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2430 let mut engine = session.memgine.lock().await;
2431 engine.reset();
2432 let mut count: u32 = 0;
2433 for fact in &facts {
2434 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2435 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2436 let kind = fact
2437 .get("kind")
2438 .and_then(|v| v.as_str())
2439 .unwrap_or("pattern");
2440 let fid = format!("loaded-{}", count);
2441 engine.ingest_fact(
2442 &fid,
2443 subject,
2444 body,
2445 "user",
2446 "peer",
2447 chrono::Utc::now(),
2448 "global",
2449 None,
2450 vec![],
2451 kind == "constraint",
2452 );
2453 count += 1;
2454 }
2455 Ok(Value::from(count))
2456}
2457
2458async fn handle_skill_ingest(
2461 req: &JsonRpcMessage,
2462 session: &crate::session::ClientSession,
2463) -> Result<Value, String> {
2464 let name = req
2465 .params
2466 .get("name")
2467 .and_then(|v| v.as_str())
2468 .ok_or("missing name")?;
2469 let code = req
2470 .params
2471 .get("code")
2472 .and_then(|v| v.as_str())
2473 .ok_or("missing code")?;
2474 let platform = req
2475 .params
2476 .get("platform")
2477 .and_then(|v| v.as_str())
2478 .unwrap_or("unknown");
2479 let persona = req
2480 .params
2481 .get("persona")
2482 .and_then(|v| v.as_str())
2483 .unwrap_or("");
2484 let url_pattern = req
2485 .params
2486 .get("url_pattern")
2487 .and_then(|v| v.as_str())
2488 .unwrap_or("");
2489 let description = req
2490 .params
2491 .get("description")
2492 .and_then(|v| v.as_str())
2493 .unwrap_or("");
2494 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2495 let keywords: Vec<String> = req
2496 .params
2497 .get("task_keywords")
2498 .and_then(|v| v.as_array())
2499 .map(|arr| {
2500 arr.iter()
2501 .filter_map(|v| v.as_str().map(String::from))
2502 .collect()
2503 })
2504 .unwrap_or_default();
2505
2506 let trigger = car_memgine::SkillTrigger {
2507 persona: persona.into(),
2508 url_pattern: url_pattern.into(),
2509 task_keywords: keywords,
2510 structured: None,
2511 };
2512 let mut engine = session.memgine.lock().await;
2513 let node = engine.ingest_skill(
2514 name,
2515 code,
2516 platform,
2517 trigger,
2518 description,
2519 supersedes,
2520 vec![],
2521 vec![],
2522 );
2523 Ok(Value::from(node.index() as u64))
2524}
2525
2526async fn handle_skill_find(
2527 req: &JsonRpcMessage,
2528 session: &crate::session::ClientSession,
2529) -> Result<Value, String> {
2530 let persona = req
2531 .params
2532 .get("persona")
2533 .and_then(|v| v.as_str())
2534 .unwrap_or("");
2535 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2536 let task = req
2537 .params
2538 .get("task")
2539 .and_then(|v| v.as_str())
2540 .unwrap_or("");
2541 let max = req
2542 .params
2543 .get("max_results")
2544 .and_then(|v| v.as_u64())
2545 .unwrap_or(1) as usize;
2546 let engine = session.memgine.lock().await;
2547 let results = engine.find_skill(persona, url, task, max);
2548 let json: Vec<Value> = results
2549 .iter()
2550 .map(|(m, s)| {
2551 serde_json::json!({
2552 "name": m.name, "code": m.code, "platform": m.platform,
2553 "description": m.description, "stats": m.stats, "match_score": s,
2554 })
2555 })
2556 .collect();
2557 serde_json::to_value(json).map_err(|e| e.to_string())
2558}
2559
2560async fn handle_skill_report(
2561 req: &JsonRpcMessage,
2562 session: &crate::session::ClientSession,
2563) -> Result<Value, String> {
2564 let name = req
2565 .params
2566 .get("skill_name")
2567 .and_then(|v| v.as_str())
2568 .ok_or("missing skill_name")?;
2569 let outcome_str = req
2570 .params
2571 .get("outcome")
2572 .and_then(|v| v.as_str())
2573 .ok_or("missing outcome")?;
2574 let outcome = match outcome_str {
2575 "success" => car_memgine::SkillOutcome::Success,
2576 _ => car_memgine::SkillOutcome::Fail,
2577 };
2578 let mut engine = session.memgine.lock().await;
2579 let stats = engine
2580 .report_outcome(name, outcome)
2581 .ok_or(format!("skill '{}' not found", name))?;
2582 serde_json::to_value(stats).map_err(|e| e.to_string())
2583}
2584
2585struct WsAgentRunner {
2594 channel: Arc<WsChannel>,
2595 host: Arc<crate::host::HostState>,
2596 client_id: String,
2597}
2598
2599#[async_trait::async_trait]
2600impl car_multi::AgentRunner for WsAgentRunner {
2601 async fn run(
2602 &self,
2603 spec: &car_multi::AgentSpec,
2604 task: &str,
2605 _runtime: &car_engine::Runtime,
2606 _mailbox: &car_multi::Mailbox,
2607 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2608 use futures::SinkExt;
2609
2610 let request_id = self.channel.next_request_id();
2611 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2612 let agent = self
2613 .host
2614 .register_agent(
2615 &self.client_id,
2616 RegisterHostAgentRequest {
2617 id: Some(agent_id.clone()),
2618 name: spec.name.clone(),
2619 kind: "callback".to_string(),
2620 capabilities: spec.tools.clone(),
2621 project: spec
2622 .metadata
2623 .get("project")
2624 .and_then(|v| v.as_str())
2625 .map(str::to_string),
2626 pid: None,
2627 display: serde_json::from_value(
2628 spec.metadata
2629 .get("display")
2630 .cloned()
2631 .unwrap_or(serde_json::Value::Null),
2632 )
2633 .unwrap_or_default(),
2634 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2635 },
2636 )
2637 .await
2638 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2639 let _ = self
2640 .host
2641 .set_status(
2642 &self.client_id,
2643 SetHostAgentStatusRequest {
2644 agent_id: agent.id.clone(),
2645 status: HostAgentStatus::Running,
2646 current_task: Some(task.to_string()),
2647 message: Some(format!("{} started", spec.name)),
2648 payload: serde_json::json!({ "task": task }),
2649 },
2650 )
2651 .await;
2652
2653 let rpc_request = serde_json::json!({
2654 "jsonrpc": "2.0",
2655 "method": "multi.run_agent",
2656 "params": {
2657 "spec": spec,
2658 "task": task,
2659 },
2660 "id": request_id,
2661 });
2662
2663 let (tx, rx) = tokio::sync::oneshot::channel();
2665 self.channel
2666 .pending
2667 .lock()
2668 .await
2669 .insert(request_id.clone(), tx);
2670
2671 let msg = Message::Text(
2672 serde_json::to_string(&rpc_request)
2673 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2674 .into(),
2675 );
2676 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2677 let _ = self
2678 .host
2679 .set_status(
2680 &self.client_id,
2681 SetHostAgentStatusRequest {
2682 agent_id: agent_id.clone(),
2683 status: HostAgentStatus::Errored,
2684 current_task: None,
2685 message: Some(format!("{} failed to start", spec.name)),
2686 payload: serde_json::json!({ "error": e.to_string() }),
2687 },
2688 )
2689 .await;
2690 return Err(car_multi::MultiError::AgentFailed(
2691 spec.name.clone(),
2692 format!("ws send error: {}", e),
2693 ));
2694 }
2695
2696 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2698 Ok(Ok(response)) => response,
2699 Ok(Err(_)) => {
2700 let _ = self
2701 .host
2702 .set_status(
2703 &self.client_id,
2704 SetHostAgentStatusRequest {
2705 agent_id: agent_id.clone(),
2706 status: HostAgentStatus::Errored,
2707 current_task: None,
2708 message: Some(format!("{} callback channel closed", spec.name)),
2709 payload: Value::Null,
2710 },
2711 )
2712 .await;
2713 return Err(car_multi::MultiError::AgentFailed(
2714 spec.name.clone(),
2715 "agent callback channel closed".into(),
2716 ));
2717 }
2718 Err(_) => {
2719 let _ = self
2720 .host
2721 .set_status(
2722 &self.client_id,
2723 SetHostAgentStatusRequest {
2724 agent_id: agent_id.clone(),
2725 status: HostAgentStatus::Errored,
2726 current_task: None,
2727 message: Some(format!("{} timed out", spec.name)),
2728 payload: Value::Null,
2729 },
2730 )
2731 .await;
2732 return Err(car_multi::MultiError::AgentFailed(
2733 spec.name.clone(),
2734 "agent callback timed out (300s)".into(),
2735 ));
2736 }
2737 };
2738
2739 if let Some(err) = response.error {
2740 let _ = self
2741 .host
2742 .set_status(
2743 &self.client_id,
2744 SetHostAgentStatusRequest {
2745 agent_id: agent_id.clone(),
2746 status: HostAgentStatus::Errored,
2747 current_task: None,
2748 message: Some(format!("{} errored", spec.name)),
2749 payload: serde_json::json!({ "error": err }),
2750 },
2751 )
2752 .await;
2753 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2754 }
2755
2756 let output_value = response.output.unwrap_or(Value::Null);
2757 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2758 car_multi::MultiError::AgentFailed(
2759 spec.name.clone(),
2760 format!("invalid AgentOutput: {}", e),
2761 )
2762 })?;
2763 let status = if output.error.is_some() {
2764 HostAgentStatus::Errored
2765 } else {
2766 HostAgentStatus::Completed
2767 };
2768 let message = if output.error.is_some() {
2769 format!("{} errored", spec.name)
2770 } else {
2771 format!("{} completed", spec.name)
2772 };
2773 let _ = self
2774 .host
2775 .set_status(
2776 &self.client_id,
2777 SetHostAgentStatusRequest {
2778 agent_id,
2779 status,
2780 current_task: None,
2781 message: Some(message),
2782 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2783 },
2784 )
2785 .await;
2786
2787 Ok(output)
2788 }
2789}
2790
2791fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2792 let safe_name: String = name
2793 .chars()
2794 .map(|c| {
2795 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2796 c
2797 } else {
2798 '-'
2799 }
2800 })
2801 .collect();
2802 format!("{}:{}:{}", client_id, safe_name, request_id)
2803}
2804
2805async fn handle_multi_swarm(
2806 req: &JsonRpcMessage,
2807 session: &crate::session::ClientSession,
2808) -> Result<Value, String> {
2809 let mode_str = req
2810 .params
2811 .get("mode")
2812 .and_then(|v| v.as_str())
2813 .ok_or("missing 'mode'")?;
2814 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2815 let task = req
2816 .params
2817 .get("task")
2818 .and_then(|v| v.as_str())
2819 .ok_or("missing 'task'")?;
2820
2821 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2822 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2823 let agent_specs: Vec<car_multi::AgentSpec> =
2824 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2825 let synth: Option<car_multi::AgentSpec> = req
2826 .params
2827 .get("synthesizer")
2828 .map(|v| serde_json::from_value(v.clone()))
2829 .transpose()
2830 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2831
2832 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2833 channel: session.channel.clone(),
2834 host: session.host.clone(),
2835 client_id: session.client_id.clone(),
2836 });
2837 let infra = car_multi::SharedInfra::new();
2838
2839 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2840 if let Some(s) = synth {
2841 swarm = swarm.with_synthesizer(s);
2842 }
2843
2844 let result = swarm
2845 .run(task, &runner, &infra)
2846 .await
2847 .map_err(|e| format!("swarm error: {}", e))?;
2848 serde_json::to_value(result).map_err(|e| e.to_string())
2849}
2850
2851async fn handle_multi_pipeline(
2852 req: &JsonRpcMessage,
2853 session: &crate::session::ClientSession,
2854) -> Result<Value, String> {
2855 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2856 let task = req
2857 .params
2858 .get("task")
2859 .and_then(|v| v.as_str())
2860 .ok_or("missing 'task'")?;
2861
2862 let stage_specs: Vec<car_multi::AgentSpec> =
2863 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2864
2865 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2866 channel: session.channel.clone(),
2867 host: session.host.clone(),
2868 client_id: session.client_id.clone(),
2869 });
2870 let infra = car_multi::SharedInfra::new();
2871
2872 let result = car_multi::Pipeline::new(stage_specs)
2873 .run(task, &runner, &infra)
2874 .await
2875 .map_err(|e| format!("pipeline error: {}", e))?;
2876 serde_json::to_value(result).map_err(|e| e.to_string())
2877}
2878
2879async fn handle_multi_supervisor(
2880 req: &JsonRpcMessage,
2881 session: &crate::session::ClientSession,
2882) -> Result<Value, String> {
2883 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2884 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2885 let task = req
2886 .params
2887 .get("task")
2888 .and_then(|v| v.as_str())
2889 .ok_or("missing 'task'")?;
2890 let max_rounds = req
2891 .params
2892 .get("max_rounds")
2893 .and_then(|v| v.as_u64())
2894 .unwrap_or(3) as u32;
2895
2896 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2897 .map_err(|e| format!("invalid workers: {}", e))?;
2898 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2899 .map_err(|e| format!("invalid supervisor: {}", e))?;
2900
2901 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2902 channel: session.channel.clone(),
2903 host: session.host.clone(),
2904 client_id: session.client_id.clone(),
2905 });
2906 let infra = car_multi::SharedInfra::new();
2907
2908 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2909 .with_max_rounds(max_rounds)
2910 .run(task, &runner, &infra)
2911 .await
2912 .map_err(|e| format!("supervisor error: {}", e))?;
2913 serde_json::to_value(result).map_err(|e| e.to_string())
2914}
2915
2916async fn handle_multi_map_reduce(
2917 req: &JsonRpcMessage,
2918 session: &crate::session::ClientSession,
2919) -> Result<Value, String> {
2920 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2921 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2922 let task = req
2923 .params
2924 .get("task")
2925 .and_then(|v| v.as_str())
2926 .ok_or("missing 'task'")?;
2927 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2928
2929 let mapper_spec: car_multi::AgentSpec =
2930 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2931 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2932 .map_err(|e| format!("invalid reducer: {}", e))?;
2933 let items: Vec<String> =
2934 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2935
2936 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2937 channel: session.channel.clone(),
2938 host: session.host.clone(),
2939 client_id: session.client_id.clone(),
2940 });
2941 let infra = car_multi::SharedInfra::new();
2942
2943 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2944 .run(task, &items, &runner, &infra)
2945 .await
2946 .map_err(|e| format!("map_reduce error: {}", e))?;
2947 serde_json::to_value(result).map_err(|e| e.to_string())
2948}
2949
2950async fn handle_multi_vote(
2951 req: &JsonRpcMessage,
2952 session: &crate::session::ClientSession,
2953) -> Result<Value, String> {
2954 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2955 let task = req
2956 .params
2957 .get("task")
2958 .and_then(|v| v.as_str())
2959 .ok_or("missing 'task'")?;
2960
2961 let agent_specs: Vec<car_multi::AgentSpec> =
2962 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2963 let synth: Option<car_multi::AgentSpec> = req
2964 .params
2965 .get("synthesizer")
2966 .map(|v| serde_json::from_value(v.clone()))
2967 .transpose()
2968 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2969
2970 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2971 channel: session.channel.clone(),
2972 host: session.host.clone(),
2973 client_id: session.client_id.clone(),
2974 });
2975 let infra = car_multi::SharedInfra::new();
2976
2977 let mut vote = car_multi::Vote::new(agent_specs);
2978 if let Some(s) = synth {
2979 vote = vote.with_synthesizer(s);
2980 }
2981
2982 let result = vote
2983 .run(task, &runner, &infra)
2984 .await
2985 .map_err(|e| format!("vote error: {}", e))?;
2986 serde_json::to_value(result).map_err(|e| e.to_string())
2987}
2988
2989fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2994 let name = req
2995 .params
2996 .get("name")
2997 .and_then(|v| v.as_str())
2998 .ok_or("scheduler.create requires 'name'")?;
2999 let prompt = req
3000 .params
3001 .get("prompt")
3002 .and_then(|v| v.as_str())
3003 .ok_or("scheduler.create requires 'prompt'")?;
3004
3005 let mut task = car_scheduler::Task::new(name, prompt);
3006
3007 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3008 let trigger = match t {
3009 "once" => car_scheduler::TaskTrigger::Once,
3010 "cron" => car_scheduler::TaskTrigger::Cron,
3011 "interval" => car_scheduler::TaskTrigger::Interval,
3012 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3013 _ => car_scheduler::TaskTrigger::Manual,
3014 };
3015 let schedule = req
3016 .params
3017 .get("schedule")
3018 .and_then(|v| v.as_str())
3019 .unwrap_or("");
3020 task = task.with_trigger(trigger, schedule);
3021 }
3022
3023 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3024 task = task.with_system_prompt(sp);
3025 }
3026
3027 serde_json::to_value(&task).map_err(|e| e.to_string())
3028}
3029
3030async fn handle_scheduler_run(
3031 req: &JsonRpcMessage,
3032 session: &crate::session::ClientSession,
3033) -> Result<Value, String> {
3034 let task_val = req
3035 .params
3036 .get("task")
3037 .ok_or("scheduler.run requires 'task'")?;
3038 let mut task: car_scheduler::Task =
3039 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3040
3041 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3042 channel: session.channel.clone(),
3043 host: session.host.clone(),
3044 client_id: session.client_id.clone(),
3045 });
3046 let executor = car_scheduler::Executor::new(runner);
3047 let execution = executor.run_once(&mut task).await;
3048
3049 serde_json::to_value(&execution).map_err(|e| e.to_string())
3050}
3051
3052async fn handle_scheduler_run_loop(
3053 req: &JsonRpcMessage,
3054 session: &crate::session::ClientSession,
3055) -> Result<Value, String> {
3056 let task_val = req
3057 .params
3058 .get("task")
3059 .ok_or("scheduler.run_loop requires 'task'")?;
3060 let mut task: car_scheduler::Task =
3061 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3062 let max_iterations = req
3063 .params
3064 .get("max_iterations")
3065 .and_then(|v| v.as_u64())
3066 .map(|v| v as u32);
3067
3068 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3069 channel: session.channel.clone(),
3070 host: session.host.clone(),
3071 client_id: session.client_id.clone(),
3072 });
3073 let executor = car_scheduler::Executor::new(runner);
3074 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3075 let executions = executor
3076 .run_loop(&mut task, max_iterations, cancel_rx)
3077 .await;
3078
3079 serde_json::to_value(&executions).map_err(|e| e.to_string())
3080}
3081
3082fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3087 state.inference.get_or_init(|| {
3088 Arc::new(car_inference::InferenceEngine::new(
3089 car_inference::InferenceConfig::default(),
3090 ))
3091 })
3092}
3093
3094async fn handle_infer(
3095 msg: &JsonRpcMessage,
3096 state: &ServerState,
3097 session: &crate::session::ClientSession,
3098) -> Result<Value, String> {
3099 let engine = get_inference_engine(state);
3100 let mut req: car_inference::GenerateRequest =
3101 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3102
3103 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3105 let mut memgine = session.memgine.lock().await;
3106 let ctx = memgine.build_context(cq);
3107 if !ctx.is_empty() {
3108 req.context = Some(ctx);
3109 }
3110 }
3111
3112 let _permit = state.admission.acquire().await;
3118
3119 let result = engine
3130 .generate_tracked(req)
3131 .await
3132 .map_err(|e| e.to_string())?;
3133 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3134}
3135
3136async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3166 let engine = get_inference_engine(state);
3167 let req: car_inference::GenerateImageRequest =
3168 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3169 let _permit = state.admission.acquire().await;
3172 let result = engine
3173 .generate_image(req)
3174 .await
3175 .map_err(|e| e.to_string())?;
3176 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3177}
3178
3179async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3180 let engine = get_inference_engine(state);
3181 let req: car_inference::GenerateVideoRequest =
3182 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3183 let _permit = state.admission.acquire().await;
3184 let result = engine
3185 .generate_video(req)
3186 .await
3187 .map_err(|e| e.to_string())?;
3188 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3189}
3190
3191async fn handle_infer_stream(
3192 msg: &JsonRpcMessage,
3193 session: &crate::session::ClientSession,
3194 state: &ServerState,
3195) -> Result<Value, String> {
3196 use futures::SinkExt;
3197 use tokio_tungstenite::tungstenite::Message;
3198
3199 let engine = get_inference_engine(state);
3200 let mut req: car_inference::GenerateRequest =
3201 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3202
3203 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3206 let mut memgine = session.memgine.lock().await;
3207 let ctx = memgine.build_context(cq);
3208 if !ctx.is_empty() {
3209 req.context = Some(ctx);
3210 }
3211 }
3212
3213 let _permit = state.admission.acquire().await;
3214 let mut rx = engine
3215 .generate_tracked_stream(req)
3216 .await
3217 .map_err(|e| e.to_string())?;
3218
3219 let mut accumulator = car_inference::StreamAccumulator::default();
3220 let request_id = msg.id.clone();
3221
3222 while let Some(event) = rx.recv().await {
3223 let event_payload = match &event {
3224 car_inference::StreamEvent::TextDelta(text) => {
3225 serde_json::json!({"type": "text", "data": text})
3226 }
3227 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3228 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3229 }
3230 car_inference::StreamEvent::ToolCallDelta {
3231 index,
3232 arguments_delta,
3233 } => serde_json::json!({
3234 "type": "tool_delta",
3235 "index": index,
3236 "data": arguments_delta,
3237 }),
3238 car_inference::StreamEvent::Usage {
3239 input_tokens,
3240 output_tokens,
3241 } => serde_json::json!({
3242 "type": "usage",
3243 "input_tokens": input_tokens,
3244 "output_tokens": output_tokens,
3245 }),
3246 car_inference::StreamEvent::Done { .. } => {
3251 accumulator.push(&event);
3252 continue;
3253 }
3254 };
3255
3256 let notif = serde_json::json!({
3257 "jsonrpc": "2.0",
3258 "method": "inference.stream.event",
3259 "params": {
3260 "request_id": request_id,
3261 "event": event_payload,
3262 },
3263 });
3264 if let Ok(text) = serde_json::to_string(¬if) {
3265 let _ = session
3266 .channel
3267 .write
3268 .lock()
3269 .await
3270 .send(Message::Text(text.into()))
3271 .await;
3272 }
3273 accumulator.push(&event);
3274 }
3275
3276 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3277 Ok(serde_json::json!({
3278 "text": text,
3279 "tool_calls": tool_calls,
3280 "usage": usage,
3281 }))
3282}
3283
3284async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3285 let engine = get_inference_engine(state);
3286 let req: car_inference::EmbedRequest =
3287 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3288 let _permit = state.admission.acquire().await;
3292 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3293 Ok(serde_json::json!({"embeddings": result}))
3294}
3295
3296async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3297 let engine = get_inference_engine(state);
3298 let req: car_inference::ClassifyRequest =
3299 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3300 let _permit = state.admission.acquire().await;
3301 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3302 Ok(serde_json::json!({"classifications": result}))
3303}
3304
3305fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3309 let total = state.admission.permits();
3310 let available = state.admission.permits_available();
3311 let in_use = total.saturating_sub(available);
3312 Ok(serde_json::json!({
3313 "permits_total": total,
3314 "permits_available": available,
3315 "permits_in_use": in_use,
3316 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3317 }))
3318}
3319
3320async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3321 let model = msg
3322 .params
3323 .get("model")
3324 .and_then(|v| v.as_str())
3325 .ok_or("missing 'model' parameter")?;
3326 let text = msg
3327 .params
3328 .get("text")
3329 .and_then(|v| v.as_str())
3330 .ok_or("missing 'text' parameter")?;
3331 let engine = get_inference_engine(state);
3332 let ids = engine
3333 .tokenize(model, text)
3334 .await
3335 .map_err(|e| e.to_string())?;
3336 Ok(serde_json::json!({"tokens": ids}))
3337}
3338
3339async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3340 let model = msg
3341 .params
3342 .get("model")
3343 .and_then(|v| v.as_str())
3344 .ok_or("missing 'model' parameter")?;
3345 let tokens: Vec<u32> = msg
3346 .params
3347 .get("tokens")
3348 .and_then(|v| v.as_array())
3349 .ok_or("missing 'tokens' parameter")?
3350 .iter()
3351 .map(|t| {
3352 t.as_u64()
3353 .and_then(|n| u32::try_from(n).ok())
3354 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3355 })
3356 .collect::<Result<Vec<_>, _>>()?;
3357 let engine = get_inference_engine(state);
3358 let text = engine
3359 .detokenize(model, &tokens)
3360 .await
3361 .map_err(|e| e.to_string())?;
3362 Ok(serde_json::json!({"text": text}))
3363}
3364
3365async fn handle_models_register(
3384 req: &JsonRpcMessage,
3385 _state: &Arc<ServerState>,
3386) -> Result<Value, String> {
3387 let schema_value = match req.params.get("schema") {
3391 Some(v) => v.clone(),
3392 None => req.params.clone(),
3393 };
3394 let schema: car_inference::ModelSchema =
3395 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3396 let id = schema.id.clone();
3397
3398 let home = std::env::var_os("HOME")
3403 .or_else(|| std::env::var_os("USERPROFILE"))
3404 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3405 let car_dir = std::path::PathBuf::from(home).join(".car");
3406 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3407 let path = car_dir.join("models.json");
3408
3409 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3410 let text =
3411 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3412 if text.trim().is_empty() {
3413 Vec::new()
3414 } else {
3415 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3416 }
3417 } else {
3418 Vec::new()
3419 };
3420 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3422 *slot = schema;
3423 } else {
3424 models.push(schema);
3425 }
3426 let json =
3427 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3428 let tmp = path.with_extension("json.tmp");
3429 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3430 std::fs::rename(&tmp, &path)
3431 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3432 Ok(serde_json::json!({
3433 "id": id,
3434 "registered": true,
3435 "path": path.to_string_lossy(),
3436 "note": "Daemon restart required for live UnifiedRegistry visibility \
3437 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3438 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3439 }))
3440}
3441
3442async fn handle_models_unregister(
3453 req: &JsonRpcMessage,
3454 _state: &Arc<ServerState>,
3455) -> Result<Value, String> {
3456 let id = match req.params.get("id") {
3460 Some(v) => v
3461 .as_str()
3462 .ok_or_else(|| "`id` must be a string".to_string())?
3463 .to_string(),
3464 None => match req.params.as_str() {
3465 Some(s) => s.to_string(),
3466 None => return Err("missing `id` parameter".to_string()),
3467 },
3468 };
3469
3470 let home = std::env::var_os("HOME")
3471 .or_else(|| std::env::var_os("USERPROFILE"))
3472 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3473 let car_dir = std::path::PathBuf::from(home).join(".car");
3474 let path = car_dir.join("models.json");
3475
3476 if !path.exists() {
3477 return Err(format!(
3478 "no models.json at {} — nothing to unregister",
3479 path.display()
3480 ));
3481 }
3482 let text =
3483 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3484 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3485 Vec::new()
3486 } else {
3487 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3488 };
3489 let before = models.len();
3490 models.retain(|m| m.id != id);
3491 if models.len() == before {
3492 return Err(format!("model {} not found in {}", id, path.display()));
3493 }
3494 let json =
3495 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3496 let tmp = path.with_extension("json.tmp");
3497 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3498 std::fs::rename(&tmp, &path)
3499 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3500 Ok(serde_json::json!({
3501 "id": id,
3502 "unregistered": true,
3503 "path": path.to_string_lossy(),
3504 "note": "Daemon restart required for live UnifiedRegistry visibility \
3505 (phase 1, matching models.register).",
3506 }))
3507}
3508
3509fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3510 let engine = get_inference_engine(state);
3511 let models = engine.list_models();
3512 serde_json::to_value(&models).map_err(|e| e.to_string())
3513}
3514
3515fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3516 let engine = get_inference_engine(state);
3517 let models = engine.list_models_unified();
3518 serde_json::to_value(&models).map_err(|e| e.to_string())
3519}
3520
3521#[derive(Debug, Deserialize)]
3522#[serde(rename_all = "camelCase")]
3523struct ModelSearchParams {
3524 #[serde(default)]
3525 query: Option<String>,
3526 #[serde(default)]
3527 capability: Option<car_inference::ModelCapability>,
3528 #[serde(default)]
3529 provider: Option<String>,
3530 #[serde(default)]
3531 local_only: bool,
3532 #[serde(default)]
3533 available_only: bool,
3534 #[serde(default)]
3535 limit: Option<usize>,
3536}
3537
3538#[derive(Debug, Serialize)]
3539#[serde(rename_all = "camelCase")]
3540struct ModelSearchEntry {
3541 #[serde(flatten)]
3542 info: car_inference::ModelInfo,
3543 family: String,
3544 version: String,
3545 tags: Vec<String>,
3546 pullable: bool,
3547 upgrade: Option<car_inference::ModelUpgrade>,
3548}
3549
3550#[derive(Debug, Serialize)]
3551#[serde(rename_all = "camelCase")]
3552struct ModelSearchResponse {
3553 models: Vec<ModelSearchEntry>,
3554 upgrades: Vec<car_inference::ModelUpgrade>,
3555 total: usize,
3556 available: usize,
3557 local: usize,
3558 remote: usize,
3559}
3560
3561fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3562 let params: ModelSearchParams =
3563 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3564 query: None,
3565 capability: None,
3566 provider: None,
3567 local_only: false,
3568 available_only: false,
3569 limit: None,
3570 });
3571 let engine = get_inference_engine(state);
3572 let upgrades = engine.available_model_upgrades();
3573 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3574 .iter()
3575 .cloned()
3576 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3577 .collect();
3578 let query = params
3579 .query
3580 .as_deref()
3581 .map(str::trim)
3582 .filter(|q| !q.is_empty())
3583 .map(|q| q.to_ascii_lowercase());
3584 let provider = params
3585 .provider
3586 .as_deref()
3587 .map(str::trim)
3588 .filter(|p| !p.is_empty())
3589 .map(|p| p.to_ascii_lowercase());
3590
3591 let mut entries: Vec<ModelSearchEntry> = engine
3592 .list_schemas()
3593 .into_iter()
3594 .filter(|schema| {
3595 if let Some(capability) = params.capability {
3596 if !schema.has_capability(capability) {
3597 return false;
3598 }
3599 }
3600 if let Some(provider) = provider.as_deref() {
3601 if schema.provider.to_ascii_lowercase() != provider {
3602 return false;
3603 }
3604 }
3605 if params.local_only && !schema.is_local() {
3606 return false;
3607 }
3608 if params.available_only && !schema.available {
3609 return false;
3610 }
3611 if let Some(query) = query.as_deref() {
3612 let capability_text = schema
3613 .capabilities
3614 .iter()
3615 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3616 .collect::<Vec<_>>()
3617 .join(" ");
3618 let haystack = format!(
3619 "{} {} {} {} {} {}",
3620 schema.id,
3621 schema.name,
3622 schema.provider,
3623 schema.family,
3624 schema.tags.join(" "),
3625 capability_text
3626 )
3627 .to_ascii_lowercase();
3628 if !haystack.contains(query) {
3629 return false;
3630 }
3631 }
3632 true
3633 })
3634 .map(|schema| {
3635 let pullable = !schema.available
3636 && matches!(
3637 schema.source,
3638 car_inference::ModelSource::Local { .. }
3639 | car_inference::ModelSource::Mlx { .. }
3640 );
3641 let info = car_inference::ModelInfo::from(&schema);
3642 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3643 ModelSearchEntry {
3644 info,
3645 family: schema.family,
3646 version: schema.version,
3647 tags: schema.tags,
3648 pullable,
3649 upgrade,
3650 }
3651 })
3652 .collect();
3653 entries.sort_by(|a, b| {
3654 b.info
3655 .available
3656 .cmp(&a.info.available)
3657 .then(b.info.is_local.cmp(&a.info.is_local))
3658 .then(a.info.name.cmp(&b.info.name))
3659 });
3660 if let Some(limit) = params.limit {
3661 entries.truncate(limit);
3662 }
3663
3664 let total = entries.len();
3665 let available = entries.iter().filter(|entry| entry.info.available).count();
3666 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3667 let response = ModelSearchResponse {
3668 models: entries,
3669 upgrades,
3670 total,
3671 available,
3672 local,
3673 remote: total.saturating_sub(local),
3674 };
3675 serde_json::to_value(response).map_err(|e| e.to_string())
3676}
3677
3678fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3679 let engine = get_inference_engine(state);
3680 serde_json::to_value(serde_json::json!({
3681 "upgrades": engine.available_model_upgrades()
3682 }))
3683 .map_err(|e| e.to_string())
3684}
3685
3686async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3687 let name = msg
3688 .params
3689 .get("name")
3690 .or_else(|| msg.params.get("id"))
3691 .or_else(|| msg.params.get("model"))
3692 .and_then(|v| v.as_str())
3693 .ok_or("missing 'name' parameter")?;
3694 let engine = get_inference_engine(state);
3695 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3696 Ok(serde_json::json!({"path": path.display().to_string()}))
3697}
3698
3699async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3700 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3701 msg.params
3702 .get("events")
3703 .cloned()
3704 .unwrap_or(msg.params.clone()),
3705 )
3706 .map_err(|e| format!("invalid events: {}", e))?;
3707
3708 let inference = get_inference_engine(state).clone();
3709 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3710
3711 let skills = engine.distill_skills(&events).await;
3712 serde_json::to_value(&skills).map_err(|e| e.to_string())
3713}
3714
3715async fn handle_memory_consolidate(
3719 session: &crate::session::ClientSession,
3720) -> Result<Value, String> {
3721 let engine_arc = session.effective_memgine().await;
3722 let report = {
3723 let mut engine = engine_arc.lock().await;
3724 engine.consolidate().await
3725 };
3726 if let Some(id) = session.agent_id.lock().await.clone() {
3727 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3728 tracing::warn!(agent_id = %id, error = %e,
3729 "agent memgine persist after consolidate failed");
3730 }
3731 }
3732 serde_json::to_value(&report).map_err(|e| e.to_string())
3733}
3734
3735async fn handle_skill_repair(
3739 msg: &JsonRpcMessage,
3740 session: &crate::session::ClientSession,
3741) -> Result<Value, String> {
3742 let name = msg
3743 .params
3744 .get("skill_name")
3745 .and_then(|v| v.as_str())
3746 .ok_or("missing 'skill_name' parameter")?;
3747 let mut engine = session.memgine.lock().await;
3748 let code = engine.repair_skill(name).await;
3749 Ok(match code {
3750 Some(c) => serde_json::json!({ "code": c }),
3751 None => Value::Null,
3752 })
3753}
3754
3755async fn handle_skills_ingest_distilled(
3758 msg: &JsonRpcMessage,
3759 session: &crate::session::ClientSession,
3760) -> Result<Value, String> {
3761 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3762 msg.params
3763 .get("skills")
3764 .cloned()
3765 .unwrap_or(msg.params.clone()),
3766 )
3767 .map_err(|e| format!("invalid skills: {}", e))?;
3768 let mut engine = session.memgine.lock().await;
3769 let nodes = engine.ingest_distilled_skills(&skills);
3770 Ok(serde_json::json!({ "ingested": nodes.len() }))
3771}
3772
3773async fn handle_skills_evolve(
3776 msg: &JsonRpcMessage,
3777 session: &crate::session::ClientSession,
3778) -> Result<Value, String> {
3779 let domain = msg
3780 .params
3781 .get("domain")
3782 .and_then(|v| v.as_str())
3783 .ok_or("missing 'domain' parameter")?
3784 .to_string();
3785 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3786 msg.params
3787 .get("events")
3788 .cloned()
3789 .unwrap_or(Value::Array(vec![])),
3790 )
3791 .map_err(|e| format!("invalid events: {}", e))?;
3792 let mut engine = session.memgine.lock().await;
3793 let skills = engine.evolve_skills(&events, &domain).await;
3794 serde_json::to_value(&skills).map_err(|e| e.to_string())
3795}
3796
3797async fn handle_skills_domains_needing_evolution(
3799 msg: &JsonRpcMessage,
3800 session: &crate::session::ClientSession,
3801) -> Result<Value, String> {
3802 let threshold = msg
3803 .params
3804 .get("threshold")
3805 .and_then(|v| v.as_f64())
3806 .unwrap_or(0.6);
3807 let engine = session.memgine.lock().await;
3808 let domains = engine.domains_needing_evolution(threshold);
3809 serde_json::to_value(&domains).map_err(|e| e.to_string())
3810}
3811
3812async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3814 let engine = get_inference_engine(state);
3815 let req: car_inference::RerankRequest =
3816 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3817 let _permit = state.admission.acquire().await;
3818 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3819 serde_json::to_value(&result).map_err(|e| e.to_string())
3820}
3821
3822async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3828 use base64::Engine as _;
3829 let engine = get_inference_engine(state);
3830
3831 let mut params = msg.params.clone();
3838 let audio_b64 = params
3839 .as_object_mut()
3840 .and_then(|m| m.remove("audio_b64"))
3841 .and_then(|v| v.as_str().map(str::to_string));
3842 let _tmp_audio = if let Some(b64) = audio_b64 {
3843 let bytes = base64::engine::general_purpose::STANDARD
3844 .decode(b64.as_bytes())
3845 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3846 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3847 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3848 let path = tmp.path().to_string_lossy().into_owned();
3849 if let Some(obj) = params.as_object_mut() {
3850 obj.insert("audio_path".to_string(), Value::String(path));
3851 }
3852 Some(tmp)
3853 } else {
3854 None
3855 };
3856
3857 let req: car_inference::TranscribeRequest =
3858 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3859 let _permit = state.admission.acquire().await;
3860 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3861 serde_json::to_value(&result).map_err(|e| e.to_string())
3862}
3863
3864async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3870 use base64::Engine as _;
3871 let engine = get_inference_engine(state);
3872
3873 let mut params = msg.params.clone();
3874 let return_b64 = params
3875 .as_object_mut()
3876 .and_then(|m| m.remove("return_b64"))
3877 .and_then(|v| v.as_bool())
3878 .unwrap_or(false);
3879 let no_output_path = params
3880 .as_object()
3881 .map(|m| !m.contains_key("output_path"))
3882 .unwrap_or(true);
3883
3884 let req: car_inference::SynthesizeRequest =
3885 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3886 let _permit = state.admission.acquire().await;
3887 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3888 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3889
3890 if return_b64 || no_output_path {
3894 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3895 format!(
3896 "synthesize: failed to read rendered audio at {}: {e}",
3897 result.audio_path
3898 )
3899 })?;
3900 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3901 if let Some(obj) = value.as_object_mut() {
3902 obj.insert("audio_b64".to_string(), Value::String(encoded));
3903 }
3904 }
3905 Ok(value)
3906}
3907
3908async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3912 let engine = get_inference_engine(state);
3913 let status = engine
3914 .prepare_speech_runtime()
3915 .await
3916 .map_err(|e| e.to_string())?;
3917 serde_json::to_value(&status).map_err(|e| e.to_string())
3918}
3919
3920async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3923 let prompt = msg
3924 .params
3925 .get("prompt")
3926 .and_then(|v| v.as_str())
3927 .ok_or("missing 'prompt' parameter")?;
3928 let engine = get_inference_engine(state);
3929 let decision = engine.route_adaptive(prompt).await;
3930 serde_json::to_value(&decision).map_err(|e| e.to_string())
3931}
3932
3933async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3935 let engine = get_inference_engine(state);
3936 let profiles = engine.export_profiles().await;
3937 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3938}
3939
3940#[derive(Deserialize)]
3941#[serde(rename_all = "camelCase")]
3942struct OutcomesResolvePendingParams {
3943 action_results: Vec<(String, bool, f64, String)>,
3948}
3949
3950async fn handle_outcomes_resolve_pending(
3970 req: &JsonRpcMessage,
3971 state: &ServerState,
3972) -> Result<Value, String> {
3973 let params: OutcomesResolvePendingParams =
3974 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3975 let engine = get_inference_engine(state);
3976 let mut tracker = engine.outcome_tracker.write().await;
3977 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
3978 tracker.resolve_pending_from_signals(inferred);
3979 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3980}
3981
3982async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3984 let n = session.runtime.log.lock().await.len();
3985 Ok(Value::from(n as u64))
3986}
3987
3988async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3989 let stats = session.runtime.log.lock().await.stats();
3990 serde_json::to_value(stats).map_err(|e| e.to_string())
3991}
3992
3993#[derive(Deserialize)]
3994#[serde(rename_all = "camelCase")]
3995struct EventsTruncateParams {
3996 #[serde(default)]
3997 max_events: Option<usize>,
3998 #[serde(default)]
3999 max_spans: Option<usize>,
4000}
4001
4002async fn handle_events_truncate(
4003 msg: &JsonRpcMessage,
4004 session: &crate::session::ClientSession,
4005) -> Result<Value, String> {
4006 let params: EventsTruncateParams =
4007 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4008 max_events: None,
4009 max_spans: None,
4010 });
4011 let mut log = session.runtime.log.lock().await;
4012 let removed_events = params
4013 .max_events
4014 .map(|max| log.truncate_events_keep_last(max))
4015 .unwrap_or(0);
4016 let removed_spans = params
4017 .max_spans
4018 .map(|max| log.truncate_spans_keep_last(max))
4019 .unwrap_or(0);
4020 let stats = log.stats();
4021 Ok(serde_json::json!({
4022 "removedEvents": removed_events,
4023 "removedSpans": removed_spans,
4024 "stats": stats,
4025 }))
4026}
4027
4028async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4029 let mut log = session.runtime.log.lock().await;
4030 let removed = log.clear();
4031 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4032}
4033
4034async fn handle_replan_set_config(
4039 msg: &JsonRpcMessage,
4040 session: &crate::session::ClientSession,
4041) -> Result<Value, String> {
4042 let max_replans = msg
4043 .params
4044 .get("max_replans")
4045 .and_then(|v| v.as_u64())
4046 .unwrap_or(0) as u32;
4047 let delay_ms = msg
4048 .params
4049 .get("delay_ms")
4050 .and_then(|v| v.as_u64())
4051 .unwrap_or(0);
4052 let verify_before_execute = msg
4053 .params
4054 .get("verify_before_execute")
4055 .and_then(|v| v.as_bool())
4056 .unwrap_or(true);
4057 let cfg = car_engine::ReplanConfig {
4058 max_replans,
4059 delay_ms,
4060 verify_before_execute,
4061 };
4062 session.runtime.set_replan_config(cfg).await;
4063 Ok(Value::Null)
4064}
4065
4066async fn handle_skills_list(
4067 msg: &JsonRpcMessage,
4068 session: &crate::session::ClientSession,
4069) -> Result<Value, String> {
4070 let domain = msg.params.get("domain").and_then(|v| v.as_str());
4071 let engine = session.memgine.lock().await;
4072 let skills: Vec<serde_json::Value> = engine
4073 .graph
4074 .inner
4075 .node_indices()
4076 .filter_map(|nix| {
4077 let node = engine.graph.inner.node_weight(nix)?;
4078 if node.kind != car_memgine::MemKind::Skill {
4079 return None;
4080 }
4081 let meta = car_memgine::SkillMeta::from_node(node)?;
4082 if let Some(d) = domain {
4083 match &meta.scope {
4084 car_memgine::SkillScope::Global => {}
4085 car_memgine::SkillScope::Domain(sd) if sd == d => {}
4086 _ => return None,
4087 }
4088 }
4089 Some(serde_json::to_value(&meta).unwrap_or_default())
4090 })
4091 .collect();
4092 serde_json::to_value(&skills).map_err(|e| e.to_string())
4093}
4094
4095#[derive(serde::Deserialize)]
4096struct SecretParams {
4097 #[serde(default)]
4098 service: Option<String>,
4099 key: String,
4100 #[serde(default)]
4101 value: Option<String>,
4102}
4103
4104fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4105 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4106 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4107 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4108}
4109
4110fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4111 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4112 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4113}
4114
4115fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4116 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4117 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4118}
4119
4120fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4121 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4122 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4123}
4124
4125#[derive(serde::Deserialize)]
4126struct PermParams {
4127 domain: String,
4128 #[serde(default)]
4129 target_bundle_id: Option<String>,
4130}
4131
4132fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4133 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4134 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4135}
4136
4137fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4138 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4139 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4140}
4141
4142fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4143 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4144 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4145}
4146
4147fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4148 #[derive(serde::Deserialize)]
4149 struct P {
4150 start: String,
4151 end: String,
4152 #[serde(default)]
4153 calendar_ids: Vec<String>,
4154 }
4155 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4156 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4157 .map_err(|e| format!("parse start: {}", e))?
4158 .with_timezone(&chrono::Utc);
4159 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4160 .map_err(|e| format!("parse end: {}", e))?
4161 .with_timezone(&chrono::Utc);
4162 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4163}
4164
4165fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4166 #[derive(serde::Deserialize)]
4167 struct P {
4168 query: String,
4169 #[serde(default = "default_limit")]
4170 limit: usize,
4171 #[serde(default)]
4172 container_ids: Vec<String>,
4173 }
4174 fn default_limit() -> usize {
4175 50
4176 }
4177 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4178 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4179}
4180
4181fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4182 #[derive(serde::Deserialize, Default)]
4183 struct P {
4184 #[serde(default)]
4185 account_ids: Vec<String>,
4186 }
4187 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4188 car_ffi_common::integrations::mail_inbox(&p.account_ids)
4189}
4190
4191fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4192 let raw = req.params.to_string();
4193 car_ffi_common::integrations::mail_send(&raw)
4194}
4195
4196fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4197 #[derive(serde::Deserialize)]
4198 struct P {
4199 #[serde(default = "default_limit")]
4200 limit: usize,
4201 }
4202 fn default_limit() -> usize {
4203 50
4204 }
4205 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4206 car_ffi_common::integrations::messages_chats(p.limit)
4207}
4208
4209fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4210 let raw = req.params.to_string();
4211 car_ffi_common::integrations::messages_send(&raw)
4212}
4213
4214fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4215 #[derive(serde::Deserialize)]
4216 struct P {
4217 query: String,
4218 #[serde(default = "default_limit")]
4219 limit: usize,
4220 }
4221 fn default_limit() -> usize {
4222 50
4223 }
4224 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4225 car_ffi_common::integrations::notes_find(&p.query, p.limit)
4226}
4227
4228fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4229 #[derive(serde::Deserialize)]
4230 struct P {
4231 #[serde(default = "default_limit")]
4232 limit: usize,
4233 }
4234 fn default_limit() -> usize {
4235 50
4236 }
4237 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4238 car_ffi_common::integrations::reminders_items(p.limit)
4239}
4240
4241fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4242 #[derive(serde::Deserialize)]
4243 struct P {
4244 #[serde(default = "default_limit")]
4245 limit: usize,
4246 }
4247 fn default_limit() -> usize {
4248 100
4249 }
4250 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4251 car_ffi_common::integrations::bookmarks_list(p.limit)
4252}
4253
4254fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4255 #[derive(serde::Deserialize)]
4256 struct P {
4257 start: String,
4258 end: String,
4259 }
4260 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4261 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4262 .map_err(|e| format!("parse start: {}", e))?
4263 .with_timezone(&chrono::Utc);
4264 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4265 .map_err(|e| format!("parse end: {}", e))?
4266 .with_timezone(&chrono::Utc);
4267 car_ffi_common::health::sleep_windows(s, e)
4268}
4269
4270fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4271 #[derive(serde::Deserialize)]
4272 struct P {
4273 start: String,
4274 end: String,
4275 }
4276 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4277 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4278 .map_err(|e| format!("parse start: {}", e))?
4279 .with_timezone(&chrono::Utc);
4280 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4281 .map_err(|e| format!("parse end: {}", e))?
4282 .with_timezone(&chrono::Utc);
4283 car_ffi_common::health::workouts(s, e)
4284}
4285
4286fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4287 #[derive(serde::Deserialize)]
4288 struct P {
4289 start: String,
4290 end: String,
4291 }
4292 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4293 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4294 .map_err(|e| format!("parse start: {}", e))?;
4295 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4296 .map_err(|e| format!("parse end: {}", e))?;
4297 car_ffi_common::health::activity(s, e)
4298}
4299
4300async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4301 let closed = session.browser.close().await?;
4302 Ok(serde_json::json!({"closed": closed}))
4303}
4304
4305async fn handle_browser_run(
4306 req: &JsonRpcMessage,
4307 session: &crate::session::ClientSession,
4308) -> Result<Value, String> {
4309 #[derive(serde::Deserialize)]
4310 struct BrowserRunParams {
4311 script: Value,
4313 #[serde(default)]
4314 width: Option<u32>,
4315 #[serde(default)]
4316 height: Option<u32>,
4317 #[serde(default)]
4322 headed: Option<bool>,
4323 #[serde(default)]
4326 extra_args: Option<Vec<String>>,
4327 }
4328 let params: BrowserRunParams =
4329 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4330
4331 let script_json = match params.script {
4333 Value::String(s) => s,
4334 other => other.to_string(),
4335 };
4336
4337 let browser_session = session
4338 .browser
4339 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4340 width: params.width.unwrap_or(1280),
4341 height: params.height.unwrap_or(720),
4342 headless: !params.headed.unwrap_or(false),
4343 extra_args: params.extra_args.unwrap_or_default(),
4344 })
4345 .await?;
4346
4347 let trace_json = browser_session.run(&script_json).await?;
4348 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4349}
4350
4351#[derive(Deserialize)]
4364struct VoiceStartParams {
4365 session_id: String,
4366 audio_source: Value,
4367 #[serde(default)]
4368 options: Option<Value>,
4369}
4370
4371async fn handle_voice_transcribe_stream_start(
4372 req: &JsonRpcMessage,
4373 state: &Arc<ServerState>,
4374 session: &Arc<crate::session::ClientSession>,
4375) -> Result<Value, String> {
4376 let params: VoiceStartParams =
4377 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4378 let audio_source_json =
4379 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
4380 let options_json = params
4381 .options
4382 .as_ref()
4383 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4384 .transpose()?;
4385 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4386 channel: session.channel.clone(),
4387 });
4388 let json = car_ffi_common::voice::transcribe_stream_start(
4389 ¶ms.session_id,
4390 &audio_source_json,
4391 options_json.as_deref(),
4392 state.voice_sessions.clone(),
4393 sink,
4394 )
4395 .await?;
4396 serde_json::from_str(&json).map_err(|e| e.to_string())
4397}
4398
4399#[derive(Deserialize)]
4400struct VoiceStopParams {
4401 session_id: String,
4402}
4403
4404async fn handle_voice_transcribe_stream_stop(
4405 req: &JsonRpcMessage,
4406 state: &Arc<ServerState>,
4407) -> Result<Value, String> {
4408 let params: VoiceStopParams =
4409 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4410 let json = car_ffi_common::voice::transcribe_stream_stop(
4411 ¶ms.session_id,
4412 state.voice_sessions.clone(),
4413 )
4414 .await?;
4415 serde_json::from_str(&json).map_err(|e| e.to_string())
4416}
4417
4418#[derive(Deserialize)]
4419struct VoicePushParams {
4420 session_id: String,
4421 pcm_b64: String,
4425}
4426
4427async fn handle_voice_transcribe_stream_push(
4428 req: &JsonRpcMessage,
4429 state: &Arc<ServerState>,
4430) -> Result<Value, String> {
4431 use base64::Engine;
4432 let params: VoicePushParams =
4433 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4434 let pcm = base64::engine::general_purpose::STANDARD
4435 .decode(¶ms.pcm_b64)
4436 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4437 let json = car_ffi_common::voice::transcribe_stream_push(
4438 ¶ms.session_id,
4439 &pcm,
4440 state.voice_sessions.clone(),
4441 )
4442 .await?;
4443 serde_json::from_str(&json).map_err(|e| e.to_string())
4444}
4445
4446fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4447 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4448 serde_json::from_str(&json).unwrap_or(Value::Null)
4449}
4450
4451#[derive(Deserialize)]
4452struct VoiceTtsStreamStartParams {
4453 stream_id: String,
4457 text: String,
4460 #[serde(default)]
4463 options: Option<Value>,
4464}
4465
4466async fn handle_voice_tts_stream_start(
4467 req: &JsonRpcMessage,
4468 session: &Arc<crate::session::ClientSession>,
4469) -> Result<Value, String> {
4470 let params: VoiceTtsStreamStartParams =
4471 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4472 let opts_str = params
4473 .options
4474 .as_ref()
4475 .map(|v| v.to_string())
4476 .filter(|s| !s.is_empty());
4477 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4478 channel: session.channel.clone(),
4479 });
4480 let json = car_ffi_common::voice::tts_stream_start(
4481 ¶ms.stream_id,
4482 ¶ms.text,
4483 opts_str.as_deref(),
4484 sink,
4485 )
4486 .await?;
4487 serde_json::from_str(&json).map_err(|e| e.to_string())
4488}
4489
4490#[derive(Deserialize)]
4491struct VoiceTtsStreamCancelParams {
4492 stream_id: String,
4493}
4494
4495async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
4496 let params: VoiceTtsStreamCancelParams =
4497 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4498 let json = car_ffi_common::voice::tts_stream_cancel(¶ms.stream_id).await?;
4499 serde_json::from_str(&json).map_err(|e| e.to_string())
4500}
4501
4502fn handle_voice_tts_stream_list() -> Value {
4503 let json = car_ffi_common::voice::list_tts_streams();
4504 serde_json::from_str(&json).unwrap_or(Value::Null)
4505}
4506
4507async fn handle_voice_dispatch_turn(
4508 req: &JsonRpcMessage,
4509 state: &Arc<ServerState>,
4510 session: &Arc<crate::session::ClientSession>,
4511) -> Result<Value, String> {
4512 let req_value = req.params.clone();
4513 let request: crate::voice_turn::DispatchVoiceTurnRequest =
4514 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4515 let engine = get_inference_engine(state).clone();
4516 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4517 channel: session.channel.clone(),
4518 });
4519 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4520 serde_json::to_value(resp).map_err(|e| e.to_string())
4521}
4522
4523async fn handle_voice_cancel_turn() -> Result<Value, String> {
4524 crate::voice_turn::cancel().await;
4525 Ok(serde_json::json!({"cancelled": true}))
4526}
4527
4528async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4529 let engine = get_inference_engine(state).clone();
4530 crate::voice_turn::prewarm(engine).await;
4531 Ok(serde_json::json!({"prewarmed": true}))
4532}
4533
4534fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4553 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4554 std::sync::OnceLock::new();
4555 SLOT.get_or_init(|| std::sync::RwLock::new(None))
4556}
4557
4558fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4559 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4560 std::sync::OnceLock::new();
4561 MAP.get_or_init(dashmap::DashMap::new)
4562}
4563
4564fn ws_runner_completions() -> &'static dashmap::DashMap<
4565 String,
4566 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4567> {
4568 static MAP: std::sync::OnceLock<
4569 dashmap::DashMap<
4570 String,
4571 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4572 >,
4573 > = std::sync::OnceLock::new();
4574 MAP.get_or_init(dashmap::DashMap::new)
4575}
4576
4577struct WsInferenceRunner;
4578
4579#[async_trait::async_trait]
4580impl car_inference::InferenceRunner for WsInferenceRunner {
4581 async fn run(
4582 &self,
4583 request: car_inference::tasks::generate::GenerateRequest,
4584 emitter: car_inference::EventEmitter,
4585 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4586 let channel = ws_runner_session()
4587 .read()
4588 .map_err(|e| {
4589 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4590 })?
4591 .clone()
4592 .ok_or_else(|| {
4593 car_inference::RunnerError::Declined(
4594 "no WebSocket inference runner registered — call inference.register_runner first"
4595 .into(),
4596 )
4597 })?;
4598
4599 let call_id = uuid::Uuid::new_v4().to_string();
4600 let request_json = serde_json::to_value(&request)
4601 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4602 let (tx, rx) = tokio::sync::oneshot::channel();
4603 ws_runner_calls().insert(call_id.clone(), emitter);
4604 ws_runner_completions().insert(call_id.clone(), tx);
4605
4606 use futures::SinkExt;
4608 let notification = serde_json::json!({
4609 "jsonrpc": "2.0",
4610 "method": "inference.runner.invoke",
4611 "params": {
4612 "call_id": call_id,
4613 "request": request_json,
4614 },
4615 });
4616 let text = serde_json::to_string(¬ification)
4617 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4618 let _ = channel
4619 .write
4620 .lock()
4621 .await
4622 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4623 .await;
4624
4625 let result = rx.await.map_err(|_| {
4626 car_inference::RunnerError::Failed("runner completion channel dropped".into())
4627 })?;
4628 ws_runner_calls().remove(&call_id);
4629 result.map_err(car_inference::RunnerError::Failed)
4630 }
4631}
4632
4633async fn handle_inference_register_runner(
4634 session: &Arc<crate::session::ClientSession>,
4635) -> Result<Value, String> {
4636 let mut guard = ws_runner_session()
4637 .write()
4638 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4639 *guard = Some(session.channel.clone());
4640 drop(guard);
4641 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4642 Ok(serde_json::json!({"registered": true}))
4643}
4644
4645#[derive(serde::Deserialize)]
4646struct InferenceRunnerEventParams {
4647 call_id: String,
4648 event: Value,
4649}
4650
4651async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4652 let params: InferenceRunnerEventParams =
4653 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4654 let stream_event = match parse_runner_event_value(¶ms.event) {
4655 Some(e) => e,
4656 None => return Err("unrecognised runner event shape".into()),
4657 };
4658 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
4659 let emitter = entry.value().clone();
4660 tokio::spawn(async move { emitter.emit(stream_event).await });
4661 }
4662 Ok(serde_json::json!({"emitted": true}))
4663}
4664
4665#[derive(serde::Deserialize)]
4666struct InferenceRunnerCompleteParams {
4667 call_id: String,
4668 result: Value,
4669}
4670
4671async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4672 let params: InferenceRunnerCompleteParams =
4673 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4674 let result: std::result::Result<car_inference::RunnerResult, String> =
4675 serde_json::from_value(params.result)
4676 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4677 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4678 let _ = tx.send(result);
4679 }
4680 Ok(serde_json::json!({"completed": true}))
4681}
4682
4683#[derive(serde::Deserialize)]
4684struct InferenceRunnerFailParams {
4685 call_id: String,
4686 error: String,
4687}
4688
4689async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4690 let params: InferenceRunnerFailParams =
4691 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4692 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4693 let _ = tx.send(Err(params.error));
4694 }
4695 Ok(serde_json::json!({"failed": true}))
4696}
4697
4698fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4699 let ty = v.get("type").and_then(|t| t.as_str())?;
4700 match ty {
4701 "text" => Some(car_inference::StreamEvent::TextDelta(
4702 v.get("data")?.as_str()?.to_string(),
4703 )),
4704 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4705 name: v.get("name")?.as_str()?.to_string(),
4706 index: v.get("index")?.as_u64()? as usize,
4707 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4708 }),
4709 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4710 index: v.get("index")?.as_u64()? as usize,
4711 arguments_delta: v.get("data")?.as_str()?.to_string(),
4712 }),
4713 "usage" => Some(car_inference::StreamEvent::Usage {
4714 input_tokens: v.get("input_tokens")?.as_u64()?,
4715 output_tokens: v.get("output_tokens")?.as_u64()?,
4716 }),
4717 "done" => Some(car_inference::StreamEvent::Done {
4718 text: v.get("text")?.as_str()?.to_string(),
4719 tool_calls: v
4720 .get("tool_calls")
4721 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4722 .unwrap_or_default(),
4723 }),
4724 _ => None,
4725 }
4726}
4727
4728#[derive(Deserialize)]
4729struct EnrollSpeakerParams {
4730 label: String,
4731 audio: Value,
4732}
4733
4734async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4735 let params: EnrollSpeakerParams =
4736 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4737 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
4738 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
4739 serde_json::from_str(&json).map_err(|e| e.to_string())
4740}
4741
4742#[derive(Deserialize)]
4743struct RemoveEnrollmentParams {
4744 label: String,
4745}
4746
4747fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4748 let params: RemoveEnrollmentParams =
4749 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4750 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
4751 serde_json::from_str(&json).map_err(|e| e.to_string())
4752}
4753
4754#[derive(Deserialize)]
4755struct WorkflowRunParams {
4756 workflow: Value,
4757}
4758
4759async fn handle_workflow_run(
4760 req: &JsonRpcMessage,
4761 session: &Arc<crate::session::ClientSession>,
4762) -> Result<Value, String> {
4763 let params: WorkflowRunParams =
4764 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4765 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4766 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4767 channel: session.channel.clone(),
4768 host: session.host.clone(),
4769 client_id: session.client_id.clone(),
4770 });
4771 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4772 serde_json::from_str(&json).map_err(|e| e.to_string())
4773}
4774
4775#[derive(Deserialize)]
4776struct WorkflowVerifyParams {
4777 workflow: Value,
4778}
4779
4780fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4781 let params: WorkflowVerifyParams =
4782 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4783 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4784 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4785 serde_json::from_str(&json).map_err(|e| e.to_string())
4786}
4787
4788async fn handle_meeting_start(
4793 req: &JsonRpcMessage,
4794 state: &Arc<ServerState>,
4795 session: &Arc<crate::session::ClientSession>,
4796) -> Result<Value, String> {
4797 let mut req_value = req.params.clone();
4803 let meeting_id = req_value
4804 .get("id")
4805 .and_then(|v| v.as_str())
4806 .map(str::to_string)
4807 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4808 if let Some(map) = req_value.as_object_mut() {
4809 map.insert("id".into(), Value::String(meeting_id.clone()));
4810 }
4811 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4812
4813 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4814 Arc::new(crate::session::WsVoiceEventSink {
4815 channel: session.channel.clone(),
4816 });
4817
4818 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4823 Arc::new(crate::session::WsMemgineIngestSink {
4824 meeting_id,
4825 engine: session.memgine.clone(),
4826 upstream: ws_upstream,
4827 });
4828
4829 let cwd = std::env::current_dir().ok();
4830 let json = crate::meeting::start_meeting(
4831 &request_json,
4832 state.meetings.clone(),
4833 state.voice_sessions.clone(),
4834 upstream,
4835 None,
4836 cwd,
4837 )
4838 .await?;
4839 serde_json::from_str(&json).map_err(|e| e.to_string())
4840}
4841
4842#[derive(Deserialize)]
4843struct MeetingStopParams {
4844 meeting_id: String,
4845 #[serde(default = "default_summarize")]
4846 summarize: bool,
4847}
4848
4849fn default_summarize() -> bool {
4850 true
4851}
4852
4853async fn handle_meeting_stop(
4854 req: &JsonRpcMessage,
4855 state: &Arc<ServerState>,
4856 _session: &Arc<crate::session::ClientSession>,
4857) -> Result<Value, String> {
4858 let params: MeetingStopParams =
4859 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4860 let inference = if params.summarize {
4861 Some(state.inference.get().cloned()).flatten()
4862 } else {
4863 None
4864 };
4865 let json = crate::meeting::stop_meeting(
4866 ¶ms.meeting_id,
4867 params.summarize,
4868 state.meetings.clone(),
4869 state.voice_sessions.clone(),
4870 inference,
4871 )
4872 .await?;
4873 serde_json::from_str(&json).map_err(|e| e.to_string())
4874}
4875
4876#[derive(Deserialize, Default)]
4877struct MeetingListParams {
4878 #[serde(default)]
4879 root: Option<std::path::PathBuf>,
4880}
4881
4882fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4883 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4884 let cwd = std::env::current_dir().ok();
4885 let json = crate::meeting::list_meetings(params.root, cwd)?;
4886 serde_json::from_str(&json).map_err(|e| e.to_string())
4887}
4888
4889#[derive(Deserialize)]
4890struct MeetingGetParams {
4891 meeting_id: String,
4892 #[serde(default)]
4893 root: Option<std::path::PathBuf>,
4894}
4895
4896fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4897 let params: MeetingGetParams =
4898 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4899 let cwd = std::env::current_dir().ok();
4900 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4901 serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904#[derive(Deserialize, Default)]
4909struct RegistryRegisterParams {
4910 entry: Value,
4914 #[serde(default)]
4915 registry_path: Option<std::path::PathBuf>,
4916}
4917
4918fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4919 let params: RegistryRegisterParams =
4920 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4921 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4922 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4923 Ok(Value::Null)
4924}
4925
4926#[derive(Deserialize, Default)]
4927struct RegistryNameParams {
4928 name: String,
4929 #[serde(default)]
4930 registry_path: Option<std::path::PathBuf>,
4931}
4932
4933fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4934 let params: RegistryNameParams =
4935 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4936 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4937 serde_json::from_str(&json).map_err(|e| e.to_string())
4938}
4939
4940fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4941 let params: RegistryNameParams =
4942 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4943 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4944 Ok(Value::Null)
4945}
4946
4947#[derive(Deserialize, Default)]
4948struct RegistryListParams {
4949 #[serde(default)]
4950 registry_path: Option<std::path::PathBuf>,
4951}
4952
4953fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4954 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4955 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4956 serde_json::from_str(&json).map_err(|e| e.to_string())
4957}
4958
4959#[derive(Deserialize, Default)]
4960struct RegistryReapParams {
4961 #[serde(default = "default_reap_age")]
4964 max_age_secs: u64,
4965 #[serde(default)]
4966 registry_path: Option<std::path::PathBuf>,
4967}
4968
4969fn default_reap_age() -> u64 {
4970 60
4971}
4972
4973fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4974 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4975 let json =
4976 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4977 serde_json::from_str(&json).map_err(|e| e.to_string())
4978}
4979
4980async fn handle_a2a_start(
4987 req: &JsonRpcMessage,
4988 session: &crate::session::ClientSession,
4989) -> Result<Value, String> {
4990 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4991 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
4997 serde_json::from_str(&json).map_err(|e| e.to_string())
4998}
4999
5000fn handle_a2a_stop() -> Result<Value, String> {
5001 let json = crate::a2a::stop_a2a()?;
5002 serde_json::from_str(&json).map_err(|e| e.to_string())
5003}
5004
5005fn handle_a2a_status() -> Result<Value, String> {
5006 let json = crate::a2a::a2a_status()?;
5007 serde_json::from_str(&json).map_err(|e| e.to_string())
5008}
5009
5010#[derive(Deserialize)]
5011#[serde(rename_all = "camelCase")]
5012struct A2aSendParams {
5013 endpoint: String,
5014 message: car_a2a::Message,
5015 #[serde(default)]
5016 blocking: bool,
5017 #[serde(default = "default_true")]
5018 ingest_a2ui: bool,
5019 #[serde(default)]
5020 route_auth: Option<A2aRouteAuth>,
5021 #[serde(default)]
5022 allow_untrusted_endpoint: bool,
5023}
5024
5025fn default_true() -> bool {
5026 true
5027}
5028
5029async fn handle_a2a_dispatch(
5039 method: &str,
5040 req: &JsonRpcMessage,
5041 state: &Arc<ServerState>,
5042) -> Result<Value, String> {
5043 let dispatcher = state.a2a_dispatcher().await;
5044 dispatcher
5045 .dispatch(method, req.params.clone())
5046 .await
5047 .map_err(|e| e.to_string())
5048}
5049
5050async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
5051 let params: A2aSendParams =
5052 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5053 let endpoint = trusted_route_endpoint(
5054 Some(params.endpoint.clone()),
5055 params.allow_untrusted_endpoint,
5056 )
5057 .ok_or_else(|| {
5058 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
5059 })?;
5060 let client = match params.route_auth.clone() {
5061 Some(auth) => {
5062 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
5063 }
5064 None => car_a2a::A2aClient::new(endpoint.clone()),
5065 };
5066 let result = client
5067 .send_message(params.message, params.blocking)
5068 .await
5069 .map_err(|e| e.to_string())?;
5070 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
5071 let mut applied = Vec::new();
5072 if params.ingest_a2ui {
5073 state
5074 .a2ui
5075 .validate_payload(&result_value)
5076 .map_err(|e| e.to_string())?;
5077 let routed_endpoint = Some(endpoint.clone());
5078 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
5079 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
5080 if owner.endpoint.is_none() {
5081 owner.with_endpoint(routed_endpoint.clone())
5082 } else {
5083 owner
5084 }
5085 });
5086 applied.push(
5087 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
5088 );
5089 }
5090 }
5091 Ok(serde_json::json!({
5092 "result": result,
5093 "a2ui": {
5094 "applied": applied,
5095 }
5096 }))
5097}
5098
5099async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5107 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5108 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5109 serde_json::from_str(&json).map_err(|e| e.to_string())
5110}
5111
5112async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5113 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5114 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5115 serde_json::from_str(&json).map_err(|e| e.to_string())
5116}
5117
5118async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5119 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5120 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5121 serde_json::from_str(&json).map_err(|e| e.to_string())
5122}
5123
5124async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5125 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5126 let json = car_ffi_common::notifications::local(&args_json).await?;
5127 serde_json::from_str(&json).map_err(|e| e.to_string())
5128}
5129
5130async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5131 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5132 let json = car_ffi_common::vision::ocr(&args_json).await?;
5133 serde_json::from_str(&json).map_err(|e| e.to_string())
5134}
5135
5136async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5141 let agents = match state.observer_manifest_path() {
5150 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5151 .map_err(|e| e.to_string())?,
5152 None => {
5153 let supervisor = state.supervisor()?;
5154 supervisor.list().await
5155 }
5156 };
5157 let attached = state.attached_agents.lock().await.clone();
5164 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5165 for a in agents {
5166 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5167 let session_id = attached.get(&a.spec.id).cloned();
5168 if let Some(map) = v.as_object_mut() {
5169 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5170 if let Some(sid) = session_id {
5171 map.insert("session_id".to_string(), Value::String(sid));
5172 }
5173 }
5174 decorated.push(v);
5175 }
5176 Ok(Value::Array(decorated))
5177}
5178
5179async fn handle_agents_upsert(
5180 req: &JsonRpcMessage,
5181 state: &Arc<ServerState>,
5182) -> Result<Value, String> {
5183 let mut params = req.params.clone();
5184 if let Some(name) = params
5193 .get("interpreter")
5194 .and_then(|v| v.as_str())
5195 .map(str::to_string)
5196 {
5197 let resolved =
5198 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5199 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5200 }
5201 let spec: car_registry::supervisor::AgentSpec =
5202 serde_json::from_value(params).map_err(|e| e.to_string())?;
5203 let supervisor = state.supervisor()?;
5204 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5205 serde_json::to_value(agent).map_err(|e| e.to_string())
5206}
5207
5208async fn handle_agents_install(
5222 req: &JsonRpcMessage,
5223 state: &Arc<ServerState>,
5224) -> Result<Value, String> {
5225 let manifest: car_registry::manifest::AgentManifest =
5226 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5227 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5228 let supervisor = state.supervisor()?;
5229 let (report, managed) = supervisor
5230 .install_manifest(manifest, &host)
5231 .await
5232 .map_err(|e| e.to_string())?;
5233 Ok(serde_json::json!({
5234 "report": {
5235 "missingOptional": report
5236 .missing_optional
5237 .iter()
5238 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5239 .collect::<Vec<_>>(),
5240 },
5241 "agent": managed,
5242 }))
5243}
5244
5245async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5246 let entries = match state.observer_manifest_path() {
5252 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5253 .map_err(|e| e.to_string())?,
5254 None => {
5255 let supervisor = state.supervisor()?;
5256 supervisor.health().await
5257 }
5258 };
5259 serde_json::to_value(entries).map_err(|e| e.to_string())
5260}
5261
5262fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5263 req.params
5264 .get("id")
5265 .and_then(Value::as_str)
5266 .map(str::to_string)
5267 .ok_or_else(|| "missing required `id` parameter".to_string())
5268}
5269
5270async fn handle_agents_remove(
5271 req: &JsonRpcMessage,
5272 state: &Arc<ServerState>,
5273) -> Result<Value, String> {
5274 let id = extract_agent_id(req)?;
5275 let supervisor = state.supervisor()?;
5276 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5277 Ok(serde_json::json!({ "removed": removed }))
5278}
5279
5280async fn handle_agents_start(
5281 req: &JsonRpcMessage,
5282 state: &Arc<ServerState>,
5283) -> Result<Value, String> {
5284 let id = extract_agent_id(req)?;
5285 let supervisor = state.supervisor()?;
5286 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5287 serde_json::to_value(agent).map_err(|e| e.to_string())
5288}
5289
5290async fn handle_agents_stop(
5291 req: &JsonRpcMessage,
5292 state: &Arc<ServerState>,
5293) -> Result<Value, String> {
5294 let id = extract_agent_id(req)?;
5295 let signal: car_registry::supervisor::StopSignal = req
5296 .params
5297 .get("signal")
5298 .map(|v| serde_json::from_value(v.clone()))
5299 .transpose()
5300 .map_err(|e| e.to_string())?
5301 .unwrap_or_default();
5302 let supervisor = state.supervisor()?;
5303 let agent = supervisor
5304 .stop(&id, signal)
5305 .await
5306 .map_err(|e| e.to_string())?;
5307 serde_json::to_value(agent).map_err(|e| e.to_string())
5308}
5309
5310async fn handle_agents_restart(
5311 req: &JsonRpcMessage,
5312 state: &Arc<ServerState>,
5313) -> Result<Value, String> {
5314 let id = extract_agent_id(req)?;
5315 let supervisor = state.supervisor()?;
5316 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5317 serde_json::to_value(agent).map_err(|e| e.to_string())
5318}
5319
5320async fn handle_agents_tail_log(
5321 req: &JsonRpcMessage,
5322 state: &Arc<ServerState>,
5323) -> Result<Value, String> {
5324 let id = extract_agent_id(req)?;
5325 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5326 let supervisor = state.supervisor()?;
5327 let lines = supervisor
5328 .tail_log(&id, n)
5329 .await
5330 .map_err(|e| e.to_string())?;
5331 Ok(serde_json::json!({ "lines": lines }))
5332}
5333
5334async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5345 let include_health = req
5346 .params
5347 .get("include_health")
5348 .and_then(Value::as_bool)
5349 .unwrap_or(false);
5350 let json = car_ffi_common::external_agents::list(include_health).await?;
5351 serde_json::from_str(&json).map_err(|e| e.to_string())
5352}
5353
5354async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5355 let include_health = req
5356 .params
5357 .get("include_health")
5358 .and_then(Value::as_bool)
5359 .unwrap_or(false);
5360 let json = car_ffi_common::external_agents::detect(include_health).await?;
5361 serde_json::from_str(&json).map_err(|e| e.to_string())
5362}
5363
5364async fn handle_agents_invoke_external(
5382 req: &JsonRpcMessage,
5383 state: &Arc<ServerState>,
5384 host_session: &Arc<crate::session::ClientSession>,
5385) -> Result<Value, String> {
5386 let id = req
5387 .params
5388 .get("id")
5389 .and_then(Value::as_str)
5390 .ok_or_else(|| "missing required `id` parameter".to_string())?
5391 .to_string();
5392 let task = req
5393 .params
5394 .get("task")
5395 .and_then(Value::as_str)
5396 .ok_or_else(|| "missing required `task` parameter".to_string())?
5397 .to_string();
5398 let stream = req
5399 .params
5400 .get("stream")
5401 .and_then(Value::as_bool)
5402 .unwrap_or(false);
5403 let session_id = req
5404 .params
5405 .get("session_id")
5406 .and_then(Value::as_str)
5407 .map(str::to_string)
5408 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5409
5410 let mut options_value = req.params.clone();
5416 if let Some(obj) = options_value.as_object_mut() {
5417 obj.remove("id");
5418 obj.remove("task");
5419 obj.remove("stream");
5420 obj.remove("session_id");
5421 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5430 if !has_explicit_mcp {
5431 if let Some(url) = state.mcp_url.get() {
5432 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5433 }
5434 }
5435 }
5436
5437 if !stream {
5438 let options_json = options_value.to_string();
5441 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5442 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5443 append_external_agent_audit(&id, &task, &options_value, &result);
5444 return Ok(result);
5445 }
5446
5447 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5453 .map_err(|e| format!("invalid options: {e}"))?;
5454
5455 {
5465 let mut chats = state.chat_sessions.lock().await;
5475 chats.entry(session_id.clone()).or_insert_with(|| {
5476 let created_at = std::time::SystemTime::now()
5477 .duration_since(std::time::UNIX_EPOCH)
5478 .map(|d| d.as_secs())
5479 .unwrap_or(0);
5480 crate::session::ChatSession {
5481 agent_id: id.clone(),
5482 host_client_id: host_session.client_id.clone(),
5483 created_at,
5484 }
5485 });
5486 }
5487
5488 use tokio::sync::mpsc;
5495 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5496
5497 let drain_state = state.clone();
5498 let drain_session_id = session_id.clone();
5499 let drain_agent_id = id.clone();
5500 tokio::spawn(async move {
5501 while let Some(event) = rx.recv().await {
5502 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5503 }
5504 });
5505
5506 let emitter_tx = tx.clone();
5507 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5508 let _ = emitter_tx.send(event);
5513 });
5514
5515 let spawn_state = state.clone();
5521 let spawn_session_id = session_id.clone();
5522 let spawn_id = id.clone();
5523 let spawn_task = task.clone();
5524 let spawn_options = options_value.clone();
5525 tokio::spawn(async move {
5526 let outcome =
5527 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5528 .await;
5529 drop(tx); let terminal_params: Value;
5536 let result_value: Value;
5537 match outcome {
5538 Ok(res) => {
5539 let mut parts: Vec<String> = Vec::new();
5546 if res.turns > 0 {
5547 parts.push(format!(
5548 "{} turn{}",
5549 res.turns,
5550 if res.turns == 1 { "" } else { "s" }
5551 ));
5552 }
5553 if res.tool_calls > 0 {
5554 parts.push(format!(
5555 "{} tool{}",
5556 res.tool_calls,
5557 if res.tool_calls == 1 { "" } else { "s" }
5558 ));
5559 }
5560 if res.duration_ms > 0 {
5561 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5562 }
5563 let summary = if parts.is_empty() {
5564 "stop".to_string()
5565 } else {
5566 parts.join(" · ")
5567 };
5568 if res.is_error {
5569 terminal_params = serde_json::json!({
5570 "session_id": spawn_session_id,
5571 "agent_id": spawn_id,
5572 "kind": "error",
5573 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5574 });
5575 } else {
5576 terminal_params = serde_json::json!({
5577 "session_id": spawn_session_id,
5578 "agent_id": spawn_id,
5579 "kind": "done",
5580 "finish_reason": summary,
5581 });
5582 }
5583 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5584 }
5585 Err(e) => {
5586 let message = format!("{e}");
5587 terminal_params = serde_json::json!({
5588 "session_id": spawn_session_id,
5589 "agent_id": spawn_id,
5590 "kind": "error",
5591 "error": message.clone(),
5592 });
5593 result_value = serde_json::json!({ "is_error": true, "error": message });
5594 }
5595 }
5596 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5597 spawn_state
5598 .chat_sessions
5599 .lock()
5600 .await
5601 .remove(&spawn_session_id);
5602 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5603 });
5604
5605 Ok(serde_json::json!({
5606 "accepted": true,
5607 "session_id": session_id,
5608 }))
5609}
5610
5611async fn emit_external_chat_event(
5628 state: &Arc<ServerState>,
5629 session_id: &str,
5630 agent_id: &str,
5631 event: car_external_agents::StreamEvent,
5632) {
5633 use car_external_agents::StreamEvent;
5634 match event {
5635 StreamEvent::Assistant(a) => {
5636 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5637 for block in content {
5638 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5639 match block_type {
5640 "text" => {
5641 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5642 if !text.is_empty() {
5643 let params = serde_json::json!({
5644 "session_id": session_id,
5645 "agent_id": agent_id,
5646 "kind": "token",
5647 "delta": text,
5648 });
5649 send_external_chat_frame(state, session_id, params).await;
5650 }
5651 }
5652 }
5653 "tool_use" => {
5654 let name = block
5655 .get("name")
5656 .and_then(|v| v.as_str())
5657 .unwrap_or("(unknown tool)");
5658 let params = serde_json::json!({
5659 "session_id": session_id,
5660 "agent_id": agent_id,
5661 "kind": "tool_call",
5662 "detail": name,
5663 });
5664 send_external_chat_frame(state, session_id, params).await;
5665 }
5666 _ => {}
5667 }
5668 }
5669 }
5670 }
5671 _ => {
5672 }
5677 }
5678}
5679
5680async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5685 use futures::SinkExt;
5686 use tokio_tungstenite::tungstenite::Message;
5687
5688 let host_client_id = state
5689 .chat_sessions
5690 .lock()
5691 .await
5692 .get(session_id)
5693 .map(|s| s.host_client_id.clone());
5694 let Some(host_client_id) = host_client_id else {
5695 return;
5696 };
5697 let host_channel = {
5698 let sessions = state.sessions.lock().await;
5699 sessions.get(&host_client_id).map(|s| s.channel.clone())
5700 };
5701 let Some(channel) = host_channel else {
5702 return;
5703 };
5704 let frame = serde_json::json!({
5705 "jsonrpc": "2.0",
5706 "method": "agents.chat.event",
5707 "params": params,
5708 });
5709 if let Ok(text) = serde_json::to_string(&frame) {
5710 let _ = channel
5711 .write
5712 .lock()
5713 .await
5714 .send(Message::Text(text.into()))
5715 .await;
5716 }
5717}
5718
5719fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5725 use std::io::Write;
5726 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5727 Some(home) => home.join(".car"),
5728 None => return,
5729 };
5730 if std::fs::create_dir_all(&car_dir).is_err() {
5731 return;
5732 }
5733 let path = car_dir.join("external-agents.jsonl");
5734 let record = serde_json::json!({
5735 "ts": chrono::Utc::now().to_rfc3339(),
5736 "adapter_id": id,
5737 "task": task,
5738 "options": options,
5739 "result": result,
5740 });
5741 let line = match serde_json::to_string(&record) {
5742 Ok(s) => s,
5743 Err(_) => return,
5744 };
5745 if let Ok(mut f) = std::fs::OpenOptions::new()
5746 .create(true)
5747 .append(true)
5748 .open(&path)
5749 {
5750 let _ = writeln!(f, "{}", line);
5751 } else {
5752 tracing::warn!(
5753 path = %path.display(),
5754 "failed to append external-agent audit record"
5755 );
5756 }
5757}
5758
5759async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5765 let force = req
5766 .params
5767 .get("force")
5768 .and_then(Value::as_bool)
5769 .unwrap_or(false);
5770 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5771 let json = car_ffi_common::external_agents::health_one(id, force).await?;
5772 serde_json::from_str(&json).map_err(|e| e.to_string())
5773 } else {
5774 let json = car_ffi_common::external_agents::health(force).await?;
5775 serde_json::from_str(&json).map_err(|e| e.to_string())
5776 }
5777}
5778
5779const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5797
5798async fn handle_agents_chat(
5803 req: &JsonRpcMessage,
5804 state: &Arc<ServerState>,
5805 host_session: &Arc<crate::session::ClientSession>,
5806) -> Result<Value, String> {
5807 use futures::SinkExt;
5808 use tokio::sync::oneshot;
5809 use tokio_tungstenite::tungstenite::Message;
5810
5811 let agent_id = req
5812 .params
5813 .get("agent_id")
5814 .and_then(Value::as_str)
5815 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5816 .to_string();
5817 let prompt = req
5818 .params
5819 .get("prompt")
5820 .and_then(Value::as_str)
5821 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5822 .to_string();
5823 let session_id = req
5824 .params
5825 .get("session_id")
5826 .and_then(Value::as_str)
5827 .map(str::to_string)
5828 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5829 let stream = req
5830 .params
5831 .get("stream")
5832 .and_then(Value::as_bool)
5833 .unwrap_or(true);
5834 let voice_input = req
5835 .params
5836 .get("voice_input")
5837 .and_then(Value::as_bool)
5838 .unwrap_or(false);
5839
5840 let agent_client_id = state
5846 .attached_agents
5847 .lock()
5848 .await
5849 .get(&agent_id)
5850 .cloned()
5851 .ok_or_else(|| {
5852 format!(
5853 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5854 agent_id
5855 )
5856 })?;
5857 let agent_channel = {
5858 let sessions = state.sessions.lock().await;
5859 sessions
5860 .get(&agent_client_id)
5861 .map(|s| s.channel.clone())
5862 .ok_or_else(|| {
5863 format!(
5864 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5865 agent_id, agent_client_id
5866 )
5867 })?
5868 };
5869
5870 {
5876 let created_at = std::time::SystemTime::now()
5877 .duration_since(std::time::UNIX_EPOCH)
5878 .map(|d| d.as_secs())
5879 .unwrap_or(0);
5880 state.chat_sessions.lock().await.insert(
5881 session_id.clone(),
5882 crate::session::ChatSession {
5883 agent_id: agent_id.clone(),
5884 host_client_id: host_session.client_id.clone(),
5885 created_at,
5886 },
5887 );
5888 }
5889
5890 let request_id = agent_channel.next_request_id();
5897 let (tx, rx) = oneshot::channel();
5898 agent_channel
5899 .pending
5900 .lock()
5901 .await
5902 .insert(request_id.clone(), tx);
5903
5904 let rpc_request = serde_json::json!({
5905 "jsonrpc": "2.0",
5906 "method": "agent.chat",
5907 "params": {
5908 "session_id": session_id,
5909 "prompt": prompt,
5910 "stream": stream,
5911 "context": {
5912 "host_client_id": host_session.client_id,
5913 "voice_input": voice_input,
5914 },
5915 },
5916 "id": request_id,
5917 });
5918 let msg = Message::Text(
5919 serde_json::to_string(&rpc_request)
5920 .map_err(|e| e.to_string())?
5921 .into(),
5922 );
5923 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5924 agent_channel.pending.lock().await.remove(&request_id);
5928 state.chat_sessions.lock().await.remove(&session_id);
5929 return Err(format!(
5930 "failed to deliver agent.chat to `{}`: {}",
5931 agent_id, e
5932 ));
5933 }
5934
5935 let ack = match tokio::time::timeout(
5940 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5941 rx,
5942 )
5943 .await
5944 {
5945 Ok(Ok(resp)) => resp,
5946 Ok(Err(_)) => {
5947 state.chat_sessions.lock().await.remove(&session_id);
5949 return Err(format!(
5950 "agent `{}` disconnected before acking agents.chat",
5951 agent_id
5952 ));
5953 }
5954 Err(_) => {
5955 agent_channel.pending.lock().await.remove(&request_id);
5959 state.chat_sessions.lock().await.remove(&session_id);
5960 return Err(format!(
5961 "agent `{}` did not ack agents.chat within {}s",
5962 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5963 ));
5964 }
5965 };
5966
5967 if let Some(err) = ack.error {
5968 state.chat_sessions.lock().await.remove(&session_id);
5970 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5971 }
5972
5973 Ok(serde_json::json!({
5974 "accepted": true,
5975 "session_id": session_id,
5976 }))
5977}
5978
5979async fn handle_agents_chat_cancel(
5987 req: &JsonRpcMessage,
5988 state: &Arc<ServerState>,
5989) -> Result<Value, String> {
5990 use futures::SinkExt;
5991 use tokio_tungstenite::tungstenite::Message;
5992
5993 let session_id = req
5994 .params
5995 .get("session_id")
5996 .and_then(Value::as_str)
5997 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5998 .to_string();
5999
6000 let chat = state.chat_sessions.lock().await.remove(&session_id);
6001 let chat = match chat {
6002 Some(c) => c,
6003 None => {
6004 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
6006 }
6007 };
6008
6009 let agent_client_id = state
6012 .attached_agents
6013 .lock()
6014 .await
6015 .get(&chat.agent_id)
6016 .cloned();
6017 if let Some(client_id) = agent_client_id {
6018 let channel_opt = {
6019 let sessions = state.sessions.lock().await;
6020 sessions.get(&client_id).map(|s| s.channel.clone())
6021 };
6022 if let Some(channel) = channel_opt {
6023 let notification = serde_json::json!({
6024 "jsonrpc": "2.0",
6025 "method": "agent.chat.cancel",
6026 "params": { "session_id": session_id },
6027 });
6028 if let Ok(text) = serde_json::to_string(¬ification) {
6029 let _ = channel
6030 .write
6031 .lock()
6032 .await
6033 .send(Message::Text(text.into()))
6034 .await;
6035 }
6036 }
6037 }
6038
6039 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
6040}
6041
6042pub(crate) async fn try_forward_agent_chat_event(
6053 parsed: &JsonRpcMessage,
6054 state: &Arc<ServerState>,
6055) -> bool {
6056 use futures::SinkExt;
6057 use tokio_tungstenite::tungstenite::Message;
6058
6059 let Some(method) = parsed.method.as_deref() else {
6063 return false;
6064 };
6065 if method != "agent.chat.event" {
6066 return false;
6067 }
6068 if !parsed.id.is_null() {
6069 return false;
6072 }
6073 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
6074 return false;
6075 };
6076 let session_id = session_id.to_string();
6077
6078 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
6083 let Some(chat) = chat else {
6084 return true; };
6086
6087 let kind = parsed
6096 .params
6097 .get("kind")
6098 .and_then(Value::as_str)
6099 .map(str::to_string)
6100 .unwrap_or_else(|| {
6101 if parsed.params.get("error").is_some() {
6102 "error".to_string()
6103 } else if parsed.params.get("finish_reason").is_some() {
6104 "done".to_string()
6105 } else {
6106 "token".to_string()
6107 }
6108 });
6109
6110 let host_channel = {
6114 let sessions = state.sessions.lock().await;
6115 sessions
6116 .get(&chat.host_client_id)
6117 .map(|s| s.channel.clone())
6118 };
6119 if let Some(channel) = host_channel {
6120 let mut params = parsed.params.clone();
6121 if let Some(obj) = params.as_object_mut() {
6122 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6123 obj.entry("kind")
6128 .or_insert_with(|| Value::String(kind.clone()));
6129 }
6130 let forward = serde_json::json!({
6131 "jsonrpc": "2.0",
6132 "method": "agents.chat.event",
6133 "params": params,
6134 });
6135 if let Ok(text) = serde_json::to_string(&forward) {
6136 let send_result = channel
6137 .write
6138 .lock()
6139 .await
6140 .send(Message::Text(text.into()))
6141 .await;
6142 if let Err(e) = send_result {
6143 tracing::warn!(
6144 session_id = %session_id,
6145 agent_id = %chat.agent_id,
6146 host_client_id = %chat.host_client_id,
6147 kind = %kind,
6148 error = %e,
6149 "agent.chat.event forward to host failed at the WS send step"
6150 );
6151 }
6152 }
6153 } else {
6154 tracing::warn!(
6161 session_id = %session_id,
6162 agent_id = %chat.agent_id,
6163 host_client_id = %chat.host_client_id,
6164 kind = %kind,
6165 "agent.chat.event from supervised agent had no host channel \
6166 (host disconnected since `agents.chat`); dropping routing entry"
6167 );
6168 state.chat_sessions.lock().await.remove(&session_id);
6169 return true;
6170 }
6171
6172 if matches!(kind.as_str(), "done" | "error") {
6176 state.chat_sessions.lock().await.remove(&session_id);
6177 }
6178
6179 true
6180}
6181
6182#[cfg(test)]
6183mod fd_leak_regression {
6184 use super::run_dispatch;
6191 use futures::SinkExt;
6192 use std::sync::Arc;
6193 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6194
6195 #[tokio::test]
6196 async fn abrupt_read_error_still_runs_session_cleanup() {
6197 let tmp = tempfile::TempDir::new().unwrap();
6198 let state = Arc::new(crate::session::ServerState::standalone(
6199 tmp.path().to_path_buf(),
6200 ));
6201
6202 let read = futures::stream::iter(vec![Err::<Message, WsError>(
6206 WsError::ConnectionClosed,
6207 )]);
6208 let write: crate::session::WsSink = Box::pin(
6209 futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6210 );
6211
6212 let result =
6213 run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6214 assert!(
6215 result.is_ok(),
6216 "run_dispatch must return Ok after cleanup, got {result:?}"
6217 );
6218
6219 assert!(
6222 state.sessions.lock().await.is_empty(),
6223 "state.sessions must be empty after an abrupt disconnect (car#209)"
6224 );
6225 }
6226}