1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176 while let Some(msg) = read.next().await {
177 while conn_tasks.try_join_next().is_some() {}
180
181 let msg = match msg {
190 Ok(m) => m,
191 Err(e) => {
192 info!("read error from {}: {}; closing", client_id, e);
193 break;
194 }
195 };
196 if msg.is_text() {
197 let text = match msg.to_text() {
198 Ok(t) => t,
199 Err(e) => {
200 info!("non-text frame from {}: {}; closing", client_id, e);
201 break;
202 }
203 };
204 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205 Ok(m) => m,
206 Err(e) => {
207 send_response(
208 &session.channel,
209 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210 )
211 .await
212 .ok();
213 continue;
214 }
215 };
216
217 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219 if let Some(id_str) = parsed.id.as_str() {
220 let mut pending = session.channel.pending.lock().await;
221 if let Some(tx) = pending.remove(id_str) {
222 let tool_resp = if let Some(result) = parsed.result {
223 ToolExecuteResponse {
224 action_id: id_str.to_string(),
225 output: Some(result),
226 error: None,
227 }
228 } else {
229 let err_msg = parsed
230 .error
231 .as_ref()
232 .and_then(|e| e.get("message"))
233 .and_then(|m| m.as_str())
234 .unwrap_or("unknown error")
235 .to_string();
236 ToolExecuteResponse {
237 action_id: id_str.to_string(),
238 output: None,
239 error: Some(err_msg),
240 }
241 };
242 let _ = tx.send(tool_resp);
243 continue;
244 }
245 }
246 }
247
248 if try_forward_agent_chat_event(&parsed, &state).await {
257 continue;
258 }
259
260 if let Some(method) = &parsed.method {
262 info!(method = %method, "dispatching JSON-RPC method");
263
264 if state.auth_token.get().is_some()
272 && !session
273 .authenticated
274 .load(std::sync::atomic::Ordering::Acquire)
275 && method != "session.auth"
276 {
277 let resp = JsonRpcResponse::error(
278 parsed.id.clone(),
279 -32001,
280 "auth required: send `session.auth` with the per-launch token \
281 from ~/Library/Application Support/ai.parslee.car/auth-token \
282 (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283 or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284 as the first frame on this connection",
285 );
286 let _ = send_response(&session.channel, resp).await;
287 info!(client = %client_id, method = %method,
288 "rejecting non-auth method on unauthenticated session; closing");
289 break;
290 }
291
292 if state.approval_gate.requires_approval(method.as_str()) {
304 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305 Ok(()) => {}
306 Err(reason) => {
307 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308 let _ = send_response(&session.channel, resp).await;
309 info!(
310 client = %client_id,
311 method = %method,
312 reason = %reason,
313 "approval gate blocked dispatch"
314 );
315 continue;
316 }
317 }
318 }
319
320 let session_task = session.clone();
335 let state_task = state.clone();
336 let method_owned = method.clone();
337 let parsed_task = parsed;
338 conn_tasks.spawn(async move {
341 let session = session_task;
342 let state = state_task;
343 let parsed = parsed_task;
344 let result = match method_owned.as_str() {
345 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346 "parslee.auth" => handle_parslee_auth().await,
347 "auth.start" => handle_auth_start(&parsed).await,
348 "auth.complete" => handle_auth_complete(&parsed).await,
349 "auth.status" => handle_auth_status().await,
350 "auth.logout" => handle_auth_logout().await,
351 "session.init" => handle_session_init(&parsed, &session).await,
352 "host.subscribe" => handle_host_subscribe(&session, &state).await,
353 "host.agents" => handle_host_agents(&session).await,
354 "host.events" => handle_host_events(&parsed, &session).await,
355 "host.approvals" => handle_host_approvals(&session).await,
356 "host.register_agent" => {
357 handle_host_register_agent(&parsed, &session).await
358 }
359 "host.unregister_agent" => {
360 handle_host_unregister_agent(&parsed, &session).await
361 }
362 "host.set_status" => handle_host_set_status(&parsed, &session).await,
363 "host.notify" => handle_host_notify(&parsed, &session).await,
364 "host.request_approval" => {
365 handle_host_request_approval(&parsed, &session).await
366 }
367 "host.resolve_approval" => {
368 handle_host_resolve_approval(&parsed, &session).await
369 }
370 "tools.register" => handle_tools_register(&parsed, &session).await,
371 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
372 "policy.register" => handle_policy_register(&parsed, &session).await,
373 "session.policy.open" => handle_session_policy_open(&session).await,
374 "session.policy.close" => {
375 handle_session_policy_close(&parsed, &session).await
376 }
377 "verify" => handle_verify(&parsed, &session).await,
378 "state.get" => handle_state_get(&parsed, &session).await,
379 "state.set" => handle_state_set(&parsed, &session).await,
380 "state.exists" => handle_state_exists(&parsed, &session).await,
381 "state.keys" => handle_state_keys(&parsed, &session).await,
382 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
383 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
384 "memory.query" => handle_memory_query(&parsed, &session).await,
385 "memory.build_context" => {
386 handle_memory_build_context(&parsed, &session).await
387 }
388 "memory.build_context_fast" => {
389 handle_memory_build_context_fast(&parsed, &session).await
390 }
391 "memory.consolidate" => handle_memory_consolidate(&session).await,
392 "memory.fact_count" => handle_memory_fact_count(&session).await,
393 "memory.persist" => handle_memory_persist(&parsed, &session).await,
394 "memory.load" => handle_memory_load(&parsed, &session).await,
395 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
396 "skill.find" => handle_skill_find(&parsed, &session).await,
397 "skill.report" => handle_skill_report(&parsed, &session).await,
398 "skill.repair" => handle_skill_repair(&parsed, &session).await,
399 "skills.ingest_distilled" => {
400 handle_skills_ingest_distilled(&parsed, &session).await
401 }
402 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
403 "skills.domains_needing_evolution" => {
404 handle_skills_domains_needing_evolution(&parsed, &session).await
405 }
406 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
407 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
408 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
409 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
410 "multi.vote" => handle_multi_vote(&parsed, &session).await,
411 "scheduler.create" => handle_scheduler_create(&parsed),
412 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
413 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
414 "infer" => handle_infer(&parsed, &state, &session).await,
415 "image.generate" => handle_image_generate(&parsed, &state).await,
416 "video.generate" => handle_video_generate(&parsed, &state).await,
417 "embed" => handle_embed(&parsed, &state).await,
418 "classify" => handle_classify(&parsed, &state).await,
419 "tokenize" => handle_tokenize(&parsed, &state).await,
420 "detokenize" => handle_detokenize(&parsed, &state).await,
421 "rerank" => handle_rerank(&parsed, &state).await,
422 "transcribe" => handle_transcribe(&parsed, &state).await,
423 "synthesize" => handle_synthesize(&parsed, &state).await,
424 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
425 "speech.prepare" => handle_speech_prepare(&state).await,
426 "models.route" => handle_models_route(&parsed, &state).await,
427 "models.stats" => handle_models_stats(&state).await,
428 "outcomes.resolve_pending" => {
429 handle_outcomes_resolve_pending(&parsed, &state).await
430 }
431 "events.count" => handle_events_count(&session).await,
432 "events.stats" => handle_events_stats(&session).await,
433 "events.truncate" => handle_events_truncate(&parsed, &session).await,
434 "events.clear" => handle_events_clear(&session).await,
435 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
436 "models.list" => handle_models_list(&state),
437 "models.register" => handle_models_register(&parsed, &state).await,
438 "models.unregister" => handle_models_unregister(&parsed, &state).await,
439 "models.list_unified" => handle_models_list_unified(&state),
440 "models.search" => handle_models_search(&parsed, &state),
441 "models.upgrades" => handle_models_upgrades(&state),
442 "models.pull" => handle_models_pull(&parsed, &state).await,
443 "models.install" => handle_models_pull(&parsed, &state).await,
444 "skills.distill" => handle_skills_distill(&parsed, &state).await,
445 "skills.list" => handle_skills_list(&parsed, &session).await,
446 "browser.run" => handle_browser_run(&parsed, &session).await,
447 "browser.close" => handle_browser_close(&session).await,
448 "secret.put" => handle_secret_put(&parsed),
449 "secret.get" => handle_secret_get(&parsed),
450 "secret.delete" => handle_secret_delete(&parsed),
451 "secret.status" => handle_secret_status(&parsed),
452 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
453 "permissions.status" => handle_perm_status(&parsed),
454 "permissions.request" => handle_perm_request(&parsed),
455 "permissions.explain" => handle_perm_explain(&parsed),
456 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
457 "accounts.list" => car_ffi_common::accounts::list(),
458 "accounts.open" => {
459 #[derive(serde::Deserialize, Default)]
460 struct OpenParams {
461 #[serde(default)]
462 account_id: Option<String>,
463 }
464 let p: OpenParams =
465 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
466 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
467 }
468 "calendar.list" => car_ffi_common::integrations::calendar_list(),
469 "calendar.events" => handle_calendar_events(&parsed),
470 "contacts.containers" => {
471 car_ffi_common::integrations::contacts_containers()
472 }
473 "contacts.find" => handle_contacts_find(&parsed),
474 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
475 "mail.inbox" => handle_mail_inbox(&parsed),
476 "mail.send" => handle_mail_send(&parsed),
477 "messages.services" => car_ffi_common::integrations::messages_services(),
478 "messages.chats" => handle_messages_chats(&parsed),
479 "messages.send" => handle_messages_send(&parsed),
480 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
481 "notes.find" => handle_notes_find(&parsed),
482 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
483 "reminders.items" => handle_reminders_items(&parsed),
484 "photos.albums" => car_ffi_common::integrations::photos_albums(),
485 "bookmarks.list" => handle_bookmarks_list(&parsed),
486 "files.locations" => car_ffi_common::integrations::files_locations(),
487 "keychain.status" => car_ffi_common::integrations::keychain_status(),
488 "health.status" => car_ffi_common::health::status(),
489 "health.sleep" => handle_health_sleep(&parsed),
490 "health.workouts" => handle_health_workouts(&parsed),
491 "health.activity" => handle_health_activity(&parsed),
492 "voice.transcribe_stream.start" => {
493 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
494 }
495 "voice.transcribe_stream.stop" => {
496 handle_voice_transcribe_stream_stop(&parsed, &state).await
497 }
498 "voice.transcribe_stream.push" => {
499 handle_voice_transcribe_stream_push(&parsed, &state).await
500 }
501 "voice.tts_stream.start" => {
502 handle_voice_tts_stream_start(&parsed, &session).await
503 }
504 "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
505 "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
506 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
507 "voice.dispatch_turn" => {
508 handle_voice_dispatch_turn(&parsed, &state, &session).await
509 }
510 "voice.cancel_turn" => handle_voice_cancel_turn().await,
511 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
512 "inference.register_runner" => {
513 handle_inference_register_runner(&session).await
514 }
515 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
516 "inference.runner.complete" => {
517 handle_inference_runner_complete(&parsed).await
518 }
519 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
520 "voice.providers.list" => {
521 serde_json::from_str::<serde_json::Value>(
525 &car_voice::list_voice_providers_json(),
526 )
527 .map_err(|e| e.to_string())
528 }
529 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
530 .await
531 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
533 .await
534 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
535 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
536 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
537 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
538 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
539 "workflow.run" => handle_workflow_run(&parsed, &session).await,
540 "workflow.verify" => handle_workflow_verify(&parsed),
541 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
542 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
543 "meeting.list" => handle_meeting_list(&parsed),
544 "meeting.get" => handle_meeting_get(&parsed),
545 "registry.register" => handle_registry_register(&parsed),
546 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
547 "registry.unregister" => handle_registry_unregister(&parsed),
548 "registry.list" => handle_registry_list(&parsed),
549 "registry.reap" => handle_registry_reap(&parsed),
550 "admission.status" => handle_admission_status(&state),
551 "a2a.start" => handle_a2a_start(&parsed, &session).await,
552 "a2a.stop" => handle_a2a_stop(),
553 "a2a.status" => handle_a2a_status(),
554 "a2a.send" => handle_a2a_send(&parsed, &state).await,
555 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
556 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
557 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
558 "a2ui.reap" => handle_a2ui_reap(&state).await,
559 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
560 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
561 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
562 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
563 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
564 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
565 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
566 "automation.run_applescript" => handle_run_applescript(&parsed).await,
567 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
568 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
569 "notifications.local" => handle_local_notification(&parsed).await,
570 "vision.ocr" => handle_vision_ocr(&parsed).await,
571 "agents.list" => handle_agents_list(&state).await,
572 "agents.health" => handle_agents_health(&state).await,
573 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
574 "agents.install" => handle_agents_install(&parsed, &state).await,
575 "agents.remove" => handle_agents_remove(&parsed, &state).await,
576 "agents.start" => handle_agents_start(&parsed, &state).await,
577 "agents.stop" => handle_agents_stop(&parsed, &state).await,
578 "agents.restart" => handle_agents_restart(&parsed, &state).await,
579 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
580 "agents.list_external" => handle_agents_list_external(&parsed).await,
581 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
582 "agents.health_external" => handle_agents_health_external(&parsed).await,
583 "agents.invoke_external" => {
584 handle_agents_invoke_external(&parsed, &state, &session).await
585 }
586 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
587 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
588 "message/send"
595 | "SendMessage"
596 | "message/stream"
597 | "SendStreamingMessage"
598 | "tasks/get"
599 | "GetTask"
600 | "tasks/list"
601 | "ListTasks"
602 | "tasks/cancel"
603 | "CancelTask"
604 | "tasks/resubscribe"
605 | "SubscribeToTask"
606 | "tasks/pushNotificationConfig/set"
607 | "CreateTaskPushNotificationConfig"
608 | "tasks/pushNotificationConfig/get"
609 | "GetTaskPushNotificationConfig"
610 | "tasks/pushNotificationConfig/list"
611 | "ListTaskPushNotificationConfigs"
612 | "tasks/pushNotificationConfig/delete"
613 | "DeleteTaskPushNotificationConfig"
614 | "agent/getAuthenticatedExtendedCard"
615 | "GetExtendedAgentCard" => {
616 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
617 }
618 _ => Err(format!("unknown method: {}", method_owned)),
619 };
620
621 let resp = match result {
622 Ok(value) => JsonRpcResponse::success(parsed.id, value),
623 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
624 };
625 let _ = send_response(&session.channel, resp).await;
626 });
627 }
628 } else if msg.is_binary() {
629 let bytes = msg.into_data();
636 let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
637 Ok(p) => p,
638 Err(e) => {
639 tracing::warn!("binary frame from {} rejected: {}", client_id, e);
640 continue;
641 }
642 };
643 match parsed.frame_type {
644 car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
645 let registry = state.voice_sessions.clone();
646 let payload_owned = parsed.payload.to_vec();
647 let session_id_owned = parsed.session_id_hex.clone();
648 conn_tasks.spawn(async move {
649 if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
650 &session_id_owned,
651 &payload_owned,
652 registry,
653 )
654 .await
655 {
656 tracing::warn!(
657 "binary PCM push to session {} failed: {}",
658 session_id_owned,
659 e
660 );
661 }
662 });
663 }
664 other => {
665 tracing::debug!(
666 "binary frame type {:#04x} from {} not accepted server-side",
667 other,
668 client_id
669 );
670 }
671 }
672 } else if msg.is_close() {
673 info!("Client {} disconnected", client_id);
674 break;
675 }
676 }
677
678 conn_tasks.abort_all();
683
684 session.host.unsubscribe(&client_id).await;
685 session.host.reap_session_approvals(&client_id).await;
691 state.a2ui_subscribers.lock().await.remove(&client_id);
692
693 let _removed = state.remove_session(&client_id).await;
704 {
705 let mut pending = session.channel.pending.lock().await;
706 pending.clear();
707 }
708
709 Ok(())
710}
711
712async fn send_response(
713 channel: &WsChannel,
714 resp: JsonRpcResponse,
715) -> Result<(), Box<dyn std::error::Error>> {
716 use futures::SinkExt;
717 let json = serde_json::to_string(&resp)?;
718 channel
719 .write
720 .lock()
721 .await
722 .send(Message::Text(json.into()))
723 .await?;
724 Ok(())
725}
726
727async fn handle_host_subscribe(
730 session: &crate::session::ClientSession,
731 state: &Arc<ServerState>,
732) -> Result<Value, String> {
733 session
734 .host
735 .subscribe(&session.client_id, session.channel.clone())
736 .await;
737 serde_json::to_value(HostSnapshot {
738 subscribed: true,
739 agents: session.host.agents().await,
740 approvals: session.host.approvals().await,
741 events: session.host.events(50).await,
742 identity: Some(daemon_identity(state)),
743 })
744 .map_err(|e| e.to_string())
745}
746
747fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
755 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
762 (
763 Some(p.to_string_lossy().into_owned()),
764 car_proto::HostManifestRole::Observer,
765 )
766 } else if let Some(s) = state.supervisor_if_installed() {
767 (
768 Some(s.manifest_path().to_string_lossy().into_owned()),
769 car_proto::HostManifestRole::Owner,
770 )
771 } else {
772 (None, car_proto::HostManifestRole::None)
773 };
774 car_proto::HostIdentity {
775 version: env!("CARGO_PKG_VERSION").to_string(),
776 pid: std::process::id(),
777 manifest_path,
778 manifest_role,
779 parslee: state
780 .parslee_session
781 .get()
782 .map(|session| session.identity.clone()),
783 }
784}
785
786async fn handle_parslee_auth() -> Result<Value, String> {
797 let session = crate::parslee_auth::load_or_refresh()
798 .await?
799 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
800 Ok(serde_json::json!({
801 "authenticated": true,
802 "token_type": "Bearer",
803 "access_token": session.access_token,
804 "authorization_header": format!("Bearer {}", session.access_token),
805 "identity": session.identity,
806 }))
807}
808
809async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
815 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
816 let client_id = req
817 .params
818 .get("client_id")
819 .and_then(|v| v.as_str())
820 .unwrap_or("parslee-car");
821 let redirect_uri = req
822 .params
823 .get("redirect_uri")
824 .and_then(|v| v.as_str())
825 .ok_or_else(|| "redirect_uri is required".to_string())?;
826 let provider = req.params.get("provider").and_then(|v| v.as_str());
827 let state = car_auth::new_state();
828 let verifier = car_auth::pkce_verifier();
829 let challenge = car_auth::pkce_challenge(&verifier);
830 let url =
831 car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
832 Ok(serde_json::json!({
833 "authorize_url": url,
834 "state": state,
835 "verifier": verifier,
836 }))
837}
838
839async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
840 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
841 let client_id = req
842 .params
843 .get("client_id")
844 .and_then(|v| v.as_str())
845 .unwrap_or("parslee-car");
846 let redirect_uri = req
847 .params
848 .get("redirect_uri")
849 .and_then(|v| v.as_str())
850 .ok_or_else(|| "redirect_uri is required".to_string())?;
851 let code = req
852 .params
853 .get("code")
854 .and_then(|v| v.as_str())
855 .ok_or_else(|| "code is required".to_string())?;
856 let verifier = req
857 .params
858 .get("verifier")
859 .and_then(|v| v.as_str())
860 .ok_or_else(|| "verifier is required".to_string())?;
861 let token =
862 car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
863 car_auth::store_tokens(&api_base, &token)?;
864 Ok(serde_json::json!({ "ok": true }))
865}
866
867async fn handle_auth_status() -> Result<Value, String> {
868 match car_auth::fetch_status(None).await? {
869 Some(session_json) => {
870 let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
871 Ok(serde_json::json!({ "authenticated": true, "session": session }))
872 }
873 None => Ok(serde_json::json!({ "authenticated": false })),
874 }
875}
876
877async fn handle_auth_logout() -> Result<Value, String> {
878 car_auth::clear_tokens()?;
879 Ok(serde_json::json!({ "ok": true }))
880}
881
882async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
883 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
884}
885
886async fn handle_host_events(
887 req: &JsonRpcMessage,
888 session: &crate::session::ClientSession,
889) -> Result<Value, String> {
890 let limit = req
891 .params
892 .get("limit")
893 .and_then(|v| v.as_u64())
894 .unwrap_or(100) as usize;
895 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
896}
897
898async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
899 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
900}
901
902async fn handle_a2ui_apply(
903 req: &JsonRpcMessage,
904 state: &Arc<ServerState>,
905) -> Result<Value, String> {
906 #[derive(Deserialize)]
907 struct Params {
908 #[serde(default)]
909 envelope: Option<car_a2ui::A2uiEnvelope>,
910 #[serde(default)]
911 message: Option<car_a2ui::A2uiEnvelope>,
912 }
913
914 let envelope = if req.params.get("createSurface").is_some()
915 || req.params.get("updateComponents").is_some()
916 || req.params.get("updateDataModel").is_some()
917 || req.params.get("deleteSurface").is_some()
918 {
919 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
920 .map_err(|e| e.to_string())?
921 } else {
922 match serde_json::from_value::<Params>(req.params.clone()) {
923 Ok(params) => params
924 .envelope
925 .or(params.message)
926 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
927 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
928 .map_err(|e| e.to_string())?,
929 }
930 };
931
932 apply_a2ui_envelope(state, envelope, None, None).await
933}
934
935async fn handle_a2ui_ingest(
936 req: &JsonRpcMessage,
937 state: &Arc<ServerState>,
938) -> Result<Value, String> {
939 #[derive(Deserialize)]
940 #[serde(rename_all = "camelCase")]
941 struct Params {
942 #[serde(default)]
943 endpoint: Option<String>,
944 #[serde(default)]
945 a2a_endpoint: Option<String>,
946 #[serde(default)]
947 owner: Option<car_a2ui::A2uiSurfaceOwner>,
948 #[serde(default)]
949 route_auth: Option<A2aRouteAuth>,
950 #[serde(default)]
951 allow_untrusted_endpoint: bool,
952 }
953
954 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
955 endpoint: None,
956 a2a_endpoint: None,
957 owner: None,
958 route_auth: None,
959 allow_untrusted_endpoint: false,
960 });
961 let payload = req.params.get("payload").unwrap_or(&req.params);
962 state
963 .a2ui
964 .validate_payload(payload)
965 .map_err(|e| e.to_string())?;
966 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
967 if envelopes.is_empty() {
968 return Err("no A2UI envelopes found in payload".into());
969 }
970 let endpoint = params.endpoint.or(params.a2a_endpoint);
971 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
972 let owner = params
973 .owner
974 .or_else(|| car_a2ui::owner_from_value(payload))
975 .map(|owner| match endpoint.clone() {
976 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
977 None => owner,
978 });
979
980 let mut results = Vec::new();
981 for envelope in envelopes {
982 let value =
983 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
984 results.push(value);
985 }
986 Ok(serde_json::json!({ "applied": results }))
987}
988
989async fn apply_a2ui_envelope(
990 state: &Arc<ServerState>,
991 envelope: car_a2ui::A2uiEnvelope,
992 owner: Option<car_a2ui::A2uiSurfaceOwner>,
993 route_auth: Option<A2aRouteAuth>,
994) -> Result<Value, String> {
995 let result = state
996 .a2ui
997 .apply_with_owner(envelope, owner)
998 .await
999 .map_err(|e| e.to_string())?;
1000 update_a2ui_route_auth(state, &result, route_auth).await;
1001 let kind = if result.deleted {
1002 "a2ui.surface_deleted"
1003 } else {
1004 "a2ui.surface_updated"
1005 };
1006 let message = if result.deleted {
1007 format!("A2UI surface {} deleted", result.surface_id)
1008 } else {
1009 format!("A2UI surface {} updated", result.surface_id)
1010 };
1011 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1012 state
1013 .host
1014 .record_event(kind, None, message, payload.clone())
1015 .await;
1016 broadcast_a2ui_event(state, kind, &payload).await;
1020 serde_json::to_value(result).map_err(|e| e.to_string())
1021}
1022
1023async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1024 use futures::SinkExt;
1025 use tokio_tungstenite::tungstenite::Message;
1026 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1027 .a2ui_subscribers
1028 .lock()
1029 .await
1030 .values()
1031 .cloned()
1032 .collect();
1033 if subscribers.is_empty() {
1034 return;
1035 }
1036 let Ok(json) = serde_json::to_string(&serde_json::json!({
1037 "jsonrpc": "2.0",
1038 "method": "a2ui.event",
1039 "params": {
1040 "kind": kind,
1041 "result": result,
1042 },
1043 })) else {
1044 return;
1045 };
1046 for channel in subscribers {
1047 let _ = channel
1048 .write
1049 .lock()
1050 .await
1051 .send(Message::Text(json.clone().into()))
1052 .await;
1053 }
1054}
1055
1056async fn update_a2ui_route_auth(
1057 state: &Arc<ServerState>,
1058 result: &car_a2ui::A2uiApplyResult,
1059 route_auth: Option<A2aRouteAuth>,
1060) {
1061 let mut auth = state.a2ui_route_auth.lock().await;
1062 if result.deleted {
1063 auth.remove(&result.surface_id);
1064 return;
1065 }
1066
1067 let has_route_endpoint = result
1068 .surface
1069 .as_ref()
1070 .and_then(|surface| surface.owner.as_ref())
1071 .and_then(|owner| owner.endpoint.as_ref())
1072 .is_some();
1073 match (has_route_endpoint, route_auth) {
1074 (true, Some(route_auth)) => {
1075 auth.insert(result.surface_id.clone(), route_auth);
1076 }
1077 _ => {
1078 auth.remove(&result.surface_id);
1079 }
1080 }
1081}
1082
1083fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1084 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1085}
1086
1087async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1088 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1089 if !removed.is_empty() {
1090 let mut auth = state.a2ui_route_auth.lock().await;
1091 for surface_id in &removed {
1092 auth.remove(surface_id);
1093 }
1094 }
1095 Ok(serde_json::json!({ "removed": removed }))
1096}
1097
1098async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1099 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1100}
1101
1102async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1103 let surface_id = req
1104 .params
1105 .get("surface_id")
1106 .or_else(|| req.params.get("surfaceId"))
1107 .and_then(Value::as_str)
1108 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1109 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1110}
1111
1112async fn handle_a2ui_subscribe(
1118 session: &crate::session::ClientSession,
1119 state: &Arc<ServerState>,
1120) -> Result<Value, String> {
1121 state
1122 .a2ui_subscribers
1123 .lock()
1124 .await
1125 .insert(session.client_id.clone(), session.channel.clone());
1126 Ok(serde_json::json!({ "subscribed": true }))
1127}
1128
1129async fn handle_a2ui_unsubscribe(
1133 session: &crate::session::ClientSession,
1134 state: &Arc<ServerState>,
1135) -> Result<Value, String> {
1136 state
1137 .a2ui_subscribers
1138 .lock()
1139 .await
1140 .remove(&session.client_id);
1141 Ok(serde_json::json!({ "subscribed": false }))
1142}
1143
1144async fn handle_a2ui_replay(
1151 req: &JsonRpcMessage,
1152 state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154 let surface_id = req
1155 .params
1156 .get("surface_id")
1157 .or_else(|| req.params.get("surfaceId"))
1158 .and_then(Value::as_str)
1159 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1160 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1161}
1162
1163async fn handle_a2ui_action(
1164 req: &JsonRpcMessage,
1165 state: &Arc<ServerState>,
1166) -> Result<Value, String> {
1167 let action: car_a2ui::ClientAction =
1168 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1169 let owner = state.a2ui.owner(&action.surface_id).await;
1170 let route = route_a2ui_action(state, &action, owner.clone()).await;
1171 let payload = serde_json::json!({
1172 "action": action,
1173 "owner": owner,
1174 "route": route,
1175 });
1176 let event = state
1177 .host
1178 .record_event(
1179 "a2ui.action",
1180 None,
1181 format!(
1182 "A2UI action {} from {}",
1183 action.name, action.source_component_id
1184 ),
1185 payload,
1186 )
1187 .await;
1188 Ok(serde_json::json!({
1189 "event": event,
1190 "route": route,
1191 }))
1192}
1193
1194async fn handle_a2ui_render_report(
1201 req: &JsonRpcMessage,
1202 state: &Arc<ServerState>,
1203) -> Result<Value, String> {
1204 let report: car_a2ui::RenderReport =
1208 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1209 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1210 let kind = "a2ui.render_report";
1211 let message = format!("A2UI render report for surface {}", report.surface_id);
1212 let event = state
1213 .host
1214 .record_event(kind, None, message, payload.clone())
1215 .await;
1216 broadcast_a2ui_event(state, kind, &payload).await;
1217
1218 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1226 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1232 tracing::warn!(
1233 surface_id = %report.surface_id,
1234 count = state.ui_agent_budget.count(&report.surface_id),
1235 max = state.ui_agent_budget.max(),
1236 "ui-agent iteration budget exhausted; skipping agent invocation"
1237 );
1238 return Ok(serde_json::json!({ "event": event }));
1239 }
1240 match state.ui_agent.on_render_report(&report, &surface) {
1244 car_ui_agent::Decision::Patch {
1245 envelope,
1246 strategy_id,
1247 patch_hash,
1248 elapsed_ns,
1249 } => {
1250 if !state
1258 .ui_agent_oscillation
1259 .check_and_record(&report.surface_id, patch_hash)
1260 {
1261 tracing::warn!(
1262 surface_id = %report.surface_id,
1263 strategy = %strategy_id,
1264 patch_hash,
1265 "ui-agent oscillation detected; suppressing patch"
1266 );
1267 state.ui_agent_budget.refund(&report.surface_id);
1270 return Ok(serde_json::json!({ "event": event }));
1271 }
1272 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1273 patch_components: Some(envelope),
1274 ..Default::default()
1275 };
1276 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1277 tracing::warn!(
1278 surface_id = %report.surface_id,
1279 strategy = %strategy_id,
1280 patch_hash,
1281 elapsed_ns,
1282 error = %e,
1283 "ui-agent patch apply failed",
1284 );
1285 state.ui_agent_budget.refund(&report.surface_id);
1287 } else {
1288 tracing::debug!(
1289 surface_id = %report.surface_id,
1290 strategy = %strategy_id,
1291 patch_hash,
1292 elapsed_ns,
1293 iteration = state.ui_agent_budget.count(&report.surface_id),
1294 "ui-agent patch applied",
1295 );
1296 if let Some(memgine) = state.shared_memgine.clone() {
1306 let speaker = format!("ui-agent/{}", report.surface_id);
1307 let text = format!("strategy applied: {}", strategy_id);
1308 tokio::spawn(async move {
1309 let mut guard = memgine.lock().await;
1310 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1311 });
1312 }
1313 }
1314 }
1315 car_ui_agent::Decision::StableNoChange => {
1316 state.ui_agent_budget.refund(&report.surface_id);
1318 }
1319 car_ui_agent::Decision::HardStop { reason } => {
1320 state.ui_agent_budget.refund(&report.surface_id);
1321 tracing::error!(
1327 surface_id = %report.surface_id,
1328 reason = %reason,
1329 "ui-agent hard-stopped improvement loop",
1330 );
1331 }
1332 }
1333 } else {
1334 tracing::debug!(
1335 surface_id = %report.surface_id,
1336 "ui-agent skipped — surface not found in store",
1337 );
1338 }
1339
1340 Ok(serde_json::json!({ "event": event }))
1341}
1342
1343async fn route_a2ui_action(
1344 state: &Arc<ServerState>,
1345 action: &car_a2ui::ClientAction,
1346 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1347) -> Value {
1348 let Some(owner) = owner else {
1349 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1350 };
1351 if owner.kind != "a2a" {
1352 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1353 }
1354 let Some(endpoint) = owner.endpoint.clone() else {
1355 return serde_json::json!({
1356 "delivered": false,
1357 "reason": "surface owner has no endpoint",
1358 "owner": owner
1359 });
1360 };
1361
1362 let message = car_a2a::Message {
1363 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1364 role: car_a2a::MessageRole::User,
1365 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1366 data: serde_json::json!({
1367 "a2uiAction": action,
1368 }),
1369 metadata: Default::default(),
1370 })],
1371 task_id: owner.task_id.clone(),
1372 context_id: owner.context_id.clone(),
1373 metadata: Default::default(),
1374 };
1375
1376 let auth = state
1377 .a2ui_route_auth
1378 .lock()
1379 .await
1380 .get(&action.surface_id)
1381 .cloned()
1382 .map(client_auth_from_route_auth)
1383 .unwrap_or(car_a2a::ClientAuth::None);
1384
1385 match car_a2a::A2aClient::new(endpoint.clone())
1386 .with_auth(auth)
1387 .send_message(message, false)
1388 .await
1389 {
1390 Ok(result) => serde_json::json!({
1391 "delivered": true,
1392 "owner": owner,
1393 "endpoint": endpoint,
1394 "result": result,
1395 }),
1396 Err(error) => serde_json::json!({
1397 "delivered": false,
1398 "owner": owner,
1399 "endpoint": endpoint,
1400 "error": error.to_string(),
1401 }),
1402 }
1403}
1404
1405fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1406 match auth {
1407 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1408 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1409 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1410 }
1411}
1412
1413fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1414 let endpoint = endpoint?;
1415 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1416 Some(endpoint)
1417 } else {
1418 None
1419 }
1420}
1421
1422fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1423 endpoint == "http://localhost"
1424 || endpoint.starts_with("http://localhost:")
1425 || endpoint.starts_with("http://localhost/")
1426 || endpoint == "http://127.0.0.1"
1427 || endpoint.starts_with("http://127.0.0.1:")
1428 || endpoint.starts_with("http://127.0.0.1/")
1429 || endpoint == "http://[::1]"
1430 || endpoint.starts_with("http://[::1]:")
1431 || endpoint.starts_with("http://[::1]/")
1432}
1433
1434async fn handle_host_register_agent(
1435 req: &JsonRpcMessage,
1436 session: &crate::session::ClientSession,
1437) -> Result<Value, String> {
1438 let request: RegisterHostAgentRequest =
1439 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1440 serde_json::to_value(
1441 session
1442 .host
1443 .register_agent(&session.client_id, request)
1444 .await?,
1445 )
1446 .map_err(|e| e.to_string())
1447}
1448
1449async fn handle_host_unregister_agent(
1450 req: &JsonRpcMessage,
1451 session: &crate::session::ClientSession,
1452) -> Result<Value, String> {
1453 let agent_id = req
1454 .params
1455 .get("agent_id")
1456 .and_then(|v| v.as_str())
1457 .ok_or("missing agent_id")?;
1458 session
1459 .host
1460 .unregister_agent(&session.client_id, agent_id)
1461 .await?;
1462 Ok(serde_json::json!({"ok": true}))
1463}
1464
1465async fn handle_host_set_status(
1466 req: &JsonRpcMessage,
1467 session: &crate::session::ClientSession,
1468) -> Result<Value, String> {
1469 let request: SetHostAgentStatusRequest =
1470 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1471 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1472 .map_err(|e| e.to_string())
1473}
1474
1475async fn handle_host_notify(
1476 req: &JsonRpcMessage,
1477 session: &crate::session::ClientSession,
1478) -> Result<Value, String> {
1479 let kind = req
1480 .params
1481 .get("kind")
1482 .and_then(|v| v.as_str())
1483 .unwrap_or("host.notification");
1484 let agent_id = req
1485 .params
1486 .get("agent_id")
1487 .and_then(|v| v.as_str())
1488 .map(str::to_string);
1489 let message = req
1490 .params
1491 .get("message")
1492 .and_then(|v| v.as_str())
1493 .unwrap_or("");
1494 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1495 serde_json::to_value(
1496 session
1497 .host
1498 .record_event(kind, agent_id, message, payload)
1499 .await,
1500 )
1501 .map_err(|e| e.to_string())
1502}
1503
1504async fn handle_host_request_approval(
1505 req: &JsonRpcMessage,
1506 session: &crate::session::ClientSession,
1507) -> Result<Value, String> {
1508 let request: CreateHostApprovalRequest =
1509 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1510 if let Some(agent_id) = &request.agent_id {
1511 let _ = session
1516 .host
1517 .set_status(
1518 &session.client_id,
1519 SetHostAgentStatusRequest {
1520 agent_id: agent_id.clone(),
1521 status: HostAgentStatus::WaitingForApproval,
1522 current_task: None,
1523 message: Some("Waiting for approval".to_string()),
1524 payload: Value::Null,
1525 },
1526 )
1527 .await;
1528 }
1529 let owner_client_id = if request.system_level {
1536 None
1537 } else {
1538 Some(session.client_id.as_str())
1539 };
1540 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1541 .map_err(|e| e.to_string())
1542}
1543
1544async fn handle_host_resolve_approval(
1545 req: &JsonRpcMessage,
1546 session: &crate::session::ClientSession,
1547) -> Result<Value, String> {
1548 let request: ResolveHostApprovalRequest =
1549 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1550 serde_json::to_value(
1551 session
1552 .host
1553 .resolve_approval(&session.client_id, request)
1554 .await?,
1555 )
1556 .map_err(|e| e.to_string())
1557}
1558
1559async fn handle_session_auth(
1570 req: &JsonRpcMessage,
1571 session: &crate::session::ClientSession,
1572 state: &Arc<ServerState>,
1573) -> Result<Value, String> {
1574 let supplied = req
1575 .params
1576 .get("token")
1577 .and_then(Value::as_str)
1578 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1579 let agent_id = req
1586 .params
1587 .get("agent_id")
1588 .and_then(Value::as_str)
1589 .map(str::to_string);
1590
1591 if let Some(id) = agent_id {
1592 let supervisor = state.supervisor()?;
1593 if !supervisor.validate_agent_token(&id, supplied).await {
1594 return Err(format!(
1595 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1596 ));
1597 }
1598 {
1602 let mut attached = state.attached_agents.lock().await;
1603 if let Some(prior) = attached.get(&id) {
1604 if prior != &session.client_id {
1605 return Err(format!(
1606 "auth failed: agent_id `{id}` is already attached on \
1607 another connection (client_id={prior})"
1608 ));
1609 }
1610 }
1611 attached.insert(id.clone(), session.client_id.clone());
1612 }
1613 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1618 *session.bound_memgine.lock().await = Some(agent_eng);
1619 *session.agent_id.lock().await = Some(id.clone());
1620 session
1621 .authenticated
1622 .store(true, std::sync::atomic::Ordering::Release);
1623 return Ok(serde_json::json!({
1624 "ok": true,
1625 "auth_enabled": true,
1626 "agent_id": id,
1627 }));
1628 }
1629
1630 let expected = match state.auth_token.get() {
1631 Some(t) => t,
1632 None => {
1633 session
1639 .authenticated
1640 .store(true, std::sync::atomic::Ordering::Release);
1641 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1642 }
1643 };
1644 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1645 return Err("auth failed: token mismatch".to_string());
1646 }
1647 session
1648 .authenticated
1649 .store(true, std::sync::atomic::Ordering::Release);
1650 Ok(serde_json::json!({
1651 "ok": true,
1652 "auth_enabled": true,
1653 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1654 }))
1655}
1656
1657fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1661 if a.len() != b.len() {
1662 return false;
1663 }
1664 let mut diff: u8 = 0;
1665 for (x, y) in a.iter().zip(b.iter()) {
1666 diff |= x ^ y;
1667 }
1668 diff == 0
1669}
1670
1671async fn gate_high_risk_method(
1681 method: &str,
1682 params: &Value,
1683 state: &Arc<ServerState>,
1684) -> Result<(), String> {
1685 let timeout = state.approval_gate.timeout;
1686 let req = CreateHostApprovalRequest {
1687 agent_id: None,
1688 action: format!("ws.method:{method}"),
1689 details: serde_json::json!({
1690 "method": method,
1691 "params_preview": preview_params(params, 2_000),
1695 }),
1696 options: vec!["approve".to_string(), "deny".to_string()],
1697 system_level: true,
1701 };
1702 match state
1703 .host
1704 .request_and_wait_approval(req, "approve", timeout)
1705 .await
1706 {
1707 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1708 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1709 "{method} denied by user (approval gate, audit 2026-05). \
1710 To call this method without an interactive prompt, start \
1711 car-server with --no-approvals on a trusted machine."
1712 )),
1713 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1714 "{method} approval timed out after {}s with no resolution. \
1715 The approval is still visible in `host.approvals` for \
1716 forensics; resubmit the request to retry.",
1717 timeout.as_secs()
1718 )),
1719 Err(e) => Err(format!("approval gate error: {e}")),
1720 }
1721}
1722
1723fn preview_params(value: &Value, max_chars: usize) -> Value {
1724 let s = value.to_string();
1725 if s.len() <= max_chars {
1726 value.clone()
1727 } else {
1728 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1729 }
1730}
1731
1732async fn handle_session_init(
1733 req: &JsonRpcMessage,
1734 session: &crate::session::ClientSession,
1735) -> Result<Value, String> {
1736 let init: SessionInitRequest =
1737 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1738
1739 for tool in &init.tools {
1740 register_from_definition(&session.runtime, tool).await;
1741 }
1742
1743 let mut policy_count = 0;
1744 {
1745 let mut policies = session.runtime.policies.write().await;
1746 for policy_def in &init.policies {
1747 if let Some(check) = build_policy_check(policy_def) {
1748 policies.register(&policy_def.name, check, "");
1749 policy_count += 1;
1750 }
1751 }
1752 }
1753
1754 serde_json::to_value(SessionInitResponse {
1755 session_id: session.client_id.clone(),
1756 tools_registered: init.tools.len(),
1757 policies_registered: policy_count,
1758 })
1759 .map_err(|e| e.to_string())
1760}
1761
1762fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1763 match def.rule.as_str() {
1764 "deny_tool" => {
1765 let target = def.target.clone();
1766 Some(Box::new(
1767 move |action: &car_ir::Action, _: &car_state::StateStore| {
1768 if action.tool.as_deref() == Some(&target) {
1769 Some(format!("tool '{}' denied", target))
1770 } else {
1771 None
1772 }
1773 },
1774 ))
1775 }
1776 "require_state" => {
1777 let key = def.key.clone();
1778 let value = def.value.clone();
1779 Some(Box::new(
1780 move |_: &car_ir::Action, state: &car_state::StateStore| {
1781 if state.get(&key).as_ref() != Some(&value) {
1782 Some(format!("state['{}'] must be {:?}", key, value))
1783 } else {
1784 None
1785 }
1786 },
1787 ))
1788 }
1789 "deny_tool_param" => {
1790 let target = def.target.clone();
1791 let param = def.key.clone();
1792 let pattern = def.pattern.clone();
1793 Some(Box::new(
1794 move |action: &car_ir::Action, _: &car_state::StateStore| {
1795 if action.tool.as_deref() != Some(&target) {
1796 return None;
1797 }
1798 if let Some(val) = action.parameters.get(¶m) {
1799 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1800 if s.contains(&pattern) {
1801 return Some(format!("param '{}' matches '{}'", param, pattern));
1802 }
1803 }
1804 None
1805 },
1806 ))
1807 }
1808 _ => None,
1809 }
1810}
1811
1812async fn handle_tools_register(
1813 req: &JsonRpcMessage,
1814 session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816 let tools: Vec<ToolDefinition> =
1817 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818 for tool in &tools {
1819 register_from_definition(&session.runtime, tool).await;
1820 }
1821 Ok(Value::from(tools.len()))
1822}
1823
1824async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1831 runtime
1832 .register_tool_schema(car_ir::ToolSchema {
1833 name: def.name.clone(),
1834 description: def.description.clone(),
1835 parameters: def.parameters.clone(),
1836 returns: def.returns.clone(),
1837 idempotent: def.idempotent,
1838 cache_ttl_secs: def.cache_ttl_secs,
1839 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1840 max_calls: rl.max_calls,
1841 interval_secs: rl.interval_secs,
1842 }),
1843 })
1844 .await;
1845}
1846
1847async fn handle_proposal_submit(
1848 req: &JsonRpcMessage,
1849 session: &crate::session::ClientSession,
1850) -> Result<Value, String> {
1851 let submit: ProposalSubmitRequest =
1852 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1853 let session_id = req
1859 .params
1860 .get("session_id")
1861 .and_then(|v| v.as_str())
1862 .map(str::to_string);
1863
1864 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1873 Some(v) if !v.is_null() => {
1874 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1875 }
1876 _ => None,
1877 };
1878
1879 let result = match (session_id, scope) {
1880 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1885 (Some(sid), None) => {
1886 session
1887 .runtime
1888 .execute_with_session(&submit.proposal, &sid)
1889 .await
1890 }
1891 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1892 (None, None) => session.runtime.execute(&submit.proposal).await,
1893 };
1894 serde_json::to_value(result).map_err(|e| e.to_string())
1895}
1896
1897async fn handle_session_policy_open(
1898 session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900 let id = session.runtime.open_session().await;
1901 Ok(serde_json::json!({ "session_id": id }))
1902}
1903
1904async fn handle_session_policy_close(
1905 req: &JsonRpcMessage,
1906 session: &crate::session::ClientSession,
1907) -> Result<Value, String> {
1908 let sid = req
1909 .params
1910 .get("session_id")
1911 .and_then(|v| v.as_str())
1912 .ok_or("missing 'session_id'")?;
1913 let closed = session.runtime.close_session(sid).await;
1914 Ok(serde_json::json!({ "closed": closed }))
1915}
1916
1917async fn handle_policy_register(
1923 req: &JsonRpcMessage,
1924 session: &crate::session::ClientSession,
1925) -> Result<Value, String> {
1926 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1927 .map_err(|e| format!("invalid policy params: {e}"))?;
1928 let session_id = req
1929 .params
1930 .get("session_id")
1931 .and_then(|v| v.as_str())
1932 .map(str::to_string);
1933 let check = build_policy_check(&def)
1934 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1935 match session_id {
1936 Some(sid) => session
1937 .runtime
1938 .register_policy_in_session(&sid, &def.name, check, "")
1939 .await
1940 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1941 None => {
1942 let mut policies = session.runtime.policies.write().await;
1943 policies.register(&def.name, check, "");
1944 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1945 }
1946 }
1947}
1948
1949async fn handle_verify(
1950 req: &JsonRpcMessage,
1951 session: &crate::session::ClientSession,
1952) -> Result<Value, String> {
1953 let vr: VerifyRequest =
1954 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1955 let tools_guard = session.runtime.tools.read().await;
1961 let result =
1962 car_verify::verify_with_schemas(&vr.proposal, Some(&vr.initial_state), Some(&tools_guard), 30);
1963 drop(tools_guard);
1964 serde_json::to_value(VerifyResponse {
1965 valid: result.valid,
1966 issues: result
1967 .issues
1968 .iter()
1969 .map(|i| VerifyIssueProto {
1970 action_id: i.action_id.clone(),
1971 severity: i.severity.clone(),
1972 message: i.message.clone(),
1973 })
1974 .collect(),
1975 simulated_state: result.simulated_state,
1976 })
1977 .map_err(|e| e.to_string())
1978}
1979
1980fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1987 req.params
1988 .get("tenant_id")
1989 .and_then(|v| v.as_str())
1990 .filter(|s| !s.is_empty())
1991 .map(str::to_string)
1992}
1993
1994async fn handle_state_get(
1995 req: &JsonRpcMessage,
1996 session: &crate::session::ClientSession,
1997) -> Result<Value, String> {
1998 let key = req
1999 .params
2000 .get("key")
2001 .and_then(|v| v.as_str())
2002 .ok_or("missing 'key'")?;
2003 let tenant = tenant_from_params(req);
2004 Ok(session
2005 .runtime
2006 .state
2007 .scoped(tenant.as_deref())
2008 .get(key)
2009 .unwrap_or(Value::Null))
2010}
2011
2012async fn handle_state_set(
2013 req: &JsonRpcMessage,
2014 session: &crate::session::ClientSession,
2015) -> Result<Value, String> {
2016 let key = req
2017 .params
2018 .get("key")
2019 .and_then(|v| v.as_str())
2020 .ok_or("missing 'key'")?;
2021 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2022 let tenant = tenant_from_params(req);
2023 session
2024 .runtime
2025 .state
2026 .scoped(tenant.as_deref())
2027 .set(key, value, "client");
2028 Ok(Value::from("ok"))
2029}
2030
2031async fn handle_state_exists(
2035 req: &JsonRpcMessage,
2036 session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038 let key = req
2039 .params
2040 .get("key")
2041 .and_then(|v| v.as_str())
2042 .ok_or("missing 'key'")?;
2043 let tenant = tenant_from_params(req);
2044 Ok(Value::Bool(
2045 session.runtime.state.scoped(tenant.as_deref()).exists(key),
2046 ))
2047}
2048
2049async fn handle_state_keys(
2052 req: &JsonRpcMessage,
2053 session: &crate::session::ClientSession,
2054) -> Result<Value, String> {
2055 let tenant = tenant_from_params(req);
2056 Ok(Value::Array(
2057 session
2058 .runtime
2059 .state
2060 .scoped(tenant.as_deref())
2061 .keys()
2062 .into_iter()
2063 .map(Value::String)
2064 .collect(),
2065 ))
2066}
2067
2068async fn handle_state_snapshot(
2079 req: &JsonRpcMessage,
2080 session: &crate::session::ClientSession,
2081) -> Result<Value, String> {
2082 let tenant = tenant_from_params(req);
2083 let view = session.runtime.state.scoped(tenant.as_deref());
2084 let mut map = serde_json::Map::new();
2085 for key in view.keys() {
2086 if let Some(value) = view.get(&key) {
2087 map.insert(key, value);
2088 }
2089 }
2090 Ok(Value::Object(map))
2091}
2092
2093fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2099 let base = car_ffi_common::memory_path::ensure_base()
2100 .map_err(|e| format!("memory base unavailable: {e}"))?;
2101 let dir = base.join("agents");
2102 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2103 Ok(dir.join(format!("{agent_id}.json")))
2104}
2105
2106async fn get_or_load_agent_memgine(
2113 state: &Arc<ServerState>,
2114 agent_id: &str,
2115) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2116 {
2117 let map = state.agent_memgines.lock().await;
2118 if let Some(eng) = map.get(agent_id) {
2119 return Ok(eng.clone());
2120 }
2121 }
2122 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2124 None,
2125 )));
2126 let path = agent_memgine_snapshot_path(agent_id)?;
2127 if path.exists() {
2128 let content = std::fs::read_to_string(&path)
2129 .map_err(|e| format!("read {}: {}", path.display(), e))?;
2130 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2131 let mut g = engine.lock().await;
2132 let mut loaded: u32 = 0;
2133 for fact in &facts {
2134 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2135 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2136 let kind = fact
2137 .get("kind")
2138 .and_then(|v| v.as_str())
2139 .unwrap_or("pattern");
2140 let fid = format!("loaded-{loaded}");
2141 g.ingest_fact(
2142 &fid,
2143 subject,
2144 body,
2145 "user",
2146 "peer",
2147 chrono::Utc::now(),
2148 "global",
2149 None,
2150 vec![],
2151 kind == "constraint",
2152 );
2153 loaded += 1;
2154 }
2155 }
2156 let mut map = state.agent_memgines.lock().await;
2157 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2158 Ok(stored)
2159}
2160
2161async fn persist_agent_memgine(
2165 agent_id: &str,
2166 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2167) -> Result<(), String> {
2168 let path = agent_memgine_snapshot_path(agent_id)?;
2169 let g = engine.lock().await;
2170 let facts: Vec<Value> = g
2171 .graph
2172 .inner
2173 .node_indices()
2174 .filter_map(|nix| {
2175 let node = g.graph.inner.node_weight(nix)?;
2176 if !node.is_valid() {
2177 return None;
2178 }
2179 if node.kind == car_memgine::MemKind::Identity
2180 || node.kind == car_memgine::MemKind::Environment
2181 {
2182 return None;
2183 }
2184 Some(serde_json::json!({
2185 "subject": node.key,
2186 "body": node.value,
2187 "kind": match node.kind {
2188 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2189 car_memgine::MemKind::Conversation => "outcome",
2190 _ => "pattern",
2191 },
2192 "confidence": 0.5,
2193 "content_type": node.content_type.as_label(),
2194 }))
2195 })
2196 .collect();
2197 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2198 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2199 Ok(())
2200}
2201
2202async fn handle_memory_fact_count(
2209 session: &crate::session::ClientSession,
2210) -> Result<Value, String> {
2211 let engine_arc = session.effective_memgine().await;
2212 let engine = engine_arc.lock().await;
2213 Ok(Value::from(engine.valid_fact_count()))
2214}
2215
2216async fn handle_memory_add_fact(
2217 req: &JsonRpcMessage,
2218 session: &crate::session::ClientSession,
2219) -> Result<Value, String> {
2220 let subject = req
2221 .params
2222 .get("subject")
2223 .and_then(|v| v.as_str())
2224 .ok_or("missing subject")?;
2225 let body = req
2226 .params
2227 .get("body")
2228 .and_then(|v| v.as_str())
2229 .ok_or("missing body")?;
2230 let kind = req
2231 .params
2232 .get("kind")
2233 .and_then(|v| v.as_str())
2234 .unwrap_or("pattern");
2235 let engine_arc = session.effective_memgine().await;
2239 let count = {
2240 let mut engine = engine_arc.lock().await;
2241 let fid = format!("ws-{}", engine.valid_fact_count());
2242 engine.ingest_fact(
2243 &fid,
2244 subject,
2245 body,
2246 "user",
2247 "peer",
2248 chrono::Utc::now(),
2249 "global",
2250 None,
2251 vec![],
2252 kind == "constraint",
2253 );
2254 engine.valid_fact_count()
2255 };
2256 if let Some(id) = session.agent_id.lock().await.clone() {
2259 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2260 tracing::warn!(agent_id = %id, error = %e,
2261 "agent memgine persist failed; in-memory state is canonical");
2262 }
2263 }
2264 Ok(Value::from(count))
2265}
2266
2267async fn handle_memory_query(
2268 req: &JsonRpcMessage,
2269 session: &crate::session::ClientSession,
2270) -> Result<Value, String> {
2271 let query = req
2272 .params
2273 .get("query")
2274 .and_then(|v| v.as_str())
2275 .ok_or("missing query")?;
2276 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2277 let engine_arc = session.effective_memgine().await;
2278 let engine = engine_arc.lock().await;
2279 let seeds = engine.graph.find_seeds(query, 5);
2280 let hits = if !seeds.is_empty() {
2285 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2286 } else {
2287 vec![]
2288 };
2289 let results: Vec<Value> = hits
2290 .iter()
2291 .filter_map(|hit| {
2292 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2293 Some(serde_json::json!({
2294 "subject": node.key,
2295 "body": node.value,
2296 "kind": format!("{:?}", node.kind).to_lowercase(),
2297 "confidence": hit.activation,
2298 }))
2299 })
2300 .collect();
2301 serde_json::to_value(results).map_err(|e| e.to_string())
2302}
2303
2304async fn handle_memory_build_context(
2305 req: &JsonRpcMessage,
2306 session: &crate::session::ClientSession,
2307) -> Result<Value, String> {
2308 let query = req
2309 .params
2310 .get("query")
2311 .and_then(|v| v.as_str())
2312 .unwrap_or("");
2313 let model_context_window = req
2317 .params
2318 .get("model_context_window")
2319 .and_then(|v| v.as_u64())
2320 .map(|w| w as usize);
2321 let mut engine = session.memgine.lock().await;
2322 Ok(Value::from(
2323 engine.build_context_for_model(query, model_context_window),
2324 ))
2325}
2326
2327async fn handle_memory_build_context_fast(
2333 req: &JsonRpcMessage,
2334 session: &crate::session::ClientSession,
2335) -> Result<Value, String> {
2336 let query = req
2337 .params
2338 .get("query")
2339 .and_then(|v| v.as_str())
2340 .unwrap_or("");
2341 let model_context_window = req
2342 .params
2343 .get("model_context_window")
2344 .and_then(|v| v.as_u64())
2345 .map(|w| w as usize);
2346 let mut engine = session.memgine.lock().await;
2347 Ok(Value::from(engine.build_context_with_options(
2348 query,
2349 model_context_window,
2350 car_memgine::ContextMode::Fast,
2351 None,
2352 )))
2353}
2354
2355async fn handle_memory_persist(
2371 req: &JsonRpcMessage,
2372 session: &crate::session::ClientSession,
2373) -> Result<Value, String> {
2374 let path = req
2375 .params
2376 .get("path")
2377 .and_then(|v| v.as_str())
2378 .ok_or("missing path")?;
2379 let resolved = car_ffi_common::memory_path::resolve(path)
2380 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2381 let engine = session.memgine.lock().await;
2382 let facts: Vec<Value> = engine
2383 .graph
2384 .inner
2385 .node_indices()
2386 .filter_map(|nix| {
2387 let node = engine.graph.inner.node_weight(nix)?;
2388 if !node.is_valid() {
2389 return None;
2390 }
2391 if node.kind == car_memgine::MemKind::Identity
2392 || node.kind == car_memgine::MemKind::Environment
2393 {
2394 return None;
2395 }
2396 Some(serde_json::json!({
2397 "subject": node.key,
2398 "body": node.value,
2399 "kind": match node.kind {
2400 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2401 car_memgine::MemKind::Conversation => "outcome",
2402 _ => "pattern",
2403 },
2404 "confidence": 0.5,
2405 "content_type": node.content_type.as_label(),
2406 }))
2407 })
2408 .collect();
2409 let count = facts.len();
2410 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2411 std::fs::write(&resolved, json)
2412 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2413 Ok(Value::from(count as u64))
2414}
2415
2416async fn handle_memory_load(
2422 req: &JsonRpcMessage,
2423 session: &crate::session::ClientSession,
2424) -> Result<Value, String> {
2425 let path = req
2426 .params
2427 .get("path")
2428 .and_then(|v| v.as_str())
2429 .ok_or("missing path")?;
2430 let resolved = car_ffi_common::memory_path::resolve(path)
2431 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2432 let content = std::fs::read_to_string(&resolved)
2433 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2434 let facts: Vec<Value> =
2435 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2436 let mut engine = session.memgine.lock().await;
2437 engine.reset();
2438 let mut count: u32 = 0;
2439 for fact in &facts {
2440 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2441 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2442 let kind = fact
2443 .get("kind")
2444 .and_then(|v| v.as_str())
2445 .unwrap_or("pattern");
2446 let fid = format!("loaded-{}", count);
2447 engine.ingest_fact(
2448 &fid,
2449 subject,
2450 body,
2451 "user",
2452 "peer",
2453 chrono::Utc::now(),
2454 "global",
2455 None,
2456 vec![],
2457 kind == "constraint",
2458 );
2459 count += 1;
2460 }
2461 Ok(Value::from(count))
2462}
2463
2464async fn handle_skill_ingest(
2467 req: &JsonRpcMessage,
2468 session: &crate::session::ClientSession,
2469) -> Result<Value, String> {
2470 let name = req
2471 .params
2472 .get("name")
2473 .and_then(|v| v.as_str())
2474 .ok_or("missing name")?;
2475 let code = req
2476 .params
2477 .get("code")
2478 .and_then(|v| v.as_str())
2479 .ok_or("missing code")?;
2480 let platform = req
2481 .params
2482 .get("platform")
2483 .and_then(|v| v.as_str())
2484 .unwrap_or("unknown");
2485 let persona = req
2486 .params
2487 .get("persona")
2488 .and_then(|v| v.as_str())
2489 .unwrap_or("");
2490 let url_pattern = req
2491 .params
2492 .get("url_pattern")
2493 .and_then(|v| v.as_str())
2494 .unwrap_or("");
2495 let description = req
2496 .params
2497 .get("description")
2498 .and_then(|v| v.as_str())
2499 .unwrap_or("");
2500 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2501 let keywords: Vec<String> = req
2502 .params
2503 .get("task_keywords")
2504 .and_then(|v| v.as_array())
2505 .map(|arr| {
2506 arr.iter()
2507 .filter_map(|v| v.as_str().map(String::from))
2508 .collect()
2509 })
2510 .unwrap_or_default();
2511
2512 let trigger = car_memgine::SkillTrigger {
2513 persona: persona.into(),
2514 url_pattern: url_pattern.into(),
2515 task_keywords: keywords,
2516 structured: None,
2517 };
2518 let mut engine = session.memgine.lock().await;
2519 let node = engine.ingest_skill(
2520 name,
2521 code,
2522 platform,
2523 trigger,
2524 description,
2525 supersedes,
2526 vec![],
2527 vec![],
2528 );
2529 Ok(Value::from(node.index() as u64))
2530}
2531
2532async fn handle_skill_find(
2533 req: &JsonRpcMessage,
2534 session: &crate::session::ClientSession,
2535) -> Result<Value, String> {
2536 let persona = req
2537 .params
2538 .get("persona")
2539 .and_then(|v| v.as_str())
2540 .unwrap_or("");
2541 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2542 let task = req
2543 .params
2544 .get("task")
2545 .and_then(|v| v.as_str())
2546 .unwrap_or("");
2547 let max = req
2548 .params
2549 .get("max_results")
2550 .and_then(|v| v.as_u64())
2551 .unwrap_or(1) as usize;
2552 let engine = session.memgine.lock().await;
2553 let results = engine.find_skill(persona, url, task, max);
2554 let json: Vec<Value> = results
2555 .iter()
2556 .map(|(m, s)| {
2557 serde_json::json!({
2558 "name": m.name, "code": m.code, "platform": m.platform,
2559 "description": m.description, "stats": m.stats, "match_score": s,
2560 })
2561 })
2562 .collect();
2563 serde_json::to_value(json).map_err(|e| e.to_string())
2564}
2565
2566async fn handle_skill_report(
2567 req: &JsonRpcMessage,
2568 session: &crate::session::ClientSession,
2569) -> Result<Value, String> {
2570 let name = req
2571 .params
2572 .get("skill_name")
2573 .and_then(|v| v.as_str())
2574 .ok_or("missing skill_name")?;
2575 let outcome_str = req
2576 .params
2577 .get("outcome")
2578 .and_then(|v| v.as_str())
2579 .ok_or("missing outcome")?;
2580 let outcome = match outcome_str {
2581 "success" => car_memgine::SkillOutcome::Success,
2582 _ => car_memgine::SkillOutcome::Fail,
2583 };
2584 let mut engine = session.memgine.lock().await;
2585 let stats = engine
2586 .report_outcome(name, outcome)
2587 .ok_or(format!("skill '{}' not found", name))?;
2588 serde_json::to_value(stats).map_err(|e| e.to_string())
2589}
2590
2591struct WsAgentRunner {
2600 channel: Arc<WsChannel>,
2601 host: Arc<crate::host::HostState>,
2602 client_id: String,
2603}
2604
2605#[async_trait::async_trait]
2606impl car_multi::AgentRunner for WsAgentRunner {
2607 async fn run(
2608 &self,
2609 spec: &car_multi::AgentSpec,
2610 task: &str,
2611 _runtime: &car_engine::Runtime,
2612 _mailbox: &car_multi::Mailbox,
2613 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2614 use futures::SinkExt;
2615
2616 let request_id = self.channel.next_request_id();
2617 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2618 let agent = self
2619 .host
2620 .register_agent(
2621 &self.client_id,
2622 RegisterHostAgentRequest {
2623 id: Some(agent_id.clone()),
2624 name: spec.name.clone(),
2625 kind: "callback".to_string(),
2626 capabilities: spec.tools.clone(),
2627 project: spec
2628 .metadata
2629 .get("project")
2630 .and_then(|v| v.as_str())
2631 .map(str::to_string),
2632 pid: None,
2633 display: serde_json::from_value(
2634 spec.metadata
2635 .get("display")
2636 .cloned()
2637 .unwrap_or(serde_json::Value::Null),
2638 )
2639 .unwrap_or_default(),
2640 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2641 },
2642 )
2643 .await
2644 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2645 let _ = self
2646 .host
2647 .set_status(
2648 &self.client_id,
2649 SetHostAgentStatusRequest {
2650 agent_id: agent.id.clone(),
2651 status: HostAgentStatus::Running,
2652 current_task: Some(task.to_string()),
2653 message: Some(format!("{} started", spec.name)),
2654 payload: serde_json::json!({ "task": task }),
2655 },
2656 )
2657 .await;
2658
2659 let rpc_request = serde_json::json!({
2660 "jsonrpc": "2.0",
2661 "method": "multi.run_agent",
2662 "params": {
2663 "spec": spec,
2664 "task": task,
2665 },
2666 "id": request_id,
2667 });
2668
2669 let (tx, rx) = tokio::sync::oneshot::channel();
2671 self.channel
2672 .pending
2673 .lock()
2674 .await
2675 .insert(request_id.clone(), tx);
2676
2677 let msg = Message::Text(
2678 serde_json::to_string(&rpc_request)
2679 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2680 .into(),
2681 );
2682 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2683 let _ = self
2684 .host
2685 .set_status(
2686 &self.client_id,
2687 SetHostAgentStatusRequest {
2688 agent_id: agent_id.clone(),
2689 status: HostAgentStatus::Errored,
2690 current_task: None,
2691 message: Some(format!("{} failed to start", spec.name)),
2692 payload: serde_json::json!({ "error": e.to_string() }),
2693 },
2694 )
2695 .await;
2696 return Err(car_multi::MultiError::AgentFailed(
2697 spec.name.clone(),
2698 format!("ws send error: {}", e),
2699 ));
2700 }
2701
2702 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2704 Ok(Ok(response)) => response,
2705 Ok(Err(_)) => {
2706 let _ = self
2707 .host
2708 .set_status(
2709 &self.client_id,
2710 SetHostAgentStatusRequest {
2711 agent_id: agent_id.clone(),
2712 status: HostAgentStatus::Errored,
2713 current_task: None,
2714 message: Some(format!("{} callback channel closed", spec.name)),
2715 payload: Value::Null,
2716 },
2717 )
2718 .await;
2719 return Err(car_multi::MultiError::AgentFailed(
2720 spec.name.clone(),
2721 "agent callback channel closed".into(),
2722 ));
2723 }
2724 Err(_) => {
2725 let _ = self
2726 .host
2727 .set_status(
2728 &self.client_id,
2729 SetHostAgentStatusRequest {
2730 agent_id: agent_id.clone(),
2731 status: HostAgentStatus::Errored,
2732 current_task: None,
2733 message: Some(format!("{} timed out", spec.name)),
2734 payload: Value::Null,
2735 },
2736 )
2737 .await;
2738 return Err(car_multi::MultiError::AgentFailed(
2739 spec.name.clone(),
2740 "agent callback timed out (300s)".into(),
2741 ));
2742 }
2743 };
2744
2745 if let Some(err) = response.error {
2746 let _ = self
2747 .host
2748 .set_status(
2749 &self.client_id,
2750 SetHostAgentStatusRequest {
2751 agent_id: agent_id.clone(),
2752 status: HostAgentStatus::Errored,
2753 current_task: None,
2754 message: Some(format!("{} errored", spec.name)),
2755 payload: serde_json::json!({ "error": err }),
2756 },
2757 )
2758 .await;
2759 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2760 }
2761
2762 let output_value = response.output.unwrap_or(Value::Null);
2763 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2764 car_multi::MultiError::AgentFailed(
2765 spec.name.clone(),
2766 format!("invalid AgentOutput: {}", e),
2767 )
2768 })?;
2769 let status = if output.error.is_some() {
2770 HostAgentStatus::Errored
2771 } else {
2772 HostAgentStatus::Completed
2773 };
2774 let message = if output.error.is_some() {
2775 format!("{} errored", spec.name)
2776 } else {
2777 format!("{} completed", spec.name)
2778 };
2779 let _ = self
2780 .host
2781 .set_status(
2782 &self.client_id,
2783 SetHostAgentStatusRequest {
2784 agent_id,
2785 status,
2786 current_task: None,
2787 message: Some(message),
2788 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2789 },
2790 )
2791 .await;
2792
2793 Ok(output)
2794 }
2795}
2796
2797fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2798 let safe_name: String = name
2799 .chars()
2800 .map(|c| {
2801 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2802 c
2803 } else {
2804 '-'
2805 }
2806 })
2807 .collect();
2808 format!("{}:{}:{}", client_id, safe_name, request_id)
2809}
2810
2811async fn handle_multi_swarm(
2812 req: &JsonRpcMessage,
2813 session: &crate::session::ClientSession,
2814) -> Result<Value, String> {
2815 let mode_str = req
2816 .params
2817 .get("mode")
2818 .and_then(|v| v.as_str())
2819 .ok_or("missing 'mode'")?;
2820 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2821 let task = req
2822 .params
2823 .get("task")
2824 .and_then(|v| v.as_str())
2825 .ok_or("missing 'task'")?;
2826
2827 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2828 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2829 let agent_specs: Vec<car_multi::AgentSpec> =
2830 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2831 let synth: Option<car_multi::AgentSpec> = req
2832 .params
2833 .get("synthesizer")
2834 .map(|v| serde_json::from_value(v.clone()))
2835 .transpose()
2836 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2837
2838 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2839 channel: session.channel.clone(),
2840 host: session.host.clone(),
2841 client_id: session.client_id.clone(),
2842 });
2843 let infra = car_multi::SharedInfra::new();
2844
2845 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2846 if let Some(s) = synth {
2847 swarm = swarm.with_synthesizer(s);
2848 }
2849
2850 let result = swarm
2851 .run(task, &runner, &infra)
2852 .await
2853 .map_err(|e| format!("swarm error: {}", e))?;
2854 serde_json::to_value(result).map_err(|e| e.to_string())
2855}
2856
2857async fn handle_multi_pipeline(
2858 req: &JsonRpcMessage,
2859 session: &crate::session::ClientSession,
2860) -> Result<Value, String> {
2861 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2862 let task = req
2863 .params
2864 .get("task")
2865 .and_then(|v| v.as_str())
2866 .ok_or("missing 'task'")?;
2867
2868 let stage_specs: Vec<car_multi::AgentSpec> =
2869 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2870
2871 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2872 channel: session.channel.clone(),
2873 host: session.host.clone(),
2874 client_id: session.client_id.clone(),
2875 });
2876 let infra = car_multi::SharedInfra::new();
2877
2878 let result = car_multi::Pipeline::new(stage_specs)
2879 .run(task, &runner, &infra)
2880 .await
2881 .map_err(|e| format!("pipeline error: {}", e))?;
2882 serde_json::to_value(result).map_err(|e| e.to_string())
2883}
2884
2885async fn handle_multi_supervisor(
2886 req: &JsonRpcMessage,
2887 session: &crate::session::ClientSession,
2888) -> Result<Value, String> {
2889 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2890 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2891 let task = req
2892 .params
2893 .get("task")
2894 .and_then(|v| v.as_str())
2895 .ok_or("missing 'task'")?;
2896 let max_rounds = req
2897 .params
2898 .get("max_rounds")
2899 .and_then(|v| v.as_u64())
2900 .unwrap_or(3) as u32;
2901
2902 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2903 .map_err(|e| format!("invalid workers: {}", e))?;
2904 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2905 .map_err(|e| format!("invalid supervisor: {}", e))?;
2906
2907 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2908 channel: session.channel.clone(),
2909 host: session.host.clone(),
2910 client_id: session.client_id.clone(),
2911 });
2912 let infra = car_multi::SharedInfra::new();
2913
2914 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2915 .with_max_rounds(max_rounds)
2916 .run(task, &runner, &infra)
2917 .await
2918 .map_err(|e| format!("supervisor error: {}", e))?;
2919 serde_json::to_value(result).map_err(|e| e.to_string())
2920}
2921
2922async fn handle_multi_map_reduce(
2923 req: &JsonRpcMessage,
2924 session: &crate::session::ClientSession,
2925) -> Result<Value, String> {
2926 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2927 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2928 let task = req
2929 .params
2930 .get("task")
2931 .and_then(|v| v.as_str())
2932 .ok_or("missing 'task'")?;
2933 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2934
2935 let mapper_spec: car_multi::AgentSpec =
2936 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2937 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2938 .map_err(|e| format!("invalid reducer: {}", e))?;
2939 let items: Vec<String> =
2940 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2941
2942 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2943 channel: session.channel.clone(),
2944 host: session.host.clone(),
2945 client_id: session.client_id.clone(),
2946 });
2947 let infra = car_multi::SharedInfra::new();
2948
2949 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2950 .run(task, &items, &runner, &infra)
2951 .await
2952 .map_err(|e| format!("map_reduce error: {}", e))?;
2953 serde_json::to_value(result).map_err(|e| e.to_string())
2954}
2955
2956async fn handle_multi_vote(
2957 req: &JsonRpcMessage,
2958 session: &crate::session::ClientSession,
2959) -> Result<Value, String> {
2960 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2961 let task = req
2962 .params
2963 .get("task")
2964 .and_then(|v| v.as_str())
2965 .ok_or("missing 'task'")?;
2966
2967 let agent_specs: Vec<car_multi::AgentSpec> =
2968 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2969 let synth: Option<car_multi::AgentSpec> = req
2970 .params
2971 .get("synthesizer")
2972 .map(|v| serde_json::from_value(v.clone()))
2973 .transpose()
2974 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2975
2976 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2977 channel: session.channel.clone(),
2978 host: session.host.clone(),
2979 client_id: session.client_id.clone(),
2980 });
2981 let infra = car_multi::SharedInfra::new();
2982
2983 let mut vote = car_multi::Vote::new(agent_specs);
2984 if let Some(s) = synth {
2985 vote = vote.with_synthesizer(s);
2986 }
2987
2988 let result = vote
2989 .run(task, &runner, &infra)
2990 .await
2991 .map_err(|e| format!("vote error: {}", e))?;
2992 serde_json::to_value(result).map_err(|e| e.to_string())
2993}
2994
2995fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
3000 let name = req
3001 .params
3002 .get("name")
3003 .and_then(|v| v.as_str())
3004 .ok_or("scheduler.create requires 'name'")?;
3005 let prompt = req
3006 .params
3007 .get("prompt")
3008 .and_then(|v| v.as_str())
3009 .ok_or("scheduler.create requires 'prompt'")?;
3010
3011 let mut task = car_scheduler::Task::new(name, prompt);
3012
3013 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3014 let trigger = match t {
3015 "once" => car_scheduler::TaskTrigger::Once,
3016 "cron" => car_scheduler::TaskTrigger::Cron,
3017 "interval" => car_scheduler::TaskTrigger::Interval,
3018 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3019 _ => car_scheduler::TaskTrigger::Manual,
3020 };
3021 let schedule = req
3022 .params
3023 .get("schedule")
3024 .and_then(|v| v.as_str())
3025 .unwrap_or("");
3026 task = task.with_trigger(trigger, schedule);
3027 }
3028
3029 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3030 task = task.with_system_prompt(sp);
3031 }
3032
3033 serde_json::to_value(&task).map_err(|e| e.to_string())
3034}
3035
3036async fn handle_scheduler_run(
3037 req: &JsonRpcMessage,
3038 session: &crate::session::ClientSession,
3039) -> Result<Value, String> {
3040 let task_val = req
3041 .params
3042 .get("task")
3043 .ok_or("scheduler.run requires 'task'")?;
3044 let mut task: car_scheduler::Task =
3045 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3046
3047 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3048 channel: session.channel.clone(),
3049 host: session.host.clone(),
3050 client_id: session.client_id.clone(),
3051 });
3052 let executor = car_scheduler::Executor::new(runner);
3053 let execution = executor.run_once(&mut task).await;
3054
3055 serde_json::to_value(&execution).map_err(|e| e.to_string())
3056}
3057
3058async fn handle_scheduler_run_loop(
3059 req: &JsonRpcMessage,
3060 session: &crate::session::ClientSession,
3061) -> Result<Value, String> {
3062 let task_val = req
3063 .params
3064 .get("task")
3065 .ok_or("scheduler.run_loop requires 'task'")?;
3066 let mut task: car_scheduler::Task =
3067 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3068 let max_iterations = req
3069 .params
3070 .get("max_iterations")
3071 .and_then(|v| v.as_u64())
3072 .map(|v| v as u32);
3073
3074 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3075 channel: session.channel.clone(),
3076 host: session.host.clone(),
3077 client_id: session.client_id.clone(),
3078 });
3079 let executor = car_scheduler::Executor::new(runner);
3080 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3081 let executions = executor
3082 .run_loop(&mut task, max_iterations, cancel_rx)
3083 .await;
3084
3085 serde_json::to_value(&executions).map_err(|e| e.to_string())
3086}
3087
3088fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3093 state.inference.get_or_init(|| {
3094 Arc::new(car_inference::InferenceEngine::new(
3095 car_inference::InferenceConfig::default(),
3096 ))
3097 })
3098}
3099
3100async fn handle_infer(
3101 msg: &JsonRpcMessage,
3102 state: &ServerState,
3103 session: &crate::session::ClientSession,
3104) -> Result<Value, String> {
3105 let engine = get_inference_engine(state);
3106 let mut req: car_inference::GenerateRequest =
3107 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3108
3109 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3111 let mut memgine = session.memgine.lock().await;
3112 let ctx = memgine.build_context(cq);
3113 if !ctx.is_empty() {
3114 req.context = Some(ctx);
3115 }
3116 }
3117
3118 let _permit = state.admission.acquire().await;
3124
3125 let result = engine
3136 .generate_tracked(req)
3137 .await
3138 .map_err(|e| e.to_string())?;
3139 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3140}
3141
3142async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3172 let engine = get_inference_engine(state);
3173 let req: car_inference::GenerateImageRequest =
3174 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3175 let _permit = state.admission.acquire().await;
3178 let result = engine
3179 .generate_image(req)
3180 .await
3181 .map_err(|e| e.to_string())?;
3182 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3183}
3184
3185async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3186 let engine = get_inference_engine(state);
3187 let req: car_inference::GenerateVideoRequest =
3188 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3189 let _permit = state.admission.acquire().await;
3190 let result = engine
3191 .generate_video(req)
3192 .await
3193 .map_err(|e| e.to_string())?;
3194 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3195}
3196
3197async fn handle_infer_stream(
3198 msg: &JsonRpcMessage,
3199 session: &crate::session::ClientSession,
3200 state: &ServerState,
3201) -> Result<Value, String> {
3202 use futures::SinkExt;
3203 use tokio_tungstenite::tungstenite::Message;
3204
3205 let engine = get_inference_engine(state);
3206 let mut req: car_inference::GenerateRequest =
3207 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3208
3209 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3212 let mut memgine = session.memgine.lock().await;
3213 let ctx = memgine.build_context(cq);
3214 if !ctx.is_empty() {
3215 req.context = Some(ctx);
3216 }
3217 }
3218
3219 let _permit = state.admission.acquire().await;
3220 let mut rx = engine
3221 .generate_tracked_stream(req)
3222 .await
3223 .map_err(|e| e.to_string())?;
3224
3225 let mut accumulator = car_inference::StreamAccumulator::default();
3226 let request_id = msg.id.clone();
3227
3228 while let Some(event) = rx.recv().await {
3229 let event_payload = match &event {
3230 car_inference::StreamEvent::TextDelta(text) => {
3231 serde_json::json!({"type": "text", "data": text})
3232 }
3233 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3234 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3235 }
3236 car_inference::StreamEvent::ToolCallDelta {
3237 index,
3238 arguments_delta,
3239 } => serde_json::json!({
3240 "type": "tool_delta",
3241 "index": index,
3242 "data": arguments_delta,
3243 }),
3244 car_inference::StreamEvent::Usage {
3245 input_tokens,
3246 output_tokens,
3247 } => serde_json::json!({
3248 "type": "usage",
3249 "input_tokens": input_tokens,
3250 "output_tokens": output_tokens,
3251 }),
3252 car_inference::StreamEvent::Done { .. } => {
3257 accumulator.push(&event);
3258 continue;
3259 }
3260 };
3261
3262 let notif = serde_json::json!({
3263 "jsonrpc": "2.0",
3264 "method": "inference.stream.event",
3265 "params": {
3266 "request_id": request_id,
3267 "event": event_payload,
3268 },
3269 });
3270 if let Ok(text) = serde_json::to_string(¬if) {
3271 let _ = session
3272 .channel
3273 .write
3274 .lock()
3275 .await
3276 .send(Message::Text(text.into()))
3277 .await;
3278 }
3279 accumulator.push(&event);
3280 }
3281
3282 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3283 Ok(serde_json::json!({
3284 "text": text,
3285 "tool_calls": tool_calls,
3286 "usage": usage,
3287 }))
3288}
3289
3290async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3291 let engine = get_inference_engine(state);
3292 let req: car_inference::EmbedRequest =
3293 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3294 let _permit = state.admission.acquire().await;
3298 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3299 Ok(serde_json::json!({"embeddings": result}))
3300}
3301
3302async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3303 let engine = get_inference_engine(state);
3304 let req: car_inference::ClassifyRequest =
3305 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3306 let _permit = state.admission.acquire().await;
3307 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3308 Ok(serde_json::json!({"classifications": result}))
3309}
3310
3311fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3315 let total = state.admission.permits();
3316 let available = state.admission.permits_available();
3317 let in_use = total.saturating_sub(available);
3318 Ok(serde_json::json!({
3319 "permits_total": total,
3320 "permits_available": available,
3321 "permits_in_use": in_use,
3322 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3323 }))
3324}
3325
3326async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3327 let model = msg
3328 .params
3329 .get("model")
3330 .and_then(|v| v.as_str())
3331 .ok_or("missing 'model' parameter")?;
3332 let text = msg
3333 .params
3334 .get("text")
3335 .and_then(|v| v.as_str())
3336 .ok_or("missing 'text' parameter")?;
3337 let engine = get_inference_engine(state);
3338 let ids = engine
3339 .tokenize(model, text)
3340 .await
3341 .map_err(|e| e.to_string())?;
3342 Ok(serde_json::json!({"tokens": ids}))
3343}
3344
3345async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3346 let model = msg
3347 .params
3348 .get("model")
3349 .and_then(|v| v.as_str())
3350 .ok_or("missing 'model' parameter")?;
3351 let tokens: Vec<u32> = msg
3352 .params
3353 .get("tokens")
3354 .and_then(|v| v.as_array())
3355 .ok_or("missing 'tokens' parameter")?
3356 .iter()
3357 .map(|t| {
3358 t.as_u64()
3359 .and_then(|n| u32::try_from(n).ok())
3360 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3361 })
3362 .collect::<Result<Vec<_>, _>>()?;
3363 let engine = get_inference_engine(state);
3364 let text = engine
3365 .detokenize(model, &tokens)
3366 .await
3367 .map_err(|e| e.to_string())?;
3368 Ok(serde_json::json!({"text": text}))
3369}
3370
3371async fn handle_models_register(
3390 req: &JsonRpcMessage,
3391 _state: &Arc<ServerState>,
3392) -> Result<Value, String> {
3393 let schema_value = match req.params.get("schema") {
3397 Some(v) => v.clone(),
3398 None => req.params.clone(),
3399 };
3400 let schema: car_inference::ModelSchema =
3401 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3402 let id = schema.id.clone();
3403
3404 let home = std::env::var_os("HOME")
3409 .or_else(|| std::env::var_os("USERPROFILE"))
3410 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3411 let car_dir = std::path::PathBuf::from(home).join(".car");
3412 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3413 let path = car_dir.join("models.json");
3414
3415 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3416 let text =
3417 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3418 if text.trim().is_empty() {
3419 Vec::new()
3420 } else {
3421 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3422 }
3423 } else {
3424 Vec::new()
3425 };
3426 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3428 *slot = schema;
3429 } else {
3430 models.push(schema);
3431 }
3432 let json =
3433 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3434 let tmp = path.with_extension("json.tmp");
3435 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3436 std::fs::rename(&tmp, &path)
3437 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3438 Ok(serde_json::json!({
3439 "id": id,
3440 "registered": true,
3441 "path": path.to_string_lossy(),
3442 "note": "Daemon restart required for live UnifiedRegistry visibility \
3443 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3444 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3445 }))
3446}
3447
3448async fn handle_models_unregister(
3459 req: &JsonRpcMessage,
3460 _state: &Arc<ServerState>,
3461) -> Result<Value, String> {
3462 let id = match req.params.get("id") {
3466 Some(v) => v
3467 .as_str()
3468 .ok_or_else(|| "`id` must be a string".to_string())?
3469 .to_string(),
3470 None => match req.params.as_str() {
3471 Some(s) => s.to_string(),
3472 None => return Err("missing `id` parameter".to_string()),
3473 },
3474 };
3475
3476 let home = std::env::var_os("HOME")
3477 .or_else(|| std::env::var_os("USERPROFILE"))
3478 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3479 let car_dir = std::path::PathBuf::from(home).join(".car");
3480 let path = car_dir.join("models.json");
3481
3482 if !path.exists() {
3483 return Err(format!(
3484 "no models.json at {} — nothing to unregister",
3485 path.display()
3486 ));
3487 }
3488 let text =
3489 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3490 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3491 Vec::new()
3492 } else {
3493 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3494 };
3495 let before = models.len();
3496 models.retain(|m| m.id != id);
3497 if models.len() == before {
3498 return Err(format!("model {} not found in {}", id, path.display()));
3499 }
3500 let json =
3501 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3502 let tmp = path.with_extension("json.tmp");
3503 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3504 std::fs::rename(&tmp, &path)
3505 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3506 Ok(serde_json::json!({
3507 "id": id,
3508 "unregistered": true,
3509 "path": path.to_string_lossy(),
3510 "note": "Daemon restart required for live UnifiedRegistry visibility \
3511 (phase 1, matching models.register).",
3512 }))
3513}
3514
3515fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3516 let engine = get_inference_engine(state);
3517 let models = engine.list_models();
3518 serde_json::to_value(&models).map_err(|e| e.to_string())
3519}
3520
3521fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3522 let engine = get_inference_engine(state);
3523 let models = engine.list_models_unified();
3524 serde_json::to_value(&models).map_err(|e| e.to_string())
3525}
3526
3527#[derive(Debug, Deserialize)]
3528#[serde(rename_all = "camelCase")]
3529struct ModelSearchParams {
3530 #[serde(default)]
3531 query: Option<String>,
3532 #[serde(default)]
3533 capability: Option<car_inference::ModelCapability>,
3534 #[serde(default)]
3535 provider: Option<String>,
3536 #[serde(default)]
3537 local_only: bool,
3538 #[serde(default)]
3539 available_only: bool,
3540 #[serde(default)]
3541 limit: Option<usize>,
3542}
3543
3544#[derive(Debug, Serialize)]
3545#[serde(rename_all = "camelCase")]
3546struct ModelSearchEntry {
3547 #[serde(flatten)]
3548 info: car_inference::ModelInfo,
3549 family: String,
3550 version: String,
3551 tags: Vec<String>,
3552 pullable: bool,
3553 upgrade: Option<car_inference::ModelUpgrade>,
3554}
3555
3556#[derive(Debug, Serialize)]
3557#[serde(rename_all = "camelCase")]
3558struct ModelSearchResponse {
3559 models: Vec<ModelSearchEntry>,
3560 upgrades: Vec<car_inference::ModelUpgrade>,
3561 total: usize,
3562 available: usize,
3563 local: usize,
3564 remote: usize,
3565}
3566
3567fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3568 let params: ModelSearchParams =
3569 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3570 query: None,
3571 capability: None,
3572 provider: None,
3573 local_only: false,
3574 available_only: false,
3575 limit: None,
3576 });
3577 let engine = get_inference_engine(state);
3578 let upgrades = engine.available_model_upgrades();
3579 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3580 .iter()
3581 .cloned()
3582 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3583 .collect();
3584 let query = params
3585 .query
3586 .as_deref()
3587 .map(str::trim)
3588 .filter(|q| !q.is_empty())
3589 .map(|q| q.to_ascii_lowercase());
3590 let provider = params
3591 .provider
3592 .as_deref()
3593 .map(str::trim)
3594 .filter(|p| !p.is_empty())
3595 .map(|p| p.to_ascii_lowercase());
3596
3597 let mut entries: Vec<ModelSearchEntry> = engine
3598 .list_schemas()
3599 .into_iter()
3600 .filter(|schema| {
3601 if let Some(capability) = params.capability {
3602 if !schema.has_capability(capability) {
3603 return false;
3604 }
3605 }
3606 if let Some(provider) = provider.as_deref() {
3607 if schema.provider.to_ascii_lowercase() != provider {
3608 return false;
3609 }
3610 }
3611 if params.local_only && !schema.is_local() {
3612 return false;
3613 }
3614 if params.available_only && !schema.available {
3615 return false;
3616 }
3617 if let Some(query) = query.as_deref() {
3618 let capability_text = schema
3619 .capabilities
3620 .iter()
3621 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3622 .collect::<Vec<_>>()
3623 .join(" ");
3624 let haystack = format!(
3625 "{} {} {} {} {} {}",
3626 schema.id,
3627 schema.name,
3628 schema.provider,
3629 schema.family,
3630 schema.tags.join(" "),
3631 capability_text
3632 )
3633 .to_ascii_lowercase();
3634 if !haystack.contains(query) {
3635 return false;
3636 }
3637 }
3638 true
3639 })
3640 .map(|schema| {
3641 let pullable = !schema.available
3642 && matches!(
3643 schema.source,
3644 car_inference::ModelSource::Local { .. }
3645 | car_inference::ModelSource::Mlx { .. }
3646 );
3647 let info = car_inference::ModelInfo::from(&schema);
3648 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3649 ModelSearchEntry {
3650 info,
3651 family: schema.family,
3652 version: schema.version,
3653 tags: schema.tags,
3654 pullable,
3655 upgrade,
3656 }
3657 })
3658 .collect();
3659 entries.sort_by(|a, b| {
3660 b.info
3661 .available
3662 .cmp(&a.info.available)
3663 .then(b.info.is_local.cmp(&a.info.is_local))
3664 .then(a.info.name.cmp(&b.info.name))
3665 });
3666 if let Some(limit) = params.limit {
3667 entries.truncate(limit);
3668 }
3669
3670 let total = entries.len();
3671 let available = entries.iter().filter(|entry| entry.info.available).count();
3672 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3673 let response = ModelSearchResponse {
3674 models: entries,
3675 upgrades,
3676 total,
3677 available,
3678 local,
3679 remote: total.saturating_sub(local),
3680 };
3681 serde_json::to_value(response).map_err(|e| e.to_string())
3682}
3683
3684fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3685 let engine = get_inference_engine(state);
3686 serde_json::to_value(serde_json::json!({
3687 "upgrades": engine.available_model_upgrades()
3688 }))
3689 .map_err(|e| e.to_string())
3690}
3691
3692async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3693 let name = msg
3694 .params
3695 .get("name")
3696 .or_else(|| msg.params.get("id"))
3697 .or_else(|| msg.params.get("model"))
3698 .and_then(|v| v.as_str())
3699 .ok_or("missing 'name' parameter")?;
3700 let engine = get_inference_engine(state);
3701 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3702 Ok(serde_json::json!({"path": path.display().to_string()}))
3703}
3704
3705async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3706 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3707 msg.params
3708 .get("events")
3709 .cloned()
3710 .unwrap_or(msg.params.clone()),
3711 )
3712 .map_err(|e| format!("invalid events: {}", e))?;
3713
3714 let inference = get_inference_engine(state).clone();
3715 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3716
3717 let skills = engine.distill_skills(&events).await;
3718 serde_json::to_value(&skills).map_err(|e| e.to_string())
3719}
3720
3721async fn handle_memory_consolidate(
3725 session: &crate::session::ClientSession,
3726) -> Result<Value, String> {
3727 let engine_arc = session.effective_memgine().await;
3728 let report = {
3729 let mut engine = engine_arc.lock().await;
3730 engine.consolidate().await
3731 };
3732 if let Some(id) = session.agent_id.lock().await.clone() {
3733 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3734 tracing::warn!(agent_id = %id, error = %e,
3735 "agent memgine persist after consolidate failed");
3736 }
3737 }
3738 serde_json::to_value(&report).map_err(|e| e.to_string())
3739}
3740
3741async fn handle_skill_repair(
3745 msg: &JsonRpcMessage,
3746 session: &crate::session::ClientSession,
3747) -> Result<Value, String> {
3748 let name = msg
3749 .params
3750 .get("skill_name")
3751 .and_then(|v| v.as_str())
3752 .ok_or("missing 'skill_name' parameter")?;
3753 let mut engine = session.memgine.lock().await;
3754 let code = engine.repair_skill(name).await;
3755 Ok(match code {
3756 Some(c) => serde_json::json!({ "code": c }),
3757 None => Value::Null,
3758 })
3759}
3760
3761async fn handle_skills_ingest_distilled(
3764 msg: &JsonRpcMessage,
3765 session: &crate::session::ClientSession,
3766) -> Result<Value, String> {
3767 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3768 msg.params
3769 .get("skills")
3770 .cloned()
3771 .unwrap_or(msg.params.clone()),
3772 )
3773 .map_err(|e| format!("invalid skills: {}", e))?;
3774 let mut engine = session.memgine.lock().await;
3775 let nodes = engine.ingest_distilled_skills(&skills);
3776 Ok(serde_json::json!({ "ingested": nodes.len() }))
3777}
3778
3779async fn handle_skills_evolve(
3782 msg: &JsonRpcMessage,
3783 session: &crate::session::ClientSession,
3784) -> Result<Value, String> {
3785 let domain = msg
3786 .params
3787 .get("domain")
3788 .and_then(|v| v.as_str())
3789 .ok_or("missing 'domain' parameter")?
3790 .to_string();
3791 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3792 msg.params
3793 .get("events")
3794 .cloned()
3795 .unwrap_or(Value::Array(vec![])),
3796 )
3797 .map_err(|e| format!("invalid events: {}", e))?;
3798 let mut engine = session.memgine.lock().await;
3799 let skills = engine.evolve_skills(&events, &domain).await;
3800 serde_json::to_value(&skills).map_err(|e| e.to_string())
3801}
3802
3803async fn handle_skills_domains_needing_evolution(
3805 msg: &JsonRpcMessage,
3806 session: &crate::session::ClientSession,
3807) -> Result<Value, String> {
3808 let threshold = msg
3809 .params
3810 .get("threshold")
3811 .and_then(|v| v.as_f64())
3812 .unwrap_or(0.6);
3813 let engine = session.memgine.lock().await;
3814 let domains = engine.domains_needing_evolution(threshold);
3815 serde_json::to_value(&domains).map_err(|e| e.to_string())
3816}
3817
3818async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3820 let engine = get_inference_engine(state);
3821 let req: car_inference::RerankRequest =
3822 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3823 let _permit = state.admission.acquire().await;
3824 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3825 serde_json::to_value(&result).map_err(|e| e.to_string())
3826}
3827
3828async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3834 use base64::Engine as _;
3835 let engine = get_inference_engine(state);
3836
3837 let mut params = msg.params.clone();
3844 let audio_b64 = params
3845 .as_object_mut()
3846 .and_then(|m| m.remove("audio_b64"))
3847 .and_then(|v| v.as_str().map(str::to_string));
3848 let _tmp_audio = if let Some(b64) = audio_b64 {
3849 let bytes = base64::engine::general_purpose::STANDARD
3850 .decode(b64.as_bytes())
3851 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3852 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3853 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3854 let path = tmp.path().to_string_lossy().into_owned();
3855 if let Some(obj) = params.as_object_mut() {
3856 obj.insert("audio_path".to_string(), Value::String(path));
3857 }
3858 Some(tmp)
3859 } else {
3860 None
3861 };
3862
3863 let req: car_inference::TranscribeRequest =
3864 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3865 let _permit = state.admission.acquire().await;
3866 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3867 serde_json::to_value(&result).map_err(|e| e.to_string())
3868}
3869
3870async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3876 use base64::Engine as _;
3877 let engine = get_inference_engine(state);
3878
3879 let mut params = msg.params.clone();
3880 let return_b64 = params
3881 .as_object_mut()
3882 .and_then(|m| m.remove("return_b64"))
3883 .and_then(|v| v.as_bool())
3884 .unwrap_or(false);
3885 let no_output_path = params
3886 .as_object()
3887 .map(|m| !m.contains_key("output_path"))
3888 .unwrap_or(true);
3889
3890 let req: car_inference::SynthesizeRequest =
3891 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3892 let _permit = state.admission.acquire().await;
3893 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3894 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3895
3896 if return_b64 || no_output_path {
3900 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3901 format!(
3902 "synthesize: failed to read rendered audio at {}: {e}",
3903 result.audio_path
3904 )
3905 })?;
3906 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3907 if let Some(obj) = value.as_object_mut() {
3908 obj.insert("audio_b64".to_string(), Value::String(encoded));
3909 }
3910 }
3911 Ok(value)
3912}
3913
3914async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3918 let engine = get_inference_engine(state);
3919 let status = engine
3920 .prepare_speech_runtime()
3921 .await
3922 .map_err(|e| e.to_string())?;
3923 serde_json::to_value(&status).map_err(|e| e.to_string())
3924}
3925
3926async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3929 let prompt = msg
3930 .params
3931 .get("prompt")
3932 .and_then(|v| v.as_str())
3933 .ok_or("missing 'prompt' parameter")?;
3934 let engine = get_inference_engine(state);
3935 let decision = engine.route_adaptive(prompt).await;
3936 serde_json::to_value(&decision).map_err(|e| e.to_string())
3937}
3938
3939async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3941 let engine = get_inference_engine(state);
3942 let profiles = engine.export_profiles().await;
3943 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3944}
3945
3946#[derive(Deserialize)]
3947#[serde(rename_all = "camelCase")]
3948struct OutcomesResolvePendingParams {
3949 action_results: Vec<(String, bool, f64, String)>,
3954}
3955
3956async fn handle_outcomes_resolve_pending(
3976 req: &JsonRpcMessage,
3977 state: &ServerState,
3978) -> Result<Value, String> {
3979 let params: OutcomesResolvePendingParams =
3980 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3981 let engine = get_inference_engine(state);
3982 let mut tracker = engine.outcome_tracker.write().await;
3983 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
3984 tracker.resolve_pending_from_signals(inferred);
3985 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3986}
3987
3988async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3990 let n = session.runtime.log.lock().await.len();
3991 Ok(Value::from(n as u64))
3992}
3993
3994async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3995 let stats = session.runtime.log.lock().await.stats();
3996 serde_json::to_value(stats).map_err(|e| e.to_string())
3997}
3998
3999#[derive(Deserialize)]
4000#[serde(rename_all = "camelCase")]
4001struct EventsTruncateParams {
4002 #[serde(default)]
4003 max_events: Option<usize>,
4004 #[serde(default)]
4005 max_spans: Option<usize>,
4006}
4007
4008async fn handle_events_truncate(
4009 msg: &JsonRpcMessage,
4010 session: &crate::session::ClientSession,
4011) -> Result<Value, String> {
4012 let params: EventsTruncateParams =
4013 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4014 max_events: None,
4015 max_spans: None,
4016 });
4017 let mut log = session.runtime.log.lock().await;
4018 let removed_events = params
4019 .max_events
4020 .map(|max| log.truncate_events_keep_last(max))
4021 .unwrap_or(0);
4022 let removed_spans = params
4023 .max_spans
4024 .map(|max| log.truncate_spans_keep_last(max))
4025 .unwrap_or(0);
4026 let stats = log.stats();
4027 Ok(serde_json::json!({
4028 "removedEvents": removed_events,
4029 "removedSpans": removed_spans,
4030 "stats": stats,
4031 }))
4032}
4033
4034async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4035 let mut log = session.runtime.log.lock().await;
4036 let removed = log.clear();
4037 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4038}
4039
4040async fn handle_replan_set_config(
4045 msg: &JsonRpcMessage,
4046 session: &crate::session::ClientSession,
4047) -> Result<Value, String> {
4048 let max_replans = msg
4049 .params
4050 .get("max_replans")
4051 .and_then(|v| v.as_u64())
4052 .unwrap_or(0) as u32;
4053 let delay_ms = msg
4054 .params
4055 .get("delay_ms")
4056 .and_then(|v| v.as_u64())
4057 .unwrap_or(0);
4058 let verify_before_execute = msg
4059 .params
4060 .get("verify_before_execute")
4061 .and_then(|v| v.as_bool())
4062 .unwrap_or(true);
4063 let cfg = car_engine::ReplanConfig {
4064 max_replans,
4065 delay_ms,
4066 verify_before_execute,
4067 };
4068 session.runtime.set_replan_config(cfg).await;
4069 Ok(Value::Null)
4070}
4071
4072async fn handle_skills_list(
4073 msg: &JsonRpcMessage,
4074 session: &crate::session::ClientSession,
4075) -> Result<Value, String> {
4076 let domain = msg.params.get("domain").and_then(|v| v.as_str());
4077 let engine = session.memgine.lock().await;
4078 let skills: Vec<serde_json::Value> = engine
4079 .graph
4080 .inner
4081 .node_indices()
4082 .filter_map(|nix| {
4083 let node = engine.graph.inner.node_weight(nix)?;
4084 if node.kind != car_memgine::MemKind::Skill {
4085 return None;
4086 }
4087 let meta = car_memgine::SkillMeta::from_node(node)?;
4088 if let Some(d) = domain {
4089 match &meta.scope {
4090 car_memgine::SkillScope::Global => {}
4091 car_memgine::SkillScope::Domain(sd) if sd == d => {}
4092 _ => return None,
4093 }
4094 }
4095 Some(serde_json::to_value(&meta).unwrap_or_default())
4096 })
4097 .collect();
4098 serde_json::to_value(&skills).map_err(|e| e.to_string())
4099}
4100
4101#[derive(serde::Deserialize)]
4102struct SecretParams {
4103 #[serde(default)]
4104 service: Option<String>,
4105 key: String,
4106 #[serde(default)]
4107 value: Option<String>,
4108}
4109
4110fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4111 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4112 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4113 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4114}
4115
4116fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4117 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4118 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4119}
4120
4121fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4122 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4123 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4124}
4125
4126fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4127 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4128 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4129}
4130
4131#[derive(serde::Deserialize)]
4132struct PermParams {
4133 domain: String,
4134 #[serde(default)]
4135 target_bundle_id: Option<String>,
4136}
4137
4138fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4139 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4140 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4141}
4142
4143fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4144 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4145 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4146}
4147
4148fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4149 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4150 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4151}
4152
4153fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4154 #[derive(serde::Deserialize)]
4155 struct P {
4156 start: String,
4157 end: String,
4158 #[serde(default)]
4159 calendar_ids: Vec<String>,
4160 }
4161 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4162 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4163 .map_err(|e| format!("parse start: {}", e))?
4164 .with_timezone(&chrono::Utc);
4165 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4166 .map_err(|e| format!("parse end: {}", e))?
4167 .with_timezone(&chrono::Utc);
4168 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4169}
4170
4171fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4172 #[derive(serde::Deserialize)]
4173 struct P {
4174 query: String,
4175 #[serde(default = "default_limit")]
4176 limit: usize,
4177 #[serde(default)]
4178 container_ids: Vec<String>,
4179 }
4180 fn default_limit() -> usize {
4181 50
4182 }
4183 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4184 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4185}
4186
4187fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4188 #[derive(serde::Deserialize, Default)]
4189 struct P {
4190 #[serde(default)]
4191 account_ids: Vec<String>,
4192 }
4193 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4194 car_ffi_common::integrations::mail_inbox(&p.account_ids)
4195}
4196
4197fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4198 let raw = req.params.to_string();
4199 car_ffi_common::integrations::mail_send(&raw)
4200}
4201
4202fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4203 #[derive(serde::Deserialize)]
4204 struct P {
4205 #[serde(default = "default_limit")]
4206 limit: usize,
4207 }
4208 fn default_limit() -> usize {
4209 50
4210 }
4211 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4212 car_ffi_common::integrations::messages_chats(p.limit)
4213}
4214
4215fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4216 let raw = req.params.to_string();
4217 car_ffi_common::integrations::messages_send(&raw)
4218}
4219
4220fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4221 #[derive(serde::Deserialize)]
4222 struct P {
4223 query: String,
4224 #[serde(default = "default_limit")]
4225 limit: usize,
4226 }
4227 fn default_limit() -> usize {
4228 50
4229 }
4230 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4231 car_ffi_common::integrations::notes_find(&p.query, p.limit)
4232}
4233
4234fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4235 #[derive(serde::Deserialize)]
4236 struct P {
4237 #[serde(default = "default_limit")]
4238 limit: usize,
4239 }
4240 fn default_limit() -> usize {
4241 50
4242 }
4243 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4244 car_ffi_common::integrations::reminders_items(p.limit)
4245}
4246
4247fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4248 #[derive(serde::Deserialize)]
4249 struct P {
4250 #[serde(default = "default_limit")]
4251 limit: usize,
4252 }
4253 fn default_limit() -> usize {
4254 100
4255 }
4256 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4257 car_ffi_common::integrations::bookmarks_list(p.limit)
4258}
4259
4260fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4261 #[derive(serde::Deserialize)]
4262 struct P {
4263 start: String,
4264 end: String,
4265 }
4266 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4267 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4268 .map_err(|e| format!("parse start: {}", e))?
4269 .with_timezone(&chrono::Utc);
4270 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4271 .map_err(|e| format!("parse end: {}", e))?
4272 .with_timezone(&chrono::Utc);
4273 car_ffi_common::health::sleep_windows(s, e)
4274}
4275
4276fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4277 #[derive(serde::Deserialize)]
4278 struct P {
4279 start: String,
4280 end: String,
4281 }
4282 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4283 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4284 .map_err(|e| format!("parse start: {}", e))?
4285 .with_timezone(&chrono::Utc);
4286 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4287 .map_err(|e| format!("parse end: {}", e))?
4288 .with_timezone(&chrono::Utc);
4289 car_ffi_common::health::workouts(s, e)
4290}
4291
4292fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4293 #[derive(serde::Deserialize)]
4294 struct P {
4295 start: String,
4296 end: String,
4297 }
4298 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4299 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4300 .map_err(|e| format!("parse start: {}", e))?;
4301 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4302 .map_err(|e| format!("parse end: {}", e))?;
4303 car_ffi_common::health::activity(s, e)
4304}
4305
4306async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4307 let closed = session.browser.close().await?;
4308 Ok(serde_json::json!({"closed": closed}))
4309}
4310
4311async fn handle_browser_run(
4312 req: &JsonRpcMessage,
4313 session: &crate::session::ClientSession,
4314) -> Result<Value, String> {
4315 #[derive(serde::Deserialize)]
4316 struct BrowserRunParams {
4317 script: Value,
4319 #[serde(default)]
4320 width: Option<u32>,
4321 #[serde(default)]
4322 height: Option<u32>,
4323 #[serde(default)]
4328 headed: Option<bool>,
4329 #[serde(default)]
4332 extra_args: Option<Vec<String>>,
4333 }
4334 let params: BrowserRunParams =
4335 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4336
4337 let script_json = match params.script {
4339 Value::String(s) => s,
4340 other => other.to_string(),
4341 };
4342
4343 let browser_session = session
4344 .browser
4345 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4346 width: params.width.unwrap_or(1280),
4347 height: params.height.unwrap_or(720),
4348 headless: !params.headed.unwrap_or(false),
4349 extra_args: params.extra_args.unwrap_or_default(),
4350 })
4351 .await?;
4352
4353 let trace_json = browser_session.run(&script_json).await?;
4354 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4355}
4356
4357#[derive(Deserialize)]
4370struct VoiceStartParams {
4371 session_id: String,
4372 audio_source: Value,
4373 #[serde(default)]
4374 options: Option<Value>,
4375}
4376
4377async fn handle_voice_transcribe_stream_start(
4378 req: &JsonRpcMessage,
4379 state: &Arc<ServerState>,
4380 session: &Arc<crate::session::ClientSession>,
4381) -> Result<Value, String> {
4382 let params: VoiceStartParams =
4383 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4384 let audio_source_json =
4385 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
4386 let options_json = params
4387 .options
4388 .as_ref()
4389 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4390 .transpose()?;
4391 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4392 channel: session.channel.clone(),
4393 });
4394 let json = car_ffi_common::voice::transcribe_stream_start(
4395 ¶ms.session_id,
4396 &audio_source_json,
4397 options_json.as_deref(),
4398 state.voice_sessions.clone(),
4399 sink,
4400 )
4401 .await?;
4402 serde_json::from_str(&json).map_err(|e| e.to_string())
4403}
4404
4405#[derive(Deserialize)]
4406struct VoiceStopParams {
4407 session_id: String,
4408}
4409
4410async fn handle_voice_transcribe_stream_stop(
4411 req: &JsonRpcMessage,
4412 state: &Arc<ServerState>,
4413) -> Result<Value, String> {
4414 let params: VoiceStopParams =
4415 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4416 let json = car_ffi_common::voice::transcribe_stream_stop(
4417 ¶ms.session_id,
4418 state.voice_sessions.clone(),
4419 )
4420 .await?;
4421 serde_json::from_str(&json).map_err(|e| e.to_string())
4422}
4423
4424#[derive(Deserialize)]
4425struct VoicePushParams {
4426 session_id: String,
4427 pcm_b64: String,
4431}
4432
4433async fn handle_voice_transcribe_stream_push(
4434 req: &JsonRpcMessage,
4435 state: &Arc<ServerState>,
4436) -> Result<Value, String> {
4437 use base64::Engine;
4438 let params: VoicePushParams =
4439 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4440 let pcm = base64::engine::general_purpose::STANDARD
4441 .decode(¶ms.pcm_b64)
4442 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4443 let json = car_ffi_common::voice::transcribe_stream_push(
4444 ¶ms.session_id,
4445 &pcm,
4446 state.voice_sessions.clone(),
4447 )
4448 .await?;
4449 serde_json::from_str(&json).map_err(|e| e.to_string())
4450}
4451
4452fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4453 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4454 serde_json::from_str(&json).unwrap_or(Value::Null)
4455}
4456
4457#[derive(Deserialize)]
4458struct VoiceTtsStreamStartParams {
4459 stream_id: String,
4463 text: String,
4466 #[serde(default)]
4469 options: Option<Value>,
4470}
4471
4472async fn handle_voice_tts_stream_start(
4473 req: &JsonRpcMessage,
4474 session: &Arc<crate::session::ClientSession>,
4475) -> Result<Value, String> {
4476 let params: VoiceTtsStreamStartParams =
4477 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4478 let opts_str = params
4479 .options
4480 .as_ref()
4481 .map(|v| v.to_string())
4482 .filter(|s| !s.is_empty());
4483 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4484 channel: session.channel.clone(),
4485 });
4486 let json = car_ffi_common::voice::tts_stream_start(
4487 ¶ms.stream_id,
4488 ¶ms.text,
4489 opts_str.as_deref(),
4490 sink,
4491 )
4492 .await?;
4493 serde_json::from_str(&json).map_err(|e| e.to_string())
4494}
4495
4496#[derive(Deserialize)]
4497struct VoiceTtsStreamCancelParams {
4498 stream_id: String,
4499}
4500
4501async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
4502 let params: VoiceTtsStreamCancelParams =
4503 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4504 let json = car_ffi_common::voice::tts_stream_cancel(¶ms.stream_id).await?;
4505 serde_json::from_str(&json).map_err(|e| e.to_string())
4506}
4507
4508fn handle_voice_tts_stream_list() -> Value {
4509 let json = car_ffi_common::voice::list_tts_streams();
4510 serde_json::from_str(&json).unwrap_or(Value::Null)
4511}
4512
4513async fn handle_voice_dispatch_turn(
4514 req: &JsonRpcMessage,
4515 state: &Arc<ServerState>,
4516 session: &Arc<crate::session::ClientSession>,
4517) -> Result<Value, String> {
4518 let req_value = req.params.clone();
4519 let request: crate::voice_turn::DispatchVoiceTurnRequest =
4520 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4521 let engine = get_inference_engine(state).clone();
4522 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4523 channel: session.channel.clone(),
4524 });
4525 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4526 serde_json::to_value(resp).map_err(|e| e.to_string())
4527}
4528
4529async fn handle_voice_cancel_turn() -> Result<Value, String> {
4530 crate::voice_turn::cancel().await;
4531 Ok(serde_json::json!({"cancelled": true}))
4532}
4533
4534async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4535 let engine = get_inference_engine(state).clone();
4536 crate::voice_turn::prewarm(engine).await;
4537 Ok(serde_json::json!({"prewarmed": true}))
4538}
4539
4540fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4559 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4560 std::sync::OnceLock::new();
4561 SLOT.get_or_init(|| std::sync::RwLock::new(None))
4562}
4563
4564fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4565 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4566 std::sync::OnceLock::new();
4567 MAP.get_or_init(dashmap::DashMap::new)
4568}
4569
4570fn ws_runner_completions() -> &'static dashmap::DashMap<
4571 String,
4572 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4573> {
4574 static MAP: std::sync::OnceLock<
4575 dashmap::DashMap<
4576 String,
4577 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4578 >,
4579 > = std::sync::OnceLock::new();
4580 MAP.get_or_init(dashmap::DashMap::new)
4581}
4582
4583struct WsInferenceRunner;
4584
4585#[async_trait::async_trait]
4586impl car_inference::InferenceRunner for WsInferenceRunner {
4587 async fn run(
4588 &self,
4589 request: car_inference::tasks::generate::GenerateRequest,
4590 emitter: car_inference::EventEmitter,
4591 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4592 let channel = ws_runner_session()
4593 .read()
4594 .map_err(|e| {
4595 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4596 })?
4597 .clone()
4598 .ok_or_else(|| {
4599 car_inference::RunnerError::Declined(
4600 "no WebSocket inference runner registered — call inference.register_runner first"
4601 .into(),
4602 )
4603 })?;
4604
4605 let call_id = uuid::Uuid::new_v4().to_string();
4606 let request_json = serde_json::to_value(&request)
4607 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4608 let (tx, rx) = tokio::sync::oneshot::channel();
4609 ws_runner_calls().insert(call_id.clone(), emitter);
4610 ws_runner_completions().insert(call_id.clone(), tx);
4611
4612 use futures::SinkExt;
4614 let notification = serde_json::json!({
4615 "jsonrpc": "2.0",
4616 "method": "inference.runner.invoke",
4617 "params": {
4618 "call_id": call_id,
4619 "request": request_json,
4620 },
4621 });
4622 let text = serde_json::to_string(¬ification)
4623 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4624 let _ = channel
4625 .write
4626 .lock()
4627 .await
4628 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4629 .await;
4630
4631 let result = rx.await.map_err(|_| {
4632 car_inference::RunnerError::Failed("runner completion channel dropped".into())
4633 })?;
4634 ws_runner_calls().remove(&call_id);
4635 result.map_err(car_inference::RunnerError::Failed)
4636 }
4637}
4638
4639async fn handle_inference_register_runner(
4640 session: &Arc<crate::session::ClientSession>,
4641) -> Result<Value, String> {
4642 let mut guard = ws_runner_session()
4643 .write()
4644 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4645 *guard = Some(session.channel.clone());
4646 drop(guard);
4647 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4648 Ok(serde_json::json!({"registered": true}))
4649}
4650
4651#[derive(serde::Deserialize)]
4652struct InferenceRunnerEventParams {
4653 call_id: String,
4654 event: Value,
4655}
4656
4657async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4658 let params: InferenceRunnerEventParams =
4659 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4660 let stream_event = match parse_runner_event_value(¶ms.event) {
4661 Some(e) => e,
4662 None => return Err("unrecognised runner event shape".into()),
4663 };
4664 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
4665 let emitter = entry.value().clone();
4666 tokio::spawn(async move { emitter.emit(stream_event).await });
4667 }
4668 Ok(serde_json::json!({"emitted": true}))
4669}
4670
4671#[derive(serde::Deserialize)]
4672struct InferenceRunnerCompleteParams {
4673 call_id: String,
4674 result: Value,
4675}
4676
4677async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4678 let params: InferenceRunnerCompleteParams =
4679 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4680 let result: std::result::Result<car_inference::RunnerResult, String> =
4681 serde_json::from_value(params.result)
4682 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4683 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4684 let _ = tx.send(result);
4685 }
4686 Ok(serde_json::json!({"completed": true}))
4687}
4688
4689#[derive(serde::Deserialize)]
4690struct InferenceRunnerFailParams {
4691 call_id: String,
4692 error: String,
4693}
4694
4695async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4696 let params: InferenceRunnerFailParams =
4697 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4698 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4699 let _ = tx.send(Err(params.error));
4700 }
4701 Ok(serde_json::json!({"failed": true}))
4702}
4703
4704fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4705 let ty = v.get("type").and_then(|t| t.as_str())?;
4706 match ty {
4707 "text" => Some(car_inference::StreamEvent::TextDelta(
4708 v.get("data")?.as_str()?.to_string(),
4709 )),
4710 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4711 name: v.get("name")?.as_str()?.to_string(),
4712 index: v.get("index")?.as_u64()? as usize,
4713 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4714 }),
4715 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4716 index: v.get("index")?.as_u64()? as usize,
4717 arguments_delta: v.get("data")?.as_str()?.to_string(),
4718 }),
4719 "usage" => Some(car_inference::StreamEvent::Usage {
4720 input_tokens: v.get("input_tokens")?.as_u64()?,
4721 output_tokens: v.get("output_tokens")?.as_u64()?,
4722 }),
4723 "done" => Some(car_inference::StreamEvent::Done {
4724 text: v.get("text")?.as_str()?.to_string(),
4725 tool_calls: v
4726 .get("tool_calls")
4727 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4728 .unwrap_or_default(),
4729 }),
4730 _ => None,
4731 }
4732}
4733
4734#[derive(Deserialize)]
4735struct EnrollSpeakerParams {
4736 label: String,
4737 audio: Value,
4738}
4739
4740async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4741 let params: EnrollSpeakerParams =
4742 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4743 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
4744 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
4745 serde_json::from_str(&json).map_err(|e| e.to_string())
4746}
4747
4748#[derive(Deserialize)]
4749struct RemoveEnrollmentParams {
4750 label: String,
4751}
4752
4753fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4754 let params: RemoveEnrollmentParams =
4755 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4756 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
4757 serde_json::from_str(&json).map_err(|e| e.to_string())
4758}
4759
4760#[derive(Deserialize)]
4761struct WorkflowRunParams {
4762 workflow: Value,
4763}
4764
4765async fn handle_workflow_run(
4766 req: &JsonRpcMessage,
4767 session: &Arc<crate::session::ClientSession>,
4768) -> Result<Value, String> {
4769 let params: WorkflowRunParams =
4770 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4771 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4772 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4773 channel: session.channel.clone(),
4774 host: session.host.clone(),
4775 client_id: session.client_id.clone(),
4776 });
4777 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4778 serde_json::from_str(&json).map_err(|e| e.to_string())
4779}
4780
4781#[derive(Deserialize)]
4782struct WorkflowVerifyParams {
4783 workflow: Value,
4784}
4785
4786fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4787 let params: WorkflowVerifyParams =
4788 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4789 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4790 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4791 serde_json::from_str(&json).map_err(|e| e.to_string())
4792}
4793
4794async fn handle_meeting_start(
4799 req: &JsonRpcMessage,
4800 state: &Arc<ServerState>,
4801 session: &Arc<crate::session::ClientSession>,
4802) -> Result<Value, String> {
4803 let mut req_value = req.params.clone();
4809 let meeting_id = req_value
4810 .get("id")
4811 .and_then(|v| v.as_str())
4812 .map(str::to_string)
4813 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4814 if let Some(map) = req_value.as_object_mut() {
4815 map.insert("id".into(), Value::String(meeting_id.clone()));
4816 }
4817 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4818
4819 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4820 Arc::new(crate::session::WsVoiceEventSink {
4821 channel: session.channel.clone(),
4822 });
4823
4824 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4829 Arc::new(crate::session::WsMemgineIngestSink {
4830 meeting_id,
4831 engine: session.memgine.clone(),
4832 upstream: ws_upstream,
4833 });
4834
4835 let cwd = std::env::current_dir().ok();
4836 let json = crate::meeting::start_meeting(
4837 &request_json,
4838 state.meetings.clone(),
4839 state.voice_sessions.clone(),
4840 upstream,
4841 None,
4842 cwd,
4843 )
4844 .await?;
4845 serde_json::from_str(&json).map_err(|e| e.to_string())
4846}
4847
4848#[derive(Deserialize)]
4849struct MeetingStopParams {
4850 meeting_id: String,
4851 #[serde(default = "default_summarize")]
4852 summarize: bool,
4853}
4854
4855fn default_summarize() -> bool {
4856 true
4857}
4858
4859async fn handle_meeting_stop(
4860 req: &JsonRpcMessage,
4861 state: &Arc<ServerState>,
4862 _session: &Arc<crate::session::ClientSession>,
4863) -> Result<Value, String> {
4864 let params: MeetingStopParams =
4865 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4866 let inference = if params.summarize {
4867 Some(state.inference.get().cloned()).flatten()
4868 } else {
4869 None
4870 };
4871 let json = crate::meeting::stop_meeting(
4872 ¶ms.meeting_id,
4873 params.summarize,
4874 state.meetings.clone(),
4875 state.voice_sessions.clone(),
4876 inference,
4877 )
4878 .await?;
4879 serde_json::from_str(&json).map_err(|e| e.to_string())
4880}
4881
4882#[derive(Deserialize, Default)]
4883struct MeetingListParams {
4884 #[serde(default)]
4885 root: Option<std::path::PathBuf>,
4886}
4887
4888fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4889 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4890 let cwd = std::env::current_dir().ok();
4891 let json = crate::meeting::list_meetings(params.root, cwd)?;
4892 serde_json::from_str(&json).map_err(|e| e.to_string())
4893}
4894
4895#[derive(Deserialize)]
4896struct MeetingGetParams {
4897 meeting_id: String,
4898 #[serde(default)]
4899 root: Option<std::path::PathBuf>,
4900}
4901
4902fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4903 let params: MeetingGetParams =
4904 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4905 let cwd = std::env::current_dir().ok();
4906 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4907 serde_json::from_str(&json).map_err(|e| e.to_string())
4908}
4909
4910#[derive(Deserialize, Default)]
4915struct RegistryRegisterParams {
4916 entry: Value,
4920 #[serde(default)]
4921 registry_path: Option<std::path::PathBuf>,
4922}
4923
4924fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4925 let params: RegistryRegisterParams =
4926 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4927 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4928 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4929 Ok(Value::Null)
4930}
4931
4932#[derive(Deserialize, Default)]
4933struct RegistryNameParams {
4934 name: String,
4935 #[serde(default)]
4936 registry_path: Option<std::path::PathBuf>,
4937}
4938
4939fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4940 let params: RegistryNameParams =
4941 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4942 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4943 serde_json::from_str(&json).map_err(|e| e.to_string())
4944}
4945
4946fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4947 let params: RegistryNameParams =
4948 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4949 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4950 Ok(Value::Null)
4951}
4952
4953#[derive(Deserialize, Default)]
4954struct RegistryListParams {
4955 #[serde(default)]
4956 registry_path: Option<std::path::PathBuf>,
4957}
4958
4959fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4960 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4961 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4962 serde_json::from_str(&json).map_err(|e| e.to_string())
4963}
4964
4965#[derive(Deserialize, Default)]
4966struct RegistryReapParams {
4967 #[serde(default = "default_reap_age")]
4970 max_age_secs: u64,
4971 #[serde(default)]
4972 registry_path: Option<std::path::PathBuf>,
4973}
4974
4975fn default_reap_age() -> u64 {
4976 60
4977}
4978
4979fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4980 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4981 let json =
4982 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4983 serde_json::from_str(&json).map_err(|e| e.to_string())
4984}
4985
4986async fn handle_a2a_start(
4993 req: &JsonRpcMessage,
4994 session: &crate::session::ClientSession,
4995) -> Result<Value, String> {
4996 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4997 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
5003 serde_json::from_str(&json).map_err(|e| e.to_string())
5004}
5005
5006fn handle_a2a_stop() -> Result<Value, String> {
5007 let json = crate::a2a::stop_a2a()?;
5008 serde_json::from_str(&json).map_err(|e| e.to_string())
5009}
5010
5011fn handle_a2a_status() -> Result<Value, String> {
5012 let json = crate::a2a::a2a_status()?;
5013 serde_json::from_str(&json).map_err(|e| e.to_string())
5014}
5015
5016#[derive(Deserialize)]
5017#[serde(rename_all = "camelCase")]
5018struct A2aSendParams {
5019 endpoint: String,
5020 message: car_a2a::Message,
5021 #[serde(default)]
5022 blocking: bool,
5023 #[serde(default = "default_true")]
5024 ingest_a2ui: bool,
5025 #[serde(default)]
5026 route_auth: Option<A2aRouteAuth>,
5027 #[serde(default)]
5028 allow_untrusted_endpoint: bool,
5029}
5030
5031fn default_true() -> bool {
5032 true
5033}
5034
5035async fn handle_a2a_dispatch(
5045 method: &str,
5046 req: &JsonRpcMessage,
5047 state: &Arc<ServerState>,
5048) -> Result<Value, String> {
5049 let dispatcher = state.a2a_dispatcher().await;
5050 dispatcher
5051 .dispatch(method, req.params.clone())
5052 .await
5053 .map_err(|e| e.to_string())
5054}
5055
5056async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
5057 let params: A2aSendParams =
5058 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5059 let endpoint = trusted_route_endpoint(
5060 Some(params.endpoint.clone()),
5061 params.allow_untrusted_endpoint,
5062 )
5063 .ok_or_else(|| {
5064 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
5065 })?;
5066 let client = match params.route_auth.clone() {
5067 Some(auth) => {
5068 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
5069 }
5070 None => car_a2a::A2aClient::new(endpoint.clone()),
5071 };
5072 let result = client
5073 .send_message(params.message, params.blocking)
5074 .await
5075 .map_err(|e| e.to_string())?;
5076 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
5077 let mut applied = Vec::new();
5078 if params.ingest_a2ui {
5079 state
5080 .a2ui
5081 .validate_payload(&result_value)
5082 .map_err(|e| e.to_string())?;
5083 let routed_endpoint = Some(endpoint.clone());
5084 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
5085 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
5086 if owner.endpoint.is_none() {
5087 owner.with_endpoint(routed_endpoint.clone())
5088 } else {
5089 owner
5090 }
5091 });
5092 applied.push(
5093 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
5094 );
5095 }
5096 }
5097 Ok(serde_json::json!({
5098 "result": result,
5099 "a2ui": {
5100 "applied": applied,
5101 }
5102 }))
5103}
5104
5105async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5113 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5114 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5115 serde_json::from_str(&json).map_err(|e| e.to_string())
5116}
5117
5118async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5119 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5120 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5121 serde_json::from_str(&json).map_err(|e| e.to_string())
5122}
5123
5124async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5125 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5126 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5127 serde_json::from_str(&json).map_err(|e| e.to_string())
5128}
5129
5130async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5131 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5132 let json = car_ffi_common::notifications::local(&args_json).await?;
5133 serde_json::from_str(&json).map_err(|e| e.to_string())
5134}
5135
5136async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5137 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5138 let json = car_ffi_common::vision::ocr(&args_json).await?;
5139 serde_json::from_str(&json).map_err(|e| e.to_string())
5140}
5141
5142async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5147 let agents = match state.observer_manifest_path() {
5156 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5157 .map_err(|e| e.to_string())?,
5158 None => {
5159 let supervisor = state.supervisor()?;
5160 supervisor.list().await
5161 }
5162 };
5163 let attached = state.attached_agents.lock().await.clone();
5170 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5171 for a in agents {
5172 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5173 let session_id = attached.get(&a.spec.id).cloned();
5174 if let Some(map) = v.as_object_mut() {
5175 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5176 if let Some(sid) = session_id {
5177 map.insert("session_id".to_string(), Value::String(sid));
5178 }
5179 }
5180 decorated.push(v);
5181 }
5182 Ok(Value::Array(decorated))
5183}
5184
5185async fn handle_agents_upsert(
5186 req: &JsonRpcMessage,
5187 state: &Arc<ServerState>,
5188) -> Result<Value, String> {
5189 let mut params = req.params.clone();
5190 if let Some(name) = params
5199 .get("interpreter")
5200 .and_then(|v| v.as_str())
5201 .map(str::to_string)
5202 {
5203 let resolved =
5204 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5205 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5206 }
5207 let spec: car_registry::supervisor::AgentSpec =
5208 serde_json::from_value(params).map_err(|e| e.to_string())?;
5209 let supervisor = state.supervisor()?;
5210 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5211 serde_json::to_value(agent).map_err(|e| e.to_string())
5212}
5213
5214async fn handle_agents_install(
5228 req: &JsonRpcMessage,
5229 state: &Arc<ServerState>,
5230) -> Result<Value, String> {
5231 let manifest: car_registry::manifest::AgentManifest =
5232 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5233 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5234 let supervisor = state.supervisor()?;
5235 let (report, managed) = supervisor
5236 .install_manifest(manifest, &host)
5237 .await
5238 .map_err(|e| e.to_string())?;
5239 Ok(serde_json::json!({
5240 "report": {
5241 "missingOptional": report
5242 .missing_optional
5243 .iter()
5244 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5245 .collect::<Vec<_>>(),
5246 },
5247 "agent": managed,
5248 }))
5249}
5250
5251async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5252 let entries = match state.observer_manifest_path() {
5258 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5259 .map_err(|e| e.to_string())?,
5260 None => {
5261 let supervisor = state.supervisor()?;
5262 supervisor.health().await
5263 }
5264 };
5265 serde_json::to_value(entries).map_err(|e| e.to_string())
5266}
5267
5268fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5269 req.params
5270 .get("id")
5271 .and_then(Value::as_str)
5272 .map(str::to_string)
5273 .ok_or_else(|| "missing required `id` parameter".to_string())
5274}
5275
5276async fn handle_agents_remove(
5277 req: &JsonRpcMessage,
5278 state: &Arc<ServerState>,
5279) -> Result<Value, String> {
5280 let id = extract_agent_id(req)?;
5281 let supervisor = state.supervisor()?;
5282 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5283 Ok(serde_json::json!({ "removed": removed }))
5284}
5285
5286async fn handle_agents_start(
5287 req: &JsonRpcMessage,
5288 state: &Arc<ServerState>,
5289) -> Result<Value, String> {
5290 let id = extract_agent_id(req)?;
5291 let supervisor = state.supervisor()?;
5292 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5293 serde_json::to_value(agent).map_err(|e| e.to_string())
5294}
5295
5296async fn handle_agents_stop(
5297 req: &JsonRpcMessage,
5298 state: &Arc<ServerState>,
5299) -> Result<Value, String> {
5300 let id = extract_agent_id(req)?;
5301 let signal: car_registry::supervisor::StopSignal = req
5302 .params
5303 .get("signal")
5304 .map(|v| serde_json::from_value(v.clone()))
5305 .transpose()
5306 .map_err(|e| e.to_string())?
5307 .unwrap_or_default();
5308 let supervisor = state.supervisor()?;
5309 let agent = supervisor
5310 .stop(&id, signal)
5311 .await
5312 .map_err(|e| e.to_string())?;
5313 serde_json::to_value(agent).map_err(|e| e.to_string())
5314}
5315
5316async fn handle_agents_restart(
5317 req: &JsonRpcMessage,
5318 state: &Arc<ServerState>,
5319) -> Result<Value, String> {
5320 let id = extract_agent_id(req)?;
5321 let supervisor = state.supervisor()?;
5322 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5323 serde_json::to_value(agent).map_err(|e| e.to_string())
5324}
5325
5326async fn handle_agents_tail_log(
5327 req: &JsonRpcMessage,
5328 state: &Arc<ServerState>,
5329) -> Result<Value, String> {
5330 let id = extract_agent_id(req)?;
5331 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5332 let supervisor = state.supervisor()?;
5333 let lines = supervisor
5334 .tail_log(&id, n)
5335 .await
5336 .map_err(|e| e.to_string())?;
5337 Ok(serde_json::json!({ "lines": lines }))
5338}
5339
5340async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5351 let include_health = req
5352 .params
5353 .get("include_health")
5354 .and_then(Value::as_bool)
5355 .unwrap_or(false);
5356 let json = car_ffi_common::external_agents::list(include_health).await?;
5357 serde_json::from_str(&json).map_err(|e| e.to_string())
5358}
5359
5360async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5361 let include_health = req
5362 .params
5363 .get("include_health")
5364 .and_then(Value::as_bool)
5365 .unwrap_or(false);
5366 let json = car_ffi_common::external_agents::detect(include_health).await?;
5367 serde_json::from_str(&json).map_err(|e| e.to_string())
5368}
5369
5370async fn handle_agents_invoke_external(
5388 req: &JsonRpcMessage,
5389 state: &Arc<ServerState>,
5390 host_session: &Arc<crate::session::ClientSession>,
5391) -> Result<Value, String> {
5392 let id = req
5393 .params
5394 .get("id")
5395 .and_then(Value::as_str)
5396 .ok_or_else(|| "missing required `id` parameter".to_string())?
5397 .to_string();
5398 let task = req
5399 .params
5400 .get("task")
5401 .and_then(Value::as_str)
5402 .ok_or_else(|| "missing required `task` parameter".to_string())?
5403 .to_string();
5404 let stream = req
5405 .params
5406 .get("stream")
5407 .and_then(Value::as_bool)
5408 .unwrap_or(false);
5409 let session_id = req
5410 .params
5411 .get("session_id")
5412 .and_then(Value::as_str)
5413 .map(str::to_string)
5414 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5415
5416 let mut options_value = req.params.clone();
5422 if let Some(obj) = options_value.as_object_mut() {
5423 obj.remove("id");
5424 obj.remove("task");
5425 obj.remove("stream");
5426 obj.remove("session_id");
5427 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5436 if !has_explicit_mcp {
5437 if let Some(url) = state.mcp_url.get() {
5438 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5439 }
5440 }
5441 }
5442
5443 if !stream {
5444 let options_json = options_value.to_string();
5447 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5448 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5449 append_external_agent_audit(&id, &task, &options_value, &result);
5450 return Ok(result);
5451 }
5452
5453 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5459 .map_err(|e| format!("invalid options: {e}"))?;
5460
5461 {
5471 let mut chats = state.chat_sessions.lock().await;
5481 chats.entry(session_id.clone()).or_insert_with(|| {
5482 let created_at = std::time::SystemTime::now()
5483 .duration_since(std::time::UNIX_EPOCH)
5484 .map(|d| d.as_secs())
5485 .unwrap_or(0);
5486 crate::session::ChatSession {
5487 agent_id: id.clone(),
5488 host_client_id: host_session.client_id.clone(),
5489 created_at,
5490 }
5491 });
5492 }
5493
5494 use tokio::sync::mpsc;
5501 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5502
5503 let drain_state = state.clone();
5504 let drain_session_id = session_id.clone();
5505 let drain_agent_id = id.clone();
5506 tokio::spawn(async move {
5507 while let Some(event) = rx.recv().await {
5508 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5509 }
5510 });
5511
5512 let emitter_tx = tx.clone();
5513 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5514 let _ = emitter_tx.send(event);
5519 });
5520
5521 let spawn_state = state.clone();
5527 let spawn_session_id = session_id.clone();
5528 let spawn_id = id.clone();
5529 let spawn_task = task.clone();
5530 let spawn_options = options_value.clone();
5531 tokio::spawn(async move {
5532 let outcome =
5533 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5534 .await;
5535 drop(tx); let terminal_params: Value;
5542 let result_value: Value;
5543 match outcome {
5544 Ok(res) => {
5545 let mut parts: Vec<String> = Vec::new();
5552 if res.turns > 0 {
5553 parts.push(format!(
5554 "{} turn{}",
5555 res.turns,
5556 if res.turns == 1 { "" } else { "s" }
5557 ));
5558 }
5559 if res.tool_calls > 0 {
5560 parts.push(format!(
5561 "{} tool{}",
5562 res.tool_calls,
5563 if res.tool_calls == 1 { "" } else { "s" }
5564 ));
5565 }
5566 if res.duration_ms > 0 {
5567 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5568 }
5569 let summary = if parts.is_empty() {
5570 "stop".to_string()
5571 } else {
5572 parts.join(" · ")
5573 };
5574 if res.is_error {
5575 terminal_params = serde_json::json!({
5576 "session_id": spawn_session_id,
5577 "agent_id": spawn_id,
5578 "kind": "error",
5579 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5580 });
5581 } else {
5582 terminal_params = serde_json::json!({
5583 "session_id": spawn_session_id,
5584 "agent_id": spawn_id,
5585 "kind": "done",
5586 "finish_reason": summary,
5587 });
5588 }
5589 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5590 }
5591 Err(e) => {
5592 let message = format!("{e}");
5593 terminal_params = serde_json::json!({
5594 "session_id": spawn_session_id,
5595 "agent_id": spawn_id,
5596 "kind": "error",
5597 "error": message.clone(),
5598 });
5599 result_value = serde_json::json!({ "is_error": true, "error": message });
5600 }
5601 }
5602 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5603 spawn_state
5604 .chat_sessions
5605 .lock()
5606 .await
5607 .remove(&spawn_session_id);
5608 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5609 });
5610
5611 Ok(serde_json::json!({
5612 "accepted": true,
5613 "session_id": session_id,
5614 }))
5615}
5616
5617async fn emit_external_chat_event(
5634 state: &Arc<ServerState>,
5635 session_id: &str,
5636 agent_id: &str,
5637 event: car_external_agents::StreamEvent,
5638) {
5639 use car_external_agents::StreamEvent;
5640 match event {
5641 StreamEvent::Assistant(a) => {
5642 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5643 for block in content {
5644 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5645 match block_type {
5646 "text" => {
5647 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5648 if !text.is_empty() {
5649 let params = serde_json::json!({
5650 "session_id": session_id,
5651 "agent_id": agent_id,
5652 "kind": "token",
5653 "delta": text,
5654 });
5655 send_external_chat_frame(state, session_id, params).await;
5656 }
5657 }
5658 }
5659 "tool_use" => {
5660 let name = block
5661 .get("name")
5662 .and_then(|v| v.as_str())
5663 .unwrap_or("(unknown tool)");
5664 let params = serde_json::json!({
5665 "session_id": session_id,
5666 "agent_id": agent_id,
5667 "kind": "tool_call",
5668 "detail": name,
5669 });
5670 send_external_chat_frame(state, session_id, params).await;
5671 }
5672 _ => {}
5673 }
5674 }
5675 }
5676 }
5677 _ => {
5678 }
5683 }
5684}
5685
5686async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5691 use futures::SinkExt;
5692 use tokio_tungstenite::tungstenite::Message;
5693
5694 let host_client_id = state
5695 .chat_sessions
5696 .lock()
5697 .await
5698 .get(session_id)
5699 .map(|s| s.host_client_id.clone());
5700 let Some(host_client_id) = host_client_id else {
5701 return;
5702 };
5703 let host_channel = {
5704 let sessions = state.sessions.lock().await;
5705 sessions.get(&host_client_id).map(|s| s.channel.clone())
5706 };
5707 let Some(channel) = host_channel else {
5708 return;
5709 };
5710 let frame = serde_json::json!({
5711 "jsonrpc": "2.0",
5712 "method": "agents.chat.event",
5713 "params": params,
5714 });
5715 if let Ok(text) = serde_json::to_string(&frame) {
5716 let _ = channel
5717 .write
5718 .lock()
5719 .await
5720 .send(Message::Text(text.into()))
5721 .await;
5722 }
5723}
5724
5725fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5731 use std::io::Write;
5732 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5733 Some(home) => home.join(".car"),
5734 None => return,
5735 };
5736 if std::fs::create_dir_all(&car_dir).is_err() {
5737 return;
5738 }
5739 let path = car_dir.join("external-agents.jsonl");
5740 let record = serde_json::json!({
5741 "ts": chrono::Utc::now().to_rfc3339(),
5742 "adapter_id": id,
5743 "task": task,
5744 "options": options,
5745 "result": result,
5746 });
5747 let line = match serde_json::to_string(&record) {
5748 Ok(s) => s,
5749 Err(_) => return,
5750 };
5751 if let Ok(mut f) = std::fs::OpenOptions::new()
5752 .create(true)
5753 .append(true)
5754 .open(&path)
5755 {
5756 let _ = writeln!(f, "{}", line);
5757 } else {
5758 tracing::warn!(
5759 path = %path.display(),
5760 "failed to append external-agent audit record"
5761 );
5762 }
5763}
5764
5765async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5771 let force = req
5772 .params
5773 .get("force")
5774 .and_then(Value::as_bool)
5775 .unwrap_or(false);
5776 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5777 let json = car_ffi_common::external_agents::health_one(id, force).await?;
5778 serde_json::from_str(&json).map_err(|e| e.to_string())
5779 } else {
5780 let json = car_ffi_common::external_agents::health(force).await?;
5781 serde_json::from_str(&json).map_err(|e| e.to_string())
5782 }
5783}
5784
5785const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5803
5804async fn handle_agents_chat(
5809 req: &JsonRpcMessage,
5810 state: &Arc<ServerState>,
5811 host_session: &Arc<crate::session::ClientSession>,
5812) -> Result<Value, String> {
5813 use futures::SinkExt;
5814 use tokio::sync::oneshot;
5815 use tokio_tungstenite::tungstenite::Message;
5816
5817 let agent_id = req
5818 .params
5819 .get("agent_id")
5820 .and_then(Value::as_str)
5821 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5822 .to_string();
5823 let prompt = req
5824 .params
5825 .get("prompt")
5826 .and_then(Value::as_str)
5827 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5828 .to_string();
5829 let session_id = req
5830 .params
5831 .get("session_id")
5832 .and_then(Value::as_str)
5833 .map(str::to_string)
5834 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5835 let stream = req
5836 .params
5837 .get("stream")
5838 .and_then(Value::as_bool)
5839 .unwrap_or(true);
5840 let voice_input = req
5841 .params
5842 .get("voice_input")
5843 .and_then(Value::as_bool)
5844 .unwrap_or(false);
5845
5846 let agent_client_id = state
5852 .attached_agents
5853 .lock()
5854 .await
5855 .get(&agent_id)
5856 .cloned()
5857 .ok_or_else(|| {
5858 format!(
5859 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5860 agent_id
5861 )
5862 })?;
5863 let agent_channel = {
5864 let sessions = state.sessions.lock().await;
5865 sessions
5866 .get(&agent_client_id)
5867 .map(|s| s.channel.clone())
5868 .ok_or_else(|| {
5869 format!(
5870 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5871 agent_id, agent_client_id
5872 )
5873 })?
5874 };
5875
5876 {
5882 let created_at = std::time::SystemTime::now()
5883 .duration_since(std::time::UNIX_EPOCH)
5884 .map(|d| d.as_secs())
5885 .unwrap_or(0);
5886 state.chat_sessions.lock().await.insert(
5887 session_id.clone(),
5888 crate::session::ChatSession {
5889 agent_id: agent_id.clone(),
5890 host_client_id: host_session.client_id.clone(),
5891 created_at,
5892 },
5893 );
5894 }
5895
5896 let request_id = agent_channel.next_request_id();
5903 let (tx, rx) = oneshot::channel();
5904 agent_channel
5905 .pending
5906 .lock()
5907 .await
5908 .insert(request_id.clone(), tx);
5909
5910 let rpc_request = serde_json::json!({
5911 "jsonrpc": "2.0",
5912 "method": "agent.chat",
5913 "params": {
5914 "session_id": session_id,
5915 "prompt": prompt,
5916 "stream": stream,
5917 "context": {
5918 "host_client_id": host_session.client_id,
5919 "voice_input": voice_input,
5920 },
5921 },
5922 "id": request_id,
5923 });
5924 let msg = Message::Text(
5925 serde_json::to_string(&rpc_request)
5926 .map_err(|e| e.to_string())?
5927 .into(),
5928 );
5929 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5930 agent_channel.pending.lock().await.remove(&request_id);
5934 state.chat_sessions.lock().await.remove(&session_id);
5935 return Err(format!(
5936 "failed to deliver agent.chat to `{}`: {}",
5937 agent_id, e
5938 ));
5939 }
5940
5941 let ack = match tokio::time::timeout(
5946 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5947 rx,
5948 )
5949 .await
5950 {
5951 Ok(Ok(resp)) => resp,
5952 Ok(Err(_)) => {
5953 state.chat_sessions.lock().await.remove(&session_id);
5955 return Err(format!(
5956 "agent `{}` disconnected before acking agents.chat",
5957 agent_id
5958 ));
5959 }
5960 Err(_) => {
5961 agent_channel.pending.lock().await.remove(&request_id);
5965 state.chat_sessions.lock().await.remove(&session_id);
5966 return Err(format!(
5967 "agent `{}` did not ack agents.chat within {}s",
5968 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5969 ));
5970 }
5971 };
5972
5973 if let Some(err) = ack.error {
5974 state.chat_sessions.lock().await.remove(&session_id);
5976 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5977 }
5978
5979 Ok(serde_json::json!({
5980 "accepted": true,
5981 "session_id": session_id,
5982 }))
5983}
5984
5985async fn handle_agents_chat_cancel(
5993 req: &JsonRpcMessage,
5994 state: &Arc<ServerState>,
5995) -> Result<Value, String> {
5996 use futures::SinkExt;
5997 use tokio_tungstenite::tungstenite::Message;
5998
5999 let session_id = req
6000 .params
6001 .get("session_id")
6002 .and_then(Value::as_str)
6003 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
6004 .to_string();
6005
6006 let chat = state.chat_sessions.lock().await.remove(&session_id);
6007 let chat = match chat {
6008 Some(c) => c,
6009 None => {
6010 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
6012 }
6013 };
6014
6015 let agent_client_id = state
6018 .attached_agents
6019 .lock()
6020 .await
6021 .get(&chat.agent_id)
6022 .cloned();
6023 if let Some(client_id) = agent_client_id {
6024 let channel_opt = {
6025 let sessions = state.sessions.lock().await;
6026 sessions.get(&client_id).map(|s| s.channel.clone())
6027 };
6028 if let Some(channel) = channel_opt {
6029 let notification = serde_json::json!({
6030 "jsonrpc": "2.0",
6031 "method": "agent.chat.cancel",
6032 "params": { "session_id": session_id },
6033 });
6034 if let Ok(text) = serde_json::to_string(¬ification) {
6035 let _ = channel
6036 .write
6037 .lock()
6038 .await
6039 .send(Message::Text(text.into()))
6040 .await;
6041 }
6042 }
6043 }
6044
6045 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
6046}
6047
6048pub(crate) async fn try_forward_agent_chat_event(
6059 parsed: &JsonRpcMessage,
6060 state: &Arc<ServerState>,
6061) -> bool {
6062 use futures::SinkExt;
6063 use tokio_tungstenite::tungstenite::Message;
6064
6065 let Some(method) = parsed.method.as_deref() else {
6069 return false;
6070 };
6071 if method != "agent.chat.event" {
6072 return false;
6073 }
6074 if !parsed.id.is_null() {
6075 return false;
6078 }
6079 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
6080 return false;
6081 };
6082 let session_id = session_id.to_string();
6083
6084 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
6089 let Some(chat) = chat else {
6090 return true; };
6092
6093 let kind = parsed
6102 .params
6103 .get("kind")
6104 .and_then(Value::as_str)
6105 .map(str::to_string)
6106 .unwrap_or_else(|| {
6107 if parsed.params.get("error").is_some() {
6108 "error".to_string()
6109 } else if parsed.params.get("finish_reason").is_some() {
6110 "done".to_string()
6111 } else {
6112 "token".to_string()
6113 }
6114 });
6115
6116 let host_channel = {
6120 let sessions = state.sessions.lock().await;
6121 sessions
6122 .get(&chat.host_client_id)
6123 .map(|s| s.channel.clone())
6124 };
6125 if let Some(channel) = host_channel {
6126 let mut params = parsed.params.clone();
6127 if let Some(obj) = params.as_object_mut() {
6128 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6129 obj.entry("kind")
6134 .or_insert_with(|| Value::String(kind.clone()));
6135 }
6136 let forward = serde_json::json!({
6137 "jsonrpc": "2.0",
6138 "method": "agents.chat.event",
6139 "params": params,
6140 });
6141 if let Ok(text) = serde_json::to_string(&forward) {
6142 let send_result = channel
6143 .write
6144 .lock()
6145 .await
6146 .send(Message::Text(text.into()))
6147 .await;
6148 if let Err(e) = send_result {
6149 tracing::warn!(
6150 session_id = %session_id,
6151 agent_id = %chat.agent_id,
6152 host_client_id = %chat.host_client_id,
6153 kind = %kind,
6154 error = %e,
6155 "agent.chat.event forward to host failed at the WS send step"
6156 );
6157 }
6158 }
6159 } else {
6160 tracing::warn!(
6167 session_id = %session_id,
6168 agent_id = %chat.agent_id,
6169 host_client_id = %chat.host_client_id,
6170 kind = %kind,
6171 "agent.chat.event from supervised agent had no host channel \
6172 (host disconnected since `agents.chat`); dropping routing entry"
6173 );
6174 state.chat_sessions.lock().await.remove(&session_id);
6175 return true;
6176 }
6177
6178 if matches!(kind.as_str(), "done" | "error") {
6182 state.chat_sessions.lock().await.remove(&session_id);
6183 }
6184
6185 true
6186}
6187
6188#[cfg(test)]
6189mod fd_leak_regression {
6190 use super::run_dispatch;
6197 use futures::SinkExt;
6198 use std::sync::Arc;
6199 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6200
6201 #[tokio::test]
6202 async fn abrupt_read_error_still_runs_session_cleanup() {
6203 let tmp = tempfile::TempDir::new().unwrap();
6204 let state = Arc::new(crate::session::ServerState::standalone(
6205 tmp.path().to_path_buf(),
6206 ));
6207
6208 let read = futures::stream::iter(vec![Err::<Message, WsError>(
6212 WsError::ConnectionClosed,
6213 )]);
6214 let write: crate::session::WsSink = Box::pin(
6215 futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6216 );
6217
6218 let result =
6219 run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6220 assert!(
6221 result.is_ok(),
6222 "run_dispatch must return Ok after cleanup, got {result:?}"
6223 );
6224
6225 assert!(
6228 state.sessions.lock().await.is_empty(),
6229 "state.sessions must be empty after an abrupt disconnect (car#209)"
6230 );
6231 }
6232}