1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176 while let Some(msg) = read.next().await {
177 while conn_tasks.try_join_next().is_some() {}
180
181 let msg = match msg {
190 Ok(m) => m,
191 Err(e) => {
192 info!("read error from {}: {}; closing", client_id, e);
193 break;
194 }
195 };
196 if msg.is_text() {
197 let text = match msg.to_text() {
198 Ok(t) => t,
199 Err(e) => {
200 info!("non-text frame from {}: {}; closing", client_id, e);
201 break;
202 }
203 };
204 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205 Ok(m) => m,
206 Err(e) => {
207 send_response(
208 &session.channel,
209 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210 )
211 .await
212 .ok();
213 continue;
214 }
215 };
216
217 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219 if let Some(id_str) = parsed.id.as_str() {
220 let mut pending = session.channel.pending.lock().await;
221 if let Some(tx) = pending.remove(id_str) {
222 let tool_resp = if let Some(result) = parsed.result {
223 ToolExecuteResponse {
224 action_id: id_str.to_string(),
225 output: Some(result),
226 error: None,
227 }
228 } else {
229 let err_msg = parsed
230 .error
231 .as_ref()
232 .and_then(|e| e.get("message"))
233 .and_then(|m| m.as_str())
234 .unwrap_or("unknown error")
235 .to_string();
236 ToolExecuteResponse {
237 action_id: id_str.to_string(),
238 output: None,
239 error: Some(err_msg),
240 }
241 };
242 let _ = tx.send(tool_resp);
243 continue;
244 }
245 }
246 }
247
248 if try_forward_agent_chat_event(&parsed, &state).await {
257 continue;
258 }
259
260 if let Some(method) = &parsed.method {
262 info!(method = %method, "dispatching JSON-RPC method");
263
264 if state.auth_token.get().is_some()
272 && !session
273 .authenticated
274 .load(std::sync::atomic::Ordering::Acquire)
275 && method != "session.auth"
276 {
277 let resp = JsonRpcResponse::error(
278 parsed.id.clone(),
279 -32001,
280 "auth required: send `session.auth` with the per-launch token \
281 from ~/Library/Application Support/ai.parslee.car/auth-token \
282 (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283 or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284 as the first frame on this connection",
285 );
286 let _ = send_response(&session.channel, resp).await;
287 info!(client = %client_id, method = %method,
288 "rejecting non-auth method on unauthenticated session; closing");
289 break;
290 }
291
292 if state.approval_gate.requires_approval(method.as_str()) {
304 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305 Ok(()) => {}
306 Err(reason) => {
307 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308 let _ = send_response(&session.channel, resp).await;
309 info!(
310 client = %client_id,
311 method = %method,
312 reason = %reason,
313 "approval gate blocked dispatch"
314 );
315 continue;
316 }
317 }
318 }
319
320 let session_task = session.clone();
335 let state_task = state.clone();
336 let method_owned = method.clone();
337 let parsed_task = parsed;
338 conn_tasks.spawn(async move {
341 let session = session_task;
342 let state = state_task;
343 let parsed = parsed_task;
344 let result = match method_owned.as_str() {
345 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346 "parslee.auth" => handle_parslee_auth().await,
347 "auth.start" => handle_auth_start(&parsed).await,
348 "auth.complete" => handle_auth_complete(&parsed).await,
349 "auth.status" => handle_auth_status().await,
350 "auth.logout" => handle_auth_logout().await,
351 "session.init" => handle_session_init(&parsed, &session).await,
352 "host.subscribe" => handle_host_subscribe(&session, &state).await,
353 "host.agents" => handle_host_agents(&session).await,
354 "host.events" => handle_host_events(&parsed, &session).await,
355 "host.approvals" => handle_host_approvals(&session).await,
356 "host.register_agent" => {
357 handle_host_register_agent(&parsed, &session).await
358 }
359 "host.unregister_agent" => {
360 handle_host_unregister_agent(&parsed, &session).await
361 }
362 "host.set_status" => handle_host_set_status(&parsed, &session).await,
363 "host.notify" => handle_host_notify(&parsed, &session).await,
364 "host.request_approval" => {
365 handle_host_request_approval(&parsed, &session).await
366 }
367 "host.resolve_approval" => {
368 handle_host_resolve_approval(&parsed, &session).await
369 }
370 "tools.register" => handle_tools_register(&parsed, &session).await,
371 "proposal.submit" => {
372 handle_proposal_submit(&parsed, &session, &state).await
373 }
374 "policy.register" => handle_policy_register(&parsed, &session).await,
375 "session.policy.open" => handle_session_policy_open(&session).await,
376 "session.policy.close" => {
377 handle_session_policy_close(&parsed, &session).await
378 }
379 "verify" => handle_verify(&parsed, &session).await,
380 "state.get" => handle_state_get(&parsed, &session).await,
381 "state.set" => handle_state_set(&parsed, &session).await,
382 "state.exists" => handle_state_exists(&parsed, &session).await,
383 "state.keys" => handle_state_keys(&parsed, &session).await,
384 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
385 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
386 "memory.query" => handle_memory_query(&parsed, &session).await,
387 "memory.build_context" => {
388 handle_memory_build_context(&parsed, &session).await
389 }
390 "memory.build_context_fast" => {
391 handle_memory_build_context_fast(&parsed, &session).await
392 }
393 "memory.consolidate" => handle_memory_consolidate(&session).await,
394 "memory.fact_count" => handle_memory_fact_count(&session).await,
395 "memory.persist" => handle_memory_persist(&parsed, &session).await,
396 "memory.load" => handle_memory_load(&parsed, &session).await,
397 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
398 "skill.find" => handle_skill_find(&parsed, &session).await,
399 "skill.report" => handle_skill_report(&parsed, &session).await,
400 "skill.repair" => handle_skill_repair(&parsed, &session).await,
401 "skills.ingest_distilled" => {
402 handle_skills_ingest_distilled(&parsed, &session).await
403 }
404 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
405 "skills.domains_needing_evolution" => {
406 handle_skills_domains_needing_evolution(&parsed, &session).await
407 }
408 "skills.ingest_provisional" => {
409 handle_skills_ingest_provisional(&parsed, &session).await
410 }
411 "skills.gate" => handle_skills_gate(&parsed, &session).await,
412 "skill.meta" => handle_skill_meta(&parsed, &session).await,
413 "skill.export" => handle_skill_export(&parsed, &session).await,
414 "skill.import" => handle_skill_import(&parsed, &session).await,
415 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
416 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
417 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
418 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
419 "multi.vote" => handle_multi_vote(&parsed, &session).await,
420 "multi.tournament" => handle_multi_tournament(&parsed, &session).await,
421 "multi.subtask" => handle_multi_subtask(&parsed, &session).await,
422 "scheduler.create" => handle_scheduler_create(&parsed),
423 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
424 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
425 "infer" => handle_infer(&parsed, &state, &session).await,
426 "image.generate" => handle_image_generate(&parsed, &state).await,
427 "video.generate" => handle_video_generate(&parsed, &state).await,
428 "embed" => handle_embed(&parsed, &state).await,
429 "classify" => handle_classify(&parsed, &state).await,
430 "tokenize" => handle_tokenize(&parsed, &state).await,
431 "detokenize" => handle_detokenize(&parsed, &state).await,
432 "rerank" => handle_rerank(&parsed, &state).await,
433 "transcribe" => handle_transcribe(&parsed, &state).await,
434 "synthesize" => handle_synthesize(&parsed, &state).await,
435 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
436 "speech.prepare" => handle_speech_prepare(&state).await,
437 "models.route" => handle_models_route(&parsed, &state).await,
438 "models.stats" => handle_models_stats(&state).await,
439 "outcomes.resolve_pending" => {
440 handle_outcomes_resolve_pending(&parsed, &state).await
441 }
442 "events.count" => handle_events_count(&session).await,
443 "events.stats" => handle_events_stats(&session).await,
444 "events.truncate" => handle_events_truncate(&parsed, &session).await,
445 "events.clear" => handle_events_clear(&session).await,
446 "runs.start" => handle_runs_start(&parsed, &session, &state).await,
453 "runs.complete" => {
454 handle_runs_complete(&parsed, &session, &state).await
455 }
456 "runs.record_turns" => {
465 handle_runs_record_turns(&parsed, &session, &state).await
466 }
467 "runs.subscribe" => {
472 handle_runs_subscribe(&parsed, &session, &state).await
473 }
474 "runs.unsubscribe" => {
475 handle_runs_unsubscribe(&parsed, &session, &state).await
476 }
477 "runs.list" => handle_runs_list(&parsed, &session, &state).await,
483 "runs.get_trace" => {
484 handle_runs_get_trace(&parsed, &session, &state).await
485 }
486 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
487 "models.list" => handle_models_list(&state),
488 "models.register" => handle_models_register(&parsed, &state).await,
489 "models.unregister" => handle_models_unregister(&parsed, &state).await,
490 "models.list_unified" => handle_models_list_unified(&state),
491 "models.search" => handle_models_search(&parsed, &state),
492 "models.recommend" => handle_models_recommend(&parsed, &state),
493 "models.setup_plan" => handle_models_setup_plan(&parsed, &state),
494 "models.upgrades" => handle_models_upgrades(&state),
495 "models.detect_upgrades" => handle_models_detect_upgrades(&state).await,
496 "models.check_upgrade_nudge" => {
497 handle_models_check_upgrade_nudge(&parsed, &state).await
498 }
499 "models.dismiss_upgrade" => {
500 handle_models_dismiss_upgrade(&parsed, &state)
501 }
502 "models.check_concierge" => {
503 handle_models_check_concierge(&parsed, &state).await
504 }
505 "models.dismiss_suggestion" => {
506 handle_models_dismiss_suggestion(&parsed, &state)
507 }
508 "models.update_prefs_get" => handle_models_update_prefs_get(&state),
509 "models.update_prefs_set" => {
510 handle_models_update_prefs_set(&parsed, &state)
511 }
512 "models.pull" => handle_models_pull(&parsed, &state).await,
513 "models.install" => handle_models_pull(&parsed, &state).await,
514 "skills.distill" => handle_skills_distill(&parsed, &state).await,
515 "skills.list" => handle_skills_list(&parsed, &session).await,
516 "browser.run" => handle_browser_run(&parsed, &session).await,
517 "browser.close" => handle_browser_close(&session).await,
518 "secret.put" => handle_secret_put(&parsed),
519 "secret.get" => handle_secret_get(&parsed),
520 "secret.delete" => handle_secret_delete(&parsed),
521 "secret.status" => handle_secret_status(&parsed),
522 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
523 "permissions.status" => handle_perm_status(&parsed),
524 "permissions.request" => handle_perm_request(&parsed),
525 "permissions.explain" => handle_perm_explain(&parsed),
526 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
527 "accounts.list" => car_ffi_common::accounts::list(),
528 "accounts.open" => {
529 #[derive(serde::Deserialize, Default)]
530 struct OpenParams {
531 #[serde(default)]
532 account_id: Option<String>,
533 }
534 let p: OpenParams =
535 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
536 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
537 }
538 "calendar.list" => car_ffi_common::integrations::calendar_list(),
539 "calendar.events" => handle_calendar_events(&parsed),
540 "calendar.create_event" => handle_calendar_create_event(&parsed),
541 "calendar.update_event" => handle_calendar_update_event(&parsed),
542 "calendar.delete_event" => handle_calendar_delete_event(&parsed),
543 "contacts.containers" => {
544 car_ffi_common::integrations::contacts_containers()
545 }
546 "contacts.find" => handle_contacts_find(&parsed),
547 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
548 "mail.inbox" => handle_mail_inbox(&parsed),
549 "mail.send" => handle_mail_send(&parsed),
550 "messages.services" => car_ffi_common::integrations::messages_services(),
551 "messages.chats" => handle_messages_chats(&parsed),
552 "messages.send" => handle_messages_send(&parsed),
553 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
554 "notes.find" => handle_notes_find(&parsed),
555 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
556 "reminders.items" => handle_reminders_items(&parsed),
557 "photos.albums" => car_ffi_common::integrations::photos_albums(),
558 "bookmarks.list" => handle_bookmarks_list(&parsed),
559 "files.locations" => car_ffi_common::integrations::files_locations(),
560 "keychain.status" => car_ffi_common::integrations::keychain_status(),
561 "health.status" => car_ffi_common::health::status(),
562 "health.sleep" => handle_health_sleep(&parsed),
563 "health.workouts" => handle_health_workouts(&parsed),
564 "health.activity" => handle_health_activity(&parsed),
565 "voice.transcribe_stream.start" => {
566 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
567 }
568 "voice.transcribe_stream.stop" => {
569 handle_voice_transcribe_stream_stop(&parsed, &state).await
570 }
571 "voice.transcribe_stream.push" => {
572 handle_voice_transcribe_stream_push(&parsed, &state).await
573 }
574 "voice.tts_stream.start" => {
575 handle_voice_tts_stream_start(&parsed, &session).await
576 }
577 "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
578 "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
579 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
580 "voice.dispatch_turn" => {
581 handle_voice_dispatch_turn(&parsed, &state, &session).await
582 }
583 "voice.cancel_turn" => handle_voice_cancel_turn().await,
584 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
585 "inference.register_runner" => {
586 handle_inference_register_runner(&session).await
587 }
588 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
589 "inference.runner.complete" => {
590 handle_inference_runner_complete(&parsed).await
591 }
592 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
593 "voice.providers.list" => {
594 serde_json::from_str::<serde_json::Value>(
598 &car_voice::list_voice_providers_json(),
599 )
600 .map_err(|e| e.to_string())
601 }
602 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
603 .await
604 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
605 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
606 .await
607 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
608 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
609 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
610 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
611 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
612 "workflow.run" => handle_workflow_run(&parsed, &session).await,
613 "workflow.resume" => handle_workflow_resume(&parsed, &session).await,
614 "builder.build" => handle_builder_build(&parsed, &state, &session).await,
615 "workflow.verify" => handle_workflow_verify(&parsed),
616 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
617 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
618 "meeting.list" => handle_meeting_list(&parsed),
619 "meeting.get" => handle_meeting_get(&parsed),
620 "registry.register" => handle_registry_register(&parsed),
621 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
622 "registry.unregister" => handle_registry_unregister(&parsed),
623 "registry.list" => handle_registry_list(&parsed),
624 "registry.reap" => handle_registry_reap(&parsed),
625 "admission.status" => handle_admission_status(&state),
626 "a2a.start" => handle_a2a_start(&parsed, &session).await,
627 "a2a.stop" => handle_a2a_stop(),
628 "a2a.status" => handle_a2a_status(),
629 "a2a.send" => handle_a2a_send(&parsed, &state).await,
630 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
631 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
632 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
633 "a2ui.reap" => handle_a2ui_reap(&state).await,
634 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
635 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
636 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
637 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
638 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
639 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
640 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
641 "automation.run_applescript" => handle_run_applescript(&parsed).await,
642 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
643 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
644 "notifications.local" => handle_local_notification(&parsed).await,
645 "vision.ocr" => handle_vision_ocr(&parsed).await,
646 "agents.list" => handle_agents_list(&state).await,
647 "agents.health" => handle_agents_health(&state).await,
648 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
649 "agents.install" => handle_agents_install(&parsed, &state).await,
650 "agents.remove" => handle_agents_remove(&parsed, &state).await,
651 "agents.start" => handle_agents_start(&parsed, &state).await,
652 "agents.stop" => handle_agents_stop(&parsed, &state).await,
653 "agents.restart" => handle_agents_restart(&parsed, &state).await,
654 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
655 "agents.list_external" => handle_agents_list_external(&parsed).await,
656 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
657 "agents.health_external" => handle_agents_health_external(&parsed).await,
658 "agents.invoke_external" => {
659 handle_agents_invoke_external(&parsed, &state, &session).await
660 }
661 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
662 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
663 "message/send"
670 | "SendMessage"
671 | "message/stream"
672 | "SendStreamingMessage"
673 | "tasks/get"
674 | "GetTask"
675 | "tasks/list"
676 | "ListTasks"
677 | "tasks/cancel"
678 | "CancelTask"
679 | "tasks/resubscribe"
680 | "SubscribeToTask"
681 | "tasks/pushNotificationConfig/set"
682 | "CreateTaskPushNotificationConfig"
683 | "tasks/pushNotificationConfig/get"
684 | "GetTaskPushNotificationConfig"
685 | "tasks/pushNotificationConfig/list"
686 | "ListTaskPushNotificationConfigs"
687 | "tasks/pushNotificationConfig/delete"
688 | "DeleteTaskPushNotificationConfig"
689 | "agent/getAuthenticatedExtendedCard"
690 | "GetExtendedAgentCard" => {
691 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
692 }
693 _ => Err(format!("unknown method: {}", method_owned)),
694 };
695
696 let resp = match result {
697 Ok(value) => JsonRpcResponse::success(parsed.id, value),
698 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
699 };
700 let _ = send_response(&session.channel, resp).await;
701 });
702 }
703 } else if msg.is_binary() {
704 let bytes = msg.into_data();
711 let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
712 Ok(p) => p,
713 Err(e) => {
714 tracing::warn!("binary frame from {} rejected: {}", client_id, e);
715 continue;
716 }
717 };
718 match parsed.frame_type {
719 car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
720 let registry = state.voice_sessions.clone();
721 let payload_owned = parsed.payload.to_vec();
722 let session_id_owned = parsed.session_id_hex.clone();
723 conn_tasks.spawn(async move {
724 if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
725 &session_id_owned,
726 &payload_owned,
727 registry,
728 )
729 .await
730 {
731 tracing::warn!(
732 "binary PCM push to session {} failed: {}",
733 session_id_owned,
734 e
735 );
736 }
737 });
738 }
739 other => {
740 tracing::debug!(
741 "binary frame type {:#04x} from {} not accepted server-side",
742 other,
743 client_id
744 );
745 }
746 }
747 } else if msg.is_close() {
748 info!("Client {} disconnected", client_id);
749 break;
750 }
751 }
752
753 conn_tasks.abort_all();
758
759 session.host.unsubscribe(&client_id).await;
760 session.host.reap_session_approvals(&client_id).await;
766 state.a2ui_subscribers.lock().await.remove(&client_id);
767
768 let _removed = state.remove_session(&client_id).await;
780 {
781 let mut pending = session.channel.pending.lock().await;
782 pending.clear();
783 }
784
785 Ok(())
786}
787
788async fn send_response(
789 channel: &WsChannel,
790 resp: JsonRpcResponse,
791) -> Result<(), Box<dyn std::error::Error>> {
792 use futures::SinkExt;
793 let json = serde_json::to_string(&resp)?;
794 channel
795 .write
796 .lock()
797 .await
798 .send(Message::Text(json.into()))
799 .await?;
800 Ok(())
801}
802
803async fn handle_host_subscribe(
806 session: &crate::session::ClientSession,
807 state: &Arc<ServerState>,
808) -> Result<Value, String> {
809 session
810 .host
811 .subscribe(&session.client_id, session.channel.clone())
812 .await;
813 serde_json::to_value(HostSnapshot {
814 subscribed: true,
815 agents: session.host.agents().await,
816 approvals: session.host.approvals().await,
817 events: session.host.events(50).await,
818 identity: Some(daemon_identity(state)),
819 })
820 .map_err(|e| e.to_string())
821}
822
823fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
831 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
838 (
839 Some(p.to_string_lossy().into_owned()),
840 car_proto::HostManifestRole::Observer,
841 )
842 } else if let Some(s) = state.supervisor_if_installed() {
843 (
844 Some(s.manifest_path().to_string_lossy().into_owned()),
845 car_proto::HostManifestRole::Owner,
846 )
847 } else {
848 (None, car_proto::HostManifestRole::None)
849 };
850 car_proto::HostIdentity {
851 version: env!("CARGO_PKG_VERSION").to_string(),
852 pid: std::process::id(),
853 manifest_path,
854 manifest_role,
855 parslee: state
856 .parslee_session
857 .get()
858 .map(|session| session.identity.clone()),
859 }
860}
861
862async fn handle_parslee_auth() -> Result<Value, String> {
873 let session = crate::parslee_auth::load_or_refresh()
874 .await?
875 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
876 Ok(serde_json::json!({
877 "authenticated": true,
878 "token_type": "Bearer",
879 "access_token": session.access_token,
880 "authorization_header": format!("Bearer {}", session.access_token),
881 "identity": session.identity,
882 }))
883}
884
885async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
891 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
892 let client_id = req
893 .params
894 .get("client_id")
895 .and_then(|v| v.as_str())
896 .unwrap_or("parslee-car");
897 let redirect_uri = req
898 .params
899 .get("redirect_uri")
900 .and_then(|v| v.as_str())
901 .ok_or_else(|| "redirect_uri is required".to_string())?;
902 let provider = req.params.get("provider").and_then(|v| v.as_str());
903 let state = car_auth::new_state();
904 let verifier = car_auth::pkce_verifier();
905 let challenge = car_auth::pkce_challenge(&verifier);
906 let url =
907 car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
908 Ok(serde_json::json!({
909 "authorize_url": url,
910 "state": state,
911 "verifier": verifier,
912 }))
913}
914
915async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
916 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
917 let client_id = req
918 .params
919 .get("client_id")
920 .and_then(|v| v.as_str())
921 .unwrap_or("parslee-car");
922 let redirect_uri = req
923 .params
924 .get("redirect_uri")
925 .and_then(|v| v.as_str())
926 .ok_or_else(|| "redirect_uri is required".to_string())?;
927 let code = req
928 .params
929 .get("code")
930 .and_then(|v| v.as_str())
931 .ok_or_else(|| "code is required".to_string())?;
932 let verifier = req
933 .params
934 .get("verifier")
935 .and_then(|v| v.as_str())
936 .ok_or_else(|| "verifier is required".to_string())?;
937 let token =
938 car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
939 car_auth::store_tokens(&api_base, &token)?;
940 Ok(serde_json::json!({ "ok": true }))
941}
942
943async fn handle_auth_status() -> Result<Value, String> {
944 match car_auth::fetch_status(None).await? {
945 Some(session_json) => {
946 let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
947 Ok(serde_json::json!({ "authenticated": true, "session": session }))
948 }
949 None => Ok(serde_json::json!({ "authenticated": false })),
950 }
951}
952
953async fn handle_auth_logout() -> Result<Value, String> {
954 car_auth::clear_tokens()?;
955 Ok(serde_json::json!({ "ok": true }))
956}
957
958async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
959 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
960}
961
962async fn handle_host_events(
963 req: &JsonRpcMessage,
964 session: &crate::session::ClientSession,
965) -> Result<Value, String> {
966 let limit = req
967 .params
968 .get("limit")
969 .and_then(|v| v.as_u64())
970 .unwrap_or(100) as usize;
971 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
972}
973
974async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
975 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
976}
977
978async fn handle_a2ui_apply(
979 req: &JsonRpcMessage,
980 state: &Arc<ServerState>,
981) -> Result<Value, String> {
982 #[derive(Deserialize)]
983 struct Params {
984 #[serde(default)]
985 envelope: Option<car_a2ui::A2uiEnvelope>,
986 #[serde(default)]
987 message: Option<car_a2ui::A2uiEnvelope>,
988 }
989
990 let envelope = if req.params.get("createSurface").is_some()
991 || req.params.get("updateComponents").is_some()
992 || req.params.get("updateDataModel").is_some()
993 || req.params.get("deleteSurface").is_some()
994 {
995 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
996 .map_err(|e| e.to_string())?
997 } else {
998 match serde_json::from_value::<Params>(req.params.clone()) {
999 Ok(params) => params
1000 .envelope
1001 .or(params.message)
1002 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
1003 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
1004 .map_err(|e| e.to_string())?,
1005 }
1006 };
1007
1008 apply_a2ui_envelope(state, envelope, None, None).await
1009}
1010
1011async fn handle_a2ui_ingest(
1012 req: &JsonRpcMessage,
1013 state: &Arc<ServerState>,
1014) -> Result<Value, String> {
1015 #[derive(Deserialize)]
1016 #[serde(rename_all = "camelCase")]
1017 struct Params {
1018 #[serde(default)]
1019 endpoint: Option<String>,
1020 #[serde(default)]
1021 a2a_endpoint: Option<String>,
1022 #[serde(default)]
1023 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1024 #[serde(default)]
1025 route_auth: Option<A2aRouteAuth>,
1026 #[serde(default)]
1027 allow_untrusted_endpoint: bool,
1028 }
1029
1030 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
1031 endpoint: None,
1032 a2a_endpoint: None,
1033 owner: None,
1034 route_auth: None,
1035 allow_untrusted_endpoint: false,
1036 });
1037 let payload = req.params.get("payload").unwrap_or(&req.params);
1038 state
1039 .a2ui
1040 .validate_payload(payload)
1041 .map_err(|e| e.to_string())?;
1042 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
1043 if envelopes.is_empty() {
1044 return Err("no A2UI envelopes found in payload".into());
1045 }
1046 let endpoint = params.endpoint.or(params.a2a_endpoint);
1047 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
1048 let owner = params
1049 .owner
1050 .or_else(|| car_a2ui::owner_from_value(payload))
1051 .map(|owner| match endpoint.clone() {
1052 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
1053 None => owner,
1054 });
1055
1056 let mut results = Vec::new();
1057 for envelope in envelopes {
1058 let value =
1059 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
1060 results.push(value);
1061 }
1062 Ok(serde_json::json!({ "applied": results }))
1063}
1064
1065async fn apply_a2ui_envelope(
1066 state: &Arc<ServerState>,
1067 envelope: car_a2ui::A2uiEnvelope,
1068 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1069 route_auth: Option<A2aRouteAuth>,
1070) -> Result<Value, String> {
1071 let result = state
1072 .a2ui
1073 .apply_with_owner(envelope, owner)
1074 .await
1075 .map_err(|e| e.to_string())?;
1076 update_a2ui_route_auth(state, &result, route_auth).await;
1077 let kind = if result.deleted {
1078 "a2ui.surface_deleted"
1079 } else {
1080 "a2ui.surface_updated"
1081 };
1082 let message = if result.deleted {
1083 format!("A2UI surface {} deleted", result.surface_id)
1084 } else {
1085 format!("A2UI surface {} updated", result.surface_id)
1086 };
1087 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1088 state
1089 .host
1090 .record_event(kind, None, message, payload.clone())
1091 .await;
1092 broadcast_a2ui_event(state, kind, &payload).await;
1096 serde_json::to_value(result).map_err(|e| e.to_string())
1097}
1098
1099async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1100 use futures::SinkExt;
1101 use tokio_tungstenite::tungstenite::Message;
1102 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1103 .a2ui_subscribers
1104 .lock()
1105 .await
1106 .values()
1107 .cloned()
1108 .collect();
1109 if subscribers.is_empty() {
1110 return;
1111 }
1112 let Ok(json) = serde_json::to_string(&serde_json::json!({
1113 "jsonrpc": "2.0",
1114 "method": "a2ui.event",
1115 "params": {
1116 "kind": kind,
1117 "result": result,
1118 },
1119 })) else {
1120 return;
1121 };
1122 for channel in subscribers {
1123 let _ = channel
1124 .write
1125 .lock()
1126 .await
1127 .send(Message::Text(json.clone().into()))
1128 .await;
1129 }
1130}
1131
1132async fn update_a2ui_route_auth(
1133 state: &Arc<ServerState>,
1134 result: &car_a2ui::A2uiApplyResult,
1135 route_auth: Option<A2aRouteAuth>,
1136) {
1137 let mut auth = state.a2ui_route_auth.lock().await;
1138 if result.deleted {
1139 auth.remove(&result.surface_id);
1140 return;
1141 }
1142
1143 let has_route_endpoint = result
1144 .surface
1145 .as_ref()
1146 .and_then(|surface| surface.owner.as_ref())
1147 .and_then(|owner| owner.endpoint.as_ref())
1148 .is_some();
1149 match (has_route_endpoint, route_auth) {
1150 (true, Some(route_auth)) => {
1151 auth.insert(result.surface_id.clone(), route_auth);
1152 }
1153 _ => {
1154 auth.remove(&result.surface_id);
1155 }
1156 }
1157}
1158
1159fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1160 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1161}
1162
1163async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1164 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1165 if !removed.is_empty() {
1166 let mut auth = state.a2ui_route_auth.lock().await;
1167 for surface_id in &removed {
1168 auth.remove(surface_id);
1169 }
1170 }
1171 Ok(serde_json::json!({ "removed": removed }))
1172}
1173
1174async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1175 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1176}
1177
1178async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1179 let surface_id = req
1180 .params
1181 .get("surface_id")
1182 .or_else(|| req.params.get("surfaceId"))
1183 .and_then(Value::as_str)
1184 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1185 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1186}
1187
1188async fn handle_a2ui_subscribe(
1194 session: &crate::session::ClientSession,
1195 state: &Arc<ServerState>,
1196) -> Result<Value, String> {
1197 state
1198 .a2ui_subscribers
1199 .lock()
1200 .await
1201 .insert(session.client_id.clone(), session.channel.clone());
1202 Ok(serde_json::json!({ "subscribed": true }))
1203}
1204
1205async fn handle_a2ui_unsubscribe(
1209 session: &crate::session::ClientSession,
1210 state: &Arc<ServerState>,
1211) -> Result<Value, String> {
1212 state
1213 .a2ui_subscribers
1214 .lock()
1215 .await
1216 .remove(&session.client_id);
1217 Ok(serde_json::json!({ "subscribed": false }))
1218}
1219
1220async fn handle_a2ui_replay(
1227 req: &JsonRpcMessage,
1228 state: &Arc<ServerState>,
1229) -> Result<Value, String> {
1230 let surface_id = req
1231 .params
1232 .get("surface_id")
1233 .or_else(|| req.params.get("surfaceId"))
1234 .and_then(Value::as_str)
1235 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1236 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1237}
1238
1239async fn handle_a2ui_action(
1240 req: &JsonRpcMessage,
1241 state: &Arc<ServerState>,
1242) -> Result<Value, String> {
1243 let action: car_a2ui::ClientAction =
1244 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1245 let owner = state.a2ui.owner(&action.surface_id).await;
1246
1247 let action_result = serde_json::json!({
1257 "surfaceId": action.surface_id,
1258 "action": action,
1259 "owner": owner,
1260 });
1261 broadcast_a2ui_event(state, "a2ui.action", &action_result).await;
1262
1263 let route = route_a2ui_action(state, &action, owner.clone()).await;
1264 let payload = serde_json::json!({
1265 "action": action,
1266 "owner": owner,
1267 "route": route,
1268 });
1269 let event = state
1270 .host
1271 .record_event(
1272 "a2ui.action",
1273 None,
1274 format!(
1275 "A2UI action {} from {}",
1276 action.name, action.source_component_id
1277 ),
1278 payload,
1279 )
1280 .await;
1281 Ok(serde_json::json!({
1282 "event": event,
1283 "route": route,
1284 }))
1285}
1286
1287async fn handle_a2ui_render_report(
1294 req: &JsonRpcMessage,
1295 state: &Arc<ServerState>,
1296) -> Result<Value, String> {
1297 let report: car_a2ui::RenderReport =
1301 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1302 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1303 let kind = "a2ui.render_report";
1304 let message = format!("A2UI render report for surface {}", report.surface_id);
1305 let event = state
1306 .host
1307 .record_event(kind, None, message, payload.clone())
1308 .await;
1309 broadcast_a2ui_event(state, kind, &payload).await;
1310
1311 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1319 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1325 tracing::warn!(
1326 surface_id = %report.surface_id,
1327 count = state.ui_agent_budget.count(&report.surface_id),
1328 max = state.ui_agent_budget.max(),
1329 "ui-agent iteration budget exhausted; skipping agent invocation"
1330 );
1331 return Ok(serde_json::json!({ "event": event }));
1332 }
1333 match state.ui_agent.on_render_report(&report, &surface) {
1337 car_ui_agent::Decision::Patch {
1338 envelope,
1339 strategy_id,
1340 patch_hash,
1341 elapsed_ns,
1342 } => {
1343 if !state
1351 .ui_agent_oscillation
1352 .check_and_record(&report.surface_id, patch_hash)
1353 {
1354 tracing::warn!(
1355 surface_id = %report.surface_id,
1356 strategy = %strategy_id,
1357 patch_hash,
1358 "ui-agent oscillation detected; suppressing patch"
1359 );
1360 state.ui_agent_budget.refund(&report.surface_id);
1363 return Ok(serde_json::json!({ "event": event }));
1364 }
1365 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1366 patch_components: Some(envelope),
1367 ..Default::default()
1368 };
1369 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1370 tracing::warn!(
1371 surface_id = %report.surface_id,
1372 strategy = %strategy_id,
1373 patch_hash,
1374 elapsed_ns,
1375 error = %e,
1376 "ui-agent patch apply failed",
1377 );
1378 state.ui_agent_budget.refund(&report.surface_id);
1380 } else {
1381 tracing::debug!(
1382 surface_id = %report.surface_id,
1383 strategy = %strategy_id,
1384 patch_hash,
1385 elapsed_ns,
1386 iteration = state.ui_agent_budget.count(&report.surface_id),
1387 "ui-agent patch applied",
1388 );
1389 if let Some(memgine) = state.shared_memgine.clone() {
1399 let speaker = format!("ui-agent/{}", report.surface_id);
1400 let text = format!("strategy applied: {}", strategy_id);
1401 tokio::spawn(async move {
1402 let mut guard = memgine.lock().await;
1403 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1404 });
1405 }
1406 }
1407 }
1408 car_ui_agent::Decision::StableNoChange => {
1409 state.ui_agent_budget.refund(&report.surface_id);
1411 }
1412 car_ui_agent::Decision::HardStop { reason } => {
1413 state.ui_agent_budget.refund(&report.surface_id);
1414 tracing::error!(
1420 surface_id = %report.surface_id,
1421 reason = %reason,
1422 "ui-agent hard-stopped improvement loop",
1423 );
1424 }
1425 }
1426 } else {
1427 tracing::debug!(
1428 surface_id = %report.surface_id,
1429 "ui-agent skipped — surface not found in store",
1430 );
1431 }
1432
1433 Ok(serde_json::json!({ "event": event }))
1434}
1435
1436async fn route_a2ui_action(
1437 state: &Arc<ServerState>,
1438 action: &car_a2ui::ClientAction,
1439 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1440) -> Value {
1441 let Some(owner) = owner else {
1442 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1443 };
1444 if owner.kind != "a2a" {
1445 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1446 }
1447 let Some(endpoint) = owner.endpoint.clone() else {
1448 return serde_json::json!({
1449 "delivered": false,
1450 "reason": "surface owner has no endpoint",
1451 "owner": owner
1452 });
1453 };
1454
1455 let message = car_a2a::Message {
1456 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1457 role: car_a2a::MessageRole::User,
1458 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1459 data: serde_json::json!({
1460 "a2uiAction": action,
1461 }),
1462 metadata: Default::default(),
1463 })],
1464 task_id: owner.task_id.clone(),
1465 context_id: owner.context_id.clone(),
1466 metadata: Default::default(),
1467 };
1468
1469 let auth = state
1470 .a2ui_route_auth
1471 .lock()
1472 .await
1473 .get(&action.surface_id)
1474 .cloned()
1475 .map(client_auth_from_route_auth)
1476 .unwrap_or(car_a2a::ClientAuth::None);
1477
1478 match car_a2a::A2aClient::new(endpoint.clone())
1479 .with_auth(auth)
1480 .send_message(message, false)
1481 .await
1482 {
1483 Ok(result) => serde_json::json!({
1484 "delivered": true,
1485 "owner": owner,
1486 "endpoint": endpoint,
1487 "result": result,
1488 }),
1489 Err(error) => serde_json::json!({
1490 "delivered": false,
1491 "owner": owner,
1492 "endpoint": endpoint,
1493 "error": error.to_string(),
1494 }),
1495 }
1496}
1497
1498fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1499 match auth {
1500 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1501 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1502 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1503 }
1504}
1505
1506fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1507 let endpoint = endpoint?;
1508 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1509 Some(endpoint)
1510 } else {
1511 None
1512 }
1513}
1514
1515fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1516 endpoint == "http://localhost"
1517 || endpoint.starts_with("http://localhost:")
1518 || endpoint.starts_with("http://localhost/")
1519 || endpoint == "http://127.0.0.1"
1520 || endpoint.starts_with("http://127.0.0.1:")
1521 || endpoint.starts_with("http://127.0.0.1/")
1522 || endpoint == "http://[::1]"
1523 || endpoint.starts_with("http://[::1]:")
1524 || endpoint.starts_with("http://[::1]/")
1525}
1526
1527async fn handle_host_register_agent(
1528 req: &JsonRpcMessage,
1529 session: &crate::session::ClientSession,
1530) -> Result<Value, String> {
1531 let request: RegisterHostAgentRequest =
1532 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1533 serde_json::to_value(
1534 session
1535 .host
1536 .register_agent(&session.client_id, request)
1537 .await?,
1538 )
1539 .map_err(|e| e.to_string())
1540}
1541
1542async fn handle_host_unregister_agent(
1543 req: &JsonRpcMessage,
1544 session: &crate::session::ClientSession,
1545) -> Result<Value, String> {
1546 let agent_id = req
1547 .params
1548 .get("agent_id")
1549 .and_then(|v| v.as_str())
1550 .ok_or("missing agent_id")?;
1551 session
1552 .host
1553 .unregister_agent(&session.client_id, agent_id)
1554 .await?;
1555 Ok(serde_json::json!({"ok": true}))
1556}
1557
1558async fn handle_host_set_status(
1559 req: &JsonRpcMessage,
1560 session: &crate::session::ClientSession,
1561) -> Result<Value, String> {
1562 let request: SetHostAgentStatusRequest =
1563 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1564 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1565 .map_err(|e| e.to_string())
1566}
1567
1568async fn handle_host_notify(
1569 req: &JsonRpcMessage,
1570 session: &crate::session::ClientSession,
1571) -> Result<Value, String> {
1572 let kind = req
1573 .params
1574 .get("kind")
1575 .and_then(|v| v.as_str())
1576 .unwrap_or("host.notification");
1577 let agent_id = req
1578 .params
1579 .get("agent_id")
1580 .and_then(|v| v.as_str())
1581 .map(str::to_string);
1582 let message = req
1583 .params
1584 .get("message")
1585 .and_then(|v| v.as_str())
1586 .unwrap_or("");
1587 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1588 serde_json::to_value(
1589 session
1590 .host
1591 .record_event(kind, agent_id, message, payload)
1592 .await,
1593 )
1594 .map_err(|e| e.to_string())
1595}
1596
1597async fn handle_host_request_approval(
1598 req: &JsonRpcMessage,
1599 session: &crate::session::ClientSession,
1600) -> Result<Value, String> {
1601 let request: CreateHostApprovalRequest =
1602 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1603 if let Some(agent_id) = &request.agent_id {
1604 let _ = session
1609 .host
1610 .set_status(
1611 &session.client_id,
1612 SetHostAgentStatusRequest {
1613 agent_id: agent_id.clone(),
1614 status: HostAgentStatus::WaitingForApproval,
1615 current_task: None,
1616 message: Some("Waiting for approval".to_string()),
1617 payload: Value::Null,
1618 },
1619 )
1620 .await;
1621 }
1622 let owner_client_id = if request.system_level {
1629 None
1630 } else {
1631 Some(session.client_id.as_str())
1632 };
1633 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1634 .map_err(|e| e.to_string())
1635}
1636
1637async fn handle_host_resolve_approval(
1638 req: &JsonRpcMessage,
1639 session: &crate::session::ClientSession,
1640) -> Result<Value, String> {
1641 let request: ResolveHostApprovalRequest =
1642 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1643 serde_json::to_value(
1644 session
1645 .host
1646 .resolve_approval(&session.client_id, request)
1647 .await?,
1648 )
1649 .map_err(|e| e.to_string())
1650}
1651
1652async fn handle_session_auth(
1663 req: &JsonRpcMessage,
1664 session: &crate::session::ClientSession,
1665 state: &Arc<ServerState>,
1666) -> Result<Value, String> {
1667 if let Some(host_supplied) = req.params.get("host_token").and_then(Value::as_str) {
1677 let expected = state.host_token.get().ok_or_else(|| {
1678 "host auth unavailable: this daemon has no host token (started with --no-auth?)"
1679 .to_string()
1680 })?;
1681 if !constant_time_eq(host_supplied.as_bytes(), expected.as_bytes()) {
1682 return Err("auth failed: host token mismatch".to_string());
1683 }
1684 session
1685 .authenticated
1686 .store(true, std::sync::atomic::Ordering::Release);
1687 session
1688 .is_host
1689 .store(true, std::sync::atomic::Ordering::Release);
1690 return Ok(serde_json::json!({
1691 "ok": true,
1692 "auth_enabled": true,
1693 "role": "host",
1694 }));
1695 }
1696
1697 let supplied = req
1698 .params
1699 .get("token")
1700 .and_then(Value::as_str)
1701 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1702 let agent_id = req
1709 .params
1710 .get("agent_id")
1711 .and_then(Value::as_str)
1712 .map(str::to_string);
1713
1714 if let Some(id) = agent_id {
1715 let supervisor = state.supervisor()?;
1716 if !supervisor.validate_agent_token(&id, supplied).await {
1717 return Err(format!(
1718 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1719 ));
1720 }
1721 {
1725 let mut attached = state.attached_agents.lock().await;
1726 if let Some(prior) = attached.get(&id) {
1727 if prior != &session.client_id {
1728 return Err(format!(
1729 "auth failed: agent_id `{id}` is already attached on \
1730 another connection (client_id={prior})"
1731 ));
1732 }
1733 }
1734 attached.insert(id.clone(), session.client_id.clone());
1735 }
1736 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1741 *session.bound_memgine.lock().await = Some(agent_eng);
1742 *session.agent_id.lock().await = Some(id.clone());
1743 session
1744 .authenticated
1745 .store(true, std::sync::atomic::Ordering::Release);
1746 return Ok(serde_json::json!({
1747 "ok": true,
1748 "auth_enabled": true,
1749 "agent_id": id,
1750 }));
1751 }
1752
1753 let expected = match state.auth_token.get() {
1754 Some(t) => t,
1755 None => {
1756 session
1762 .authenticated
1763 .store(true, std::sync::atomic::Ordering::Release);
1764 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1765 }
1766 };
1767 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1768 return Err("auth failed: token mismatch".to_string());
1769 }
1770 session
1771 .authenticated
1772 .store(true, std::sync::atomic::Ordering::Release);
1773 Ok(serde_json::json!({
1774 "ok": true,
1775 "auth_enabled": true,
1776 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1777 }))
1778}
1779
1780fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1784 if a.len() != b.len() {
1785 return false;
1786 }
1787 let mut diff: u8 = 0;
1788 for (x, y) in a.iter().zip(b.iter()) {
1789 diff |= x ^ y;
1790 }
1791 diff == 0
1792}
1793
1794async fn gate_high_risk_method(
1804 method: &str,
1805 params: &Value,
1806 state: &Arc<ServerState>,
1807) -> Result<(), String> {
1808 let timeout = state.approval_gate.timeout;
1809 let req = CreateHostApprovalRequest {
1810 agent_id: None,
1811 action: format!("ws.method:{method}"),
1812 details: serde_json::json!({
1813 "method": method,
1814 "params_preview": preview_params(params, 2_000),
1818 }),
1819 options: vec!["approve".to_string(), "deny".to_string()],
1820 system_level: true,
1824 };
1825 match state
1826 .host
1827 .request_and_wait_approval(req, "approve", timeout)
1828 .await
1829 {
1830 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1831 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1832 "{method} denied by user (approval gate, audit 2026-05). \
1833 To call this method without an interactive prompt, start \
1834 car-server with --no-approvals on a trusted machine."
1835 )),
1836 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1837 "{method} approval timed out after {}s with no resolution. \
1838 The approval is still visible in `host.approvals` for \
1839 forensics; resubmit the request to retry.",
1840 timeout.as_secs()
1841 )),
1842 Err(e) => Err(format!("approval gate error: {e}")),
1843 }
1844}
1845
1846fn preview_params(value: &Value, max_chars: usize) -> Value {
1847 let s = value.to_string();
1848 if s.len() <= max_chars {
1849 value.clone()
1850 } else {
1851 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1852 }
1853}
1854
1855async fn handle_session_init(
1856 req: &JsonRpcMessage,
1857 session: &crate::session::ClientSession,
1858) -> Result<Value, String> {
1859 let init: SessionInitRequest =
1860 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1861
1862 for tool in &init.tools {
1863 register_from_definition(&session.runtime, tool).await;
1864 }
1865
1866 let mut policy_count = 0;
1867 {
1868 let mut policies = session.runtime.policies.write().await;
1869 for policy_def in &init.policies {
1870 if let Some(check) = build_policy_check(policy_def) {
1871 policies.register(&policy_def.name, check, "");
1872 policy_count += 1;
1873 }
1874 }
1875 }
1876
1877 serde_json::to_value(SessionInitResponse {
1878 session_id: session.client_id.clone(),
1879 tools_registered: init.tools.len(),
1880 policies_registered: policy_count,
1881 })
1882 .map_err(|e| e.to_string())
1883}
1884
1885fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1886 match def.rule.as_str() {
1887 "deny_tool" => {
1888 let target = def.target.clone();
1889 Some(Box::new(
1890 move |action: &car_ir::Action, _: &car_state::StateStore| {
1891 if action.tool.as_deref() == Some(&target) {
1892 Some(format!("tool '{}' denied", target))
1893 } else {
1894 None
1895 }
1896 },
1897 ))
1898 }
1899 "require_state" => {
1900 let key = def.key.clone();
1901 let value = def.value.clone();
1902 Some(Box::new(
1903 move |_: &car_ir::Action, state: &car_state::StateStore| {
1904 if state.get(&key).as_ref() != Some(&value) {
1905 Some(format!("state['{}'] must be {:?}", key, value))
1906 } else {
1907 None
1908 }
1909 },
1910 ))
1911 }
1912 "deny_tool_param" => {
1913 let target = def.target.clone();
1914 let param = def.key.clone();
1915 let pattern = def.pattern.clone();
1916 Some(Box::new(
1917 move |action: &car_ir::Action, _: &car_state::StateStore| {
1918 if action.tool.as_deref() != Some(&target) {
1919 return None;
1920 }
1921 if let Some(val) = action.parameters.get(¶m) {
1922 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1923 if s.contains(&pattern) {
1924 return Some(format!("param '{}' matches '{}'", param, pattern));
1925 }
1926 }
1927 None
1928 },
1929 ))
1930 }
1931 _ => None,
1932 }
1933}
1934
1935async fn handle_tools_register(
1936 req: &JsonRpcMessage,
1937 session: &crate::session::ClientSession,
1938) -> Result<Value, String> {
1939 let tools: Vec<ToolDefinition> =
1940 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1941 for tool in &tools {
1942 register_from_definition(&session.runtime, tool).await;
1943 }
1944 Ok(Value::from(tools.len()))
1945}
1946
1947async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1954 runtime
1955 .register_tool_schema(car_ir::ToolSchema {
1956 name: def.name.clone(),
1957 description: def.description.clone(),
1958 parameters: def.parameters.clone(),
1959 returns: def.returns.clone(),
1960 idempotent: def.idempotent,
1961 cache_ttl_secs: def.cache_ttl_secs,
1962 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1963 max_calls: rl.max_calls,
1964 interval_secs: rl.interval_secs,
1965 }),
1966 })
1967 .await;
1968}
1969
1970async fn handle_proposal_submit(
1971 req: &JsonRpcMessage,
1972 session: &crate::session::ClientSession,
1973 state: &Arc<ServerState>,
1974) -> Result<Value, String> {
1975 let submit: ProposalSubmitRequest =
1976 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1977 let session_id = req
1983 .params
1984 .get("session_id")
1985 .and_then(|v| v.as_str())
1986 .map(str::to_string);
1987
1988 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1997 Some(v) if !v.is_null() => {
1998 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1999 }
2000 _ => None,
2001 };
2002
2003 let result = match (session_id, scope) {
2004 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
2009 (Some(sid), None) => {
2010 session
2011 .runtime
2012 .execute_with_session(&submit.proposal, &sid)
2013 .await
2014 }
2015 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
2016 (None, None) => session.runtime.execute(&submit.proposal).await,
2017 };
2018
2019 let current_run = session.current_run_id.lock().await.clone();
2031 if let Some(run_id) = current_run {
2032 let start_index = state.run_turn_count(&run_id).await;
2033 let records =
2034 crate::run_trace::record_turns(&submit.proposal, &result.results, start_index);
2035 if !records.is_empty() {
2036 match state.record_run_turns(&run_id, records).await {
2045 crate::session::RecordRunTurnsOutcome::RefusedCeiling => {
2046 tracing::warn!(
2047 run_id = %run_id,
2048 "proposal-recorder: run hit the turn ceiling; trace append refused (runaway backstop)"
2049 );
2050 }
2051 crate::session::RecordRunTurnsOutcome::Appended { .. }
2052 | crate::session::RecordRunTurnsOutcome::UnknownOrTerminal => {}
2053 }
2054 }
2055 }
2056
2057 serde_json::to_value(result).map_err(|e| e.to_string())
2058}
2059
2060async fn handle_session_policy_open(
2061 session: &crate::session::ClientSession,
2062) -> Result<Value, String> {
2063 let id = session.runtime.open_session().await;
2064 Ok(serde_json::json!({ "session_id": id }))
2065}
2066
2067async fn handle_session_policy_close(
2068 req: &JsonRpcMessage,
2069 session: &crate::session::ClientSession,
2070) -> Result<Value, String> {
2071 let sid = req
2072 .params
2073 .get("session_id")
2074 .and_then(|v| v.as_str())
2075 .ok_or("missing 'session_id'")?;
2076 let closed = session.runtime.close_session(sid).await;
2077 Ok(serde_json::json!({ "closed": closed }))
2078}
2079
2080async fn handle_policy_register(
2086 req: &JsonRpcMessage,
2087 session: &crate::session::ClientSession,
2088) -> Result<Value, String> {
2089 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
2090 .map_err(|e| format!("invalid policy params: {e}"))?;
2091 let session_id = req
2092 .params
2093 .get("session_id")
2094 .and_then(|v| v.as_str())
2095 .map(str::to_string);
2096 let check = build_policy_check(&def)
2097 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
2098 match session_id {
2099 Some(sid) => session
2100 .runtime
2101 .register_policy_in_session(&sid, &def.name, check, "")
2102 .await
2103 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
2104 None => {
2105 let mut policies = session.runtime.policies.write().await;
2106 policies.register(&def.name, check, "");
2107 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
2108 }
2109 }
2110}
2111
2112async fn handle_verify(
2113 req: &JsonRpcMessage,
2114 session: &crate::session::ClientSession,
2115) -> Result<Value, String> {
2116 let vr: VerifyRequest =
2117 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2118 let tools_guard = session.runtime.tools.read().await;
2124 let result =
2125 car_verify::verify_with_schemas(&vr.proposal, Some(&vr.initial_state), Some(&tools_guard), 30);
2126 drop(tools_guard);
2127 serde_json::to_value(VerifyResponse {
2128 valid: result.valid,
2129 issues: result
2130 .issues
2131 .iter()
2132 .map(|i| VerifyIssueProto {
2133 action_id: i.action_id.clone(),
2134 severity: i.severity.clone(),
2135 message: i.message.clone(),
2136 })
2137 .collect(),
2138 simulated_state: result.simulated_state,
2139 })
2140 .map_err(|e| e.to_string())
2141}
2142
2143fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
2150 req.params
2151 .get("tenant_id")
2152 .and_then(|v| v.as_str())
2153 .filter(|s| !s.is_empty())
2154 .map(str::to_string)
2155}
2156
2157async fn handle_state_get(
2158 req: &JsonRpcMessage,
2159 session: &crate::session::ClientSession,
2160) -> Result<Value, String> {
2161 let key = req
2162 .params
2163 .get("key")
2164 .and_then(|v| v.as_str())
2165 .ok_or("missing 'key'")?;
2166 let tenant = tenant_from_params(req);
2167 Ok(session
2168 .runtime
2169 .state
2170 .scoped(tenant.as_deref())
2171 .get(key)
2172 .unwrap_or(Value::Null))
2173}
2174
2175async fn handle_state_set(
2176 req: &JsonRpcMessage,
2177 session: &crate::session::ClientSession,
2178) -> Result<Value, String> {
2179 let key = req
2180 .params
2181 .get("key")
2182 .and_then(|v| v.as_str())
2183 .ok_or("missing 'key'")?;
2184 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2185 let tenant = tenant_from_params(req);
2186 session
2187 .runtime
2188 .state
2189 .scoped(tenant.as_deref())
2190 .set(key, value, "client");
2191 Ok(Value::from("ok"))
2192}
2193
2194async fn handle_state_exists(
2198 req: &JsonRpcMessage,
2199 session: &crate::session::ClientSession,
2200) -> Result<Value, String> {
2201 let key = req
2202 .params
2203 .get("key")
2204 .and_then(|v| v.as_str())
2205 .ok_or("missing 'key'")?;
2206 let tenant = tenant_from_params(req);
2207 Ok(Value::Bool(
2208 session.runtime.state.scoped(tenant.as_deref()).exists(key),
2209 ))
2210}
2211
2212async fn handle_state_keys(
2215 req: &JsonRpcMessage,
2216 session: &crate::session::ClientSession,
2217) -> Result<Value, String> {
2218 let tenant = tenant_from_params(req);
2219 Ok(Value::Array(
2220 session
2221 .runtime
2222 .state
2223 .scoped(tenant.as_deref())
2224 .keys()
2225 .into_iter()
2226 .map(Value::String)
2227 .collect(),
2228 ))
2229}
2230
2231async fn handle_state_snapshot(
2242 req: &JsonRpcMessage,
2243 session: &crate::session::ClientSession,
2244) -> Result<Value, String> {
2245 let tenant = tenant_from_params(req);
2246 let view = session.runtime.state.scoped(tenant.as_deref());
2247 let mut map = serde_json::Map::new();
2248 for key in view.keys() {
2249 if let Some(value) = view.get(&key) {
2250 map.insert(key, value);
2251 }
2252 }
2253 Ok(Value::Object(map))
2254}
2255
2256fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2262 let base = car_ffi_common::memory_path::ensure_base()
2263 .map_err(|e| format!("memory base unavailable: {e}"))?;
2264 let dir = base.join("agents");
2265 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2266 Ok(dir.join(format!("{agent_id}.json")))
2267}
2268
2269async fn get_or_load_agent_memgine(
2276 state: &Arc<ServerState>,
2277 agent_id: &str,
2278) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2279 {
2280 let map = state.agent_memgines.lock().await;
2281 if let Some(eng) = map.get(agent_id) {
2282 return Ok(eng.clone());
2283 }
2284 }
2285 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2287 None,
2288 )));
2289 let path = agent_memgine_snapshot_path(agent_id)?;
2290 if path.exists() {
2291 let content = std::fs::read_to_string(&path)
2292 .map_err(|e| format!("read {}: {}", path.display(), e))?;
2293 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2294 let mut g = engine.lock().await;
2295 let mut loaded: u32 = 0;
2296 for fact in &facts {
2297 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2298 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2299 let kind = fact
2300 .get("kind")
2301 .and_then(|v| v.as_str())
2302 .unwrap_or("pattern");
2303 let fid = format!("loaded-{loaded}");
2304 g.ingest_fact(
2305 &fid,
2306 subject,
2307 body,
2308 "user",
2309 "peer",
2310 chrono::Utc::now(),
2311 "global",
2312 None,
2313 vec![],
2314 kind == "constraint",
2315 );
2316 loaded += 1;
2317 }
2318 }
2319 let mut map = state.agent_memgines.lock().await;
2320 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2321 Ok(stored)
2322}
2323
2324async fn persist_agent_memgine(
2328 agent_id: &str,
2329 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2330) -> Result<(), String> {
2331 let path = agent_memgine_snapshot_path(agent_id)?;
2332 let g = engine.lock().await;
2333 let facts: Vec<Value> = g
2334 .graph
2335 .inner
2336 .node_indices()
2337 .filter_map(|nix| {
2338 let node = g.graph.inner.node_weight(nix)?;
2339 if !node.is_valid() {
2340 return None;
2341 }
2342 if node.kind == car_memgine::MemKind::Identity
2343 || node.kind == car_memgine::MemKind::Environment
2344 {
2345 return None;
2346 }
2347 Some(serde_json::json!({
2348 "subject": node.key,
2349 "body": node.value,
2350 "kind": match node.kind {
2351 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2352 car_memgine::MemKind::Conversation => "outcome",
2353 _ => "pattern",
2354 },
2355 "confidence": 0.5,
2356 "content_type": node.content_type.as_label(),
2357 }))
2358 })
2359 .collect();
2360 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2361 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2362 Ok(())
2363}
2364
2365async fn handle_memory_fact_count(
2372 session: &crate::session::ClientSession,
2373) -> Result<Value, String> {
2374 let engine_arc = session.effective_memgine().await;
2375 let engine = engine_arc.lock().await;
2376 Ok(Value::from(engine.valid_fact_count()))
2377}
2378
2379async fn handle_memory_add_fact(
2380 req: &JsonRpcMessage,
2381 session: &crate::session::ClientSession,
2382) -> Result<Value, String> {
2383 let subject = req
2384 .params
2385 .get("subject")
2386 .and_then(|v| v.as_str())
2387 .ok_or("missing subject")?;
2388 let body = req
2389 .params
2390 .get("body")
2391 .and_then(|v| v.as_str())
2392 .ok_or("missing body")?;
2393 let kind = req
2394 .params
2395 .get("kind")
2396 .and_then(|v| v.as_str())
2397 .unwrap_or("pattern");
2398 let engine_arc = session.effective_memgine().await;
2402 let count = {
2403 let mut engine = engine_arc.lock().await;
2404 let fid = format!("ws-{}", engine.valid_fact_count());
2405 engine.ingest_fact(
2406 &fid,
2407 subject,
2408 body,
2409 "user",
2410 "peer",
2411 chrono::Utc::now(),
2412 "global",
2413 None,
2414 vec![],
2415 kind == "constraint",
2416 );
2417 engine.valid_fact_count()
2418 };
2419 if let Some(id) = session.agent_id.lock().await.clone() {
2422 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2423 tracing::warn!(agent_id = %id, error = %e,
2424 "agent memgine persist failed; in-memory state is canonical");
2425 }
2426 }
2427 Ok(Value::from(count))
2428}
2429
2430async fn handle_memory_query(
2431 req: &JsonRpcMessage,
2432 session: &crate::session::ClientSession,
2433) -> Result<Value, String> {
2434 let query = req
2435 .params
2436 .get("query")
2437 .and_then(|v| v.as_str())
2438 .ok_or("missing query")?;
2439 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2440 let engine_arc = session.effective_memgine().await;
2441 let engine = engine_arc.lock().await;
2442 let seeds = engine.graph.find_seeds(query, 5);
2443 let hits = if !seeds.is_empty() {
2448 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2449 } else {
2450 vec![]
2451 };
2452 let results: Vec<Value> = hits
2453 .iter()
2454 .filter_map(|hit| {
2455 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2456 Some(serde_json::json!({
2457 "subject": node.key,
2458 "body": node.value,
2459 "kind": format!("{:?}", node.kind).to_lowercase(),
2460 "confidence": hit.activation,
2461 }))
2462 })
2463 .collect();
2464 serde_json::to_value(results).map_err(|e| e.to_string())
2465}
2466
2467async fn handle_memory_build_context(
2468 req: &JsonRpcMessage,
2469 session: &crate::session::ClientSession,
2470) -> Result<Value, String> {
2471 let query = req
2472 .params
2473 .get("query")
2474 .and_then(|v| v.as_str())
2475 .unwrap_or("");
2476 let model_context_window = req
2480 .params
2481 .get("model_context_window")
2482 .and_then(|v| v.as_u64())
2483 .map(|w| w as usize);
2484 let mut engine = session.memgine.lock().await;
2485 Ok(Value::from(
2486 engine.build_context_for_model(query, model_context_window),
2487 ))
2488}
2489
2490async fn handle_memory_build_context_fast(
2496 req: &JsonRpcMessage,
2497 session: &crate::session::ClientSession,
2498) -> Result<Value, String> {
2499 let query = req
2500 .params
2501 .get("query")
2502 .and_then(|v| v.as_str())
2503 .unwrap_or("");
2504 let model_context_window = req
2505 .params
2506 .get("model_context_window")
2507 .and_then(|v| v.as_u64())
2508 .map(|w| w as usize);
2509 let mut engine = session.memgine.lock().await;
2510 Ok(Value::from(engine.build_context_with_options(
2511 query,
2512 model_context_window,
2513 car_memgine::ContextMode::Fast,
2514 None,
2515 )))
2516}
2517
2518async fn handle_memory_persist(
2534 req: &JsonRpcMessage,
2535 session: &crate::session::ClientSession,
2536) -> Result<Value, String> {
2537 let path = req
2538 .params
2539 .get("path")
2540 .and_then(|v| v.as_str())
2541 .ok_or("missing path")?;
2542 let resolved = car_ffi_common::memory_path::resolve(path)
2543 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2544 let engine = session.memgine.lock().await;
2545 let facts: Vec<Value> = engine
2546 .graph
2547 .inner
2548 .node_indices()
2549 .filter_map(|nix| {
2550 let node = engine.graph.inner.node_weight(nix)?;
2551 if !node.is_valid() {
2552 return None;
2553 }
2554 if node.kind == car_memgine::MemKind::Identity
2555 || node.kind == car_memgine::MemKind::Environment
2556 {
2557 return None;
2558 }
2559 Some(serde_json::json!({
2560 "subject": node.key,
2561 "body": node.value,
2562 "kind": match node.kind {
2563 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2564 car_memgine::MemKind::Conversation => "outcome",
2565 _ => "pattern",
2566 },
2567 "confidence": 0.5,
2568 "content_type": node.content_type.as_label(),
2569 }))
2570 })
2571 .collect();
2572 let count = facts.len();
2573 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2574 std::fs::write(&resolved, json)
2575 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2576 Ok(Value::from(count as u64))
2577}
2578
2579async fn handle_memory_load(
2585 req: &JsonRpcMessage,
2586 session: &crate::session::ClientSession,
2587) -> Result<Value, String> {
2588 let path = req
2589 .params
2590 .get("path")
2591 .and_then(|v| v.as_str())
2592 .ok_or("missing path")?;
2593 let resolved = car_ffi_common::memory_path::resolve(path)
2594 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2595 let content = std::fs::read_to_string(&resolved)
2596 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2597 let facts: Vec<Value> =
2598 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2599 let mut engine = session.memgine.lock().await;
2600 engine.reset();
2601 let mut count: u32 = 0;
2602 for fact in &facts {
2603 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2604 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2605 let kind = fact
2606 .get("kind")
2607 .and_then(|v| v.as_str())
2608 .unwrap_or("pattern");
2609 let fid = format!("loaded-{}", count);
2610 engine.ingest_fact(
2611 &fid,
2612 subject,
2613 body,
2614 "user",
2615 "peer",
2616 chrono::Utc::now(),
2617 "global",
2618 None,
2619 vec![],
2620 kind == "constraint",
2621 );
2622 count += 1;
2623 }
2624 Ok(Value::from(count))
2625}
2626
2627async fn handle_skill_ingest(
2630 req: &JsonRpcMessage,
2631 session: &crate::session::ClientSession,
2632) -> Result<Value, String> {
2633 let name = req
2634 .params
2635 .get("name")
2636 .and_then(|v| v.as_str())
2637 .ok_or("missing name")?;
2638 let code = req
2639 .params
2640 .get("code")
2641 .and_then(|v| v.as_str())
2642 .ok_or("missing code")?;
2643 let platform = req
2644 .params
2645 .get("platform")
2646 .and_then(|v| v.as_str())
2647 .unwrap_or("unknown");
2648 let persona = req
2649 .params
2650 .get("persona")
2651 .and_then(|v| v.as_str())
2652 .unwrap_or("");
2653 let url_pattern = req
2654 .params
2655 .get("url_pattern")
2656 .and_then(|v| v.as_str())
2657 .unwrap_or("");
2658 let description = req
2659 .params
2660 .get("description")
2661 .and_then(|v| v.as_str())
2662 .unwrap_or("");
2663 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2664 let keywords: Vec<String> = req
2665 .params
2666 .get("task_keywords")
2667 .and_then(|v| v.as_array())
2668 .map(|arr| {
2669 arr.iter()
2670 .filter_map(|v| v.as_str().map(String::from))
2671 .collect()
2672 })
2673 .unwrap_or_default();
2674
2675 let trigger = car_memgine::SkillTrigger {
2676 persona: persona.into(),
2677 url_pattern: url_pattern.into(),
2678 task_keywords: keywords,
2679 structured: None,
2680 };
2681 let mut engine = session.memgine.lock().await;
2682 let node = engine.ingest_skill(
2683 name,
2684 code,
2685 platform,
2686 trigger,
2687 description,
2688 supersedes,
2689 vec![],
2690 vec![],
2691 );
2692 Ok(Value::from(node.index() as u64))
2693}
2694
2695async fn handle_skill_find(
2696 req: &JsonRpcMessage,
2697 session: &crate::session::ClientSession,
2698) -> Result<Value, String> {
2699 let persona = req
2700 .params
2701 .get("persona")
2702 .and_then(|v| v.as_str())
2703 .unwrap_or("");
2704 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2705 let task = req
2706 .params
2707 .get("task")
2708 .and_then(|v| v.as_str())
2709 .unwrap_or("");
2710 let max = req
2711 .params
2712 .get("max_results")
2713 .and_then(|v| v.as_u64())
2714 .unwrap_or(1) as usize;
2715 let engine = session.memgine.lock().await;
2716 let results = engine.find_skill(persona, url, task, max);
2717 let json: Vec<Value> = results
2718 .iter()
2719 .map(|(m, s)| {
2720 serde_json::json!({
2721 "name": m.name, "code": m.code, "platform": m.platform,
2722 "description": m.description, "stats": m.stats, "match_score": s,
2723 })
2724 })
2725 .collect();
2726 serde_json::to_value(json).map_err(|e| e.to_string())
2727}
2728
2729async fn handle_skill_report(
2730 req: &JsonRpcMessage,
2731 session: &crate::session::ClientSession,
2732) -> Result<Value, String> {
2733 let name = req
2734 .params
2735 .get("skill_name")
2736 .and_then(|v| v.as_str())
2737 .ok_or("missing skill_name")?;
2738 let outcome_str = req
2739 .params
2740 .get("outcome")
2741 .and_then(|v| v.as_str())
2742 .ok_or("missing outcome")?;
2743 let outcome = match outcome_str {
2744 "success" => car_memgine::SkillOutcome::Success,
2745 _ => car_memgine::SkillOutcome::Fail,
2746 };
2747 let mut engine = session.memgine.lock().await;
2748 let stats = engine
2749 .report_outcome(name, outcome)
2750 .ok_or(format!("skill '{}' not found", name))?;
2751 serde_json::to_value(stats).map_err(|e| e.to_string())
2752}
2753
2754struct WsAgentRunner {
2763 channel: Arc<WsChannel>,
2764 host: Arc<crate::host::HostState>,
2765 client_id: String,
2766}
2767
2768#[async_trait::async_trait]
2769impl car_multi::AgentRunner for WsAgentRunner {
2770 async fn run(
2771 &self,
2772 spec: &car_multi::AgentSpec,
2773 task: &str,
2774 _runtime: &car_engine::Runtime,
2775 _mailbox: &car_multi::Mailbox,
2776 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2777 use futures::SinkExt;
2778
2779 let request_id = self.channel.next_request_id();
2780 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2781 let agent = self
2782 .host
2783 .register_agent(
2784 &self.client_id,
2785 RegisterHostAgentRequest {
2786 id: Some(agent_id.clone()),
2787 name: spec.name.clone(),
2788 kind: "callback".to_string(),
2789 capabilities: spec.tools.clone(),
2790 project: spec
2791 .metadata
2792 .get("project")
2793 .and_then(|v| v.as_str())
2794 .map(str::to_string),
2795 pid: None,
2796 display: serde_json::from_value(
2797 spec.metadata
2798 .get("display")
2799 .cloned()
2800 .unwrap_or(serde_json::Value::Null),
2801 )
2802 .unwrap_or_default(),
2803 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2804 },
2805 )
2806 .await
2807 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2808 let _ = self
2809 .host
2810 .set_status(
2811 &self.client_id,
2812 SetHostAgentStatusRequest {
2813 agent_id: agent.id.clone(),
2814 status: HostAgentStatus::Running,
2815 current_task: Some(task.to_string()),
2816 message: Some(format!("{} started", spec.name)),
2817 payload: serde_json::json!({ "task": task }),
2818 },
2819 )
2820 .await;
2821
2822 let rpc_request = serde_json::json!({
2823 "jsonrpc": "2.0",
2824 "method": "multi.run_agent",
2825 "params": {
2826 "spec": spec,
2827 "task": task,
2828 },
2829 "id": request_id,
2830 });
2831
2832 let (tx, rx) = tokio::sync::oneshot::channel();
2834 self.channel
2835 .pending
2836 .lock()
2837 .await
2838 .insert(request_id.clone(), tx);
2839
2840 let msg = Message::Text(
2841 serde_json::to_string(&rpc_request)
2842 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2843 .into(),
2844 );
2845 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2846 let _ = self
2847 .host
2848 .set_status(
2849 &self.client_id,
2850 SetHostAgentStatusRequest {
2851 agent_id: agent_id.clone(),
2852 status: HostAgentStatus::Errored,
2853 current_task: None,
2854 message: Some(format!("{} failed to start", spec.name)),
2855 payload: serde_json::json!({ "error": e.to_string() }),
2856 },
2857 )
2858 .await;
2859 return Err(car_multi::MultiError::AgentFailed(
2860 spec.name.clone(),
2861 format!("ws send error: {}", e),
2862 ));
2863 }
2864
2865 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2867 Ok(Ok(response)) => response,
2868 Ok(Err(_)) => {
2869 let _ = self
2870 .host
2871 .set_status(
2872 &self.client_id,
2873 SetHostAgentStatusRequest {
2874 agent_id: agent_id.clone(),
2875 status: HostAgentStatus::Errored,
2876 current_task: None,
2877 message: Some(format!("{} callback channel closed", spec.name)),
2878 payload: Value::Null,
2879 },
2880 )
2881 .await;
2882 return Err(car_multi::MultiError::AgentFailed(
2883 spec.name.clone(),
2884 "agent callback channel closed".into(),
2885 ));
2886 }
2887 Err(_) => {
2888 let _ = self
2889 .host
2890 .set_status(
2891 &self.client_id,
2892 SetHostAgentStatusRequest {
2893 agent_id: agent_id.clone(),
2894 status: HostAgentStatus::Errored,
2895 current_task: None,
2896 message: Some(format!("{} timed out", spec.name)),
2897 payload: Value::Null,
2898 },
2899 )
2900 .await;
2901 return Err(car_multi::MultiError::AgentFailed(
2902 spec.name.clone(),
2903 "agent callback timed out (300s)".into(),
2904 ));
2905 }
2906 };
2907
2908 if let Some(err) = response.error {
2909 let _ = self
2910 .host
2911 .set_status(
2912 &self.client_id,
2913 SetHostAgentStatusRequest {
2914 agent_id: agent_id.clone(),
2915 status: HostAgentStatus::Errored,
2916 current_task: None,
2917 message: Some(format!("{} errored", spec.name)),
2918 payload: serde_json::json!({ "error": err }),
2919 },
2920 )
2921 .await;
2922 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2923 }
2924
2925 let output_value = response.output.unwrap_or(Value::Null);
2926 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2927 car_multi::MultiError::AgentFailed(
2928 spec.name.clone(),
2929 format!("invalid AgentOutput: {}", e),
2930 )
2931 })?;
2932 let status = if output.error.is_some() {
2933 HostAgentStatus::Errored
2934 } else {
2935 HostAgentStatus::Completed
2936 };
2937 let message = if output.error.is_some() {
2938 format!("{} errored", spec.name)
2939 } else {
2940 format!("{} completed", spec.name)
2941 };
2942 let _ = self
2943 .host
2944 .set_status(
2945 &self.client_id,
2946 SetHostAgentStatusRequest {
2947 agent_id,
2948 status,
2949 current_task: None,
2950 message: Some(message),
2951 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2952 },
2953 )
2954 .await;
2955
2956 Ok(output)
2957 }
2958}
2959
2960fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2961 let safe_name: String = name
2962 .chars()
2963 .map(|c| {
2964 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2965 c
2966 } else {
2967 '-'
2968 }
2969 })
2970 .collect();
2971 format!("{}:{}:{}", client_id, safe_name, request_id)
2972}
2973
2974fn multi_infra_with_budget(req: &JsonRpcMessage) -> Result<car_multi::SharedInfra, String> {
2979 let infra = car_multi::SharedInfra::new();
2980 match req.params.get("budget") {
2981 None | Some(Value::Null) => Ok(infra),
2982 Some(v) => {
2983 let limits: car_multi::BudgetLimits =
2984 serde_json::from_value(v.clone()).map_err(|e| format!("invalid budget: {}", e))?;
2985 Ok(infra.with_budget(limits))
2986 }
2987 }
2988}
2989
2990async fn handle_multi_swarm(
2991 req: &JsonRpcMessage,
2992 session: &crate::session::ClientSession,
2993) -> Result<Value, String> {
2994 let mode_str = req
2995 .params
2996 .get("mode")
2997 .and_then(|v| v.as_str())
2998 .ok_or("missing 'mode'")?;
2999 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
3000 let task = req
3001 .params
3002 .get("task")
3003 .and_then(|v| v.as_str())
3004 .ok_or("missing 'task'")?;
3005
3006 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
3007 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
3008 let agent_specs: Vec<car_multi::AgentSpec> =
3009 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
3010 let synth: Option<car_multi::AgentSpec> = req
3011 .params
3012 .get("synthesizer")
3013 .map(|v| serde_json::from_value(v.clone()))
3014 .transpose()
3015 .map_err(|e| format!("invalid synthesizer: {}", e))?;
3016
3017 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3018 channel: session.channel.clone(),
3019 host: session.host.clone(),
3020 client_id: session.client_id.clone(),
3021 });
3022 let infra = multi_infra_with_budget(req)?;
3023
3024 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
3025 if let Some(s) = synth {
3026 swarm = swarm.with_synthesizer(s);
3027 }
3028
3029 let result = swarm
3030 .run(task, &runner, &infra)
3031 .await
3032 .map_err(|e| format!("swarm error: {}", e))?;
3033 serde_json::to_value(result).map_err(|e| e.to_string())
3034}
3035
3036async fn handle_multi_pipeline(
3037 req: &JsonRpcMessage,
3038 session: &crate::session::ClientSession,
3039) -> Result<Value, String> {
3040 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
3041 let task = req
3042 .params
3043 .get("task")
3044 .and_then(|v| v.as_str())
3045 .ok_or("missing 'task'")?;
3046
3047 let stage_specs: Vec<car_multi::AgentSpec> =
3048 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
3049
3050 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3051 channel: session.channel.clone(),
3052 host: session.host.clone(),
3053 client_id: session.client_id.clone(),
3054 });
3055 let infra = multi_infra_with_budget(req)?;
3056
3057 let result = car_multi::Pipeline::new(stage_specs)
3058 .run(task, &runner, &infra)
3059 .await
3060 .map_err(|e| format!("pipeline error: {}", e))?;
3061 serde_json::to_value(result).map_err(|e| e.to_string())
3062}
3063
3064async fn handle_multi_supervisor(
3065 req: &JsonRpcMessage,
3066 session: &crate::session::ClientSession,
3067) -> Result<Value, String> {
3068 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
3069 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
3070 let task = req
3071 .params
3072 .get("task")
3073 .and_then(|v| v.as_str())
3074 .ok_or("missing 'task'")?;
3075 let max_rounds = req
3076 .params
3077 .get("max_rounds")
3078 .and_then(|v| v.as_u64())
3079 .unwrap_or(3) as u32;
3080
3081 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
3082 .map_err(|e| format!("invalid workers: {}", e))?;
3083 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
3084 .map_err(|e| format!("invalid supervisor: {}", e))?;
3085
3086 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3087 channel: session.channel.clone(),
3088 host: session.host.clone(),
3089 client_id: session.client_id.clone(),
3090 });
3091 let infra = multi_infra_with_budget(req)?;
3092
3093 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
3094 .with_max_rounds(max_rounds)
3095 .run(task, &runner, &infra)
3096 .await
3097 .map_err(|e| format!("supervisor error: {}", e))?;
3098 serde_json::to_value(result).map_err(|e| e.to_string())
3099}
3100
3101async fn handle_multi_map_reduce(
3102 req: &JsonRpcMessage,
3103 session: &crate::session::ClientSession,
3104) -> Result<Value, String> {
3105 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
3106 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
3107 let task = req
3108 .params
3109 .get("task")
3110 .and_then(|v| v.as_str())
3111 .ok_or("missing 'task'")?;
3112 let items_val = req.params.get("items").ok_or("missing 'items'")?;
3113
3114 let mapper_spec: car_multi::AgentSpec =
3115 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
3116 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
3117 .map_err(|e| format!("invalid reducer: {}", e))?;
3118 let items: Vec<String> =
3119 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
3120
3121 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3122 channel: session.channel.clone(),
3123 host: session.host.clone(),
3124 client_id: session.client_id.clone(),
3125 });
3126 let infra = multi_infra_with_budget(req)?;
3127
3128 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
3129 .run(task, &items, &runner, &infra)
3130 .await
3131 .map_err(|e| format!("map_reduce error: {}", e))?;
3132 serde_json::to_value(result).map_err(|e| e.to_string())
3133}
3134
3135async fn handle_multi_vote(
3136 req: &JsonRpcMessage,
3137 session: &crate::session::ClientSession,
3138) -> Result<Value, String> {
3139 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
3140 let task = req
3141 .params
3142 .get("task")
3143 .and_then(|v| v.as_str())
3144 .ok_or("missing 'task'")?;
3145
3146 let agent_specs: Vec<car_multi::AgentSpec> =
3147 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
3148 let synth: Option<car_multi::AgentSpec> = req
3149 .params
3150 .get("synthesizer")
3151 .map(|v| serde_json::from_value(v.clone()))
3152 .transpose()
3153 .map_err(|e| format!("invalid synthesizer: {}", e))?;
3154
3155 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3156 channel: session.channel.clone(),
3157 host: session.host.clone(),
3158 client_id: session.client_id.clone(),
3159 });
3160 let infra = multi_infra_with_budget(req)?;
3161
3162 let mut vote = car_multi::Vote::new(agent_specs);
3163 if let Some(s) = synth {
3164 vote = vote.with_synthesizer(s);
3165 }
3166
3167 let result = vote
3168 .run(task, &runner, &infra)
3169 .await
3170 .map_err(|e| format!("vote error: {}", e))?;
3171 serde_json::to_value(result).map_err(|e| e.to_string())
3172}
3173
3174async fn handle_multi_tournament(
3175 req: &JsonRpcMessage,
3176 session: &crate::session::ClientSession,
3177) -> Result<Value, String> {
3178 let competitors_val = req.params.get("competitors").ok_or("missing 'competitors'")?;
3179 let judge_val = req.params.get("judge").ok_or("missing 'judge'")?;
3180 let task = req
3181 .params
3182 .get("task")
3183 .and_then(|v| v.as_str())
3184 .ok_or("missing 'task'")?;
3185
3186 let competitors: Vec<car_multi::AgentSpec> = serde_json::from_value(competitors_val.clone())
3187 .map_err(|e| format!("invalid competitors: {}", e))?;
3188 let judge: car_multi::AgentSpec =
3189 serde_json::from_value(judge_val.clone()).map_err(|e| format!("invalid judge: {}", e))?;
3190
3191 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3192 channel: session.channel.clone(),
3193 host: session.host.clone(),
3194 client_id: session.client_id.clone(),
3195 });
3196 let infra = multi_infra_with_budget(req)?;
3197
3198 let result = car_multi::Tournament::new(competitors, judge)
3199 .run(task, &runner, &infra)
3200 .await
3201 .map_err(|e| format!("tournament error: {}", e))?;
3202 serde_json::to_value(result).map_err(|e| e.to_string())
3203}
3204
3205async fn handle_multi_subtask(
3206 req: &JsonRpcMessage,
3207 session: &crate::session::ClientSession,
3208) -> Result<Value, String> {
3209 let main_val = req.params.get("main").ok_or("missing 'main'")?;
3210 let task = req
3211 .params
3212 .get("task")
3213 .and_then(|v| v.as_str())
3214 .ok_or("missing 'task'")?;
3215
3216 let main_spec: car_multi::AgentSpec =
3217 serde_json::from_value(main_val.clone()).map_err(|e| format!("invalid main: {}", e))?;
3218
3219 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3220 channel: session.channel.clone(),
3221 host: session.host.clone(),
3222 client_id: session.client_id.clone(),
3223 });
3224 let infra = multi_infra_with_budget(req)?;
3225
3226 let result = car_multi::SpawnSubtask::new(main_spec)
3227 .run(task, &runner, &infra)
3228 .await
3229 .map_err(|e| format!("spawn_subtask error: {}", e))?;
3230 serde_json::to_value(result).map_err(|e| e.to_string())
3231}
3232
3233fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
3238 let name = req
3239 .params
3240 .get("name")
3241 .and_then(|v| v.as_str())
3242 .ok_or("scheduler.create requires 'name'")?;
3243 let prompt = req
3244 .params
3245 .get("prompt")
3246 .and_then(|v| v.as_str())
3247 .ok_or("scheduler.create requires 'prompt'")?;
3248
3249 let mut task = car_scheduler::Task::new(name, prompt);
3250
3251 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3252 let trigger = match t {
3253 "once" => car_scheduler::TaskTrigger::Once,
3254 "cron" => car_scheduler::TaskTrigger::Cron,
3255 "interval" => car_scheduler::TaskTrigger::Interval,
3256 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3257 _ => car_scheduler::TaskTrigger::Manual,
3258 };
3259 let schedule = req
3260 .params
3261 .get("schedule")
3262 .and_then(|v| v.as_str())
3263 .unwrap_or("");
3264 task = task.with_trigger(trigger, schedule);
3265 }
3266
3267 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3268 task = task.with_system_prompt(sp);
3269 }
3270
3271 serde_json::to_value(&task).map_err(|e| e.to_string())
3272}
3273
3274async fn handle_scheduler_run(
3275 req: &JsonRpcMessage,
3276 session: &crate::session::ClientSession,
3277) -> Result<Value, String> {
3278 let task_val = req
3279 .params
3280 .get("task")
3281 .ok_or("scheduler.run requires 'task'")?;
3282 let mut task: car_scheduler::Task =
3283 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3284
3285 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3286 channel: session.channel.clone(),
3287 host: session.host.clone(),
3288 client_id: session.client_id.clone(),
3289 });
3290 let executor = car_scheduler::Executor::new(runner);
3291 let execution = executor.run_once(&mut task).await;
3292
3293 serde_json::to_value(&execution).map_err(|e| e.to_string())
3294}
3295
3296async fn handle_scheduler_run_loop(
3297 req: &JsonRpcMessage,
3298 session: &crate::session::ClientSession,
3299) -> Result<Value, String> {
3300 let task_val = req
3301 .params
3302 .get("task")
3303 .ok_or("scheduler.run_loop requires 'task'")?;
3304 let mut task: car_scheduler::Task =
3305 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3306 let max_iterations = req
3307 .params
3308 .get("max_iterations")
3309 .and_then(|v| v.as_u64())
3310 .map(|v| v as u32);
3311
3312 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3313 channel: session.channel.clone(),
3314 host: session.host.clone(),
3315 client_id: session.client_id.clone(),
3316 });
3317 let executor = car_scheduler::Executor::new(runner);
3318 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3319 let executions = executor
3320 .run_loop(&mut task, max_iterations, cancel_rx)
3321 .await;
3322
3323 serde_json::to_value(&executions).map_err(|e| e.to_string())
3324}
3325
3326fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3331 state.inference.get_or_init(|| {
3332 Arc::new(car_inference::InferenceEngine::new(
3333 car_inference::InferenceConfig::default(),
3334 ))
3335 })
3336}
3337
3338async fn handle_infer(
3339 msg: &JsonRpcMessage,
3340 state: &ServerState,
3341 session: &crate::session::ClientSession,
3342) -> Result<Value, String> {
3343 let engine = get_inference_engine(state);
3344 let mut req: car_inference::GenerateRequest =
3345 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3346
3347 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3349 let mut memgine = session.memgine.lock().await;
3350 let ctx = memgine.build_context(cq);
3351 if !ctx.is_empty() {
3352 req.context = Some(ctx);
3353 }
3354 }
3355
3356 let _permit = state.admission.acquire().await;
3362
3363 let result = engine
3374 .generate_tracked(req)
3375 .await
3376 .map_err(|e| e.to_string())?;
3377 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3378}
3379
3380async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3410 let engine = get_inference_engine(state);
3411 let req: car_inference::GenerateImageRequest =
3412 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3413 let _permit = state.admission.acquire().await;
3416 let result = engine
3417 .generate_image(req)
3418 .await
3419 .map_err(|e| e.to_string())?;
3420 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3421}
3422
3423async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3424 let engine = get_inference_engine(state);
3425 let req: car_inference::GenerateVideoRequest =
3426 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3427 let _permit = state.admission.acquire().await;
3428 let result = engine
3429 .generate_video(req)
3430 .await
3431 .map_err(|e| e.to_string())?;
3432 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3433}
3434
3435async fn handle_infer_stream(
3436 msg: &JsonRpcMessage,
3437 session: &crate::session::ClientSession,
3438 state: &ServerState,
3439) -> Result<Value, String> {
3440 use futures::SinkExt;
3441 use tokio_tungstenite::tungstenite::Message;
3442
3443 let engine = get_inference_engine(state);
3444 let mut req: car_inference::GenerateRequest =
3445 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3446
3447 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3450 let mut memgine = session.memgine.lock().await;
3451 let ctx = memgine.build_context(cq);
3452 if !ctx.is_empty() {
3453 req.context = Some(ctx);
3454 }
3455 }
3456
3457 let _permit = state.admission.acquire().await;
3458 let mut rx = engine
3459 .generate_tracked_stream(req)
3460 .await
3461 .map_err(|e| e.to_string())?;
3462
3463 let mut accumulator = car_inference::StreamAccumulator::default();
3464 let request_id = msg.id.clone();
3465
3466 while let Some(event) = rx.recv().await {
3467 let event_payload = match &event {
3468 car_inference::StreamEvent::TextDelta(text) => {
3469 serde_json::json!({"type": "text", "data": text})
3470 }
3471 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3472 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3473 }
3474 car_inference::StreamEvent::ToolCallDelta {
3475 index,
3476 arguments_delta,
3477 } => serde_json::json!({
3478 "type": "tool_delta",
3479 "index": index,
3480 "data": arguments_delta,
3481 }),
3482 car_inference::StreamEvent::Usage {
3483 input_tokens,
3484 output_tokens,
3485 } => serde_json::json!({
3486 "type": "usage",
3487 "input_tokens": input_tokens,
3488 "output_tokens": output_tokens,
3489 }),
3490 car_inference::StreamEvent::Done { .. } => {
3495 accumulator.push(&event);
3496 continue;
3497 }
3498 };
3499
3500 let notif = serde_json::json!({
3501 "jsonrpc": "2.0",
3502 "method": "inference.stream.event",
3503 "params": {
3504 "request_id": request_id,
3505 "event": event_payload,
3506 },
3507 });
3508 if let Ok(text) = serde_json::to_string(¬if) {
3509 let _ = session
3510 .channel
3511 .write
3512 .lock()
3513 .await
3514 .send(Message::Text(text.into()))
3515 .await;
3516 }
3517 accumulator.push(&event);
3518 }
3519
3520 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3521 Ok(serde_json::json!({
3522 "text": text,
3523 "tool_calls": tool_calls,
3524 "usage": usage,
3525 }))
3526}
3527
3528async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3529 let engine = get_inference_engine(state);
3530 let req: car_inference::EmbedRequest =
3531 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3532 let _permit = state.admission.acquire().await;
3536 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3537 Ok(serde_json::json!({"embeddings": result}))
3538}
3539
3540async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3541 let engine = get_inference_engine(state);
3542 let req: car_inference::ClassifyRequest =
3543 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3544 let _permit = state.admission.acquire().await;
3545 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3546 Ok(serde_json::json!({"classifications": result}))
3547}
3548
3549fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3553 let total = state.admission.permits();
3554 let available = state.admission.permits_available();
3555 let in_use = total.saturating_sub(available);
3556 Ok(serde_json::json!({
3557 "permits_total": total,
3558 "permits_available": available,
3559 "permits_in_use": in_use,
3560 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3561 }))
3562}
3563
3564async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3565 let model = msg
3566 .params
3567 .get("model")
3568 .and_then(|v| v.as_str())
3569 .ok_or("missing 'model' parameter")?;
3570 let text = msg
3571 .params
3572 .get("text")
3573 .and_then(|v| v.as_str())
3574 .ok_or("missing 'text' parameter")?;
3575 let engine = get_inference_engine(state);
3576 let ids = engine
3577 .tokenize(model, text)
3578 .await
3579 .map_err(|e| e.to_string())?;
3580 Ok(serde_json::json!({"tokens": ids}))
3581}
3582
3583async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3584 let model = msg
3585 .params
3586 .get("model")
3587 .and_then(|v| v.as_str())
3588 .ok_or("missing 'model' parameter")?;
3589 let tokens: Vec<u32> = msg
3590 .params
3591 .get("tokens")
3592 .and_then(|v| v.as_array())
3593 .ok_or("missing 'tokens' parameter")?
3594 .iter()
3595 .map(|t| {
3596 t.as_u64()
3597 .and_then(|n| u32::try_from(n).ok())
3598 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3599 })
3600 .collect::<Result<Vec<_>, _>>()?;
3601 let engine = get_inference_engine(state);
3602 let text = engine
3603 .detokenize(model, &tokens)
3604 .await
3605 .map_err(|e| e.to_string())?;
3606 Ok(serde_json::json!({"text": text}))
3607}
3608
3609async fn handle_models_register(
3628 req: &JsonRpcMessage,
3629 _state: &Arc<ServerState>,
3630) -> Result<Value, String> {
3631 let schema_value = match req.params.get("schema") {
3635 Some(v) => v.clone(),
3636 None => req.params.clone(),
3637 };
3638 let schema: car_inference::ModelSchema =
3639 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3640 let id = schema.id.clone();
3641
3642 let home = std::env::var_os("HOME")
3647 .or_else(|| std::env::var_os("USERPROFILE"))
3648 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3649 let car_dir = std::path::PathBuf::from(home).join(".car");
3650 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3651 let path = car_dir.join("models.json");
3652
3653 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3654 let text =
3655 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3656 if text.trim().is_empty() {
3657 Vec::new()
3658 } else {
3659 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3660 }
3661 } else {
3662 Vec::new()
3663 };
3664 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3666 *slot = schema;
3667 } else {
3668 models.push(schema);
3669 }
3670 let json =
3671 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3672 let tmp = path.with_extension("json.tmp");
3673 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3674 std::fs::rename(&tmp, &path)
3675 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3676 Ok(serde_json::json!({
3677 "id": id,
3678 "registered": true,
3679 "path": path.to_string_lossy(),
3680 "note": "Daemon restart required for live UnifiedRegistry visibility \
3681 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3682 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3683 }))
3684}
3685
3686async fn handle_models_unregister(
3697 req: &JsonRpcMessage,
3698 _state: &Arc<ServerState>,
3699) -> Result<Value, String> {
3700 let id = match req.params.get("id") {
3704 Some(v) => v
3705 .as_str()
3706 .ok_or_else(|| "`id` must be a string".to_string())?
3707 .to_string(),
3708 None => match req.params.as_str() {
3709 Some(s) => s.to_string(),
3710 None => return Err("missing `id` parameter".to_string()),
3711 },
3712 };
3713
3714 let home = std::env::var_os("HOME")
3715 .or_else(|| std::env::var_os("USERPROFILE"))
3716 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3717 let car_dir = std::path::PathBuf::from(home).join(".car");
3718 let path = car_dir.join("models.json");
3719
3720 if !path.exists() {
3721 return Err(format!(
3722 "no models.json at {} — nothing to unregister",
3723 path.display()
3724 ));
3725 }
3726 let text =
3727 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3728 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3729 Vec::new()
3730 } else {
3731 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3732 };
3733 let before = models.len();
3734 models.retain(|m| m.id != id);
3735 if models.len() == before {
3736 return Err(format!("model {} not found in {}", id, path.display()));
3737 }
3738 let json =
3739 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3740 let tmp = path.with_extension("json.tmp");
3741 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3742 std::fs::rename(&tmp, &path)
3743 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3744 Ok(serde_json::json!({
3745 "id": id,
3746 "unregistered": true,
3747 "path": path.to_string_lossy(),
3748 "note": "Daemon restart required for live UnifiedRegistry visibility \
3749 (phase 1, matching models.register).",
3750 }))
3751}
3752
3753fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3754 let engine = get_inference_engine(state);
3755 let models = engine.list_models();
3756 serde_json::to_value(&models).map_err(|e| e.to_string())
3757}
3758
3759fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3760 let engine = get_inference_engine(state);
3761 let models = engine.list_models_unified();
3762 serde_json::to_value(&models).map_err(|e| e.to_string())
3763}
3764
3765#[derive(Debug, Deserialize)]
3766#[serde(rename_all = "camelCase")]
3767struct ModelSearchParams {
3768 #[serde(default)]
3769 query: Option<String>,
3770 #[serde(default)]
3771 capability: Option<car_inference::ModelCapability>,
3772 #[serde(default)]
3773 provider: Option<String>,
3774 #[serde(default)]
3775 local_only: bool,
3776 #[serde(default)]
3777 available_only: bool,
3778 #[serde(default)]
3779 limit: Option<usize>,
3780}
3781
3782#[derive(Debug, Serialize)]
3783#[serde(rename_all = "camelCase")]
3784struct ModelSearchEntry {
3785 #[serde(flatten)]
3786 info: car_inference::ModelInfo,
3787 family: String,
3788 version: String,
3789 tags: Vec<String>,
3790 pullable: bool,
3791 upgrade: Option<car_inference::ModelUpgrade>,
3792}
3793
3794#[derive(Debug, Serialize)]
3795#[serde(rename_all = "camelCase")]
3796struct ModelSearchResponse {
3797 models: Vec<ModelSearchEntry>,
3798 upgrades: Vec<car_inference::ModelUpgrade>,
3799 total: usize,
3800 available: usize,
3801 local: usize,
3802 remote: usize,
3803}
3804
3805fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3806 let params: ModelSearchParams =
3807 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3808 query: None,
3809 capability: None,
3810 provider: None,
3811 local_only: false,
3812 available_only: false,
3813 limit: None,
3814 });
3815 let engine = get_inference_engine(state);
3816 let upgrades = engine.available_model_upgrades();
3817 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3818 .iter()
3819 .cloned()
3820 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3821 .collect();
3822 let query = params
3823 .query
3824 .as_deref()
3825 .map(str::trim)
3826 .filter(|q| !q.is_empty())
3827 .map(|q| q.to_ascii_lowercase());
3828 let provider = params
3829 .provider
3830 .as_deref()
3831 .map(str::trim)
3832 .filter(|p| !p.is_empty())
3833 .map(|p| p.to_ascii_lowercase());
3834
3835 let mut entries: Vec<ModelSearchEntry> = engine
3836 .list_schemas()
3837 .into_iter()
3838 .filter(|schema| {
3839 if let Some(capability) = params.capability {
3840 if !schema.has_capability(capability) {
3841 return false;
3842 }
3843 }
3844 if let Some(provider) = provider.as_deref() {
3845 if schema.provider.to_ascii_lowercase() != provider {
3846 return false;
3847 }
3848 }
3849 if params.local_only && !schema.is_local() {
3850 return false;
3851 }
3852 if params.available_only && !schema.available {
3853 return false;
3854 }
3855 if let Some(query) = query.as_deref() {
3856 let capability_text = schema
3857 .capabilities
3858 .iter()
3859 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3860 .collect::<Vec<_>>()
3861 .join(" ");
3862 let haystack = format!(
3863 "{} {} {} {} {} {}",
3864 schema.id,
3865 schema.name,
3866 schema.provider,
3867 schema.family,
3868 schema.tags.join(" "),
3869 capability_text
3870 )
3871 .to_ascii_lowercase();
3872 if !haystack.contains(query) {
3873 return false;
3874 }
3875 }
3876 true
3877 })
3878 .map(|schema| {
3879 let pullable = !schema.available
3880 && matches!(
3881 schema.source,
3882 car_inference::ModelSource::Local { .. }
3883 | car_inference::ModelSource::Mlx { .. }
3884 );
3885 let info = car_inference::ModelInfo::from(&schema);
3886 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3887 ModelSearchEntry {
3888 info,
3889 family: schema.family,
3890 version: schema.version,
3891 tags: schema.tags,
3892 pullable,
3893 upgrade,
3894 }
3895 })
3896 .collect();
3897 entries.sort_by(|a, b| {
3898 b.info
3899 .available
3900 .cmp(&a.info.available)
3901 .then(b.info.is_local.cmp(&a.info.is_local))
3902 .then(a.info.name.cmp(&b.info.name))
3903 });
3904 if let Some(limit) = params.limit {
3905 entries.truncate(limit);
3906 }
3907
3908 let total = entries.len();
3909 let available = entries.iter().filter(|entry| entry.info.available).count();
3910 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3911 let response = ModelSearchResponse {
3912 models: entries,
3913 upgrades,
3914 total,
3915 available,
3916 local,
3917 remote: total.saturating_sub(local),
3918 };
3919 serde_json::to_value(response).map_err(|e| e.to_string())
3920}
3921
3922fn optional_enum_param<T: serde::de::DeserializeOwned>(
3926 req: &JsonRpcMessage,
3927 key: &str,
3928) -> Result<Option<T>, String> {
3929 match req.params.get(key) {
3930 None | Some(Value::Null) => Ok(None),
3931 Some(v) => serde_json::from_value(v.clone())
3932 .map(Some)
3933 .map_err(|_| format!("invalid '{key}': {v}")),
3934 }
3935}
3936
3937fn recommend_from_params(
3942 req: &JsonRpcMessage,
3943 engine: &car_inference::InferenceEngine,
3944) -> Result<car_inference::RecommendationSet, String> {
3945 let use_case = optional_enum_param::<car_inference::UseCase>(req, "use_case")?.unwrap_or_default();
3946 let tier = optional_enum_param::<car_inference::QualityTier>(req, "tier")?.unwrap_or_default();
3947 let privacy = if req
3948 .params
3949 .get("cloud_ok")
3950 .and_then(|v| v.as_bool())
3951 .unwrap_or(false)
3952 {
3953 car_inference::Privacy::CloudOk
3954 } else {
3955 car_inference::Privacy::OnDevice
3956 };
3957 let hw = car_inference::HardwareInfo::detect();
3958 let schemas = engine.list_schemas();
3959 let refs: Vec<&car_inference::ModelSchema> = schemas.iter().collect();
3960 Ok(car_inference::recommend(&refs, &hw, use_case, tier, privacy))
3961}
3962
3963fn handle_models_recommend(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3965 let engine = get_inference_engine(state);
3966 let set = recommend_from_params(req, engine)?;
3967 serde_json::to_value(set).map_err(|e| e.to_string())
3968}
3969
3970fn handle_models_setup_plan(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3973 let engine = get_inference_engine(state);
3974 let set = recommend_from_params(req, engine)?;
3975 let hw = car_inference::HardwareInfo::detect();
3976 let mut picks = set.picks.into_iter();
3977 let recommended = picks.next();
3978 let alternatives: Vec<_> = picks.collect();
3979 serde_json::to_value(serde_json::json!({
3980 "machine": describe_machine_for_plan(&hw),
3981 "recommended": recommended,
3982 "alternatives": alternatives,
3983 "needs_more_memory": set.not_enough_memory,
3984 "note": set.note,
3985 }))
3986 .map_err(|e| e.to_string())
3987}
3988
3989fn describe_machine_for_plan(hw: &car_inference::HardwareInfo) -> String {
3991 use car_inference::hardware::SupportedAcceleration::*;
3992 match hw.supported_acceleration() {
3993 Apple { unified_memory_mb } => format!(
3994 "Apple Silicon, {} GB unified memory (Metal)",
3995 unified_memory_mb / 1024
3996 ),
3997 Cuda { device_memory_mb } => match device_memory_mb {
3998 Some(mb) => format!("NVIDIA GPU, {} GB VRAM (CUDA)", mb / 1024),
3999 None => "NVIDIA GPU (CUDA)".to_string(),
4000 },
4001 UnsupportedDiscreteGpu { name, .. } => format!(
4002 "{} GB RAM, CPU inference ({name} not yet supported)",
4003 hw.total_ram_mb / 1024
4004 ),
4005 Cpu => format!("{} GB RAM, CPU inference", hw.total_ram_mb / 1024),
4006 }
4007}
4008
4009fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
4010 let engine = get_inference_engine(state);
4011 serde_json::to_value(serde_json::json!({
4012 "upgrades": engine.available_model_upgrades()
4013 }))
4014 .map_err(|e| e.to_string())
4015}
4016
4017async fn handle_models_detect_upgrades(state: &ServerState) -> Result<Value, String> {
4020 let engine = get_inference_engine(state);
4021 let findings = engine.detect_upgrades().await;
4022 serde_json::to_value(serde_json::json!({ "upgrades": findings })).map_err(|e| e.to_string())
4023}
4024
4025async fn handle_models_check_upgrade_nudge(
4029 req: &JsonRpcMessage,
4030 state: &ServerState,
4031) -> Result<Value, String> {
4032 let engine = get_inference_engine(state);
4033 let inference_active = req
4034 .params
4035 .get("inference_active")
4036 .and_then(|v| v.as_bool())
4037 .unwrap_or(false);
4038 let (decision, _state) = engine.check_upgrade_nudge(inference_active).await;
4039 serde_json::to_value(decision).map_err(|e| e.to_string())
4040}
4041
4042fn handle_models_dismiss_upgrade(
4044 req: &JsonRpcMessage,
4045 state: &ServerState,
4046) -> Result<Value, String> {
4047 let key = req
4048 .params
4049 .get("dismiss_key")
4050 .and_then(|v| v.as_str())
4051 .map(str::trim)
4052 .filter(|s| !s.is_empty())
4053 .ok_or("missing or empty 'dismiss_key' parameter")?;
4054 let engine = get_inference_engine(state);
4055 engine.dismiss_upgrade_nudge(key).map_err(|e| e.to_string())?;
4056 Ok(serde_json::json!({ "dismissed": key }))
4057}
4058
4059async fn handle_models_check_concierge(
4063 req: &JsonRpcMessage,
4064 state: &ServerState,
4065) -> Result<Value, String> {
4066 let engine = get_inference_engine(state);
4067 let inference_active = req
4068 .params
4069 .get("inference_active")
4070 .and_then(|v| v.as_bool())
4071 .unwrap_or(false);
4072 let (suggestions, _state) = engine.check_concierge(inference_active).await;
4073 serde_json::to_value(serde_json::json!({ "suggestions": suggestions }))
4074 .map_err(|e| e.to_string())
4075}
4076
4077fn handle_models_dismiss_suggestion(
4080 req: &JsonRpcMessage,
4081 state: &ServerState,
4082) -> Result<Value, String> {
4083 let key = req
4084 .params
4085 .get("dismiss_key")
4086 .and_then(|v| v.as_str())
4087 .map(str::trim)
4088 .filter(|s| !s.is_empty())
4089 .ok_or("missing or empty 'dismiss_key' parameter")?;
4090 let engine = get_inference_engine(state);
4091 engine
4092 .dismiss_concierge_suggestion(key)
4093 .map_err(|e| e.to_string())?;
4094 Ok(serde_json::json!({ "dismissed": key }))
4095}
4096
4097fn handle_models_update_prefs_get(state: &ServerState) -> Result<Value, String> {
4099 let engine = get_inference_engine(state);
4100 serde_json::to_value(engine.update_prefs()).map_err(|e| e.to_string())
4101}
4102
4103fn handle_models_update_prefs_set(
4106 req: &JsonRpcMessage,
4107 state: &ServerState,
4108) -> Result<Value, String> {
4109 let prefs: car_inference::UpdatePreferences =
4110 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid preferences: {e}"))?;
4111 let engine = get_inference_engine(state);
4112 engine.set_update_prefs(&prefs).map_err(|e| e.to_string())?;
4113 serde_json::to_value(prefs).map_err(|e| e.to_string())
4114}
4115
4116pub async fn run_upgrade_nudge_check(state: &Arc<ServerState>) {
4122 let engine = get_inference_engine(state);
4123 let inference_active = state.admission.in_flight() > 0;
4126 let (decision, mut nstate) = engine.check_upgrade_nudge(inference_active).await;
4127 if let Some(nudge) = decision.nudge {
4128 let delivered = broadcast_upgrade_nudge(state, &nudge).await;
4129 if delivered > 0 {
4133 let now = std::time::SystemTime::now()
4134 .duration_since(std::time::UNIX_EPOCH)
4135 .map(|d| d.as_secs())
4136 .unwrap_or(0);
4137 nstate.last_nudge_secs = now;
4138 let _ = nstate.save_to(&car_inference::NudgeState::default_path());
4139 }
4140 }
4141}
4142
4143pub async fn broadcast_upgrade_nudge(
4149 state: &Arc<ServerState>,
4150 nudge: &car_inference::UpgradeNudge,
4151) -> usize {
4152 use futures::SinkExt;
4153 use tokio_tungstenite::tungstenite::Message;
4154 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
4155 .a2ui_subscribers
4156 .lock()
4157 .await
4158 .values()
4159 .cloned()
4160 .collect();
4161 if subscribers.is_empty() {
4162 return 0;
4163 }
4164 let Ok(json) = serde_json::to_string(&serde_json::json!({
4165 "jsonrpc": "2.0",
4166 "method": "models.upgrade_available",
4167 "params": nudge,
4168 })) else {
4169 return 0;
4170 };
4171 let mut delivered = 0;
4172 for channel in subscribers {
4173 if channel
4174 .write
4175 .lock()
4176 .await
4177 .send(Message::Text(json.clone().into()))
4178 .await
4179 .is_ok()
4180 {
4181 delivered += 1;
4182 }
4183 }
4184 delivered
4185}
4186
4187pub async fn run_concierge_check(state: &Arc<ServerState>) {
4193 let engine = get_inference_engine(state);
4194 let inference_active = state.admission.in_flight() > 0;
4195 let (suggestions, mut nstate) = engine.check_concierge(inference_active).await;
4196 if suggestions.is_empty() {
4197 return;
4198 }
4199 let mut any_delivered = false;
4200 for suggestion in &suggestions {
4201 if broadcast_concierge_suggestion(state, suggestion).await > 0 {
4202 any_delivered = true;
4203 }
4204 }
4205 if any_delivered {
4209 let now = std::time::SystemTime::now()
4210 .duration_since(std::time::UNIX_EPOCH)
4211 .map(|d| d.as_secs())
4212 .unwrap_or(0);
4213 nstate.last_concierge_secs = now;
4214 let _ = nstate.save_to(&car_inference::NudgeState::default_path());
4215 }
4216}
4217
4218pub async fn broadcast_concierge_suggestion(
4222 state: &Arc<ServerState>,
4223 suggestion: &car_inference::ConciergeSuggestion,
4224) -> usize {
4225 use futures::SinkExt;
4226 use tokio_tungstenite::tungstenite::Message;
4227 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
4228 .a2ui_subscribers
4229 .lock()
4230 .await
4231 .values()
4232 .cloned()
4233 .collect();
4234 if subscribers.is_empty() {
4235 return 0;
4236 }
4237 let Ok(json) = serde_json::to_string(&serde_json::json!({
4238 "jsonrpc": "2.0",
4239 "method": "models.suggestion_available",
4240 "params": suggestion,
4241 })) else {
4242 return 0;
4243 };
4244 let mut delivered = 0;
4245 for channel in subscribers {
4246 if channel
4247 .write
4248 .lock()
4249 .await
4250 .send(Message::Text(json.clone().into()))
4251 .await
4252 .is_ok()
4253 {
4254 delivered += 1;
4255 }
4256 }
4257 delivered
4258}
4259
4260struct PullProgressSink {
4264 tx: tokio::sync::mpsc::UnboundedSender<car_inference::DownloadEvent>,
4265}
4266
4267impl car_inference::DownloadProgress for PullProgressSink {
4268 fn on_event(&self, event: &car_inference::DownloadEvent) {
4269 let _ = self.tx.send(event.clone());
4271 }
4272}
4273
4274async fn handle_models_pull(msg: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4275 let name = msg
4276 .params
4277 .get("name")
4278 .or_else(|| msg.params.get("id"))
4279 .or_else(|| msg.params.get("model"))
4280 .and_then(|v| v.as_str())
4281 .ok_or("missing 'name' parameter")?
4282 .to_string();
4283 let engine = get_inference_engine(state);
4284
4285 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<car_inference::DownloadEvent>();
4291 let sink = car_inference::ProgressSink::new(Arc::new(PullProgressSink { tx }));
4292 let broadcaster_state = state.clone();
4293 let model_label = name.clone();
4294 let broadcaster = tokio::spawn(async move {
4295 while let Some(event) = rx.recv().await {
4296 broadcast_pull_progress(&broadcaster_state, &model_label, &event).await;
4297 }
4298 });
4299
4300 let result = engine.pull_model_with_progress(&name, &sink).await;
4301 drop(sink);
4303 if let Err(e) = broadcaster.await {
4305 tracing::warn!(error = %e, "pull-progress broadcaster task failed");
4306 }
4307
4308 let path = result.map_err(|e| e.to_string())?;
4309 Ok(serde_json::json!({"path": path.display().to_string()}))
4310}
4311
4312async fn broadcast_pull_progress(
4315 state: &Arc<ServerState>,
4316 model: &str,
4317 event: &car_inference::DownloadEvent,
4318) {
4319 use futures::SinkExt;
4320 use tokio_tungstenite::tungstenite::Message;
4321 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
4322 .a2ui_subscribers
4323 .lock()
4324 .await
4325 .values()
4326 .cloned()
4327 .collect();
4328 if subscribers.is_empty() {
4329 return;
4330 }
4331 let Ok(json) = serde_json::to_string(&serde_json::json!({
4332 "jsonrpc": "2.0",
4333 "method": "models.pull_progress",
4334 "params": { "model": model, "event": event },
4335 })) else {
4336 return;
4337 };
4338 for channel in subscribers {
4339 let _ = channel
4340 .write
4341 .lock()
4342 .await
4343 .send(Message::Text(json.clone().into()))
4344 .await;
4345 }
4346}
4347
4348async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4349 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
4350 msg.params
4351 .get("events")
4352 .cloned()
4353 .unwrap_or(msg.params.clone()),
4354 )
4355 .map_err(|e| format!("invalid events: {}", e))?;
4356
4357 let inference = get_inference_engine(state).clone();
4358 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
4359
4360 let skills = engine.distill_skills(&events).await;
4361 serde_json::to_value(&skills).map_err(|e| e.to_string())
4362}
4363
4364async fn handle_memory_consolidate(
4368 session: &crate::session::ClientSession,
4369) -> Result<Value, String> {
4370 let engine_arc = session.effective_memgine().await;
4371 let report = {
4372 let mut engine = engine_arc.lock().await;
4373 engine.consolidate().await
4374 };
4375 if let Some(id) = session.agent_id.lock().await.clone() {
4376 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
4377 tracing::warn!(agent_id = %id, error = %e,
4378 "agent memgine persist after consolidate failed");
4379 }
4380 }
4381 serde_json::to_value(&report).map_err(|e| e.to_string())
4382}
4383
4384async fn handle_skill_repair(
4388 msg: &JsonRpcMessage,
4389 session: &crate::session::ClientSession,
4390) -> Result<Value, String> {
4391 let name = msg
4392 .params
4393 .get("skill_name")
4394 .and_then(|v| v.as_str())
4395 .ok_or("missing 'skill_name' parameter")?;
4396 let mut engine = session.memgine.lock().await;
4397 let code = engine.repair_skill(name).await;
4398 Ok(match code {
4399 Some(c) => serde_json::json!({ "code": c }),
4400 None => Value::Null,
4401 })
4402}
4403
4404async fn handle_skills_ingest_distilled(
4407 msg: &JsonRpcMessage,
4408 session: &crate::session::ClientSession,
4409) -> Result<Value, String> {
4410 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
4411 msg.params
4412 .get("skills")
4413 .cloned()
4414 .unwrap_or(msg.params.clone()),
4415 )
4416 .map_err(|e| format!("invalid skills: {}", e))?;
4417 let mut engine = session.memgine.lock().await;
4418 let nodes = engine.ingest_distilled_skills(&skills);
4419 Ok(serde_json::json!({ "ingested": nodes.len() }))
4420}
4421
4422async fn handle_skills_evolve(
4425 msg: &JsonRpcMessage,
4426 session: &crate::session::ClientSession,
4427) -> Result<Value, String> {
4428 let domain = msg
4429 .params
4430 .get("domain")
4431 .and_then(|v| v.as_str())
4432 .ok_or("missing 'domain' parameter")?
4433 .to_string();
4434 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
4435 msg.params
4436 .get("events")
4437 .cloned()
4438 .unwrap_or(Value::Array(vec![])),
4439 )
4440 .map_err(|e| format!("invalid events: {}", e))?;
4441 let mut engine = session.memgine.lock().await;
4442 let skills = engine.evolve_skills(&events, &domain).await;
4443 serde_json::to_value(&skills).map_err(|e| e.to_string())
4444}
4445
4446async fn handle_skills_domains_needing_evolution(
4448 msg: &JsonRpcMessage,
4449 session: &crate::session::ClientSession,
4450) -> Result<Value, String> {
4451 let threshold = msg
4452 .params
4453 .get("threshold")
4454 .and_then(|v| v.as_f64())
4455 .unwrap_or(0.6);
4456 let engine = session.memgine.lock().await;
4457 let domains = engine.domains_needing_evolution(threshold);
4458 serde_json::to_value(&domains).map_err(|e| e.to_string())
4459}
4460
4461async fn handle_skills_ingest_provisional(
4467 msg: &JsonRpcMessage,
4468 session: &crate::session::ClientSession,
4469) -> Result<Value, String> {
4470 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
4471 msg.params
4472 .get("skills")
4473 .cloned()
4474 .unwrap_or(msg.params.clone()),
4475 )
4476 .map_err(|e| format!("invalid skills: {}", e))?;
4477 let tenant = msg.params.get("tenant").and_then(|v| v.as_str());
4478 let mut engine = session.memgine.lock().await;
4479 let ingested = engine.ingest_provisional_candidates(&skills, tenant);
4480 Ok(serde_json::json!({ "ingested": ingested }))
4481}
4482
4483async fn handle_skills_gate(
4488 _msg: &JsonRpcMessage,
4489 session: &crate::session::ClientSession,
4490) -> Result<Value, String> {
4491 let mut engine = session.memgine.lock().await;
4492 let (promoted, rejected) = engine.gate_skill_candidates();
4493 Ok(serde_json::json!({ "promoted": promoted, "rejected": rejected }))
4494}
4495
4496async fn handle_skill_meta(
4500 msg: &JsonRpcMessage,
4501 session: &crate::session::ClientSession,
4502) -> Result<Value, String> {
4503 let key = msg
4504 .params
4505 .get("key")
4506 .and_then(|v| v.as_str())
4507 .ok_or("missing 'key' parameter")?;
4508 let engine = session.memgine.lock().await;
4509 match engine.skill_meta(key) {
4510 Some(meta) => serde_json::to_value(&meta).map_err(|e| e.to_string()),
4511 None => Ok(Value::Null),
4512 }
4513}
4514
4515async fn handle_skill_export(
4519 msg: &JsonRpcMessage,
4520 session: &crate::session::ClientSession,
4521) -> Result<Value, String> {
4522 let key = msg
4523 .params
4524 .get("key")
4525 .and_then(|v| v.as_str())
4526 .ok_or("missing 'key' parameter")?;
4527 let engine = session.memgine.lock().await;
4528 Ok(engine.export_skill(key).map(Value::String).unwrap_or(Value::Null))
4529}
4530
4531async fn handle_skill_import(
4535 msg: &JsonRpcMessage,
4536 session: &crate::session::ClientSession,
4537) -> Result<Value, String> {
4538 let md = msg
4539 .params
4540 .get("markdown")
4541 .and_then(|v| v.as_str())
4542 .ok_or("missing 'markdown' parameter")?;
4543 let mut engine = session.memgine.lock().await;
4544 engine.import_skill_markdown(md)?;
4545 Ok(serde_json::json!({ "imported": true }))
4546}
4547
4548async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4550 let engine = get_inference_engine(state);
4551 let req: car_inference::RerankRequest =
4552 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
4553 let _permit = state.admission.acquire().await;
4554 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
4555 serde_json::to_value(&result).map_err(|e| e.to_string())
4556}
4557
4558async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4564 use base64::Engine as _;
4565 let engine = get_inference_engine(state);
4566
4567 let mut params = msg.params.clone();
4574 let audio_b64 = params
4575 .as_object_mut()
4576 .and_then(|m| m.remove("audio_b64"))
4577 .and_then(|v| v.as_str().map(str::to_string));
4578 let _tmp_audio = if let Some(b64) = audio_b64 {
4579 let bytes = base64::engine::general_purpose::STANDARD
4580 .decode(b64.as_bytes())
4581 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
4582 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
4583 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
4584 let path = tmp.path().to_string_lossy().into_owned();
4585 if let Some(obj) = params.as_object_mut() {
4586 obj.insert("audio_path".to_string(), Value::String(path));
4587 }
4588 Some(tmp)
4589 } else {
4590 None
4591 };
4592
4593 let req: car_inference::TranscribeRequest =
4594 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
4595 let _permit = state.admission.acquire().await;
4596 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
4597 serde_json::to_value(&result).map_err(|e| e.to_string())
4598}
4599
4600async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4606 use base64::Engine as _;
4607 let engine = get_inference_engine(state);
4608
4609 let mut params = msg.params.clone();
4610 let return_b64 = params
4611 .as_object_mut()
4612 .and_then(|m| m.remove("return_b64"))
4613 .and_then(|v| v.as_bool())
4614 .unwrap_or(false);
4615 let no_output_path = params
4616 .as_object()
4617 .map(|m| !m.contains_key("output_path"))
4618 .unwrap_or(true);
4619
4620 let req: car_inference::SynthesizeRequest =
4621 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
4622 let _permit = state.admission.acquire().await;
4623 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
4624 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4625
4626 if return_b64 || no_output_path {
4630 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
4631 format!(
4632 "synthesize: failed to read rendered audio at {}: {e}",
4633 result.audio_path
4634 )
4635 })?;
4636 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
4637 if let Some(obj) = value.as_object_mut() {
4638 obj.insert("audio_b64".to_string(), Value::String(encoded));
4639 }
4640 }
4641 Ok(value)
4642}
4643
4644async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
4648 let engine = get_inference_engine(state);
4649 let status = engine
4650 .prepare_speech_runtime()
4651 .await
4652 .map_err(|e| e.to_string())?;
4653 serde_json::to_value(&status).map_err(|e| e.to_string())
4654}
4655
4656async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4659 let prompt = msg
4660 .params
4661 .get("prompt")
4662 .and_then(|v| v.as_str())
4663 .ok_or("missing 'prompt' parameter")?;
4664 let engine = get_inference_engine(state);
4665 let decision = engine.route_adaptive(prompt).await;
4666 serde_json::to_value(&decision).map_err(|e| e.to_string())
4667}
4668
4669async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
4671 let engine = get_inference_engine(state);
4672 let profiles = engine.export_profiles().await;
4673 serde_json::to_value(&profiles).map_err(|e| e.to_string())
4674}
4675
4676#[derive(Deserialize)]
4677#[serde(rename_all = "camelCase")]
4678struct OutcomesResolvePendingParams {
4679 action_results: Vec<(String, bool, f64, String)>,
4684}
4685
4686async fn handle_outcomes_resolve_pending(
4706 req: &JsonRpcMessage,
4707 state: &ServerState,
4708) -> Result<Value, String> {
4709 let params: OutcomesResolvePendingParams =
4710 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
4711 let engine = get_inference_engine(state);
4712 let mut tracker = engine.outcome_tracker.write().await;
4713 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
4714 tracker.resolve_pending_from_signals(inferred);
4715 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
4716}
4717
4718async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
4720 let n = session.runtime.log.lock().await.len();
4721 Ok(Value::from(n as u64))
4722}
4723
4724async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
4725 let stats = session.runtime.log.lock().await.stats();
4726 serde_json::to_value(stats).map_err(|e| e.to_string())
4727}
4728
4729#[derive(Deserialize)]
4730#[serde(rename_all = "camelCase")]
4731struct EventsTruncateParams {
4732 #[serde(default)]
4733 max_events: Option<usize>,
4734 #[serde(default)]
4735 max_spans: Option<usize>,
4736}
4737
4738async fn handle_events_truncate(
4739 msg: &JsonRpcMessage,
4740 session: &crate::session::ClientSession,
4741) -> Result<Value, String> {
4742 let params: EventsTruncateParams =
4743 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4744 max_events: None,
4745 max_spans: None,
4746 });
4747 let mut log = session.runtime.log.lock().await;
4748 let removed_events = params
4749 .max_events
4750 .map(|max| log.truncate_events_keep_last(max))
4751 .unwrap_or(0);
4752 let removed_spans = params
4753 .max_spans
4754 .map(|max| log.truncate_spans_keep_last(max))
4755 .unwrap_or(0);
4756 let stats = log.stats();
4757 Ok(serde_json::json!({
4758 "removedEvents": removed_events,
4759 "removedSpans": removed_spans,
4760 "stats": stats,
4761 }))
4762}
4763
4764async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4765 let mut log = session.runtime.log.lock().await;
4766 let removed = log.clear();
4767 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4768}
4769
4770async fn resolve_run_agent_id(
4791 session: &crate::session::ClientSession,
4792 req: &car_proto::RunStartRequest,
4793) -> Result<String, String> {
4794 if let Some(bound) = session.agent_id.lock().await.clone() {
4802 if let Some(id) = req.agent_id.as_deref() {
4803 let id = id.trim();
4804 if !id.is_empty() && id != bound {
4805 return Err(format!(
4806 "runs.start `agent_id` (`{id}`) does not match this session's bound \
4807 agent: a bound connection can only record runs under its own agent"
4808 ));
4809 }
4810 }
4811 return Ok(bound);
4812 }
4813 if let Some(id) = req.agent_id.as_deref() {
4817 let id = id.trim();
4818 if !id.is_empty() {
4819 return Ok(id.to_string());
4820 }
4821 }
4822 if let Ok(env_id) = std::env::var("CAR_AGENT_ID") {
4824 let env_id = env_id.trim().to_string();
4825 if !env_id.is_empty() {
4826 return Ok(env_id);
4827 }
4828 }
4829 if let Some(name) = req.agent_name.as_deref() {
4832 if let Some(synth) = synthesize_agent_id(name) {
4833 return Ok(synth);
4834 }
4835 }
4836 Err(
4837 "runs.start could not resolve an agent_id: no `agent_id` param, no bound \
4838 session.auth {agent_id}, no CAR_AGENT_ID env, and no usable `agent_name` \
4839 to synthesize one from"
4840 .to_string(),
4841 )
4842}
4843
4844fn synthesize_agent_id(name: &str) -> Option<String> {
4852 let mut slug = String::new();
4853 let mut prev_dash = false;
4854 for ch in name.chars() {
4855 if ch.is_ascii_alphanumeric() {
4856 slug.push(ch.to_ascii_lowercase());
4857 prev_dash = false;
4858 } else if !prev_dash {
4859 slug.push('-');
4860 prev_dash = true;
4861 }
4862 }
4863 let slug = slug.trim_matches('-');
4864 if slug.is_empty() {
4865 return None;
4866 }
4867 Some(format!("name:{slug}"))
4868}
4869
4870async fn handle_runs_start(
4871 req: &JsonRpcMessage,
4872 session: &crate::session::ClientSession,
4873 state: &Arc<ServerState>,
4874) -> Result<Value, String> {
4875 let params: car_proto::RunStartRequest = serde_json::from_value(req.params.clone())
4876 .map_err(|e| format!("runs.start requires {{ intent, agent_id?, agent_name?, outcome_description? }}: {e}"))?;
4877 if params.intent.trim().is_empty() {
4878 return Err("runs.start requires a non-empty `intent`".to_string());
4879 }
4880
4881 let agent_id = resolve_run_agent_id(session, ¶ms).await?;
4882 let run_id = uuid::Uuid::new_v4().to_string();
4883 let started_at = chrono::Utc::now();
4884
4885 *session.current_run_id.lock().await = Some(run_id.clone());
4889
4890 state
4891 .start_run(crate::session::RunMeta {
4892 run_id: run_id.clone(),
4893 agent_id: agent_id.clone(),
4894 client_id: session.client_id.clone(),
4895 intent: params.intent.clone(),
4896 outcome_description: params.outcome_description.clone(),
4897 started_at,
4898 termination: None,
4899 ended_at: None,
4900 turns: Vec::new(),
4901 })
4902 .await;
4903
4904 serde_json::to_value(car_proto::RunStartResponse { run_id, agent_id })
4905 .map_err(|e| e.to_string())
4906}
4907
4908async fn handle_runs_complete(
4909 req: &JsonRpcMessage,
4910 session: &crate::session::ClientSession,
4911 state: &Arc<ServerState>,
4912) -> Result<Value, String> {
4913 let params: car_proto::RunCompleteRequest = serde_json::from_value(req.params.clone())
4914 .map_err(|e| format!("runs.complete requires {{ run_id, outcome }}: {e}"))?;
4915
4916 let termination = car_proto::RunTermination::Outcome {
4917 status: params.outcome.status,
4918 outcome: params.outcome.clone(),
4919 };
4920 state.complete_run(¶ms.run_id, termination).await?;
4921
4922 {
4926 let mut cur = session.current_run_id.lock().await;
4927 if cur.as_deref() == Some(params.run_id.as_str()) {
4928 *cur = None;
4929 }
4930 }
4931
4932 serde_json::to_value(car_proto::RunCompleteResponse {
4933 run_id: params.run_id,
4934 ok: true,
4935 })
4936 .map_err(|e| e.to_string())
4937}
4938
4939const RECORD_TURN_FIELD_CAP_BYTES: usize = 16 * 1024;
4950
4951const RECORD_TURN_TRUNC_MARKER: &str = "…[truncated]";
4954
4955const RECORD_TURN_MAX_BYTES: usize = 256 * 1024;
4964
4965const RECORD_TURN_OVERSIZE_MARKER: &str = "…[truncated: turn exceeded 256 KiB]";
4969
4970const RECORD_TURNS_MAX_BATCH: usize = 256;
4974
4975const RECORD_TURNS_RUN_CEILING: usize = crate::session::RECORD_TURNS_RUN_CEILING;
4990
4991fn truncate_turn_string(s: &mut String) {
4995 if s.len() > RECORD_TURN_FIELD_CAP_BYTES {
4996 let mut end = RECORD_TURN_FIELD_CAP_BYTES;
4998 while end > 0 && !s.is_char_boundary(end) {
4999 end -= 1;
5000 }
5001 s.truncate(end);
5002 s.push_str(RECORD_TURN_TRUNC_MARKER);
5003 }
5004}
5005
5006fn truncate_turn_value(value: &mut Value) {
5012 match value {
5013 Value::String(s) => truncate_turn_string(s),
5014 Value::Array(items) => {
5015 for item in items.iter_mut() {
5016 truncate_turn_value(item);
5017 }
5018 }
5019 Value::Object(map) => {
5020 for (_k, v) in map.iter_mut() {
5021 truncate_turn_value(v);
5022 }
5023 }
5024 _ => {}
5025 }
5026}
5027
5028const RECORD_TURN_REINDEX_HEADROOM: usize = 16;
5038
5039const RECORD_TURN_MEASURE_CAP: usize = RECORD_TURN_MAX_BYTES - RECORD_TURN_REINDEX_HEADROOM;
5043
5044fn turn_encoded_len(turn: &car_proto::RunTurn) -> usize {
5048 serde_json::to_vec(turn).map_or(usize::MAX, |v| v.len())
5049}
5050
5051fn enforce_turn_byte_cap(turn: &mut car_proto::RunTurn) -> bool {
5072 let car_proto::RunTurn {
5077 index: _, prompt: _, tool: _, parameters: _, output: _, cli_outcome: _, verifier_verdict: _, policy_rejected: _, } = turn;
5086
5087 if turn_encoded_len(turn) <= RECORD_TURN_MEASURE_CAP {
5088 return true;
5089 }
5090 if !turn.parameters.is_null() {
5092 turn.parameters = Value::String(RECORD_TURN_OVERSIZE_MARKER.to_string());
5093 }
5094 if turn.output.is_some() {
5095 turn.output = Some(Value::String(RECORD_TURN_OVERSIZE_MARKER.to_string()));
5096 }
5097 if turn.prompt.is_some() {
5098 turn.prompt = Some(RECORD_TURN_OVERSIZE_MARKER.to_string());
5099 }
5100 if turn_encoded_len(turn) <= RECORD_TURN_MEASURE_CAP {
5101 return true;
5102 }
5103 if let Some(tool) = turn.tool.as_mut() {
5106 truncate_turn_string(tool);
5107 }
5108 if let Some(pr) = turn.policy_rejected.as_mut() {
5109 truncate_turn_string(&mut pr.rule);
5110 if let Some(param) = pr.param.as_mut() {
5111 truncate_turn_string(param);
5112 }
5113 }
5114 turn_encoded_len(turn) <= RECORD_TURN_MEASURE_CAP
5121}
5122
5123fn record_turns_dropped(run_id: &str, reason: &str) -> Result<Value, String> {
5127 serde_json::to_value(car_proto::RunRecordTurnsResponse {
5128 run_id: run_id.to_string(),
5129 base_index: 0,
5130 count: 0,
5131 ok: false,
5132 dropped: Some(reason.to_string()),
5133 })
5134 .map_err(|e| e.to_string())
5135}
5136
5137async fn handle_runs_record_turns(
5166 req: &JsonRpcMessage,
5167 session: &crate::session::ClientSession,
5168 state: &Arc<ServerState>,
5169) -> Result<Value, String> {
5170 let raw_run_id = req
5177 .params
5178 .get("run_id")
5179 .and_then(Value::as_str)
5180 .map(str::to_string);
5181 let run_id = match raw_run_id {
5182 Some(id) if !id.trim().is_empty() => id,
5183 _ => return Err("runs.record_turns requires { run_id, turns: [RunTurn] }".to_string()),
5184 };
5185
5186 let raw_turns = match req.params.get("turns").and_then(Value::as_array) {
5187 Some(arr) => arr,
5188 None => {
5189 return Err(
5190 "runs.record_turns requires { run_id, turns: [RunTurn] }: `turns` must be an array"
5191 .to_string(),
5192 )
5193 }
5194 };
5195 if raw_turns.is_empty() {
5196 return Err(
5197 "runs.record_turns requires a non-empty `turns` array".to_string(),
5198 );
5199 }
5200 if raw_turns.len() > RECORD_TURNS_MAX_BATCH {
5201 return Err(format!(
5202 "runs.record_turns batch too large: {} turns exceeds the per-call cap of {}",
5203 raw_turns.len(),
5204 RECORD_TURNS_MAX_BATCH
5205 ));
5206 }
5207
5208 let mut turns: Vec<car_proto::RunTurn> = Vec::with_capacity(raw_turns.len());
5213 for (i, raw) in raw_turns.iter().enumerate() {
5214 let mut obj = raw.clone();
5215 match obj.as_object_mut() {
5216 Some(map) => {
5217 map.insert("index".to_string(), Value::Number(0.into()));
5226 map.entry("verifier_verdict")
5227 .or_insert_with(|| Value::String("not_run".to_string()));
5228 if let Some(p) = map.get_mut("prompt") {
5231 truncate_turn_value(p);
5232 }
5233 if let Some(o) = map.get_mut("output") {
5234 truncate_turn_value(o);
5235 }
5236 if let Some(params) = map.get_mut("parameters") {
5237 truncate_turn_value(params);
5238 }
5239 }
5240 None => {
5241 return Err(format!(
5242 "runs.record_turns requires {{ run_id, turns: [RunTurn] }}: \
5243 turn {i} is not an object"
5244 ));
5245 }
5246 }
5247 let mut turn: car_proto::RunTurn = serde_json::from_value(obj).map_err(|e| {
5248 format!("runs.record_turns requires {{ run_id, turns: [RunTurn] }}: turn {i}: {e}")
5249 })?;
5250 if !enforce_turn_byte_cap(&mut turn) {
5257 tracing::warn!(
5258 run_id = %run_id,
5259 turn = i,
5260 "runs.record_turns: turn could not be bounded under the per-turn byte cap; dropping batch"
5261 );
5262 return record_turns_dropped(&run_id, "turn_too_large");
5263 }
5264 turns.push(turn);
5265 }
5266
5267 let header = state.run_header(&run_id).await;
5277 let owning_agent = match &header {
5278 Some((agent_id, _terminal, _len)) => agent_id.clone(),
5279 None => match state.run_store.agent_for_run(&run_id) {
5280 Some(a) => a,
5281 None => return record_turns_dropped(&run_id, "run_not_found"),
5284 },
5285 };
5286
5287 let bound = session.agent_id.lock().await.clone();
5293 if bound.as_deref() != Some(owning_agent.as_str()) {
5294 tracing::debug!(
5295 run_id = %run_id,
5296 owning_agent = %owning_agent,
5297 client_id = %session.client_id,
5298 "runs.record_turns denied (not the owning agent); returning uniform not-found"
5299 );
5300 return record_turns_dropped(&run_id, "run_not_found");
5301 }
5302
5303 if let Some((_agent_id, terminal, len)) = &header {
5311 if *terminal {
5312 return record_turns_dropped(&run_id, "run_terminal");
5313 }
5314 if *len + turns.len() > RECORD_TURNS_RUN_CEILING {
5322 return record_turns_dropped(&run_id, "run_turn_limit");
5323 }
5324 }
5325
5326 let count = turns.len();
5333 let records: Vec<car_proto::RunRecord> =
5334 turns.into_iter().map(car_proto::RunRecord::Turn).collect();
5335 let new_total = match state.record_run_turns(&run_id, records).await {
5336 crate::session::RecordRunTurnsOutcome::Appended { new_total } => new_total,
5337 crate::session::RecordRunTurnsOutcome::RefusedCeiling => {
5338 return record_turns_dropped(&run_id, "run_turn_limit");
5342 }
5343 crate::session::RecordRunTurnsOutcome::UnknownOrTerminal => {
5344 return record_turns_dropped(&run_id, "run_terminal");
5349 }
5350 };
5351
5352 serde_json::to_value(car_proto::RunRecordTurnsResponse {
5353 run_id,
5354 base_index: new_total - count,
5355 count,
5356 ok: true,
5357 dropped: None,
5358 })
5359 .map_err(|e| e.to_string())
5360}
5361
5362async fn authorize_run_access(
5398 session: &crate::session::ClientSession,
5399 state: &Arc<ServerState>,
5400 owning_agent_id: &str,
5401) -> Result<(), String> {
5402 if let Some(bound) = session.agent_id.lock().await.clone() {
5404 if bound == owning_agent_id {
5405 return Ok(());
5406 }
5407 }
5408 if session.is_host.load(std::sync::atomic::Ordering::Acquire) {
5416 return Ok(());
5417 }
5418 tracing::debug!(
5423 owning_agent = %owning_agent_id,
5424 client_id = %session.client_id,
5425 "run access denied: connection neither owns the agent nor is the host client"
5426 );
5427 Err("not authorized to access this run: this connection neither owns the \
5428 owning agent (via session.auth) nor is the host management client"
5429 .to_string())
5430}
5431
5432async fn handle_runs_subscribe(
5447 req: &JsonRpcMessage,
5448 session: &crate::session::ClientSession,
5449 state: &Arc<ServerState>,
5450) -> Result<Value, String> {
5451 let params: car_proto::RunSubscribeRequest = serde_json::from_value(req.params.clone())
5452 .map_err(|e| format!("runs.subscribe requires {{ run_id }}: {e}"))?;
5453
5454 let not_found = || {
5457 serde_json::to_value(serde_json::json!({
5458 "run_id": params.run_id,
5459 "not_found": true,
5460 }))
5461 .map_err(|e| e.to_string())
5462 };
5463
5464 let owning_agent = match state.run_meta(¶ms.run_id).await {
5468 Some(meta) => meta.agent_id,
5469 None => match state.run_store.agent_for_run(¶ms.run_id) {
5470 Some(a) => a,
5471 None => return not_found(),
5472 },
5473 };
5474
5475 if authorize_run_access(session, state, &owning_agent)
5479 .await
5480 .is_err()
5481 {
5482 tracing::debug!(
5483 run_id = %params.run_id,
5484 owning_agent = %owning_agent,
5485 client_id = %session.client_id,
5486 "runs.subscribe denied (unauthorized); returning uniform not-found"
5487 );
5488 return not_found();
5489 }
5490
5491 let snapshot = match state
5492 .subscribe_run(¶ms.run_id, &session.client_id, session.channel.clone())
5493 .await
5494 {
5495 Some(s) => s,
5496 None => return not_found(),
5498 };
5499
5500 serde_json::to_value(snapshot).map_err(|e| e.to_string())
5501}
5502
5503async fn handle_runs_unsubscribe(
5508 req: &JsonRpcMessage,
5509 session: &crate::session::ClientSession,
5510 state: &Arc<ServerState>,
5511) -> Result<Value, String> {
5512 let params: car_proto::RunUnsubscribeRequest = serde_json::from_value(req.params.clone())
5513 .map_err(|e| format!("runs.unsubscribe requires {{ run_id }}: {e}"))?;
5514 let removed = state
5515 .unsubscribe_run(¶ms.run_id, &session.client_id)
5516 .await;
5517 serde_json::to_value(car_proto::RunUnsubscribeResponse {
5518 run_id: params.run_id,
5519 removed,
5520 })
5521 .map_err(|e| e.to_string())
5522}
5523
5524async fn handle_runs_list(
5535 req: &JsonRpcMessage,
5536 session: &crate::session::ClientSession,
5537 state: &Arc<ServerState>,
5538) -> Result<Value, String> {
5539 let params: car_proto::RunListRequest = serde_json::from_value(req.params.clone())
5540 .map_err(|e| format!("runs.list requires {{ agent_id }}: {e}"))?;
5541
5542 authorize_run_access(session, state, ¶ms.agent_id).await?;
5545
5546 let runs = state.run_store.list_runs(¶ms.agent_id);
5547 serde_json::to_value(serde_json::json!({
5548 "agent_id": params.agent_id,
5549 "runs": runs,
5550 }))
5551 .map_err(|e| e.to_string())
5552}
5553
5554async fn handle_runs_get_trace(
5570 req: &JsonRpcMessage,
5571 session: &crate::session::ClientSession,
5572 state: &Arc<ServerState>,
5573) -> Result<Value, String> {
5574 let params: car_proto::RunGetTraceRequest = serde_json::from_value(req.params.clone())
5575 .map_err(|e| format!("runs.get_trace requires {{ run_id, cursor? }}: {e}"))?;
5576
5577 let Some(owning_agent) = state.run_store.agent_for_run(¶ms.run_id) else {
5584 return serde_json::to_value(serde_json::json!({
5585 "run_id": params.run_id,
5586 "not_found": true,
5587 }))
5588 .map_err(|e| e.to_string());
5589 };
5590
5591 authorize_run_access(session, state, &owning_agent).await?;
5593
5594 let records = state
5597 .run_store
5598 .get_run_trace_for(&owning_agent, ¶ms.run_id)
5599 .unwrap_or_default();
5600
5601 let cursor = params.cursor.unwrap_or(0);
5606 let paged: Vec<car_proto::RunRecord> = if cursor == 0 {
5607 records
5608 } else {
5609 records.into_iter().skip(cursor).collect()
5610 };
5611
5612 serde_json::to_value(serde_json::json!({
5613 "run_id": params.run_id,
5614 "agent_id": owning_agent,
5615 "records": paged,
5616 "cursor": cursor,
5617 }))
5618 .map_err(|e| e.to_string())
5619}
5620
5621async fn handle_replan_set_config(
5626 msg: &JsonRpcMessage,
5627 session: &crate::session::ClientSession,
5628) -> Result<Value, String> {
5629 let max_replans = msg
5630 .params
5631 .get("max_replans")
5632 .and_then(|v| v.as_u64())
5633 .unwrap_or(0) as u32;
5634 let delay_ms = msg
5635 .params
5636 .get("delay_ms")
5637 .and_then(|v| v.as_u64())
5638 .unwrap_or(0);
5639 let verify_before_execute = msg
5640 .params
5641 .get("verify_before_execute")
5642 .and_then(|v| v.as_bool())
5643 .unwrap_or(true);
5644 let cfg = car_engine::ReplanConfig {
5645 max_replans,
5646 delay_ms,
5647 verify_before_execute,
5648 };
5649 session.runtime.set_replan_config(cfg).await;
5650 Ok(Value::Null)
5651}
5652
5653async fn handle_skills_list(
5654 msg: &JsonRpcMessage,
5655 session: &crate::session::ClientSession,
5656) -> Result<Value, String> {
5657 let domain = msg.params.get("domain").and_then(|v| v.as_str());
5658 let engine = session.memgine.lock().await;
5659 let skills: Vec<serde_json::Value> = engine
5660 .graph
5661 .inner
5662 .node_indices()
5663 .filter_map(|nix| {
5664 let node = engine.graph.inner.node_weight(nix)?;
5665 if node.kind != car_memgine::MemKind::Skill {
5666 return None;
5667 }
5668 let meta = car_memgine::SkillMeta::from_node(node)?;
5669 if let Some(d) = domain {
5670 match &meta.scope {
5671 car_memgine::SkillScope::Global => {}
5672 car_memgine::SkillScope::Domain(sd) if sd == d => {}
5673 _ => return None,
5674 }
5675 }
5676 Some(serde_json::to_value(&meta).unwrap_or_default())
5677 })
5678 .collect();
5679 serde_json::to_value(&skills).map_err(|e| e.to_string())
5680}
5681
5682#[derive(serde::Deserialize)]
5683struct SecretParams {
5684 #[serde(default)]
5685 service: Option<String>,
5686 key: String,
5687 #[serde(default)]
5688 value: Option<String>,
5689}
5690
5691fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
5692 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5693 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
5694 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
5695}
5696
5697fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
5698 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5699 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
5700}
5701
5702fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
5703 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5704 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
5705}
5706
5707fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
5708 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5709 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
5710}
5711
5712#[derive(serde::Deserialize)]
5713struct PermParams {
5714 domain: String,
5715 #[serde(default)]
5716 target_bundle_id: Option<String>,
5717}
5718
5719fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
5720 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5721 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
5722}
5723
5724fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
5725 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5726 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
5727}
5728
5729fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
5730 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5731 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
5732}
5733
5734fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
5735 #[derive(serde::Deserialize)]
5736 struct P {
5737 start: String,
5738 end: String,
5739 #[serde(default)]
5740 calendar_ids: Vec<String>,
5741 }
5742 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5743 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
5744 .map_err(|e| format!("parse start: {}", e))?
5745 .with_timezone(&chrono::Utc);
5746 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
5747 .map_err(|e| format!("parse end: {}", e))?
5748 .with_timezone(&chrono::Utc);
5749 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
5750}
5751
5752fn handle_calendar_create_event(req: &JsonRpcMessage) -> Result<Value, String> {
5753 let raw = req.params.to_string();
5754 car_ffi_common::integrations::calendar_create_event(&raw)
5755}
5756
5757fn handle_calendar_update_event(req: &JsonRpcMessage) -> Result<Value, String> {
5758 let raw = req.params.to_string();
5759 car_ffi_common::integrations::calendar_update_event(&raw)
5760}
5761
5762fn handle_calendar_delete_event(req: &JsonRpcMessage) -> Result<Value, String> {
5763 #[derive(serde::Deserialize)]
5764 struct P {
5765 event_id: String,
5766 }
5767 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5768 car_ffi_common::integrations::calendar_delete_event(&p.event_id)
5769}
5770
5771fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
5772 #[derive(serde::Deserialize)]
5773 struct P {
5774 query: String,
5775 #[serde(default = "default_limit")]
5776 limit: usize,
5777 #[serde(default)]
5778 container_ids: Vec<String>,
5779 }
5780 fn default_limit() -> usize {
5781 50
5782 }
5783 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5784 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
5785}
5786
5787fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
5788 #[derive(serde::Deserialize, Default)]
5789 struct P {
5790 #[serde(default)]
5791 account_ids: Vec<String>,
5792 }
5793 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
5794 car_ffi_common::integrations::mail_inbox(&p.account_ids)
5795}
5796
5797fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
5798 let raw = req.params.to_string();
5799 car_ffi_common::integrations::mail_send(&raw)
5800}
5801
5802fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
5803 #[derive(serde::Deserialize)]
5804 struct P {
5805 #[serde(default = "default_limit")]
5806 limit: usize,
5807 }
5808 fn default_limit() -> usize {
5809 50
5810 }
5811 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
5812 car_ffi_common::integrations::messages_chats(p.limit)
5813}
5814
5815fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
5816 let raw = req.params.to_string();
5817 car_ffi_common::integrations::messages_send(&raw)
5818}
5819
5820fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
5821 #[derive(serde::Deserialize)]
5822 struct P {
5823 query: String,
5824 #[serde(default = "default_limit")]
5825 limit: usize,
5826 }
5827 fn default_limit() -> usize {
5828 50
5829 }
5830 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5831 car_ffi_common::integrations::notes_find(&p.query, p.limit)
5832}
5833
5834fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
5835 #[derive(serde::Deserialize)]
5836 struct P {
5837 #[serde(default = "default_limit")]
5838 limit: usize,
5839 }
5840 fn default_limit() -> usize {
5841 50
5842 }
5843 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
5844 car_ffi_common::integrations::reminders_items(p.limit)
5845}
5846
5847fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
5848 #[derive(serde::Deserialize)]
5849 struct P {
5850 #[serde(default = "default_limit")]
5851 limit: usize,
5852 }
5853 fn default_limit() -> usize {
5854 100
5855 }
5856 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
5857 car_ffi_common::integrations::bookmarks_list(p.limit)
5858}
5859
5860fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
5861 #[derive(serde::Deserialize)]
5862 struct P {
5863 start: String,
5864 end: String,
5865 }
5866 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5867 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
5868 .map_err(|e| format!("parse start: {}", e))?
5869 .with_timezone(&chrono::Utc);
5870 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
5871 .map_err(|e| format!("parse end: {}", e))?
5872 .with_timezone(&chrono::Utc);
5873 car_ffi_common::health::sleep_windows(s, e)
5874}
5875
5876fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
5877 #[derive(serde::Deserialize)]
5878 struct P {
5879 start: String,
5880 end: String,
5881 }
5882 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5883 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
5884 .map_err(|e| format!("parse start: {}", e))?
5885 .with_timezone(&chrono::Utc);
5886 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
5887 .map_err(|e| format!("parse end: {}", e))?
5888 .with_timezone(&chrono::Utc);
5889 car_ffi_common::health::workouts(s, e)
5890}
5891
5892fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
5893 #[derive(serde::Deserialize)]
5894 struct P {
5895 start: String,
5896 end: String,
5897 }
5898 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5899 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
5900 .map_err(|e| format!("parse start: {}", e))?;
5901 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
5902 .map_err(|e| format!("parse end: {}", e))?;
5903 car_ffi_common::health::activity(s, e)
5904}
5905
5906async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
5907 let closed = session.browser.close().await?;
5908 Ok(serde_json::json!({"closed": closed}))
5909}
5910
5911async fn handle_browser_run(
5912 req: &JsonRpcMessage,
5913 session: &crate::session::ClientSession,
5914) -> Result<Value, String> {
5915 #[derive(serde::Deserialize)]
5916 struct BrowserRunParams {
5917 script: Value,
5919 #[serde(default)]
5920 width: Option<u32>,
5921 #[serde(default)]
5922 height: Option<u32>,
5923 #[serde(default)]
5928 headed: Option<bool>,
5929 #[serde(default)]
5932 extra_args: Option<Vec<String>>,
5933 }
5934 let params: BrowserRunParams =
5935 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5936
5937 let script_json = match params.script {
5939 Value::String(s) => s,
5940 other => other.to_string(),
5941 };
5942
5943 let browser_session = session
5944 .browser
5945 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
5946 width: params.width.unwrap_or(1280),
5947 height: params.height.unwrap_or(720),
5948 headless: !params.headed.unwrap_or(false),
5949 extra_args: params.extra_args.unwrap_or_default(),
5950 })
5951 .await?;
5952
5953 let trace_json = browser_session.run(&script_json).await?;
5954 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
5955}
5956
5957#[derive(Deserialize)]
5970struct VoiceStartParams {
5971 session_id: String,
5972 audio_source: Value,
5973 #[serde(default)]
5974 options: Option<Value>,
5975}
5976
5977async fn handle_voice_transcribe_stream_start(
5978 req: &JsonRpcMessage,
5979 state: &Arc<ServerState>,
5980 session: &Arc<crate::session::ClientSession>,
5981) -> Result<Value, String> {
5982 let params: VoiceStartParams =
5983 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5984 let audio_source_json =
5985 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
5986 let options_json = params
5987 .options
5988 .as_ref()
5989 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
5990 .transpose()?;
5991 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
5992 channel: session.channel.clone(),
5993 });
5994 let json = car_ffi_common::voice::transcribe_stream_start(
5995 ¶ms.session_id,
5996 &audio_source_json,
5997 options_json.as_deref(),
5998 state.voice_sessions.clone(),
5999 sink,
6000 )
6001 .await?;
6002 serde_json::from_str(&json).map_err(|e| e.to_string())
6003}
6004
6005#[derive(Deserialize)]
6006struct VoiceStopParams {
6007 session_id: String,
6008}
6009
6010async fn handle_voice_transcribe_stream_stop(
6011 req: &JsonRpcMessage,
6012 state: &Arc<ServerState>,
6013) -> Result<Value, String> {
6014 let params: VoiceStopParams =
6015 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6016 let json = car_ffi_common::voice::transcribe_stream_stop(
6017 ¶ms.session_id,
6018 state.voice_sessions.clone(),
6019 )
6020 .await?;
6021 serde_json::from_str(&json).map_err(|e| e.to_string())
6022}
6023
6024#[derive(Deserialize)]
6025struct VoicePushParams {
6026 session_id: String,
6027 pcm_b64: String,
6031}
6032
6033async fn handle_voice_transcribe_stream_push(
6034 req: &JsonRpcMessage,
6035 state: &Arc<ServerState>,
6036) -> Result<Value, String> {
6037 use base64::Engine;
6038 let params: VoicePushParams =
6039 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6040 let pcm = base64::engine::general_purpose::STANDARD
6041 .decode(¶ms.pcm_b64)
6042 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
6043 let json = car_ffi_common::voice::transcribe_stream_push(
6044 ¶ms.session_id,
6045 &pcm,
6046 state.voice_sessions.clone(),
6047 )
6048 .await?;
6049 serde_json::from_str(&json).map_err(|e| e.to_string())
6050}
6051
6052fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
6053 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
6054 serde_json::from_str(&json).unwrap_or(Value::Null)
6055}
6056
6057#[derive(Deserialize)]
6058struct VoiceTtsStreamStartParams {
6059 stream_id: String,
6063 text: String,
6066 #[serde(default)]
6069 options: Option<Value>,
6070}
6071
6072async fn handle_voice_tts_stream_start(
6073 req: &JsonRpcMessage,
6074 session: &Arc<crate::session::ClientSession>,
6075) -> Result<Value, String> {
6076 let params: VoiceTtsStreamStartParams =
6077 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6078 let opts_str = params
6079 .options
6080 .as_ref()
6081 .map(|v| v.to_string())
6082 .filter(|s| !s.is_empty());
6083 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
6084 channel: session.channel.clone(),
6085 });
6086 let json = car_ffi_common::voice::tts_stream_start(
6087 ¶ms.stream_id,
6088 ¶ms.text,
6089 opts_str.as_deref(),
6090 sink,
6091 )
6092 .await?;
6093 serde_json::from_str(&json).map_err(|e| e.to_string())
6094}
6095
6096#[derive(Deserialize)]
6097struct VoiceTtsStreamCancelParams {
6098 stream_id: String,
6099}
6100
6101async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
6102 let params: VoiceTtsStreamCancelParams =
6103 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6104 let json = car_ffi_common::voice::tts_stream_cancel(¶ms.stream_id).await?;
6105 serde_json::from_str(&json).map_err(|e| e.to_string())
6106}
6107
6108fn handle_voice_tts_stream_list() -> Value {
6109 let json = car_ffi_common::voice::list_tts_streams();
6110 serde_json::from_str(&json).unwrap_or(Value::Null)
6111}
6112
6113async fn handle_voice_dispatch_turn(
6114 req: &JsonRpcMessage,
6115 state: &Arc<ServerState>,
6116 session: &Arc<crate::session::ClientSession>,
6117) -> Result<Value, String> {
6118 let req_value = req.params.clone();
6119 let request: crate::voice_turn::DispatchVoiceTurnRequest =
6120 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
6121 let engine = get_inference_engine(state).clone();
6122 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
6123 channel: session.channel.clone(),
6124 });
6125 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
6126 serde_json::to_value(resp).map_err(|e| e.to_string())
6127}
6128
6129async fn handle_voice_cancel_turn() -> Result<Value, String> {
6130 crate::voice_turn::cancel().await;
6131 Ok(serde_json::json!({"cancelled": true}))
6132}
6133
6134async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
6135 let engine = get_inference_engine(state).clone();
6136 crate::voice_turn::prewarm(engine).await;
6137 Ok(serde_json::json!({"prewarmed": true}))
6138}
6139
6140fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
6159 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
6160 std::sync::OnceLock::new();
6161 SLOT.get_or_init(|| std::sync::RwLock::new(None))
6162}
6163
6164fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
6165 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
6166 std::sync::OnceLock::new();
6167 MAP.get_or_init(dashmap::DashMap::new)
6168}
6169
6170fn ws_runner_completions() -> &'static dashmap::DashMap<
6171 String,
6172 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
6173> {
6174 static MAP: std::sync::OnceLock<
6175 dashmap::DashMap<
6176 String,
6177 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
6178 >,
6179 > = std::sync::OnceLock::new();
6180 MAP.get_or_init(dashmap::DashMap::new)
6181}
6182
6183struct WsInferenceRunner;
6184
6185#[async_trait::async_trait]
6186impl car_inference::InferenceRunner for WsInferenceRunner {
6187 async fn run(
6188 &self,
6189 request: car_inference::tasks::generate::GenerateRequest,
6190 emitter: car_inference::EventEmitter,
6191 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
6192 let channel = ws_runner_session()
6193 .read()
6194 .map_err(|e| {
6195 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
6196 })?
6197 .clone()
6198 .ok_or_else(|| {
6199 car_inference::RunnerError::Declined(
6200 "no WebSocket inference runner registered — call inference.register_runner first"
6201 .into(),
6202 )
6203 })?;
6204
6205 let call_id = uuid::Uuid::new_v4().to_string();
6206 let request_json = serde_json::to_value(&request)
6207 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
6208 let (tx, rx) = tokio::sync::oneshot::channel();
6209 ws_runner_calls().insert(call_id.clone(), emitter);
6210 ws_runner_completions().insert(call_id.clone(), tx);
6211
6212 use futures::SinkExt;
6214 let notification = serde_json::json!({
6215 "jsonrpc": "2.0",
6216 "method": "inference.runner.invoke",
6217 "params": {
6218 "call_id": call_id,
6219 "request": request_json,
6220 },
6221 });
6222 let text = serde_json::to_string(¬ification)
6223 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
6224 let _ = channel
6225 .write
6226 .lock()
6227 .await
6228 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
6229 .await;
6230
6231 let result = rx.await.map_err(|_| {
6232 car_inference::RunnerError::Failed("runner completion channel dropped".into())
6233 })?;
6234 ws_runner_calls().remove(&call_id);
6235 result.map_err(car_inference::RunnerError::Failed)
6236 }
6237}
6238
6239async fn handle_inference_register_runner(
6240 session: &Arc<crate::session::ClientSession>,
6241) -> Result<Value, String> {
6242 let mut guard = ws_runner_session()
6243 .write()
6244 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
6245 *guard = Some(session.channel.clone());
6246 drop(guard);
6247 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
6248 Ok(serde_json::json!({"registered": true}))
6249}
6250
6251#[derive(serde::Deserialize)]
6252struct InferenceRunnerEventParams {
6253 call_id: String,
6254 event: Value,
6255}
6256
6257async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
6258 let params: InferenceRunnerEventParams =
6259 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6260 let stream_event = match parse_runner_event_value(¶ms.event) {
6261 Some(e) => e,
6262 None => return Err("unrecognised runner event shape".into()),
6263 };
6264 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
6265 let emitter = entry.value().clone();
6266 tokio::spawn(async move { emitter.emit(stream_event).await });
6267 }
6268 Ok(serde_json::json!({"emitted": true}))
6269}
6270
6271#[derive(serde::Deserialize)]
6272struct InferenceRunnerCompleteParams {
6273 call_id: String,
6274 result: Value,
6275}
6276
6277async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
6278 let params: InferenceRunnerCompleteParams =
6279 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6280 let result: std::result::Result<car_inference::RunnerResult, String> =
6281 serde_json::from_value(params.result)
6282 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
6283 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
6284 let _ = tx.send(result);
6285 }
6286 Ok(serde_json::json!({"completed": true}))
6287}
6288
6289#[derive(serde::Deserialize)]
6290struct InferenceRunnerFailParams {
6291 call_id: String,
6292 error: String,
6293}
6294
6295async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
6296 let params: InferenceRunnerFailParams =
6297 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6298 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
6299 let _ = tx.send(Err(params.error));
6300 }
6301 Ok(serde_json::json!({"failed": true}))
6302}
6303
6304fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
6305 let ty = v.get("type").and_then(|t| t.as_str())?;
6306 match ty {
6307 "text" => Some(car_inference::StreamEvent::TextDelta(
6308 v.get("data")?.as_str()?.to_string(),
6309 )),
6310 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
6311 name: v.get("name")?.as_str()?.to_string(),
6312 index: v.get("index")?.as_u64()? as usize,
6313 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
6314 }),
6315 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
6316 index: v.get("index")?.as_u64()? as usize,
6317 arguments_delta: v.get("data")?.as_str()?.to_string(),
6318 }),
6319 "usage" => Some(car_inference::StreamEvent::Usage {
6320 input_tokens: v.get("input_tokens")?.as_u64()?,
6321 output_tokens: v.get("output_tokens")?.as_u64()?,
6322 }),
6323 "done" => Some(car_inference::StreamEvent::Done {
6324 text: v.get("text")?.as_str()?.to_string(),
6325 tool_calls: v
6326 .get("tool_calls")
6327 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
6328 .unwrap_or_default(),
6329 }),
6330 _ => None,
6331 }
6332}
6333
6334#[derive(Deserialize)]
6335struct EnrollSpeakerParams {
6336 label: String,
6337 audio: Value,
6338}
6339
6340async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
6341 let params: EnrollSpeakerParams =
6342 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6343 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
6344 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
6345 serde_json::from_str(&json).map_err(|e| e.to_string())
6346}
6347
6348#[derive(Deserialize)]
6349struct RemoveEnrollmentParams {
6350 label: String,
6351}
6352
6353fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
6354 let params: RemoveEnrollmentParams =
6355 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6356 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
6357 serde_json::from_str(&json).map_err(|e| e.to_string())
6358}
6359
6360#[derive(Deserialize)]
6361struct WorkflowRunParams {
6362 workflow: Value,
6363}
6364
6365async fn handle_workflow_run(
6366 req: &JsonRpcMessage,
6367 session: &Arc<crate::session::ClientSession>,
6368) -> Result<Value, String> {
6369 let params: WorkflowRunParams =
6370 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6371 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
6372 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
6373 channel: session.channel.clone(),
6374 host: session.host.clone(),
6375 client_id: session.client_id.clone(),
6376 });
6377 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
6378 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
6379 persist_if_paused(&result)?;
6382 Ok(result)
6383}
6384
6385pub fn recover_workflow_checkpoints() {
6390 let dir = match workflow_runs_dir() {
6391 Ok(d) => d,
6392 Err(e) => {
6393 tracing::debug!(error = %e, "no workflow checkpoint dir; skipping recovery");
6394 return;
6395 }
6396 };
6397 match car_workflow::CheckpointStore::open(&dir).and_then(|s| s.recover_orphaned()) {
6398 Ok(0) => {}
6399 Ok(n) => tracing::info!(rearmed = n, "recovered orphaned workflow checkpoints"),
6400 Err(e) => tracing::warn!(error = %e, "workflow checkpoint recovery failed"),
6401 }
6402}
6403
6404fn workflow_runs_dir() -> Result<std::path::PathBuf, String> {
6406 let home = std::env::var("HOME")
6407 .or_else(|_| std::env::var("USERPROFILE"))
6408 .map_err(|_| "cannot resolve home directory for workflow checkpoints".to_string())?;
6409 Ok(std::path::PathBuf::from(home)
6410 .join(".car")
6411 .join("workflow-runs"))
6412}
6413
6414fn persist_if_paused(result: &Value) -> Result<(), String> {
6416 if result.get("status").and_then(|s| s.as_str()) != Some("paused") {
6417 return Ok(());
6418 }
6419 let Some(paused_value) = result.get("paused") else {
6420 return Err("paused result missing checkpoint".to_string());
6421 };
6422 let paused: car_workflow::PausedWorkflow =
6423 serde_json::from_value(paused_value.clone()).map_err(|e| e.to_string())?;
6424 let store = car_workflow::CheckpointStore::open(workflow_runs_dir()?).map_err(|e| e.to_string())?;
6425 store.save(&paused).map_err(|e| e.to_string())
6426}
6427
6428#[derive(Deserialize)]
6429struct WorkflowResumeParams {
6430 run_id: String,
6431 #[serde(default)]
6432 input: Value,
6433}
6434
6435async fn handle_workflow_resume(
6436 req: &JsonRpcMessage,
6437 session: &Arc<crate::session::ClientSession>,
6438) -> Result<Value, String> {
6439 let params: WorkflowResumeParams =
6440 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6441
6442 let store =
6443 car_workflow::CheckpointStore::open(workflow_runs_dir()?).map_err(|e| e.to_string())?;
6444 let paused = store
6447 .claim(¶ms.run_id)
6448 .map_err(|e| e.to_string())?
6449 .ok_or_else(|| {
6450 format!(
6451 "no paused workflow run '{}' (already resumed or unknown)",
6452 params.run_id
6453 )
6454 })?;
6455 let paused_json = serde_json::to_string(&paused).map_err(|e| e.to_string())?;
6456 let input_json = if params.input.is_null() {
6457 "{}".to_string()
6458 } else {
6459 serde_json::to_string(¶ms.input).map_err(|e| e.to_string())?
6460 };
6461
6462 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
6463 channel: session.channel.clone(),
6464 host: session.host.clone(),
6465 client_id: session.client_id.clone(),
6466 });
6467
6468 let json = car_ffi_common::workflow::resume_workflow(&paused_json, &input_json, runner).await;
6469 let result = match json {
6470 Ok(j) => serde_json::from_str::<Value>(&j).map_err(|e| e.to_string())?,
6471 Err(e) => {
6472 let _ = store.save(&paused);
6475 let _ = store.complete(¶ms.run_id);
6476 return Err(e);
6477 }
6478 };
6479 persist_if_paused(&result)?;
6482 store.complete(¶ms.run_id).map_err(|e| e.to_string())?;
6483 Ok(result)
6484}
6485
6486#[derive(Deserialize)]
6487struct BuilderBuildParams {
6488 goal: String,
6489 #[serde(default)]
6490 existing: Value,
6491 #[serde(default = "default_builder_attempts")]
6492 max_attempts: u32,
6493}
6494
6495fn default_builder_attempts() -> u32 {
6496 3
6497}
6498
6499async fn handle_builder_build(
6504 req: &JsonRpcMessage,
6505 state: &Arc<ServerState>,
6506 session: &Arc<crate::session::ClientSession>,
6507) -> Result<Value, String> {
6508 let params: BuilderBuildParams =
6509 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6510 if params.goal.trim().is_empty() {
6511 return Err("missing 'goal'".to_string());
6512 }
6513
6514 let engine = get_inference_engine(state).clone();
6515
6516 let tools: Vec<car_builder::ToolInfo> = session
6517 .runtime
6518 .registry
6519 .schemas()
6520 .await
6521 .into_iter()
6522 .map(|s| car_builder::ToolInfo {
6523 name: s.name,
6524 description: s.description,
6525 })
6526 .collect();
6527 let models: Vec<String> = engine
6528 .list_models_unified()
6529 .into_iter()
6530 .map(|m| m.id)
6531 .collect();
6532 let catalog = car_builder::ToolCatalog {
6533 tools,
6534 models,
6535 agents: Vec::new(),
6536 };
6537
6538 let existing = if params.existing.is_null() {
6539 None
6540 } else {
6541 serde_json::from_value::<car_workflow::Workflow>(params.existing.clone()).ok()
6542 };
6543
6544 let build_req = car_builder::BuildRequest {
6545 goal: params.goal,
6546 catalog,
6547 existing,
6548 feedback: None,
6549 max_attempts: params.max_attempts,
6550 };
6551
6552 let result = car_builder::build_workflow(
6553 |prompt: String| {
6554 let engine = engine.clone();
6555 async move {
6556 let greq = car_inference::GenerateRequest {
6557 prompt,
6558 model: None,
6559 params: car_inference::GenerateParams {
6560 temperature: 0.2,
6561 max_tokens: 4096,
6562 ..Default::default()
6563 },
6564 context: None,
6565 tools: None,
6566 images: None,
6567 messages: None,
6568 cache_control: false,
6569 response_format: None,
6570 intent: None,
6571 };
6572 engine
6573 .generate_tracked(greq)
6574 .await
6575 .map(|r| r.text)
6576 .map_err(|e| e.to_string())
6577 }
6578 },
6579 &build_req,
6580 )
6581 .await;
6582
6583 Ok(serde_json::json!({
6584 "valid": result.valid,
6585 "workflow": result.workflow,
6586 "issues": result.issues,
6587 "warnings": result.warnings,
6588 "attempts": result.attempts,
6589 }))
6590}
6591
6592#[derive(Deserialize)]
6593struct WorkflowVerifyParams {
6594 workflow: Value,
6595}
6596
6597fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
6598 let params: WorkflowVerifyParams =
6599 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6600 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
6601 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
6602 serde_json::from_str(&json).map_err(|e| e.to_string())
6603}
6604
6605async fn handle_meeting_start(
6610 req: &JsonRpcMessage,
6611 state: &Arc<ServerState>,
6612 session: &Arc<crate::session::ClientSession>,
6613) -> Result<Value, String> {
6614 let mut req_value = req.params.clone();
6620 let meeting_id = req_value
6621 .get("id")
6622 .and_then(|v| v.as_str())
6623 .map(str::to_string)
6624 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
6625 if let Some(map) = req_value.as_object_mut() {
6626 map.insert("id".into(), Value::String(meeting_id.clone()));
6627 }
6628 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
6629
6630 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
6631 Arc::new(crate::session::WsVoiceEventSink {
6632 channel: session.channel.clone(),
6633 });
6634
6635 let upstream: Arc<dyn car_voice::VoiceEventSink> =
6640 Arc::new(crate::session::WsMemgineIngestSink {
6641 meeting_id,
6642 engine: session.memgine.clone(),
6643 upstream: ws_upstream,
6644 });
6645
6646 let cwd = std::env::current_dir().ok();
6647 let json = crate::meeting::start_meeting(
6648 &request_json,
6649 state.meetings.clone(),
6650 state.voice_sessions.clone(),
6651 upstream,
6652 None,
6653 cwd,
6654 )
6655 .await?;
6656 serde_json::from_str(&json).map_err(|e| e.to_string())
6657}
6658
6659#[derive(Deserialize)]
6660struct MeetingStopParams {
6661 meeting_id: String,
6662 #[serde(default = "default_summarize")]
6663 summarize: bool,
6664}
6665
6666fn default_summarize() -> bool {
6667 true
6668}
6669
6670async fn handle_meeting_stop(
6671 req: &JsonRpcMessage,
6672 state: &Arc<ServerState>,
6673 _session: &Arc<crate::session::ClientSession>,
6674) -> Result<Value, String> {
6675 let params: MeetingStopParams =
6676 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6677 let inference = if params.summarize {
6678 Some(state.inference.get().cloned()).flatten()
6679 } else {
6680 None
6681 };
6682 let json = crate::meeting::stop_meeting(
6683 ¶ms.meeting_id,
6684 params.summarize,
6685 state.meetings.clone(),
6686 state.voice_sessions.clone(),
6687 inference,
6688 )
6689 .await?;
6690 serde_json::from_str(&json).map_err(|e| e.to_string())
6691}
6692
6693#[derive(Deserialize, Default)]
6694struct MeetingListParams {
6695 #[serde(default)]
6696 root: Option<std::path::PathBuf>,
6697}
6698
6699fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
6700 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6701 let cwd = std::env::current_dir().ok();
6702 let json = crate::meeting::list_meetings(params.root, cwd)?;
6703 serde_json::from_str(&json).map_err(|e| e.to_string())
6704}
6705
6706#[derive(Deserialize)]
6707struct MeetingGetParams {
6708 meeting_id: String,
6709 #[serde(default)]
6710 root: Option<std::path::PathBuf>,
6711}
6712
6713fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
6714 let params: MeetingGetParams =
6715 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6716 let cwd = std::env::current_dir().ok();
6717 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
6718 serde_json::from_str(&json).map_err(|e| e.to_string())
6719}
6720
6721#[derive(Deserialize, Default)]
6726struct RegistryRegisterParams {
6727 entry: Value,
6731 #[serde(default)]
6732 registry_path: Option<std::path::PathBuf>,
6733}
6734
6735fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
6736 let params: RegistryRegisterParams =
6737 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6738 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
6739 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
6740 Ok(Value::Null)
6741}
6742
6743#[derive(Deserialize, Default)]
6744struct RegistryNameParams {
6745 name: String,
6746 #[serde(default)]
6747 registry_path: Option<std::path::PathBuf>,
6748}
6749
6750fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
6751 let params: RegistryNameParams =
6752 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6753 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
6754 serde_json::from_str(&json).map_err(|e| e.to_string())
6755}
6756
6757fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
6758 let params: RegistryNameParams =
6759 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6760 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
6761 Ok(Value::Null)
6762}
6763
6764#[derive(Deserialize, Default)]
6765struct RegistryListParams {
6766 #[serde(default)]
6767 registry_path: Option<std::path::PathBuf>,
6768}
6769
6770fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
6771 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6772 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
6773 serde_json::from_str(&json).map_err(|e| e.to_string())
6774}
6775
6776#[derive(Deserialize, Default)]
6777struct RegistryReapParams {
6778 #[serde(default = "default_reap_age")]
6781 max_age_secs: u64,
6782 #[serde(default)]
6783 registry_path: Option<std::path::PathBuf>,
6784}
6785
6786fn default_reap_age() -> u64 {
6787 60
6788}
6789
6790fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
6791 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6792 let json =
6793 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
6794 serde_json::from_str(&json).map_err(|e| e.to_string())
6795}
6796
6797async fn handle_a2a_start(
6804 req: &JsonRpcMessage,
6805 session: &crate::session::ClientSession,
6806) -> Result<Value, String> {
6807 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6808 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
6814 serde_json::from_str(&json).map_err(|e| e.to_string())
6815}
6816
6817fn handle_a2a_stop() -> Result<Value, String> {
6818 let json = crate::a2a::stop_a2a()?;
6819 serde_json::from_str(&json).map_err(|e| e.to_string())
6820}
6821
6822fn handle_a2a_status() -> Result<Value, String> {
6823 let json = crate::a2a::a2a_status()?;
6824 serde_json::from_str(&json).map_err(|e| e.to_string())
6825}
6826
6827#[derive(Deserialize)]
6828#[serde(rename_all = "camelCase")]
6829struct A2aSendParams {
6830 endpoint: String,
6831 message: car_a2a::Message,
6832 #[serde(default)]
6833 blocking: bool,
6834 #[serde(default = "default_true")]
6835 ingest_a2ui: bool,
6836 #[serde(default)]
6837 route_auth: Option<A2aRouteAuth>,
6838 #[serde(default)]
6839 allow_untrusted_endpoint: bool,
6840}
6841
6842fn default_true() -> bool {
6843 true
6844}
6845
6846async fn handle_a2a_dispatch(
6856 method: &str,
6857 req: &JsonRpcMessage,
6858 state: &Arc<ServerState>,
6859) -> Result<Value, String> {
6860 let dispatcher = state.a2a_dispatcher().await;
6861 dispatcher
6862 .dispatch(method, req.params.clone())
6863 .await
6864 .map_err(|e| e.to_string())
6865}
6866
6867async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
6868 let params: A2aSendParams =
6869 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6870 let endpoint = trusted_route_endpoint(
6871 Some(params.endpoint.clone()),
6872 params.allow_untrusted_endpoint,
6873 )
6874 .ok_or_else(|| {
6875 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
6876 })?;
6877 let client = match params.route_auth.clone() {
6878 Some(auth) => {
6879 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
6880 }
6881 None => car_a2a::A2aClient::new(endpoint.clone()),
6882 };
6883 let result = client
6884 .send_message(params.message, params.blocking)
6885 .await
6886 .map_err(|e| e.to_string())?;
6887 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
6888 let mut applied = Vec::new();
6889 if params.ingest_a2ui {
6890 state
6891 .a2ui
6892 .validate_payload(&result_value)
6893 .map_err(|e| e.to_string())?;
6894 let routed_endpoint = Some(endpoint.clone());
6895 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
6896 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
6897 if owner.endpoint.is_none() {
6898 owner.with_endpoint(routed_endpoint.clone())
6899 } else {
6900 owner
6901 }
6902 });
6903 applied.push(
6904 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
6905 );
6906 }
6907 }
6908 Ok(serde_json::json!({
6909 "result": result,
6910 "a2ui": {
6911 "applied": applied,
6912 }
6913 }))
6914}
6915
6916async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
6924 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6925 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
6926 serde_json::from_str(&json).map_err(|e| e.to_string())
6927}
6928
6929async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
6930 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6931 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
6932 serde_json::from_str(&json).map_err(|e| e.to_string())
6933}
6934
6935async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
6936 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6937 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
6938 serde_json::from_str(&json).map_err(|e| e.to_string())
6939}
6940
6941async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
6942 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6943 let json = car_ffi_common::notifications::local(&args_json).await?;
6944 serde_json::from_str(&json).map_err(|e| e.to_string())
6945}
6946
6947async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
6948 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6949 let json = car_ffi_common::vision::ocr(&args_json).await?;
6950 serde_json::from_str(&json).map_err(|e| e.to_string())
6951}
6952
6953async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
6958 let agents = match state.observer_manifest_path() {
6967 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
6968 .map_err(|e| e.to_string())?,
6969 None => {
6970 let supervisor = state.supervisor()?;
6971 supervisor.list().await
6972 }
6973 };
6974 let attached = state.attached_agents.lock().await.clone();
6981 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
6982 for a in agents {
6983 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
6984 let session_id = attached.get(&a.spec.id).cloned();
6985 if let Some(map) = v.as_object_mut() {
6986 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
6987 if let Some(sid) = session_id {
6988 map.insert("session_id".to_string(), Value::String(sid));
6989 }
6990 }
6991 decorated.push(v);
6992 }
6993 Ok(Value::Array(decorated))
6994}
6995
6996async fn handle_agents_upsert(
6997 req: &JsonRpcMessage,
6998 state: &Arc<ServerState>,
6999) -> Result<Value, String> {
7000 let mut params = req.params.clone();
7001 if let Some(name) = params
7010 .get("interpreter")
7011 .and_then(|v| v.as_str())
7012 .map(str::to_string)
7013 {
7014 let resolved =
7015 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
7016 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
7017 }
7018 let spec: car_registry::supervisor::AgentSpec =
7019 serde_json::from_value(params).map_err(|e| e.to_string())?;
7020 let supervisor = state.supervisor()?;
7021 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
7022 serde_json::to_value(agent).map_err(|e| e.to_string())
7023}
7024
7025async fn handle_agents_install(
7039 req: &JsonRpcMessage,
7040 state: &Arc<ServerState>,
7041) -> Result<Value, String> {
7042 let manifest: car_registry::manifest::AgentManifest =
7043 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
7044 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
7045 let supervisor = state.supervisor()?;
7046 let (report, managed) = supervisor
7047 .install_manifest(manifest, &host)
7048 .await
7049 .map_err(|e| e.to_string())?;
7050 Ok(serde_json::json!({
7051 "report": {
7052 "missingOptional": report
7053 .missing_optional
7054 .iter()
7055 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
7056 .collect::<Vec<_>>(),
7057 },
7058 "agent": managed,
7059 }))
7060}
7061
7062async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
7063 let entries = match state.observer_manifest_path() {
7069 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
7070 .map_err(|e| e.to_string())?,
7071 None => {
7072 let supervisor = state.supervisor()?;
7073 supervisor.health().await
7074 }
7075 };
7076 serde_json::to_value(entries).map_err(|e| e.to_string())
7077}
7078
7079fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
7080 req.params
7081 .get("id")
7082 .and_then(Value::as_str)
7083 .map(str::to_string)
7084 .ok_or_else(|| "missing required `id` parameter".to_string())
7085}
7086
7087async fn handle_agents_remove(
7088 req: &JsonRpcMessage,
7089 state: &Arc<ServerState>,
7090) -> Result<Value, String> {
7091 let id = extract_agent_id(req)?;
7092 let supervisor = state.supervisor()?;
7093 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
7094 Ok(serde_json::json!({ "removed": removed }))
7095}
7096
7097async fn handle_agents_start(
7098 req: &JsonRpcMessage,
7099 state: &Arc<ServerState>,
7100) -> Result<Value, String> {
7101 let id = extract_agent_id(req)?;
7102 let supervisor = state.supervisor()?;
7103 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
7104 serde_json::to_value(agent).map_err(|e| e.to_string())
7105}
7106
7107async fn handle_agents_stop(
7108 req: &JsonRpcMessage,
7109 state: &Arc<ServerState>,
7110) -> Result<Value, String> {
7111 let id = extract_agent_id(req)?;
7112 let signal: car_registry::supervisor::StopSignal = req
7113 .params
7114 .get("signal")
7115 .map(|v| serde_json::from_value(v.clone()))
7116 .transpose()
7117 .map_err(|e| e.to_string())?
7118 .unwrap_or_default();
7119 let supervisor = state.supervisor()?;
7120 let agent = supervisor
7121 .stop(&id, signal)
7122 .await
7123 .map_err(|e| e.to_string())?;
7124 serde_json::to_value(agent).map_err(|e| e.to_string())
7125}
7126
7127async fn handle_agents_restart(
7128 req: &JsonRpcMessage,
7129 state: &Arc<ServerState>,
7130) -> Result<Value, String> {
7131 let id = extract_agent_id(req)?;
7132 let supervisor = state.supervisor()?;
7133 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
7134 serde_json::to_value(agent).map_err(|e| e.to_string())
7135}
7136
7137async fn handle_agents_tail_log(
7138 req: &JsonRpcMessage,
7139 state: &Arc<ServerState>,
7140) -> Result<Value, String> {
7141 let id = extract_agent_id(req)?;
7142 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
7143 let supervisor = state.supervisor()?;
7144 let lines = supervisor
7145 .tail_log(&id, n)
7146 .await
7147 .map_err(|e| e.to_string())?;
7148 Ok(serde_json::json!({ "lines": lines }))
7149}
7150
7151async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
7162 let include_health = req
7163 .params
7164 .get("include_health")
7165 .and_then(Value::as_bool)
7166 .unwrap_or(false);
7167 let json = car_ffi_common::external_agents::list(include_health).await?;
7168 serde_json::from_str(&json).map_err(|e| e.to_string())
7169}
7170
7171async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
7172 let include_health = req
7173 .params
7174 .get("include_health")
7175 .and_then(Value::as_bool)
7176 .unwrap_or(false);
7177 let json = car_ffi_common::external_agents::detect(include_health).await?;
7178 serde_json::from_str(&json).map_err(|e| e.to_string())
7179}
7180
7181async fn handle_agents_invoke_external(
7199 req: &JsonRpcMessage,
7200 state: &Arc<ServerState>,
7201 host_session: &Arc<crate::session::ClientSession>,
7202) -> Result<Value, String> {
7203 let id = req
7204 .params
7205 .get("id")
7206 .and_then(Value::as_str)
7207 .ok_or_else(|| "missing required `id` parameter".to_string())?
7208 .to_string();
7209 let task = req
7210 .params
7211 .get("task")
7212 .and_then(Value::as_str)
7213 .ok_or_else(|| "missing required `task` parameter".to_string())?
7214 .to_string();
7215 let stream = req
7216 .params
7217 .get("stream")
7218 .and_then(Value::as_bool)
7219 .unwrap_or(false);
7220 let session_id = req
7221 .params
7222 .get("session_id")
7223 .and_then(Value::as_str)
7224 .map(str::to_string)
7225 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
7226
7227 let mut options_value = req.params.clone();
7233 if let Some(obj) = options_value.as_object_mut() {
7234 obj.remove("id");
7235 obj.remove("task");
7236 obj.remove("stream");
7237 obj.remove("session_id");
7238 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
7247 if !has_explicit_mcp {
7248 if let Some(url) = state.mcp_url.get() {
7249 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
7250 }
7251 }
7252 }
7253
7254 if !stream {
7255 let options_json = options_value.to_string();
7258 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
7259 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
7260 append_external_agent_audit(&id, &task, &options_value, &result);
7261 return Ok(result);
7262 }
7263
7264 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
7270 .map_err(|e| format!("invalid options: {e}"))?;
7271
7272 {
7282 let mut chats = state.chat_sessions.lock().await;
7292 chats.entry(session_id.clone()).or_insert_with(|| {
7293 let created_at = std::time::SystemTime::now()
7294 .duration_since(std::time::UNIX_EPOCH)
7295 .map(|d| d.as_secs())
7296 .unwrap_or(0);
7297 crate::session::ChatSession {
7298 agent_id: id.clone(),
7299 host_client_id: host_session.client_id.clone(),
7300 created_at,
7301 }
7302 });
7303 }
7304
7305 use tokio::sync::mpsc;
7312 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
7313
7314 let drain_state = state.clone();
7315 let drain_session_id = session_id.clone();
7316 let drain_agent_id = id.clone();
7317 tokio::spawn(async move {
7318 while let Some(event) = rx.recv().await {
7319 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
7320 }
7321 });
7322
7323 let emitter_tx = tx.clone();
7324 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
7325 let _ = emitter_tx.send(event);
7330 });
7331
7332 let spawn_state = state.clone();
7338 let spawn_session_id = session_id.clone();
7339 let spawn_id = id.clone();
7340 let spawn_task = task.clone();
7341 let spawn_options = options_value.clone();
7342 tokio::spawn(async move {
7343 let outcome =
7344 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
7345 .await;
7346 drop(tx); let terminal_params: Value;
7353 let result_value: Value;
7354 match outcome {
7355 Ok(res) => {
7356 let mut parts: Vec<String> = Vec::new();
7363 if res.turns > 0 {
7364 parts.push(format!(
7365 "{} turn{}",
7366 res.turns,
7367 if res.turns == 1 { "" } else { "s" }
7368 ));
7369 }
7370 if res.tool_calls > 0 {
7371 parts.push(format!(
7372 "{} tool{}",
7373 res.tool_calls,
7374 if res.tool_calls == 1 { "" } else { "s" }
7375 ));
7376 }
7377 if res.duration_ms > 0 {
7378 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
7379 }
7380 let summary = if parts.is_empty() {
7381 "stop".to_string()
7382 } else {
7383 parts.join(" · ")
7384 };
7385 if res.is_error {
7386 terminal_params = serde_json::json!({
7387 "session_id": spawn_session_id,
7388 "agent_id": spawn_id,
7389 "kind": "error",
7390 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
7391 });
7392 } else {
7393 terminal_params = serde_json::json!({
7394 "session_id": spawn_session_id,
7395 "agent_id": spawn_id,
7396 "kind": "done",
7397 "finish_reason": summary,
7398 });
7399 }
7400 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
7401 }
7402 Err(e) => {
7403 let message = format!("{e}");
7404 terminal_params = serde_json::json!({
7405 "session_id": spawn_session_id,
7406 "agent_id": spawn_id,
7407 "kind": "error",
7408 "error": message.clone(),
7409 });
7410 result_value = serde_json::json!({ "is_error": true, "error": message });
7411 }
7412 }
7413 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
7414 spawn_state
7415 .chat_sessions
7416 .lock()
7417 .await
7418 .remove(&spawn_session_id);
7419 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
7420 });
7421
7422 Ok(serde_json::json!({
7423 "accepted": true,
7424 "session_id": session_id,
7425 }))
7426}
7427
7428async fn emit_external_chat_event(
7445 state: &Arc<ServerState>,
7446 session_id: &str,
7447 agent_id: &str,
7448 event: car_external_agents::StreamEvent,
7449) {
7450 use car_external_agents::StreamEvent;
7451 match event {
7452 StreamEvent::Assistant(a) => {
7453 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
7454 for block in content {
7455 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
7456 match block_type {
7457 "text" => {
7458 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
7459 if !text.is_empty() {
7460 let params = serde_json::json!({
7461 "session_id": session_id,
7462 "agent_id": agent_id,
7463 "kind": "token",
7464 "delta": text,
7465 });
7466 send_external_chat_frame(state, session_id, params).await;
7467 }
7468 }
7469 }
7470 "tool_use" => {
7471 let name = block
7472 .get("name")
7473 .and_then(|v| v.as_str())
7474 .unwrap_or("(unknown tool)");
7475 let params = serde_json::json!({
7476 "session_id": session_id,
7477 "agent_id": agent_id,
7478 "kind": "tool_call",
7479 "detail": name,
7480 });
7481 send_external_chat_frame(state, session_id, params).await;
7482 }
7483 _ => {}
7484 }
7485 }
7486 }
7487 }
7488 _ => {
7489 }
7494 }
7495}
7496
7497async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
7502 use futures::SinkExt;
7503 use tokio_tungstenite::tungstenite::Message;
7504
7505 let host_client_id = state
7506 .chat_sessions
7507 .lock()
7508 .await
7509 .get(session_id)
7510 .map(|s| s.host_client_id.clone());
7511 let Some(host_client_id) = host_client_id else {
7512 return;
7513 };
7514 let host_channel = {
7515 let sessions = state.sessions.lock().await;
7516 sessions.get(&host_client_id).map(|s| s.channel.clone())
7517 };
7518 let Some(channel) = host_channel else {
7519 return;
7520 };
7521 let frame = serde_json::json!({
7522 "jsonrpc": "2.0",
7523 "method": "agents.chat.event",
7524 "params": params,
7525 });
7526 if let Ok(text) = serde_json::to_string(&frame) {
7527 let _ = channel
7528 .write
7529 .lock()
7530 .await
7531 .send(Message::Text(text.into()))
7532 .await;
7533 }
7534}
7535
7536fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
7542 use std::io::Write;
7543 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
7544 Some(home) => home.join(".car"),
7545 None => return,
7546 };
7547 if std::fs::create_dir_all(&car_dir).is_err() {
7548 return;
7549 }
7550 let path = car_dir.join("external-agents.jsonl");
7551 let record = serde_json::json!({
7552 "ts": chrono::Utc::now().to_rfc3339(),
7553 "adapter_id": id,
7554 "task": task,
7555 "options": options,
7556 "result": result,
7557 });
7558 let line = match serde_json::to_string(&record) {
7559 Ok(s) => s,
7560 Err(_) => return,
7561 };
7562 if let Ok(mut f) = std::fs::OpenOptions::new()
7563 .create(true)
7564 .append(true)
7565 .open(&path)
7566 {
7567 let _ = writeln!(f, "{}", line);
7568 } else {
7569 tracing::warn!(
7570 path = %path.display(),
7571 "failed to append external-agent audit record"
7572 );
7573 }
7574}
7575
7576async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
7582 let force = req
7583 .params
7584 .get("force")
7585 .and_then(Value::as_bool)
7586 .unwrap_or(false);
7587 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
7588 let json = car_ffi_common::external_agents::health_one(id, force).await?;
7589 serde_json::from_str(&json).map_err(|e| e.to_string())
7590 } else {
7591 let json = car_ffi_common::external_agents::health(force).await?;
7592 serde_json::from_str(&json).map_err(|e| e.to_string())
7593 }
7594}
7595
7596const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
7614
7615async fn handle_agents_chat(
7620 req: &JsonRpcMessage,
7621 state: &Arc<ServerState>,
7622 host_session: &Arc<crate::session::ClientSession>,
7623) -> Result<Value, String> {
7624 use futures::SinkExt;
7625 use tokio::sync::oneshot;
7626 use tokio_tungstenite::tungstenite::Message;
7627
7628 let agent_id = req
7629 .params
7630 .get("agent_id")
7631 .and_then(Value::as_str)
7632 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
7633 .to_string();
7634 let prompt = req
7635 .params
7636 .get("prompt")
7637 .and_then(Value::as_str)
7638 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
7639 .to_string();
7640 let session_id = req
7641 .params
7642 .get("session_id")
7643 .and_then(Value::as_str)
7644 .map(str::to_string)
7645 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
7646 let stream = req
7647 .params
7648 .get("stream")
7649 .and_then(Value::as_bool)
7650 .unwrap_or(true);
7651 let voice_input = req
7652 .params
7653 .get("voice_input")
7654 .and_then(Value::as_bool)
7655 .unwrap_or(false);
7656
7657 let agent_client_id = state
7663 .attached_agents
7664 .lock()
7665 .await
7666 .get(&agent_id)
7667 .cloned()
7668 .ok_or_else(|| {
7669 format!(
7670 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
7671 agent_id
7672 )
7673 })?;
7674 let agent_channel = {
7675 let sessions = state.sessions.lock().await;
7676 sessions
7677 .get(&agent_client_id)
7678 .map(|s| s.channel.clone())
7679 .ok_or_else(|| {
7680 format!(
7681 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
7682 agent_id, agent_client_id
7683 )
7684 })?
7685 };
7686
7687 {
7693 let created_at = std::time::SystemTime::now()
7694 .duration_since(std::time::UNIX_EPOCH)
7695 .map(|d| d.as_secs())
7696 .unwrap_or(0);
7697 state.chat_sessions.lock().await.insert(
7698 session_id.clone(),
7699 crate::session::ChatSession {
7700 agent_id: agent_id.clone(),
7701 host_client_id: host_session.client_id.clone(),
7702 created_at,
7703 },
7704 );
7705 }
7706
7707 let request_id = agent_channel.next_request_id();
7714 let (tx, rx) = oneshot::channel();
7715 agent_channel
7716 .pending
7717 .lock()
7718 .await
7719 .insert(request_id.clone(), tx);
7720
7721 let rpc_request = serde_json::json!({
7722 "jsonrpc": "2.0",
7723 "method": "agent.chat",
7724 "params": {
7725 "session_id": session_id,
7726 "prompt": prompt,
7727 "stream": stream,
7728 "context": {
7729 "host_client_id": host_session.client_id,
7730 "voice_input": voice_input,
7731 },
7732 },
7733 "id": request_id,
7734 });
7735 let msg = Message::Text(
7736 serde_json::to_string(&rpc_request)
7737 .map_err(|e| e.to_string())?
7738 .into(),
7739 );
7740 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
7741 agent_channel.pending.lock().await.remove(&request_id);
7745 state.chat_sessions.lock().await.remove(&session_id);
7746 return Err(format!(
7747 "failed to deliver agent.chat to `{}`: {}",
7748 agent_id, e
7749 ));
7750 }
7751
7752 let ack = match tokio::time::timeout(
7757 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
7758 rx,
7759 )
7760 .await
7761 {
7762 Ok(Ok(resp)) => resp,
7763 Ok(Err(_)) => {
7764 state.chat_sessions.lock().await.remove(&session_id);
7766 return Err(format!(
7767 "agent `{}` disconnected before acking agents.chat",
7768 agent_id
7769 ));
7770 }
7771 Err(_) => {
7772 agent_channel.pending.lock().await.remove(&request_id);
7776 state.chat_sessions.lock().await.remove(&session_id);
7777 return Err(format!(
7778 "agent `{}` did not ack agents.chat within {}s",
7779 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
7780 ));
7781 }
7782 };
7783
7784 if let Some(err) = ack.error {
7785 state.chat_sessions.lock().await.remove(&session_id);
7787 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
7788 }
7789
7790 Ok(serde_json::json!({
7791 "accepted": true,
7792 "session_id": session_id,
7793 }))
7794}
7795
7796async fn handle_agents_chat_cancel(
7804 req: &JsonRpcMessage,
7805 state: &Arc<ServerState>,
7806) -> Result<Value, String> {
7807 use futures::SinkExt;
7808 use tokio_tungstenite::tungstenite::Message;
7809
7810 let session_id = req
7811 .params
7812 .get("session_id")
7813 .and_then(Value::as_str)
7814 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
7815 .to_string();
7816
7817 let chat = state.chat_sessions.lock().await.remove(&session_id);
7818 let chat = match chat {
7819 Some(c) => c,
7820 None => {
7821 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
7823 }
7824 };
7825
7826 let agent_client_id = state
7829 .attached_agents
7830 .lock()
7831 .await
7832 .get(&chat.agent_id)
7833 .cloned();
7834 if let Some(client_id) = agent_client_id {
7835 let channel_opt = {
7836 let sessions = state.sessions.lock().await;
7837 sessions.get(&client_id).map(|s| s.channel.clone())
7838 };
7839 if let Some(channel) = channel_opt {
7840 let notification = serde_json::json!({
7841 "jsonrpc": "2.0",
7842 "method": "agent.chat.cancel",
7843 "params": { "session_id": session_id },
7844 });
7845 if let Ok(text) = serde_json::to_string(¬ification) {
7846 let _ = channel
7847 .write
7848 .lock()
7849 .await
7850 .send(Message::Text(text.into()))
7851 .await;
7852 }
7853 }
7854 }
7855
7856 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
7857}
7858
7859pub(crate) async fn try_forward_agent_chat_event(
7870 parsed: &JsonRpcMessage,
7871 state: &Arc<ServerState>,
7872) -> bool {
7873 use futures::SinkExt;
7874 use tokio_tungstenite::tungstenite::Message;
7875
7876 let Some(method) = parsed.method.as_deref() else {
7880 return false;
7881 };
7882 if method != "agent.chat.event" {
7883 return false;
7884 }
7885 if !parsed.id.is_null() {
7886 return false;
7889 }
7890 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
7891 return false;
7892 };
7893 let session_id = session_id.to_string();
7894
7895 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
7900 let Some(chat) = chat else {
7901 return true; };
7903
7904 let kind = parsed
7913 .params
7914 .get("kind")
7915 .and_then(Value::as_str)
7916 .map(str::to_string)
7917 .unwrap_or_else(|| {
7918 if parsed.params.get("error").is_some() {
7919 "error".to_string()
7920 } else if parsed.params.get("finish_reason").is_some() {
7921 "done".to_string()
7922 } else {
7923 "token".to_string()
7924 }
7925 });
7926
7927 let host_channel = {
7931 let sessions = state.sessions.lock().await;
7932 sessions
7933 .get(&chat.host_client_id)
7934 .map(|s| s.channel.clone())
7935 };
7936 if let Some(channel) = host_channel {
7937 let mut params = parsed.params.clone();
7938 if let Some(obj) = params.as_object_mut() {
7939 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
7940 obj.entry("kind")
7945 .or_insert_with(|| Value::String(kind.clone()));
7946 }
7947 let forward = serde_json::json!({
7948 "jsonrpc": "2.0",
7949 "method": "agents.chat.event",
7950 "params": params,
7951 });
7952 if let Ok(text) = serde_json::to_string(&forward) {
7953 let send_result = channel
7954 .write
7955 .lock()
7956 .await
7957 .send(Message::Text(text.into()))
7958 .await;
7959 if let Err(e) = send_result {
7960 tracing::warn!(
7961 session_id = %session_id,
7962 agent_id = %chat.agent_id,
7963 host_client_id = %chat.host_client_id,
7964 kind = %kind,
7965 error = %e,
7966 "agent.chat.event forward to host failed at the WS send step"
7967 );
7968 }
7969 }
7970 } else {
7971 tracing::warn!(
7978 session_id = %session_id,
7979 agent_id = %chat.agent_id,
7980 host_client_id = %chat.host_client_id,
7981 kind = %kind,
7982 "agent.chat.event from supervised agent had no host channel \
7983 (host disconnected since `agents.chat`); dropping routing entry"
7984 );
7985 state.chat_sessions.lock().await.remove(&session_id);
7986 return true;
7987 }
7988
7989 if matches!(kind.as_str(), "done" | "error") {
7993 state.chat_sessions.lock().await.remove(&session_id);
7994 }
7995
7996 true
7997}
7998
7999#[cfg(test)]
8000mod fd_leak_regression {
8001 use super::run_dispatch;
8008 use futures::SinkExt;
8009 use std::sync::Arc;
8010 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
8011
8012 #[tokio::test]
8013 async fn abrupt_read_error_still_runs_session_cleanup() {
8014 let tmp = tempfile::TempDir::new().unwrap();
8015 let state = Arc::new(crate::session::ServerState::standalone(
8016 tmp.path().to_path_buf(),
8017 ));
8018
8019 let read = futures::stream::iter(vec![Err::<Message, WsError>(
8023 WsError::ConnectionClosed,
8024 )]);
8025 let write: crate::session::WsSink = Box::pin(
8026 futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
8027 );
8028
8029 let result =
8030 run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
8031 assert!(
8032 result.is_ok(),
8033 "run_dispatch must return Ok after cleanup, got {result:?}"
8034 );
8035
8036 assert!(
8039 state.sessions.lock().await.is_empty(),
8040 "state.sessions must be empty after an abrupt disconnect (car#209)"
8041 );
8042 }
8043}
8044
8045#[cfg(test)]
8046mod a2ui_action_delivery {
8047 use super::{handle_a2ui_action, JsonRpcMessage};
8052 use crate::session::{ServerState, WsChannel, WsSink};
8053 use futures::{SinkExt, StreamExt};
8054 use std::collections::HashMap;
8055 use std::sync::atomic::AtomicU64;
8056 use std::sync::Arc;
8057 use tokio::sync::Mutex;
8058 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
8059
8060 #[tokio::test]
8061 async fn client_action_broadcasts_to_a2ui_subscribers() {
8062 let tmp = tempfile::TempDir::new().unwrap();
8063 let state = Arc::new(ServerState::standalone(tmp.path().to_path_buf()));
8064
8065 let (tx, mut rx) = futures::channel::mpsc::unbounded::<Message>();
8068 let sink: WsSink = Box::pin(tx.sink_map_err(|_| WsError::ConnectionClosed));
8069 let channel = Arc::new(WsChannel {
8070 write: Mutex::new(sink),
8071 pending: Mutex::new(HashMap::new()),
8072 next_id: AtomicU64::new(0),
8073 });
8074 state
8075 .a2ui_subscribers
8076 .lock()
8077 .await
8078 .insert("test-sub".to_string(), channel);
8079
8080 let req: JsonRpcMessage = serde_json::from_value(serde_json::json!({
8084 "jsonrpc": "2.0",
8085 "method": "a2ui.action",
8086 "id": 1,
8087 "params": {
8088 "action": "trader:pause",
8089 "surfaceId": "surf-1",
8090 "sourceComponentId": "b1",
8091 "timestamp": "2026-06-03T00:00:00Z"
8092 }
8093 }))
8094 .unwrap();
8095
8096 let out = handle_a2ui_action(&req, &state).await;
8097 assert!(out.is_ok(), "handle_a2ui_action failed: {out:?}");
8098
8099 let msg = rx.next().await.expect("subscriber received no frame");
8100 let text = match msg {
8101 Message::Text(t) => t.to_string(),
8102 other => panic!("expected text frame, got {other:?}"),
8103 };
8104 let v: serde_json::Value = serde_json::from_str(&text).unwrap();
8105 assert_eq!(v["method"], "a2ui.event");
8106 assert_eq!(v["params"]["kind"], "a2ui.action");
8107 assert_eq!(
8109 v["params"]["result"]["action"]["name"], "trader:pause",
8110 "ClientAction.name should accept the `action` alias"
8111 );
8112 assert_eq!(v["params"]["result"]["surfaceId"], "surf-1");
8113 }
8114}