1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176 while let Some(msg) = read.next().await {
177 while conn_tasks.try_join_next().is_some() {}
180
181 let msg = match msg {
190 Ok(m) => m,
191 Err(e) => {
192 info!("read error from {}: {}; closing", client_id, e);
193 break;
194 }
195 };
196 if msg.is_text() {
197 let text = match msg.to_text() {
198 Ok(t) => t,
199 Err(e) => {
200 info!("non-text frame from {}: {}; closing", client_id, e);
201 break;
202 }
203 };
204 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205 Ok(m) => m,
206 Err(e) => {
207 send_response(
208 &session.channel,
209 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210 )
211 .await
212 .ok();
213 continue;
214 }
215 };
216
217 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219 if let Some(id_str) = parsed.id.as_str() {
220 let mut pending = session.channel.pending.lock().await;
221 if let Some(tx) = pending.remove(id_str) {
222 let tool_resp = if let Some(result) = parsed.result {
223 ToolExecuteResponse {
224 action_id: id_str.to_string(),
225 output: Some(result),
226 error: None,
227 }
228 } else {
229 let err_msg = parsed
230 .error
231 .as_ref()
232 .and_then(|e| e.get("message"))
233 .and_then(|m| m.as_str())
234 .unwrap_or("unknown error")
235 .to_string();
236 ToolExecuteResponse {
237 action_id: id_str.to_string(),
238 output: None,
239 error: Some(err_msg),
240 }
241 };
242 let _ = tx.send(tool_resp);
243 continue;
244 }
245 }
246 }
247
248 if try_forward_agent_chat_event(&parsed, &state).await {
257 continue;
258 }
259
260 if let Some(method) = &parsed.method {
262 info!(method = %method, "dispatching JSON-RPC method");
263
264 if state.auth_token.get().is_some()
272 && !session
273 .authenticated
274 .load(std::sync::atomic::Ordering::Acquire)
275 && method != "session.auth"
276 {
277 let resp = JsonRpcResponse::error(
278 parsed.id.clone(),
279 -32001,
280 "auth required: send `session.auth` with the per-launch token \
281 from ~/Library/Application Support/ai.parslee.car/auth-token \
282 (macOS), $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux), \
283 or %LOCALAPPDATA%\\ai.parslee.car\\auth-token (Windows) \
284 as the first frame on this connection",
285 );
286 let _ = send_response(&session.channel, resp).await;
287 info!(client = %client_id, method = %method,
288 "rejecting non-auth method on unauthenticated session; closing");
289 break;
290 }
291
292 if state.approval_gate.requires_approval(method.as_str()) {
304 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
305 Ok(()) => {}
306 Err(reason) => {
307 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
308 let _ = send_response(&session.channel, resp).await;
309 info!(
310 client = %client_id,
311 method = %method,
312 reason = %reason,
313 "approval gate blocked dispatch"
314 );
315 continue;
316 }
317 }
318 }
319
320 let session_task = session.clone();
335 let state_task = state.clone();
336 let method_owned = method.clone();
337 let parsed_task = parsed;
338 conn_tasks.spawn(async move {
341 let session = session_task;
342 let state = state_task;
343 let parsed = parsed_task;
344 let result = match method_owned.as_str() {
345 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
346 "parslee.auth" => handle_parslee_auth().await,
347 "auth.start" => handle_auth_start(&parsed).await,
348 "auth.complete" => handle_auth_complete(&parsed).await,
349 "auth.status" => handle_auth_status().await,
350 "auth.logout" => handle_auth_logout().await,
351 "session.init" => handle_session_init(&parsed, &session).await,
352 "host.subscribe" => handle_host_subscribe(&session, &state).await,
353 "host.agents" => handle_host_agents(&session).await,
354 "host.events" => handle_host_events(&parsed, &session).await,
355 "host.approvals" => handle_host_approvals(&session).await,
356 "host.register_agent" => {
357 handle_host_register_agent(&parsed, &session).await
358 }
359 "host.unregister_agent" => {
360 handle_host_unregister_agent(&parsed, &session).await
361 }
362 "host.set_status" => handle_host_set_status(&parsed, &session).await,
363 "host.notify" => handle_host_notify(&parsed, &session).await,
364 "host.request_approval" => {
365 handle_host_request_approval(&parsed, &session).await
366 }
367 "host.resolve_approval" => {
368 handle_host_resolve_approval(&parsed, &session).await
369 }
370 "tools.register" => handle_tools_register(&parsed, &session).await,
371 "proposal.submit" => {
372 handle_proposal_submit(&parsed, &session, &state).await
373 }
374 "policy.register" => handle_policy_register(&parsed, &session).await,
375 "session.policy.open" => handle_session_policy_open(&session).await,
376 "session.policy.close" => {
377 handle_session_policy_close(&parsed, &session).await
378 }
379 "verify" => handle_verify(&parsed, &session).await,
380 "state.get" => handle_state_get(&parsed, &session).await,
381 "state.set" => handle_state_set(&parsed, &session).await,
382 "state.exists" => handle_state_exists(&parsed, &session).await,
383 "state.keys" => handle_state_keys(&parsed, &session).await,
384 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
385 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
386 "memory.query" => handle_memory_query(&parsed, &session).await,
387 "memory.build_context" => {
388 handle_memory_build_context(&parsed, &session).await
389 }
390 "memory.build_context_fast" => {
391 handle_memory_build_context_fast(&parsed, &session).await
392 }
393 "memory.consolidate" => handle_memory_consolidate(&session).await,
394 "memory.fact_count" => handle_memory_fact_count(&session).await,
395 "memory.persist" => handle_memory_persist(&parsed, &session).await,
396 "memory.load" => handle_memory_load(&parsed, &session).await,
397 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
398 "skill.find" => handle_skill_find(&parsed, &session).await,
399 "skill.report" => handle_skill_report(&parsed, &session).await,
400 "skill.repair" => handle_skill_repair(&parsed, &session).await,
401 "skills.ingest_distilled" => {
402 handle_skills_ingest_distilled(&parsed, &session).await
403 }
404 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
405 "skills.domains_needing_evolution" => {
406 handle_skills_domains_needing_evolution(&parsed, &session).await
407 }
408 "skills.ingest_provisional" => {
409 handle_skills_ingest_provisional(&parsed, &session).await
410 }
411 "skills.gate" => handle_skills_gate(&parsed, &session).await,
412 "skill.meta" => handle_skill_meta(&parsed, &session).await,
413 "skill.export" => handle_skill_export(&parsed, &session).await,
414 "skill.import" => handle_skill_import(&parsed, &session).await,
415 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
416 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
417 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
418 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
419 "multi.vote" => handle_multi_vote(&parsed, &session).await,
420 "multi.tournament" => handle_multi_tournament(&parsed, &session).await,
421 "multi.subtask" => handle_multi_subtask(&parsed, &session).await,
422 "scheduler.create" => handle_scheduler_create(&parsed),
423 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
424 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
425 "infer" => handle_infer(&parsed, &state, &session).await,
426 "image.generate" => handle_image_generate(&parsed, &state).await,
427 "video.generate" => handle_video_generate(&parsed, &state).await,
428 "embed" => handle_embed(&parsed, &state).await,
429 "classify" => handle_classify(&parsed, &state).await,
430 "tokenize" => handle_tokenize(&parsed, &state).await,
431 "detokenize" => handle_detokenize(&parsed, &state).await,
432 "rerank" => handle_rerank(&parsed, &state).await,
433 "transcribe" => handle_transcribe(&parsed, &state).await,
434 "synthesize" => handle_synthesize(&parsed, &state).await,
435 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
436 "speech.prepare" => handle_speech_prepare(&state).await,
437 "models.route" => handle_models_route(&parsed, &state).await,
438 "models.stats" => handle_models_stats(&state).await,
439 "outcomes.resolve_pending" => {
440 handle_outcomes_resolve_pending(&parsed, &state).await
441 }
442 "events.count" => handle_events_count(&session).await,
443 "events.stats" => handle_events_stats(&session).await,
444 "events.truncate" => handle_events_truncate(&parsed, &session).await,
445 "events.clear" => handle_events_clear(&session).await,
446 "runs.start" => handle_runs_start(&parsed, &session, &state).await,
453 "runs.complete" => {
454 handle_runs_complete(&parsed, &session, &state).await
455 }
456 "runs.subscribe" => {
461 handle_runs_subscribe(&parsed, &session, &state).await
462 }
463 "runs.unsubscribe" => {
464 handle_runs_unsubscribe(&parsed, &session, &state).await
465 }
466 "runs.list" => handle_runs_list(&parsed, &session, &state).await,
472 "runs.get_trace" => {
473 handle_runs_get_trace(&parsed, &session, &state).await
474 }
475 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
476 "models.list" => handle_models_list(&state),
477 "models.register" => handle_models_register(&parsed, &state).await,
478 "models.unregister" => handle_models_unregister(&parsed, &state).await,
479 "models.list_unified" => handle_models_list_unified(&state),
480 "models.search" => handle_models_search(&parsed, &state),
481 "models.recommend" => handle_models_recommend(&parsed, &state),
482 "models.setup_plan" => handle_models_setup_plan(&parsed, &state),
483 "models.upgrades" => handle_models_upgrades(&state),
484 "models.detect_upgrades" => handle_models_detect_upgrades(&state).await,
485 "models.check_upgrade_nudge" => {
486 handle_models_check_upgrade_nudge(&parsed, &state).await
487 }
488 "models.dismiss_upgrade" => {
489 handle_models_dismiss_upgrade(&parsed, &state)
490 }
491 "models.update_prefs_get" => handle_models_update_prefs_get(&state),
492 "models.update_prefs_set" => {
493 handle_models_update_prefs_set(&parsed, &state)
494 }
495 "models.pull" => handle_models_pull(&parsed, &state).await,
496 "models.install" => handle_models_pull(&parsed, &state).await,
497 "skills.distill" => handle_skills_distill(&parsed, &state).await,
498 "skills.list" => handle_skills_list(&parsed, &session).await,
499 "browser.run" => handle_browser_run(&parsed, &session).await,
500 "browser.close" => handle_browser_close(&session).await,
501 "secret.put" => handle_secret_put(&parsed),
502 "secret.get" => handle_secret_get(&parsed),
503 "secret.delete" => handle_secret_delete(&parsed),
504 "secret.status" => handle_secret_status(&parsed),
505 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
506 "permissions.status" => handle_perm_status(&parsed),
507 "permissions.request" => handle_perm_request(&parsed),
508 "permissions.explain" => handle_perm_explain(&parsed),
509 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
510 "accounts.list" => car_ffi_common::accounts::list(),
511 "accounts.open" => {
512 #[derive(serde::Deserialize, Default)]
513 struct OpenParams {
514 #[serde(default)]
515 account_id: Option<String>,
516 }
517 let p: OpenParams =
518 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
519 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
520 }
521 "calendar.list" => car_ffi_common::integrations::calendar_list(),
522 "calendar.events" => handle_calendar_events(&parsed),
523 "calendar.create_event" => handle_calendar_create_event(&parsed),
524 "calendar.update_event" => handle_calendar_update_event(&parsed),
525 "calendar.delete_event" => handle_calendar_delete_event(&parsed),
526 "contacts.containers" => {
527 car_ffi_common::integrations::contacts_containers()
528 }
529 "contacts.find" => handle_contacts_find(&parsed),
530 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
531 "mail.inbox" => handle_mail_inbox(&parsed),
532 "mail.send" => handle_mail_send(&parsed),
533 "messages.services" => car_ffi_common::integrations::messages_services(),
534 "messages.chats" => handle_messages_chats(&parsed),
535 "messages.send" => handle_messages_send(&parsed),
536 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
537 "notes.find" => handle_notes_find(&parsed),
538 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
539 "reminders.items" => handle_reminders_items(&parsed),
540 "photos.albums" => car_ffi_common::integrations::photos_albums(),
541 "bookmarks.list" => handle_bookmarks_list(&parsed),
542 "files.locations" => car_ffi_common::integrations::files_locations(),
543 "keychain.status" => car_ffi_common::integrations::keychain_status(),
544 "health.status" => car_ffi_common::health::status(),
545 "health.sleep" => handle_health_sleep(&parsed),
546 "health.workouts" => handle_health_workouts(&parsed),
547 "health.activity" => handle_health_activity(&parsed),
548 "voice.transcribe_stream.start" => {
549 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
550 }
551 "voice.transcribe_stream.stop" => {
552 handle_voice_transcribe_stream_stop(&parsed, &state).await
553 }
554 "voice.transcribe_stream.push" => {
555 handle_voice_transcribe_stream_push(&parsed, &state).await
556 }
557 "voice.tts_stream.start" => {
558 handle_voice_tts_stream_start(&parsed, &session).await
559 }
560 "voice.tts_stream.cancel" => handle_voice_tts_stream_cancel(&parsed).await,
561 "voice.tts_stream.list" => Ok(handle_voice_tts_stream_list()),
562 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
563 "voice.dispatch_turn" => {
564 handle_voice_dispatch_turn(&parsed, &state, &session).await
565 }
566 "voice.cancel_turn" => handle_voice_cancel_turn().await,
567 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
568 "inference.register_runner" => {
569 handle_inference_register_runner(&session).await
570 }
571 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
572 "inference.runner.complete" => {
573 handle_inference_runner_complete(&parsed).await
574 }
575 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
576 "voice.providers.list" => {
577 serde_json::from_str::<serde_json::Value>(
581 &car_voice::list_voice_providers_json(),
582 )
583 .map_err(|e| e.to_string())
584 }
585 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
586 .await
587 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
588 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
589 .await
590 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
591 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
592 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
593 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
594 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
595 "workflow.run" => handle_workflow_run(&parsed, &session).await,
596 "workflow.resume" => handle_workflow_resume(&parsed, &session).await,
597 "builder.build" => handle_builder_build(&parsed, &state, &session).await,
598 "workflow.verify" => handle_workflow_verify(&parsed),
599 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
600 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
601 "meeting.list" => handle_meeting_list(&parsed),
602 "meeting.get" => handle_meeting_get(&parsed),
603 "registry.register" => handle_registry_register(&parsed),
604 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
605 "registry.unregister" => handle_registry_unregister(&parsed),
606 "registry.list" => handle_registry_list(&parsed),
607 "registry.reap" => handle_registry_reap(&parsed),
608 "admission.status" => handle_admission_status(&state),
609 "a2a.start" => handle_a2a_start(&parsed, &session).await,
610 "a2a.stop" => handle_a2a_stop(),
611 "a2a.status" => handle_a2a_status(),
612 "a2a.send" => handle_a2a_send(&parsed, &state).await,
613 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
614 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
615 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
616 "a2ui.reap" => handle_a2ui_reap(&state).await,
617 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
618 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
619 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
620 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
621 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
622 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
623 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
624 "automation.run_applescript" => handle_run_applescript(&parsed).await,
625 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
626 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
627 "notifications.local" => handle_local_notification(&parsed).await,
628 "vision.ocr" => handle_vision_ocr(&parsed).await,
629 "agents.list" => handle_agents_list(&state).await,
630 "agents.health" => handle_agents_health(&state).await,
631 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
632 "agents.install" => handle_agents_install(&parsed, &state).await,
633 "agents.remove" => handle_agents_remove(&parsed, &state).await,
634 "agents.start" => handle_agents_start(&parsed, &state).await,
635 "agents.stop" => handle_agents_stop(&parsed, &state).await,
636 "agents.restart" => handle_agents_restart(&parsed, &state).await,
637 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
638 "agents.list_external" => handle_agents_list_external(&parsed).await,
639 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
640 "agents.health_external" => handle_agents_health_external(&parsed).await,
641 "agents.invoke_external" => {
642 handle_agents_invoke_external(&parsed, &state, &session).await
643 }
644 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
645 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
646 "message/send"
653 | "SendMessage"
654 | "message/stream"
655 | "SendStreamingMessage"
656 | "tasks/get"
657 | "GetTask"
658 | "tasks/list"
659 | "ListTasks"
660 | "tasks/cancel"
661 | "CancelTask"
662 | "tasks/resubscribe"
663 | "SubscribeToTask"
664 | "tasks/pushNotificationConfig/set"
665 | "CreateTaskPushNotificationConfig"
666 | "tasks/pushNotificationConfig/get"
667 | "GetTaskPushNotificationConfig"
668 | "tasks/pushNotificationConfig/list"
669 | "ListTaskPushNotificationConfigs"
670 | "tasks/pushNotificationConfig/delete"
671 | "DeleteTaskPushNotificationConfig"
672 | "agent/getAuthenticatedExtendedCard"
673 | "GetExtendedAgentCard" => {
674 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
675 }
676 _ => Err(format!("unknown method: {}", method_owned)),
677 };
678
679 let resp = match result {
680 Ok(value) => JsonRpcResponse::success(parsed.id, value),
681 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
682 };
683 let _ = send_response(&session.channel, resp).await;
684 });
685 }
686 } else if msg.is_binary() {
687 let bytes = msg.into_data();
694 let parsed = match car_ffi_common::voice::binary::parse_frame(&bytes) {
695 Ok(p) => p,
696 Err(e) => {
697 tracing::warn!("binary frame from {} rejected: {}", client_id, e);
698 continue;
699 }
700 };
701 match parsed.frame_type {
702 car_ffi_common::voice::binary::FRAME_TYPE_INBOUND_PCM => {
703 let registry = state.voice_sessions.clone();
704 let payload_owned = parsed.payload.to_vec();
705 let session_id_owned = parsed.session_id_hex.clone();
706 conn_tasks.spawn(async move {
707 if let Err(e) = car_ffi_common::voice::transcribe_stream_push(
708 &session_id_owned,
709 &payload_owned,
710 registry,
711 )
712 .await
713 {
714 tracing::warn!(
715 "binary PCM push to session {} failed: {}",
716 session_id_owned,
717 e
718 );
719 }
720 });
721 }
722 other => {
723 tracing::debug!(
724 "binary frame type {:#04x} from {} not accepted server-side",
725 other,
726 client_id
727 );
728 }
729 }
730 } else if msg.is_close() {
731 info!("Client {} disconnected", client_id);
732 break;
733 }
734 }
735
736 conn_tasks.abort_all();
741
742 session.host.unsubscribe(&client_id).await;
743 session.host.reap_session_approvals(&client_id).await;
749 state.a2ui_subscribers.lock().await.remove(&client_id);
750
751 let _removed = state.remove_session(&client_id).await;
763 {
764 let mut pending = session.channel.pending.lock().await;
765 pending.clear();
766 }
767
768 Ok(())
769}
770
771async fn send_response(
772 channel: &WsChannel,
773 resp: JsonRpcResponse,
774) -> Result<(), Box<dyn std::error::Error>> {
775 use futures::SinkExt;
776 let json = serde_json::to_string(&resp)?;
777 channel
778 .write
779 .lock()
780 .await
781 .send(Message::Text(json.into()))
782 .await?;
783 Ok(())
784}
785
786async fn handle_host_subscribe(
789 session: &crate::session::ClientSession,
790 state: &Arc<ServerState>,
791) -> Result<Value, String> {
792 session
793 .host
794 .subscribe(&session.client_id, session.channel.clone())
795 .await;
796 serde_json::to_value(HostSnapshot {
797 subscribed: true,
798 agents: session.host.agents().await,
799 approvals: session.host.approvals().await,
800 events: session.host.events(50).await,
801 identity: Some(daemon_identity(state)),
802 })
803 .map_err(|e| e.to_string())
804}
805
806fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
814 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
821 (
822 Some(p.to_string_lossy().into_owned()),
823 car_proto::HostManifestRole::Observer,
824 )
825 } else if let Some(s) = state.supervisor_if_installed() {
826 (
827 Some(s.manifest_path().to_string_lossy().into_owned()),
828 car_proto::HostManifestRole::Owner,
829 )
830 } else {
831 (None, car_proto::HostManifestRole::None)
832 };
833 car_proto::HostIdentity {
834 version: env!("CARGO_PKG_VERSION").to_string(),
835 pid: std::process::id(),
836 manifest_path,
837 manifest_role,
838 parslee: state
839 .parslee_session
840 .get()
841 .map(|session| session.identity.clone()),
842 }
843}
844
845async fn handle_parslee_auth() -> Result<Value, String> {
856 let session = crate::parslee_auth::load_or_refresh()
857 .await?
858 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
859 Ok(serde_json::json!({
860 "authenticated": true,
861 "token_type": "Bearer",
862 "access_token": session.access_token,
863 "authorization_header": format!("Bearer {}", session.access_token),
864 "identity": session.identity,
865 }))
866}
867
868async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
874 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
875 let client_id = req
876 .params
877 .get("client_id")
878 .and_then(|v| v.as_str())
879 .unwrap_or("parslee-car");
880 let redirect_uri = req
881 .params
882 .get("redirect_uri")
883 .and_then(|v| v.as_str())
884 .ok_or_else(|| "redirect_uri is required".to_string())?;
885 let provider = req.params.get("provider").and_then(|v| v.as_str());
886 let state = car_auth::new_state();
887 let verifier = car_auth::pkce_verifier();
888 let challenge = car_auth::pkce_challenge(&verifier);
889 let url =
890 car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
891 Ok(serde_json::json!({
892 "authorize_url": url,
893 "state": state,
894 "verifier": verifier,
895 }))
896}
897
898async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
899 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
900 let client_id = req
901 .params
902 .get("client_id")
903 .and_then(|v| v.as_str())
904 .unwrap_or("parslee-car");
905 let redirect_uri = req
906 .params
907 .get("redirect_uri")
908 .and_then(|v| v.as_str())
909 .ok_or_else(|| "redirect_uri is required".to_string())?;
910 let code = req
911 .params
912 .get("code")
913 .and_then(|v| v.as_str())
914 .ok_or_else(|| "code is required".to_string())?;
915 let verifier = req
916 .params
917 .get("verifier")
918 .and_then(|v| v.as_str())
919 .ok_or_else(|| "verifier is required".to_string())?;
920 let token =
921 car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
922 car_auth::store_tokens(&api_base, &token)?;
923 Ok(serde_json::json!({ "ok": true }))
924}
925
926async fn handle_auth_status() -> Result<Value, String> {
927 match car_auth::fetch_status(None).await? {
928 Some(session_json) => {
929 let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
930 Ok(serde_json::json!({ "authenticated": true, "session": session }))
931 }
932 None => Ok(serde_json::json!({ "authenticated": false })),
933 }
934}
935
936async fn handle_auth_logout() -> Result<Value, String> {
937 car_auth::clear_tokens()?;
938 Ok(serde_json::json!({ "ok": true }))
939}
940
941async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
942 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
943}
944
945async fn handle_host_events(
946 req: &JsonRpcMessage,
947 session: &crate::session::ClientSession,
948) -> Result<Value, String> {
949 let limit = req
950 .params
951 .get("limit")
952 .and_then(|v| v.as_u64())
953 .unwrap_or(100) as usize;
954 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
955}
956
957async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
958 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
959}
960
961async fn handle_a2ui_apply(
962 req: &JsonRpcMessage,
963 state: &Arc<ServerState>,
964) -> Result<Value, String> {
965 #[derive(Deserialize)]
966 struct Params {
967 #[serde(default)]
968 envelope: Option<car_a2ui::A2uiEnvelope>,
969 #[serde(default)]
970 message: Option<car_a2ui::A2uiEnvelope>,
971 }
972
973 let envelope = if req.params.get("createSurface").is_some()
974 || req.params.get("updateComponents").is_some()
975 || req.params.get("updateDataModel").is_some()
976 || req.params.get("deleteSurface").is_some()
977 {
978 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
979 .map_err(|e| e.to_string())?
980 } else {
981 match serde_json::from_value::<Params>(req.params.clone()) {
982 Ok(params) => params
983 .envelope
984 .or(params.message)
985 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
986 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
987 .map_err(|e| e.to_string())?,
988 }
989 };
990
991 apply_a2ui_envelope(state, envelope, None, None).await
992}
993
994async fn handle_a2ui_ingest(
995 req: &JsonRpcMessage,
996 state: &Arc<ServerState>,
997) -> Result<Value, String> {
998 #[derive(Deserialize)]
999 #[serde(rename_all = "camelCase")]
1000 struct Params {
1001 #[serde(default)]
1002 endpoint: Option<String>,
1003 #[serde(default)]
1004 a2a_endpoint: Option<String>,
1005 #[serde(default)]
1006 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1007 #[serde(default)]
1008 route_auth: Option<A2aRouteAuth>,
1009 #[serde(default)]
1010 allow_untrusted_endpoint: bool,
1011 }
1012
1013 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
1014 endpoint: None,
1015 a2a_endpoint: None,
1016 owner: None,
1017 route_auth: None,
1018 allow_untrusted_endpoint: false,
1019 });
1020 let payload = req.params.get("payload").unwrap_or(&req.params);
1021 state
1022 .a2ui
1023 .validate_payload(payload)
1024 .map_err(|e| e.to_string())?;
1025 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
1026 if envelopes.is_empty() {
1027 return Err("no A2UI envelopes found in payload".into());
1028 }
1029 let endpoint = params.endpoint.or(params.a2a_endpoint);
1030 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
1031 let owner = params
1032 .owner
1033 .or_else(|| car_a2ui::owner_from_value(payload))
1034 .map(|owner| match endpoint.clone() {
1035 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
1036 None => owner,
1037 });
1038
1039 let mut results = Vec::new();
1040 for envelope in envelopes {
1041 let value =
1042 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
1043 results.push(value);
1044 }
1045 Ok(serde_json::json!({ "applied": results }))
1046}
1047
1048async fn apply_a2ui_envelope(
1049 state: &Arc<ServerState>,
1050 envelope: car_a2ui::A2uiEnvelope,
1051 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1052 route_auth: Option<A2aRouteAuth>,
1053) -> Result<Value, String> {
1054 let result = state
1055 .a2ui
1056 .apply_with_owner(envelope, owner)
1057 .await
1058 .map_err(|e| e.to_string())?;
1059 update_a2ui_route_auth(state, &result, route_auth).await;
1060 let kind = if result.deleted {
1061 "a2ui.surface_deleted"
1062 } else {
1063 "a2ui.surface_updated"
1064 };
1065 let message = if result.deleted {
1066 format!("A2UI surface {} deleted", result.surface_id)
1067 } else {
1068 format!("A2UI surface {} updated", result.surface_id)
1069 };
1070 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
1071 state
1072 .host
1073 .record_event(kind, None, message, payload.clone())
1074 .await;
1075 broadcast_a2ui_event(state, kind, &payload).await;
1079 serde_json::to_value(result).map_err(|e| e.to_string())
1080}
1081
1082async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
1083 use futures::SinkExt;
1084 use tokio_tungstenite::tungstenite::Message;
1085 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
1086 .a2ui_subscribers
1087 .lock()
1088 .await
1089 .values()
1090 .cloned()
1091 .collect();
1092 if subscribers.is_empty() {
1093 return;
1094 }
1095 let Ok(json) = serde_json::to_string(&serde_json::json!({
1096 "jsonrpc": "2.0",
1097 "method": "a2ui.event",
1098 "params": {
1099 "kind": kind,
1100 "result": result,
1101 },
1102 })) else {
1103 return;
1104 };
1105 for channel in subscribers {
1106 let _ = channel
1107 .write
1108 .lock()
1109 .await
1110 .send(Message::Text(json.clone().into()))
1111 .await;
1112 }
1113}
1114
1115async fn update_a2ui_route_auth(
1116 state: &Arc<ServerState>,
1117 result: &car_a2ui::A2uiApplyResult,
1118 route_auth: Option<A2aRouteAuth>,
1119) {
1120 let mut auth = state.a2ui_route_auth.lock().await;
1121 if result.deleted {
1122 auth.remove(&result.surface_id);
1123 return;
1124 }
1125
1126 let has_route_endpoint = result
1127 .surface
1128 .as_ref()
1129 .and_then(|surface| surface.owner.as_ref())
1130 .and_then(|owner| owner.endpoint.as_ref())
1131 .is_some();
1132 match (has_route_endpoint, route_auth) {
1133 (true, Some(route_auth)) => {
1134 auth.insert(result.surface_id.clone(), route_auth);
1135 }
1136 _ => {
1137 auth.remove(&result.surface_id);
1138 }
1139 }
1140}
1141
1142fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1143 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1144}
1145
1146async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1147 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1148 if !removed.is_empty() {
1149 let mut auth = state.a2ui_route_auth.lock().await;
1150 for surface_id in &removed {
1151 auth.remove(surface_id);
1152 }
1153 }
1154 Ok(serde_json::json!({ "removed": removed }))
1155}
1156
1157async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1158 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1159}
1160
1161async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1162 let surface_id = req
1163 .params
1164 .get("surface_id")
1165 .or_else(|| req.params.get("surfaceId"))
1166 .and_then(Value::as_str)
1167 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1168 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1169}
1170
1171async fn handle_a2ui_subscribe(
1177 session: &crate::session::ClientSession,
1178 state: &Arc<ServerState>,
1179) -> Result<Value, String> {
1180 state
1181 .a2ui_subscribers
1182 .lock()
1183 .await
1184 .insert(session.client_id.clone(), session.channel.clone());
1185 Ok(serde_json::json!({ "subscribed": true }))
1186}
1187
1188async fn handle_a2ui_unsubscribe(
1192 session: &crate::session::ClientSession,
1193 state: &Arc<ServerState>,
1194) -> Result<Value, String> {
1195 state
1196 .a2ui_subscribers
1197 .lock()
1198 .await
1199 .remove(&session.client_id);
1200 Ok(serde_json::json!({ "subscribed": false }))
1201}
1202
1203async fn handle_a2ui_replay(
1210 req: &JsonRpcMessage,
1211 state: &Arc<ServerState>,
1212) -> Result<Value, String> {
1213 let surface_id = req
1214 .params
1215 .get("surface_id")
1216 .or_else(|| req.params.get("surfaceId"))
1217 .and_then(Value::as_str)
1218 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1219 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1220}
1221
1222async fn handle_a2ui_action(
1223 req: &JsonRpcMessage,
1224 state: &Arc<ServerState>,
1225) -> Result<Value, String> {
1226 let action: car_a2ui::ClientAction =
1227 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1228 let owner = state.a2ui.owner(&action.surface_id).await;
1229
1230 let action_result = serde_json::json!({
1240 "surfaceId": action.surface_id,
1241 "action": action,
1242 "owner": owner,
1243 });
1244 broadcast_a2ui_event(state, "a2ui.action", &action_result).await;
1245
1246 let route = route_a2ui_action(state, &action, owner.clone()).await;
1247 let payload = serde_json::json!({
1248 "action": action,
1249 "owner": owner,
1250 "route": route,
1251 });
1252 let event = state
1253 .host
1254 .record_event(
1255 "a2ui.action",
1256 None,
1257 format!(
1258 "A2UI action {} from {}",
1259 action.name, action.source_component_id
1260 ),
1261 payload,
1262 )
1263 .await;
1264 Ok(serde_json::json!({
1265 "event": event,
1266 "route": route,
1267 }))
1268}
1269
1270async fn handle_a2ui_render_report(
1277 req: &JsonRpcMessage,
1278 state: &Arc<ServerState>,
1279) -> Result<Value, String> {
1280 let report: car_a2ui::RenderReport =
1284 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1285 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1286 let kind = "a2ui.render_report";
1287 let message = format!("A2UI render report for surface {}", report.surface_id);
1288 let event = state
1289 .host
1290 .record_event(kind, None, message, payload.clone())
1291 .await;
1292 broadcast_a2ui_event(state, kind, &payload).await;
1293
1294 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1302 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1308 tracing::warn!(
1309 surface_id = %report.surface_id,
1310 count = state.ui_agent_budget.count(&report.surface_id),
1311 max = state.ui_agent_budget.max(),
1312 "ui-agent iteration budget exhausted; skipping agent invocation"
1313 );
1314 return Ok(serde_json::json!({ "event": event }));
1315 }
1316 match state.ui_agent.on_render_report(&report, &surface) {
1320 car_ui_agent::Decision::Patch {
1321 envelope,
1322 strategy_id,
1323 patch_hash,
1324 elapsed_ns,
1325 } => {
1326 if !state
1334 .ui_agent_oscillation
1335 .check_and_record(&report.surface_id, patch_hash)
1336 {
1337 tracing::warn!(
1338 surface_id = %report.surface_id,
1339 strategy = %strategy_id,
1340 patch_hash,
1341 "ui-agent oscillation detected; suppressing patch"
1342 );
1343 state.ui_agent_budget.refund(&report.surface_id);
1346 return Ok(serde_json::json!({ "event": event }));
1347 }
1348 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1349 patch_components: Some(envelope),
1350 ..Default::default()
1351 };
1352 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1353 tracing::warn!(
1354 surface_id = %report.surface_id,
1355 strategy = %strategy_id,
1356 patch_hash,
1357 elapsed_ns,
1358 error = %e,
1359 "ui-agent patch apply failed",
1360 );
1361 state.ui_agent_budget.refund(&report.surface_id);
1363 } else {
1364 tracing::debug!(
1365 surface_id = %report.surface_id,
1366 strategy = %strategy_id,
1367 patch_hash,
1368 elapsed_ns,
1369 iteration = state.ui_agent_budget.count(&report.surface_id),
1370 "ui-agent patch applied",
1371 );
1372 if let Some(memgine) = state.shared_memgine.clone() {
1382 let speaker = format!("ui-agent/{}", report.surface_id);
1383 let text = format!("strategy applied: {}", strategy_id);
1384 tokio::spawn(async move {
1385 let mut guard = memgine.lock().await;
1386 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1387 });
1388 }
1389 }
1390 }
1391 car_ui_agent::Decision::StableNoChange => {
1392 state.ui_agent_budget.refund(&report.surface_id);
1394 }
1395 car_ui_agent::Decision::HardStop { reason } => {
1396 state.ui_agent_budget.refund(&report.surface_id);
1397 tracing::error!(
1403 surface_id = %report.surface_id,
1404 reason = %reason,
1405 "ui-agent hard-stopped improvement loop",
1406 );
1407 }
1408 }
1409 } else {
1410 tracing::debug!(
1411 surface_id = %report.surface_id,
1412 "ui-agent skipped — surface not found in store",
1413 );
1414 }
1415
1416 Ok(serde_json::json!({ "event": event }))
1417}
1418
1419async fn route_a2ui_action(
1420 state: &Arc<ServerState>,
1421 action: &car_a2ui::ClientAction,
1422 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1423) -> Value {
1424 let Some(owner) = owner else {
1425 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1426 };
1427 if owner.kind != "a2a" {
1428 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1429 }
1430 let Some(endpoint) = owner.endpoint.clone() else {
1431 return serde_json::json!({
1432 "delivered": false,
1433 "reason": "surface owner has no endpoint",
1434 "owner": owner
1435 });
1436 };
1437
1438 let message = car_a2a::Message {
1439 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1440 role: car_a2a::MessageRole::User,
1441 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1442 data: serde_json::json!({
1443 "a2uiAction": action,
1444 }),
1445 metadata: Default::default(),
1446 })],
1447 task_id: owner.task_id.clone(),
1448 context_id: owner.context_id.clone(),
1449 metadata: Default::default(),
1450 };
1451
1452 let auth = state
1453 .a2ui_route_auth
1454 .lock()
1455 .await
1456 .get(&action.surface_id)
1457 .cloned()
1458 .map(client_auth_from_route_auth)
1459 .unwrap_or(car_a2a::ClientAuth::None);
1460
1461 match car_a2a::A2aClient::new(endpoint.clone())
1462 .with_auth(auth)
1463 .send_message(message, false)
1464 .await
1465 {
1466 Ok(result) => serde_json::json!({
1467 "delivered": true,
1468 "owner": owner,
1469 "endpoint": endpoint,
1470 "result": result,
1471 }),
1472 Err(error) => serde_json::json!({
1473 "delivered": false,
1474 "owner": owner,
1475 "endpoint": endpoint,
1476 "error": error.to_string(),
1477 }),
1478 }
1479}
1480
1481fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1482 match auth {
1483 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1484 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1485 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1486 }
1487}
1488
1489fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1490 let endpoint = endpoint?;
1491 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1492 Some(endpoint)
1493 } else {
1494 None
1495 }
1496}
1497
1498fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1499 endpoint == "http://localhost"
1500 || endpoint.starts_with("http://localhost:")
1501 || endpoint.starts_with("http://localhost/")
1502 || endpoint == "http://127.0.0.1"
1503 || endpoint.starts_with("http://127.0.0.1:")
1504 || endpoint.starts_with("http://127.0.0.1/")
1505 || endpoint == "http://[::1]"
1506 || endpoint.starts_with("http://[::1]:")
1507 || endpoint.starts_with("http://[::1]/")
1508}
1509
1510async fn handle_host_register_agent(
1511 req: &JsonRpcMessage,
1512 session: &crate::session::ClientSession,
1513) -> Result<Value, String> {
1514 let request: RegisterHostAgentRequest =
1515 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1516 serde_json::to_value(
1517 session
1518 .host
1519 .register_agent(&session.client_id, request)
1520 .await?,
1521 )
1522 .map_err(|e| e.to_string())
1523}
1524
1525async fn handle_host_unregister_agent(
1526 req: &JsonRpcMessage,
1527 session: &crate::session::ClientSession,
1528) -> Result<Value, String> {
1529 let agent_id = req
1530 .params
1531 .get("agent_id")
1532 .and_then(|v| v.as_str())
1533 .ok_or("missing agent_id")?;
1534 session
1535 .host
1536 .unregister_agent(&session.client_id, agent_id)
1537 .await?;
1538 Ok(serde_json::json!({"ok": true}))
1539}
1540
1541async fn handle_host_set_status(
1542 req: &JsonRpcMessage,
1543 session: &crate::session::ClientSession,
1544) -> Result<Value, String> {
1545 let request: SetHostAgentStatusRequest =
1546 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1547 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1548 .map_err(|e| e.to_string())
1549}
1550
1551async fn handle_host_notify(
1552 req: &JsonRpcMessage,
1553 session: &crate::session::ClientSession,
1554) -> Result<Value, String> {
1555 let kind = req
1556 .params
1557 .get("kind")
1558 .and_then(|v| v.as_str())
1559 .unwrap_or("host.notification");
1560 let agent_id = req
1561 .params
1562 .get("agent_id")
1563 .and_then(|v| v.as_str())
1564 .map(str::to_string);
1565 let message = req
1566 .params
1567 .get("message")
1568 .and_then(|v| v.as_str())
1569 .unwrap_or("");
1570 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1571 serde_json::to_value(
1572 session
1573 .host
1574 .record_event(kind, agent_id, message, payload)
1575 .await,
1576 )
1577 .map_err(|e| e.to_string())
1578}
1579
1580async fn handle_host_request_approval(
1581 req: &JsonRpcMessage,
1582 session: &crate::session::ClientSession,
1583) -> Result<Value, String> {
1584 let request: CreateHostApprovalRequest =
1585 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1586 if let Some(agent_id) = &request.agent_id {
1587 let _ = session
1592 .host
1593 .set_status(
1594 &session.client_id,
1595 SetHostAgentStatusRequest {
1596 agent_id: agent_id.clone(),
1597 status: HostAgentStatus::WaitingForApproval,
1598 current_task: None,
1599 message: Some("Waiting for approval".to_string()),
1600 payload: Value::Null,
1601 },
1602 )
1603 .await;
1604 }
1605 let owner_client_id = if request.system_level {
1612 None
1613 } else {
1614 Some(session.client_id.as_str())
1615 };
1616 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1617 .map_err(|e| e.to_string())
1618}
1619
1620async fn handle_host_resolve_approval(
1621 req: &JsonRpcMessage,
1622 session: &crate::session::ClientSession,
1623) -> Result<Value, String> {
1624 let request: ResolveHostApprovalRequest =
1625 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1626 serde_json::to_value(
1627 session
1628 .host
1629 .resolve_approval(&session.client_id, request)
1630 .await?,
1631 )
1632 .map_err(|e| e.to_string())
1633}
1634
1635async fn handle_session_auth(
1646 req: &JsonRpcMessage,
1647 session: &crate::session::ClientSession,
1648 state: &Arc<ServerState>,
1649) -> Result<Value, String> {
1650 if let Some(host_supplied) = req.params.get("host_token").and_then(Value::as_str) {
1660 let expected = state.host_token.get().ok_or_else(|| {
1661 "host auth unavailable: this daemon has no host token (started with --no-auth?)"
1662 .to_string()
1663 })?;
1664 if !constant_time_eq(host_supplied.as_bytes(), expected.as_bytes()) {
1665 return Err("auth failed: host token mismatch".to_string());
1666 }
1667 session
1668 .authenticated
1669 .store(true, std::sync::atomic::Ordering::Release);
1670 session
1671 .is_host
1672 .store(true, std::sync::atomic::Ordering::Release);
1673 return Ok(serde_json::json!({
1674 "ok": true,
1675 "auth_enabled": true,
1676 "role": "host",
1677 }));
1678 }
1679
1680 let supplied = req
1681 .params
1682 .get("token")
1683 .and_then(Value::as_str)
1684 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1685 let agent_id = req
1692 .params
1693 .get("agent_id")
1694 .and_then(Value::as_str)
1695 .map(str::to_string);
1696
1697 if let Some(id) = agent_id {
1698 let supervisor = state.supervisor()?;
1699 if !supervisor.validate_agent_token(&id, supplied).await {
1700 return Err(format!(
1701 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1702 ));
1703 }
1704 {
1708 let mut attached = state.attached_agents.lock().await;
1709 if let Some(prior) = attached.get(&id) {
1710 if prior != &session.client_id {
1711 return Err(format!(
1712 "auth failed: agent_id `{id}` is already attached on \
1713 another connection (client_id={prior})"
1714 ));
1715 }
1716 }
1717 attached.insert(id.clone(), session.client_id.clone());
1718 }
1719 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1724 *session.bound_memgine.lock().await = Some(agent_eng);
1725 *session.agent_id.lock().await = Some(id.clone());
1726 session
1727 .authenticated
1728 .store(true, std::sync::atomic::Ordering::Release);
1729 return Ok(serde_json::json!({
1730 "ok": true,
1731 "auth_enabled": true,
1732 "agent_id": id,
1733 }));
1734 }
1735
1736 let expected = match state.auth_token.get() {
1737 Some(t) => t,
1738 None => {
1739 session
1745 .authenticated
1746 .store(true, std::sync::atomic::Ordering::Release);
1747 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1748 }
1749 };
1750 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1751 return Err("auth failed: token mismatch".to_string());
1752 }
1753 session
1754 .authenticated
1755 .store(true, std::sync::atomic::Ordering::Release);
1756 Ok(serde_json::json!({
1757 "ok": true,
1758 "auth_enabled": true,
1759 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1760 }))
1761}
1762
1763fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1767 if a.len() != b.len() {
1768 return false;
1769 }
1770 let mut diff: u8 = 0;
1771 for (x, y) in a.iter().zip(b.iter()) {
1772 diff |= x ^ y;
1773 }
1774 diff == 0
1775}
1776
1777async fn gate_high_risk_method(
1787 method: &str,
1788 params: &Value,
1789 state: &Arc<ServerState>,
1790) -> Result<(), String> {
1791 let timeout = state.approval_gate.timeout;
1792 let req = CreateHostApprovalRequest {
1793 agent_id: None,
1794 action: format!("ws.method:{method}"),
1795 details: serde_json::json!({
1796 "method": method,
1797 "params_preview": preview_params(params, 2_000),
1801 }),
1802 options: vec!["approve".to_string(), "deny".to_string()],
1803 system_level: true,
1807 };
1808 match state
1809 .host
1810 .request_and_wait_approval(req, "approve", timeout)
1811 .await
1812 {
1813 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1814 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1815 "{method} denied by user (approval gate, audit 2026-05). \
1816 To call this method without an interactive prompt, start \
1817 car-server with --no-approvals on a trusted machine."
1818 )),
1819 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1820 "{method} approval timed out after {}s with no resolution. \
1821 The approval is still visible in `host.approvals` for \
1822 forensics; resubmit the request to retry.",
1823 timeout.as_secs()
1824 )),
1825 Err(e) => Err(format!("approval gate error: {e}")),
1826 }
1827}
1828
1829fn preview_params(value: &Value, max_chars: usize) -> Value {
1830 let s = value.to_string();
1831 if s.len() <= max_chars {
1832 value.clone()
1833 } else {
1834 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1835 }
1836}
1837
1838async fn handle_session_init(
1839 req: &JsonRpcMessage,
1840 session: &crate::session::ClientSession,
1841) -> Result<Value, String> {
1842 let init: SessionInitRequest =
1843 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1844
1845 for tool in &init.tools {
1846 register_from_definition(&session.runtime, tool).await;
1847 }
1848
1849 let mut policy_count = 0;
1850 {
1851 let mut policies = session.runtime.policies.write().await;
1852 for policy_def in &init.policies {
1853 if let Some(check) = build_policy_check(policy_def) {
1854 policies.register(&policy_def.name, check, "");
1855 policy_count += 1;
1856 }
1857 }
1858 }
1859
1860 serde_json::to_value(SessionInitResponse {
1861 session_id: session.client_id.clone(),
1862 tools_registered: init.tools.len(),
1863 policies_registered: policy_count,
1864 })
1865 .map_err(|e| e.to_string())
1866}
1867
1868fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1869 match def.rule.as_str() {
1870 "deny_tool" => {
1871 let target = def.target.clone();
1872 Some(Box::new(
1873 move |action: &car_ir::Action, _: &car_state::StateStore| {
1874 if action.tool.as_deref() == Some(&target) {
1875 Some(format!("tool '{}' denied", target))
1876 } else {
1877 None
1878 }
1879 },
1880 ))
1881 }
1882 "require_state" => {
1883 let key = def.key.clone();
1884 let value = def.value.clone();
1885 Some(Box::new(
1886 move |_: &car_ir::Action, state: &car_state::StateStore| {
1887 if state.get(&key).as_ref() != Some(&value) {
1888 Some(format!("state['{}'] must be {:?}", key, value))
1889 } else {
1890 None
1891 }
1892 },
1893 ))
1894 }
1895 "deny_tool_param" => {
1896 let target = def.target.clone();
1897 let param = def.key.clone();
1898 let pattern = def.pattern.clone();
1899 Some(Box::new(
1900 move |action: &car_ir::Action, _: &car_state::StateStore| {
1901 if action.tool.as_deref() != Some(&target) {
1902 return None;
1903 }
1904 if let Some(val) = action.parameters.get(¶m) {
1905 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1906 if s.contains(&pattern) {
1907 return Some(format!("param '{}' matches '{}'", param, pattern));
1908 }
1909 }
1910 None
1911 },
1912 ))
1913 }
1914 _ => None,
1915 }
1916}
1917
1918async fn handle_tools_register(
1919 req: &JsonRpcMessage,
1920 session: &crate::session::ClientSession,
1921) -> Result<Value, String> {
1922 let tools: Vec<ToolDefinition> =
1923 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1924 for tool in &tools {
1925 register_from_definition(&session.runtime, tool).await;
1926 }
1927 Ok(Value::from(tools.len()))
1928}
1929
1930async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1937 runtime
1938 .register_tool_schema(car_ir::ToolSchema {
1939 name: def.name.clone(),
1940 description: def.description.clone(),
1941 parameters: def.parameters.clone(),
1942 returns: def.returns.clone(),
1943 idempotent: def.idempotent,
1944 cache_ttl_secs: def.cache_ttl_secs,
1945 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1946 max_calls: rl.max_calls,
1947 interval_secs: rl.interval_secs,
1948 }),
1949 })
1950 .await;
1951}
1952
1953async fn handle_proposal_submit(
1954 req: &JsonRpcMessage,
1955 session: &crate::session::ClientSession,
1956 state: &Arc<ServerState>,
1957) -> Result<Value, String> {
1958 let submit: ProposalSubmitRequest =
1959 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1960 let session_id = req
1966 .params
1967 .get("session_id")
1968 .and_then(|v| v.as_str())
1969 .map(str::to_string);
1970
1971 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1980 Some(v) if !v.is_null() => {
1981 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1982 }
1983 _ => None,
1984 };
1985
1986 let result = match (session_id, scope) {
1987 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1992 (Some(sid), None) => {
1993 session
1994 .runtime
1995 .execute_with_session(&submit.proposal, &sid)
1996 .await
1997 }
1998 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1999 (None, None) => session.runtime.execute(&submit.proposal).await,
2000 };
2001
2002 let current_run = session.current_run_id.lock().await.clone();
2014 if let Some(run_id) = current_run {
2015 let start_index = state.run_turn_count(&run_id).await;
2016 let records =
2017 crate::run_trace::record_turns(&submit.proposal, &result.results, start_index);
2018 if !records.is_empty() {
2019 state.record_run_turns(&run_id, records).await;
2020 }
2021 }
2022
2023 serde_json::to_value(result).map_err(|e| e.to_string())
2024}
2025
2026async fn handle_session_policy_open(
2027 session: &crate::session::ClientSession,
2028) -> Result<Value, String> {
2029 let id = session.runtime.open_session().await;
2030 Ok(serde_json::json!({ "session_id": id }))
2031}
2032
2033async fn handle_session_policy_close(
2034 req: &JsonRpcMessage,
2035 session: &crate::session::ClientSession,
2036) -> Result<Value, String> {
2037 let sid = req
2038 .params
2039 .get("session_id")
2040 .and_then(|v| v.as_str())
2041 .ok_or("missing 'session_id'")?;
2042 let closed = session.runtime.close_session(sid).await;
2043 Ok(serde_json::json!({ "closed": closed }))
2044}
2045
2046async fn handle_policy_register(
2052 req: &JsonRpcMessage,
2053 session: &crate::session::ClientSession,
2054) -> Result<Value, String> {
2055 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
2056 .map_err(|e| format!("invalid policy params: {e}"))?;
2057 let session_id = req
2058 .params
2059 .get("session_id")
2060 .and_then(|v| v.as_str())
2061 .map(str::to_string);
2062 let check = build_policy_check(&def)
2063 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
2064 match session_id {
2065 Some(sid) => session
2066 .runtime
2067 .register_policy_in_session(&sid, &def.name, check, "")
2068 .await
2069 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
2070 None => {
2071 let mut policies = session.runtime.policies.write().await;
2072 policies.register(&def.name, check, "");
2073 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
2074 }
2075 }
2076}
2077
2078async fn handle_verify(
2079 req: &JsonRpcMessage,
2080 session: &crate::session::ClientSession,
2081) -> Result<Value, String> {
2082 let vr: VerifyRequest =
2083 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2084 let tools_guard = session.runtime.tools.read().await;
2090 let result =
2091 car_verify::verify_with_schemas(&vr.proposal, Some(&vr.initial_state), Some(&tools_guard), 30);
2092 drop(tools_guard);
2093 serde_json::to_value(VerifyResponse {
2094 valid: result.valid,
2095 issues: result
2096 .issues
2097 .iter()
2098 .map(|i| VerifyIssueProto {
2099 action_id: i.action_id.clone(),
2100 severity: i.severity.clone(),
2101 message: i.message.clone(),
2102 })
2103 .collect(),
2104 simulated_state: result.simulated_state,
2105 })
2106 .map_err(|e| e.to_string())
2107}
2108
2109fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
2116 req.params
2117 .get("tenant_id")
2118 .and_then(|v| v.as_str())
2119 .filter(|s| !s.is_empty())
2120 .map(str::to_string)
2121}
2122
2123async fn handle_state_get(
2124 req: &JsonRpcMessage,
2125 session: &crate::session::ClientSession,
2126) -> Result<Value, String> {
2127 let key = req
2128 .params
2129 .get("key")
2130 .and_then(|v| v.as_str())
2131 .ok_or("missing 'key'")?;
2132 let tenant = tenant_from_params(req);
2133 Ok(session
2134 .runtime
2135 .state
2136 .scoped(tenant.as_deref())
2137 .get(key)
2138 .unwrap_or(Value::Null))
2139}
2140
2141async fn handle_state_set(
2142 req: &JsonRpcMessage,
2143 session: &crate::session::ClientSession,
2144) -> Result<Value, String> {
2145 let key = req
2146 .params
2147 .get("key")
2148 .and_then(|v| v.as_str())
2149 .ok_or("missing 'key'")?;
2150 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
2151 let tenant = tenant_from_params(req);
2152 session
2153 .runtime
2154 .state
2155 .scoped(tenant.as_deref())
2156 .set(key, value, "client");
2157 Ok(Value::from("ok"))
2158}
2159
2160async fn handle_state_exists(
2164 req: &JsonRpcMessage,
2165 session: &crate::session::ClientSession,
2166) -> Result<Value, String> {
2167 let key = req
2168 .params
2169 .get("key")
2170 .and_then(|v| v.as_str())
2171 .ok_or("missing 'key'")?;
2172 let tenant = tenant_from_params(req);
2173 Ok(Value::Bool(
2174 session.runtime.state.scoped(tenant.as_deref()).exists(key),
2175 ))
2176}
2177
2178async fn handle_state_keys(
2181 req: &JsonRpcMessage,
2182 session: &crate::session::ClientSession,
2183) -> Result<Value, String> {
2184 let tenant = tenant_from_params(req);
2185 Ok(Value::Array(
2186 session
2187 .runtime
2188 .state
2189 .scoped(tenant.as_deref())
2190 .keys()
2191 .into_iter()
2192 .map(Value::String)
2193 .collect(),
2194 ))
2195}
2196
2197async fn handle_state_snapshot(
2208 req: &JsonRpcMessage,
2209 session: &crate::session::ClientSession,
2210) -> Result<Value, String> {
2211 let tenant = tenant_from_params(req);
2212 let view = session.runtime.state.scoped(tenant.as_deref());
2213 let mut map = serde_json::Map::new();
2214 for key in view.keys() {
2215 if let Some(value) = view.get(&key) {
2216 map.insert(key, value);
2217 }
2218 }
2219 Ok(Value::Object(map))
2220}
2221
2222fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2228 let base = car_ffi_common::memory_path::ensure_base()
2229 .map_err(|e| format!("memory base unavailable: {e}"))?;
2230 let dir = base.join("agents");
2231 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2232 Ok(dir.join(format!("{agent_id}.json")))
2233}
2234
2235async fn get_or_load_agent_memgine(
2242 state: &Arc<ServerState>,
2243 agent_id: &str,
2244) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2245 {
2246 let map = state.agent_memgines.lock().await;
2247 if let Some(eng) = map.get(agent_id) {
2248 return Ok(eng.clone());
2249 }
2250 }
2251 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2253 None,
2254 )));
2255 let path = agent_memgine_snapshot_path(agent_id)?;
2256 if path.exists() {
2257 let content = std::fs::read_to_string(&path)
2258 .map_err(|e| format!("read {}: {}", path.display(), e))?;
2259 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2260 let mut g = engine.lock().await;
2261 let mut loaded: u32 = 0;
2262 for fact in &facts {
2263 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2264 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2265 let kind = fact
2266 .get("kind")
2267 .and_then(|v| v.as_str())
2268 .unwrap_or("pattern");
2269 let fid = format!("loaded-{loaded}");
2270 g.ingest_fact(
2271 &fid,
2272 subject,
2273 body,
2274 "user",
2275 "peer",
2276 chrono::Utc::now(),
2277 "global",
2278 None,
2279 vec![],
2280 kind == "constraint",
2281 );
2282 loaded += 1;
2283 }
2284 }
2285 let mut map = state.agent_memgines.lock().await;
2286 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2287 Ok(stored)
2288}
2289
2290async fn persist_agent_memgine(
2294 agent_id: &str,
2295 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2296) -> Result<(), String> {
2297 let path = agent_memgine_snapshot_path(agent_id)?;
2298 let g = engine.lock().await;
2299 let facts: Vec<Value> = g
2300 .graph
2301 .inner
2302 .node_indices()
2303 .filter_map(|nix| {
2304 let node = g.graph.inner.node_weight(nix)?;
2305 if !node.is_valid() {
2306 return None;
2307 }
2308 if node.kind == car_memgine::MemKind::Identity
2309 || node.kind == car_memgine::MemKind::Environment
2310 {
2311 return None;
2312 }
2313 Some(serde_json::json!({
2314 "subject": node.key,
2315 "body": node.value,
2316 "kind": match node.kind {
2317 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2318 car_memgine::MemKind::Conversation => "outcome",
2319 _ => "pattern",
2320 },
2321 "confidence": 0.5,
2322 "content_type": node.content_type.as_label(),
2323 }))
2324 })
2325 .collect();
2326 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2327 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2328 Ok(())
2329}
2330
2331async fn handle_memory_fact_count(
2338 session: &crate::session::ClientSession,
2339) -> Result<Value, String> {
2340 let engine_arc = session.effective_memgine().await;
2341 let engine = engine_arc.lock().await;
2342 Ok(Value::from(engine.valid_fact_count()))
2343}
2344
2345async fn handle_memory_add_fact(
2346 req: &JsonRpcMessage,
2347 session: &crate::session::ClientSession,
2348) -> Result<Value, String> {
2349 let subject = req
2350 .params
2351 .get("subject")
2352 .and_then(|v| v.as_str())
2353 .ok_or("missing subject")?;
2354 let body = req
2355 .params
2356 .get("body")
2357 .and_then(|v| v.as_str())
2358 .ok_or("missing body")?;
2359 let kind = req
2360 .params
2361 .get("kind")
2362 .and_then(|v| v.as_str())
2363 .unwrap_or("pattern");
2364 let engine_arc = session.effective_memgine().await;
2368 let count = {
2369 let mut engine = engine_arc.lock().await;
2370 let fid = format!("ws-{}", engine.valid_fact_count());
2371 engine.ingest_fact(
2372 &fid,
2373 subject,
2374 body,
2375 "user",
2376 "peer",
2377 chrono::Utc::now(),
2378 "global",
2379 None,
2380 vec![],
2381 kind == "constraint",
2382 );
2383 engine.valid_fact_count()
2384 };
2385 if let Some(id) = session.agent_id.lock().await.clone() {
2388 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2389 tracing::warn!(agent_id = %id, error = %e,
2390 "agent memgine persist failed; in-memory state is canonical");
2391 }
2392 }
2393 Ok(Value::from(count))
2394}
2395
2396async fn handle_memory_query(
2397 req: &JsonRpcMessage,
2398 session: &crate::session::ClientSession,
2399) -> Result<Value, String> {
2400 let query = req
2401 .params
2402 .get("query")
2403 .and_then(|v| v.as_str())
2404 .ok_or("missing query")?;
2405 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2406 let engine_arc = session.effective_memgine().await;
2407 let engine = engine_arc.lock().await;
2408 let seeds = engine.graph.find_seeds(query, 5);
2409 let hits = if !seeds.is_empty() {
2414 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2415 } else {
2416 vec![]
2417 };
2418 let results: Vec<Value> = hits
2419 .iter()
2420 .filter_map(|hit| {
2421 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2422 Some(serde_json::json!({
2423 "subject": node.key,
2424 "body": node.value,
2425 "kind": format!("{:?}", node.kind).to_lowercase(),
2426 "confidence": hit.activation,
2427 }))
2428 })
2429 .collect();
2430 serde_json::to_value(results).map_err(|e| e.to_string())
2431}
2432
2433async fn handle_memory_build_context(
2434 req: &JsonRpcMessage,
2435 session: &crate::session::ClientSession,
2436) -> Result<Value, String> {
2437 let query = req
2438 .params
2439 .get("query")
2440 .and_then(|v| v.as_str())
2441 .unwrap_or("");
2442 let model_context_window = req
2446 .params
2447 .get("model_context_window")
2448 .and_then(|v| v.as_u64())
2449 .map(|w| w as usize);
2450 let mut engine = session.memgine.lock().await;
2451 Ok(Value::from(
2452 engine.build_context_for_model(query, model_context_window),
2453 ))
2454}
2455
2456async fn handle_memory_build_context_fast(
2462 req: &JsonRpcMessage,
2463 session: &crate::session::ClientSession,
2464) -> Result<Value, String> {
2465 let query = req
2466 .params
2467 .get("query")
2468 .and_then(|v| v.as_str())
2469 .unwrap_or("");
2470 let model_context_window = req
2471 .params
2472 .get("model_context_window")
2473 .and_then(|v| v.as_u64())
2474 .map(|w| w as usize);
2475 let mut engine = session.memgine.lock().await;
2476 Ok(Value::from(engine.build_context_with_options(
2477 query,
2478 model_context_window,
2479 car_memgine::ContextMode::Fast,
2480 None,
2481 )))
2482}
2483
2484async fn handle_memory_persist(
2500 req: &JsonRpcMessage,
2501 session: &crate::session::ClientSession,
2502) -> Result<Value, String> {
2503 let path = req
2504 .params
2505 .get("path")
2506 .and_then(|v| v.as_str())
2507 .ok_or("missing path")?;
2508 let resolved = car_ffi_common::memory_path::resolve(path)
2509 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2510 let engine = session.memgine.lock().await;
2511 let facts: Vec<Value> = engine
2512 .graph
2513 .inner
2514 .node_indices()
2515 .filter_map(|nix| {
2516 let node = engine.graph.inner.node_weight(nix)?;
2517 if !node.is_valid() {
2518 return None;
2519 }
2520 if node.kind == car_memgine::MemKind::Identity
2521 || node.kind == car_memgine::MemKind::Environment
2522 {
2523 return None;
2524 }
2525 Some(serde_json::json!({
2526 "subject": node.key,
2527 "body": node.value,
2528 "kind": match node.kind {
2529 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2530 car_memgine::MemKind::Conversation => "outcome",
2531 _ => "pattern",
2532 },
2533 "confidence": 0.5,
2534 "content_type": node.content_type.as_label(),
2535 }))
2536 })
2537 .collect();
2538 let count = facts.len();
2539 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2540 std::fs::write(&resolved, json)
2541 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2542 Ok(Value::from(count as u64))
2543}
2544
2545async fn handle_memory_load(
2551 req: &JsonRpcMessage,
2552 session: &crate::session::ClientSession,
2553) -> Result<Value, String> {
2554 let path = req
2555 .params
2556 .get("path")
2557 .and_then(|v| v.as_str())
2558 .ok_or("missing path")?;
2559 let resolved = car_ffi_common::memory_path::resolve(path)
2560 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2561 let content = std::fs::read_to_string(&resolved)
2562 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2563 let facts: Vec<Value> =
2564 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2565 let mut engine = session.memgine.lock().await;
2566 engine.reset();
2567 let mut count: u32 = 0;
2568 for fact in &facts {
2569 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2570 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2571 let kind = fact
2572 .get("kind")
2573 .and_then(|v| v.as_str())
2574 .unwrap_or("pattern");
2575 let fid = format!("loaded-{}", count);
2576 engine.ingest_fact(
2577 &fid,
2578 subject,
2579 body,
2580 "user",
2581 "peer",
2582 chrono::Utc::now(),
2583 "global",
2584 None,
2585 vec![],
2586 kind == "constraint",
2587 );
2588 count += 1;
2589 }
2590 Ok(Value::from(count))
2591}
2592
2593async fn handle_skill_ingest(
2596 req: &JsonRpcMessage,
2597 session: &crate::session::ClientSession,
2598) -> Result<Value, String> {
2599 let name = req
2600 .params
2601 .get("name")
2602 .and_then(|v| v.as_str())
2603 .ok_or("missing name")?;
2604 let code = req
2605 .params
2606 .get("code")
2607 .and_then(|v| v.as_str())
2608 .ok_or("missing code")?;
2609 let platform = req
2610 .params
2611 .get("platform")
2612 .and_then(|v| v.as_str())
2613 .unwrap_or("unknown");
2614 let persona = req
2615 .params
2616 .get("persona")
2617 .and_then(|v| v.as_str())
2618 .unwrap_or("");
2619 let url_pattern = req
2620 .params
2621 .get("url_pattern")
2622 .and_then(|v| v.as_str())
2623 .unwrap_or("");
2624 let description = req
2625 .params
2626 .get("description")
2627 .and_then(|v| v.as_str())
2628 .unwrap_or("");
2629 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2630 let keywords: Vec<String> = req
2631 .params
2632 .get("task_keywords")
2633 .and_then(|v| v.as_array())
2634 .map(|arr| {
2635 arr.iter()
2636 .filter_map(|v| v.as_str().map(String::from))
2637 .collect()
2638 })
2639 .unwrap_or_default();
2640
2641 let trigger = car_memgine::SkillTrigger {
2642 persona: persona.into(),
2643 url_pattern: url_pattern.into(),
2644 task_keywords: keywords,
2645 structured: None,
2646 };
2647 let mut engine = session.memgine.lock().await;
2648 let node = engine.ingest_skill(
2649 name,
2650 code,
2651 platform,
2652 trigger,
2653 description,
2654 supersedes,
2655 vec![],
2656 vec![],
2657 );
2658 Ok(Value::from(node.index() as u64))
2659}
2660
2661async fn handle_skill_find(
2662 req: &JsonRpcMessage,
2663 session: &crate::session::ClientSession,
2664) -> Result<Value, String> {
2665 let persona = req
2666 .params
2667 .get("persona")
2668 .and_then(|v| v.as_str())
2669 .unwrap_or("");
2670 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2671 let task = req
2672 .params
2673 .get("task")
2674 .and_then(|v| v.as_str())
2675 .unwrap_or("");
2676 let max = req
2677 .params
2678 .get("max_results")
2679 .and_then(|v| v.as_u64())
2680 .unwrap_or(1) as usize;
2681 let engine = session.memgine.lock().await;
2682 let results = engine.find_skill(persona, url, task, max);
2683 let json: Vec<Value> = results
2684 .iter()
2685 .map(|(m, s)| {
2686 serde_json::json!({
2687 "name": m.name, "code": m.code, "platform": m.platform,
2688 "description": m.description, "stats": m.stats, "match_score": s,
2689 })
2690 })
2691 .collect();
2692 serde_json::to_value(json).map_err(|e| e.to_string())
2693}
2694
2695async fn handle_skill_report(
2696 req: &JsonRpcMessage,
2697 session: &crate::session::ClientSession,
2698) -> Result<Value, String> {
2699 let name = req
2700 .params
2701 .get("skill_name")
2702 .and_then(|v| v.as_str())
2703 .ok_or("missing skill_name")?;
2704 let outcome_str = req
2705 .params
2706 .get("outcome")
2707 .and_then(|v| v.as_str())
2708 .ok_or("missing outcome")?;
2709 let outcome = match outcome_str {
2710 "success" => car_memgine::SkillOutcome::Success,
2711 _ => car_memgine::SkillOutcome::Fail,
2712 };
2713 let mut engine = session.memgine.lock().await;
2714 let stats = engine
2715 .report_outcome(name, outcome)
2716 .ok_or(format!("skill '{}' not found", name))?;
2717 serde_json::to_value(stats).map_err(|e| e.to_string())
2718}
2719
2720struct WsAgentRunner {
2729 channel: Arc<WsChannel>,
2730 host: Arc<crate::host::HostState>,
2731 client_id: String,
2732}
2733
2734#[async_trait::async_trait]
2735impl car_multi::AgentRunner for WsAgentRunner {
2736 async fn run(
2737 &self,
2738 spec: &car_multi::AgentSpec,
2739 task: &str,
2740 _runtime: &car_engine::Runtime,
2741 _mailbox: &car_multi::Mailbox,
2742 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2743 use futures::SinkExt;
2744
2745 let request_id = self.channel.next_request_id();
2746 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2747 let agent = self
2748 .host
2749 .register_agent(
2750 &self.client_id,
2751 RegisterHostAgentRequest {
2752 id: Some(agent_id.clone()),
2753 name: spec.name.clone(),
2754 kind: "callback".to_string(),
2755 capabilities: spec.tools.clone(),
2756 project: spec
2757 .metadata
2758 .get("project")
2759 .and_then(|v| v.as_str())
2760 .map(str::to_string),
2761 pid: None,
2762 display: serde_json::from_value(
2763 spec.metadata
2764 .get("display")
2765 .cloned()
2766 .unwrap_or(serde_json::Value::Null),
2767 )
2768 .unwrap_or_default(),
2769 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2770 },
2771 )
2772 .await
2773 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2774 let _ = self
2775 .host
2776 .set_status(
2777 &self.client_id,
2778 SetHostAgentStatusRequest {
2779 agent_id: agent.id.clone(),
2780 status: HostAgentStatus::Running,
2781 current_task: Some(task.to_string()),
2782 message: Some(format!("{} started", spec.name)),
2783 payload: serde_json::json!({ "task": task }),
2784 },
2785 )
2786 .await;
2787
2788 let rpc_request = serde_json::json!({
2789 "jsonrpc": "2.0",
2790 "method": "multi.run_agent",
2791 "params": {
2792 "spec": spec,
2793 "task": task,
2794 },
2795 "id": request_id,
2796 });
2797
2798 let (tx, rx) = tokio::sync::oneshot::channel();
2800 self.channel
2801 .pending
2802 .lock()
2803 .await
2804 .insert(request_id.clone(), tx);
2805
2806 let msg = Message::Text(
2807 serde_json::to_string(&rpc_request)
2808 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2809 .into(),
2810 );
2811 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2812 let _ = self
2813 .host
2814 .set_status(
2815 &self.client_id,
2816 SetHostAgentStatusRequest {
2817 agent_id: agent_id.clone(),
2818 status: HostAgentStatus::Errored,
2819 current_task: None,
2820 message: Some(format!("{} failed to start", spec.name)),
2821 payload: serde_json::json!({ "error": e.to_string() }),
2822 },
2823 )
2824 .await;
2825 return Err(car_multi::MultiError::AgentFailed(
2826 spec.name.clone(),
2827 format!("ws send error: {}", e),
2828 ));
2829 }
2830
2831 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2833 Ok(Ok(response)) => response,
2834 Ok(Err(_)) => {
2835 let _ = self
2836 .host
2837 .set_status(
2838 &self.client_id,
2839 SetHostAgentStatusRequest {
2840 agent_id: agent_id.clone(),
2841 status: HostAgentStatus::Errored,
2842 current_task: None,
2843 message: Some(format!("{} callback channel closed", spec.name)),
2844 payload: Value::Null,
2845 },
2846 )
2847 .await;
2848 return Err(car_multi::MultiError::AgentFailed(
2849 spec.name.clone(),
2850 "agent callback channel closed".into(),
2851 ));
2852 }
2853 Err(_) => {
2854 let _ = self
2855 .host
2856 .set_status(
2857 &self.client_id,
2858 SetHostAgentStatusRequest {
2859 agent_id: agent_id.clone(),
2860 status: HostAgentStatus::Errored,
2861 current_task: None,
2862 message: Some(format!("{} timed out", spec.name)),
2863 payload: Value::Null,
2864 },
2865 )
2866 .await;
2867 return Err(car_multi::MultiError::AgentFailed(
2868 spec.name.clone(),
2869 "agent callback timed out (300s)".into(),
2870 ));
2871 }
2872 };
2873
2874 if let Some(err) = response.error {
2875 let _ = self
2876 .host
2877 .set_status(
2878 &self.client_id,
2879 SetHostAgentStatusRequest {
2880 agent_id: agent_id.clone(),
2881 status: HostAgentStatus::Errored,
2882 current_task: None,
2883 message: Some(format!("{} errored", spec.name)),
2884 payload: serde_json::json!({ "error": err }),
2885 },
2886 )
2887 .await;
2888 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2889 }
2890
2891 let output_value = response.output.unwrap_or(Value::Null);
2892 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2893 car_multi::MultiError::AgentFailed(
2894 spec.name.clone(),
2895 format!("invalid AgentOutput: {}", e),
2896 )
2897 })?;
2898 let status = if output.error.is_some() {
2899 HostAgentStatus::Errored
2900 } else {
2901 HostAgentStatus::Completed
2902 };
2903 let message = if output.error.is_some() {
2904 format!("{} errored", spec.name)
2905 } else {
2906 format!("{} completed", spec.name)
2907 };
2908 let _ = self
2909 .host
2910 .set_status(
2911 &self.client_id,
2912 SetHostAgentStatusRequest {
2913 agent_id,
2914 status,
2915 current_task: None,
2916 message: Some(message),
2917 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2918 },
2919 )
2920 .await;
2921
2922 Ok(output)
2923 }
2924}
2925
2926fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2927 let safe_name: String = name
2928 .chars()
2929 .map(|c| {
2930 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2931 c
2932 } else {
2933 '-'
2934 }
2935 })
2936 .collect();
2937 format!("{}:{}:{}", client_id, safe_name, request_id)
2938}
2939
2940fn multi_infra_with_budget(req: &JsonRpcMessage) -> Result<car_multi::SharedInfra, String> {
2945 let infra = car_multi::SharedInfra::new();
2946 match req.params.get("budget") {
2947 None | Some(Value::Null) => Ok(infra),
2948 Some(v) => {
2949 let limits: car_multi::BudgetLimits =
2950 serde_json::from_value(v.clone()).map_err(|e| format!("invalid budget: {}", e))?;
2951 Ok(infra.with_budget(limits))
2952 }
2953 }
2954}
2955
2956async fn handle_multi_swarm(
2957 req: &JsonRpcMessage,
2958 session: &crate::session::ClientSession,
2959) -> Result<Value, String> {
2960 let mode_str = req
2961 .params
2962 .get("mode")
2963 .and_then(|v| v.as_str())
2964 .ok_or("missing 'mode'")?;
2965 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2966 let task = req
2967 .params
2968 .get("task")
2969 .and_then(|v| v.as_str())
2970 .ok_or("missing 'task'")?;
2971
2972 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2973 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2974 let agent_specs: Vec<car_multi::AgentSpec> =
2975 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2976 let synth: Option<car_multi::AgentSpec> = req
2977 .params
2978 .get("synthesizer")
2979 .map(|v| serde_json::from_value(v.clone()))
2980 .transpose()
2981 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2982
2983 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2984 channel: session.channel.clone(),
2985 host: session.host.clone(),
2986 client_id: session.client_id.clone(),
2987 });
2988 let infra = multi_infra_with_budget(req)?;
2989
2990 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2991 if let Some(s) = synth {
2992 swarm = swarm.with_synthesizer(s);
2993 }
2994
2995 let result = swarm
2996 .run(task, &runner, &infra)
2997 .await
2998 .map_err(|e| format!("swarm error: {}", e))?;
2999 serde_json::to_value(result).map_err(|e| e.to_string())
3000}
3001
3002async fn handle_multi_pipeline(
3003 req: &JsonRpcMessage,
3004 session: &crate::session::ClientSession,
3005) -> Result<Value, String> {
3006 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
3007 let task = req
3008 .params
3009 .get("task")
3010 .and_then(|v| v.as_str())
3011 .ok_or("missing 'task'")?;
3012
3013 let stage_specs: Vec<car_multi::AgentSpec> =
3014 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
3015
3016 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3017 channel: session.channel.clone(),
3018 host: session.host.clone(),
3019 client_id: session.client_id.clone(),
3020 });
3021 let infra = multi_infra_with_budget(req)?;
3022
3023 let result = car_multi::Pipeline::new(stage_specs)
3024 .run(task, &runner, &infra)
3025 .await
3026 .map_err(|e| format!("pipeline error: {}", e))?;
3027 serde_json::to_value(result).map_err(|e| e.to_string())
3028}
3029
3030async fn handle_multi_supervisor(
3031 req: &JsonRpcMessage,
3032 session: &crate::session::ClientSession,
3033) -> Result<Value, String> {
3034 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
3035 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
3036 let task = req
3037 .params
3038 .get("task")
3039 .and_then(|v| v.as_str())
3040 .ok_or("missing 'task'")?;
3041 let max_rounds = req
3042 .params
3043 .get("max_rounds")
3044 .and_then(|v| v.as_u64())
3045 .unwrap_or(3) as u32;
3046
3047 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
3048 .map_err(|e| format!("invalid workers: {}", e))?;
3049 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
3050 .map_err(|e| format!("invalid supervisor: {}", e))?;
3051
3052 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3053 channel: session.channel.clone(),
3054 host: session.host.clone(),
3055 client_id: session.client_id.clone(),
3056 });
3057 let infra = multi_infra_with_budget(req)?;
3058
3059 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
3060 .with_max_rounds(max_rounds)
3061 .run(task, &runner, &infra)
3062 .await
3063 .map_err(|e| format!("supervisor error: {}", e))?;
3064 serde_json::to_value(result).map_err(|e| e.to_string())
3065}
3066
3067async fn handle_multi_map_reduce(
3068 req: &JsonRpcMessage,
3069 session: &crate::session::ClientSession,
3070) -> Result<Value, String> {
3071 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
3072 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
3073 let task = req
3074 .params
3075 .get("task")
3076 .and_then(|v| v.as_str())
3077 .ok_or("missing 'task'")?;
3078 let items_val = req.params.get("items").ok_or("missing 'items'")?;
3079
3080 let mapper_spec: car_multi::AgentSpec =
3081 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
3082 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
3083 .map_err(|e| format!("invalid reducer: {}", e))?;
3084 let items: Vec<String> =
3085 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
3086
3087 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3088 channel: session.channel.clone(),
3089 host: session.host.clone(),
3090 client_id: session.client_id.clone(),
3091 });
3092 let infra = multi_infra_with_budget(req)?;
3093
3094 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
3095 .run(task, &items, &runner, &infra)
3096 .await
3097 .map_err(|e| format!("map_reduce error: {}", e))?;
3098 serde_json::to_value(result).map_err(|e| e.to_string())
3099}
3100
3101async fn handle_multi_vote(
3102 req: &JsonRpcMessage,
3103 session: &crate::session::ClientSession,
3104) -> Result<Value, String> {
3105 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
3106 let task = req
3107 .params
3108 .get("task")
3109 .and_then(|v| v.as_str())
3110 .ok_or("missing 'task'")?;
3111
3112 let agent_specs: Vec<car_multi::AgentSpec> =
3113 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
3114 let synth: Option<car_multi::AgentSpec> = req
3115 .params
3116 .get("synthesizer")
3117 .map(|v| serde_json::from_value(v.clone()))
3118 .transpose()
3119 .map_err(|e| format!("invalid synthesizer: {}", e))?;
3120
3121 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3122 channel: session.channel.clone(),
3123 host: session.host.clone(),
3124 client_id: session.client_id.clone(),
3125 });
3126 let infra = multi_infra_with_budget(req)?;
3127
3128 let mut vote = car_multi::Vote::new(agent_specs);
3129 if let Some(s) = synth {
3130 vote = vote.with_synthesizer(s);
3131 }
3132
3133 let result = vote
3134 .run(task, &runner, &infra)
3135 .await
3136 .map_err(|e| format!("vote error: {}", e))?;
3137 serde_json::to_value(result).map_err(|e| e.to_string())
3138}
3139
3140async fn handle_multi_tournament(
3141 req: &JsonRpcMessage,
3142 session: &crate::session::ClientSession,
3143) -> Result<Value, String> {
3144 let competitors_val = req.params.get("competitors").ok_or("missing 'competitors'")?;
3145 let judge_val = req.params.get("judge").ok_or("missing 'judge'")?;
3146 let task = req
3147 .params
3148 .get("task")
3149 .and_then(|v| v.as_str())
3150 .ok_or("missing 'task'")?;
3151
3152 let competitors: Vec<car_multi::AgentSpec> = serde_json::from_value(competitors_val.clone())
3153 .map_err(|e| format!("invalid competitors: {}", e))?;
3154 let judge: car_multi::AgentSpec =
3155 serde_json::from_value(judge_val.clone()).map_err(|e| format!("invalid judge: {}", e))?;
3156
3157 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3158 channel: session.channel.clone(),
3159 host: session.host.clone(),
3160 client_id: session.client_id.clone(),
3161 });
3162 let infra = multi_infra_with_budget(req)?;
3163
3164 let result = car_multi::Tournament::new(competitors, judge)
3165 .run(task, &runner, &infra)
3166 .await
3167 .map_err(|e| format!("tournament error: {}", e))?;
3168 serde_json::to_value(result).map_err(|e| e.to_string())
3169}
3170
3171async fn handle_multi_subtask(
3172 req: &JsonRpcMessage,
3173 session: &crate::session::ClientSession,
3174) -> Result<Value, String> {
3175 let main_val = req.params.get("main").ok_or("missing 'main'")?;
3176 let task = req
3177 .params
3178 .get("task")
3179 .and_then(|v| v.as_str())
3180 .ok_or("missing 'task'")?;
3181
3182 let main_spec: car_multi::AgentSpec =
3183 serde_json::from_value(main_val.clone()).map_err(|e| format!("invalid main: {}", e))?;
3184
3185 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3186 channel: session.channel.clone(),
3187 host: session.host.clone(),
3188 client_id: session.client_id.clone(),
3189 });
3190 let infra = multi_infra_with_budget(req)?;
3191
3192 let result = car_multi::SpawnSubtask::new(main_spec)
3193 .run(task, &runner, &infra)
3194 .await
3195 .map_err(|e| format!("spawn_subtask error: {}", e))?;
3196 serde_json::to_value(result).map_err(|e| e.to_string())
3197}
3198
3199fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
3204 let name = req
3205 .params
3206 .get("name")
3207 .and_then(|v| v.as_str())
3208 .ok_or("scheduler.create requires 'name'")?;
3209 let prompt = req
3210 .params
3211 .get("prompt")
3212 .and_then(|v| v.as_str())
3213 .ok_or("scheduler.create requires 'prompt'")?;
3214
3215 let mut task = car_scheduler::Task::new(name, prompt);
3216
3217 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
3218 let trigger = match t {
3219 "once" => car_scheduler::TaskTrigger::Once,
3220 "cron" => car_scheduler::TaskTrigger::Cron,
3221 "interval" => car_scheduler::TaskTrigger::Interval,
3222 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
3223 _ => car_scheduler::TaskTrigger::Manual,
3224 };
3225 let schedule = req
3226 .params
3227 .get("schedule")
3228 .and_then(|v| v.as_str())
3229 .unwrap_or("");
3230 task = task.with_trigger(trigger, schedule);
3231 }
3232
3233 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
3234 task = task.with_system_prompt(sp);
3235 }
3236
3237 serde_json::to_value(&task).map_err(|e| e.to_string())
3238}
3239
3240async fn handle_scheduler_run(
3241 req: &JsonRpcMessage,
3242 session: &crate::session::ClientSession,
3243) -> Result<Value, String> {
3244 let task_val = req
3245 .params
3246 .get("task")
3247 .ok_or("scheduler.run requires 'task'")?;
3248 let mut task: car_scheduler::Task =
3249 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3250
3251 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3252 channel: session.channel.clone(),
3253 host: session.host.clone(),
3254 client_id: session.client_id.clone(),
3255 });
3256 let executor = car_scheduler::Executor::new(runner);
3257 let execution = executor.run_once(&mut task).await;
3258
3259 serde_json::to_value(&execution).map_err(|e| e.to_string())
3260}
3261
3262async fn handle_scheduler_run_loop(
3263 req: &JsonRpcMessage,
3264 session: &crate::session::ClientSession,
3265) -> Result<Value, String> {
3266 let task_val = req
3267 .params
3268 .get("task")
3269 .ok_or("scheduler.run_loop requires 'task'")?;
3270 let mut task: car_scheduler::Task =
3271 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3272 let max_iterations = req
3273 .params
3274 .get("max_iterations")
3275 .and_then(|v| v.as_u64())
3276 .map(|v| v as u32);
3277
3278 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3279 channel: session.channel.clone(),
3280 host: session.host.clone(),
3281 client_id: session.client_id.clone(),
3282 });
3283 let executor = car_scheduler::Executor::new(runner);
3284 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3285 let executions = executor
3286 .run_loop(&mut task, max_iterations, cancel_rx)
3287 .await;
3288
3289 serde_json::to_value(&executions).map_err(|e| e.to_string())
3290}
3291
3292fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3297 state.inference.get_or_init(|| {
3298 Arc::new(car_inference::InferenceEngine::new(
3299 car_inference::InferenceConfig::default(),
3300 ))
3301 })
3302}
3303
3304async fn handle_infer(
3305 msg: &JsonRpcMessage,
3306 state: &ServerState,
3307 session: &crate::session::ClientSession,
3308) -> Result<Value, String> {
3309 let engine = get_inference_engine(state);
3310 let mut req: car_inference::GenerateRequest =
3311 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3312
3313 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3315 let mut memgine = session.memgine.lock().await;
3316 let ctx = memgine.build_context(cq);
3317 if !ctx.is_empty() {
3318 req.context = Some(ctx);
3319 }
3320 }
3321
3322 let _permit = state.admission.acquire().await;
3328
3329 let result = engine
3340 .generate_tracked(req)
3341 .await
3342 .map_err(|e| e.to_string())?;
3343 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3344}
3345
3346async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3376 let engine = get_inference_engine(state);
3377 let req: car_inference::GenerateImageRequest =
3378 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3379 let _permit = state.admission.acquire().await;
3382 let result = engine
3383 .generate_image(req)
3384 .await
3385 .map_err(|e| e.to_string())?;
3386 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3387}
3388
3389async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3390 let engine = get_inference_engine(state);
3391 let req: car_inference::GenerateVideoRequest =
3392 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3393 let _permit = state.admission.acquire().await;
3394 let result = engine
3395 .generate_video(req)
3396 .await
3397 .map_err(|e| e.to_string())?;
3398 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3399}
3400
3401async fn handle_infer_stream(
3402 msg: &JsonRpcMessage,
3403 session: &crate::session::ClientSession,
3404 state: &ServerState,
3405) -> Result<Value, String> {
3406 use futures::SinkExt;
3407 use tokio_tungstenite::tungstenite::Message;
3408
3409 let engine = get_inference_engine(state);
3410 let mut req: car_inference::GenerateRequest =
3411 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3412
3413 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3416 let mut memgine = session.memgine.lock().await;
3417 let ctx = memgine.build_context(cq);
3418 if !ctx.is_empty() {
3419 req.context = Some(ctx);
3420 }
3421 }
3422
3423 let _permit = state.admission.acquire().await;
3424 let mut rx = engine
3425 .generate_tracked_stream(req)
3426 .await
3427 .map_err(|e| e.to_string())?;
3428
3429 let mut accumulator = car_inference::StreamAccumulator::default();
3430 let request_id = msg.id.clone();
3431
3432 while let Some(event) = rx.recv().await {
3433 let event_payload = match &event {
3434 car_inference::StreamEvent::TextDelta(text) => {
3435 serde_json::json!({"type": "text", "data": text})
3436 }
3437 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3438 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3439 }
3440 car_inference::StreamEvent::ToolCallDelta {
3441 index,
3442 arguments_delta,
3443 } => serde_json::json!({
3444 "type": "tool_delta",
3445 "index": index,
3446 "data": arguments_delta,
3447 }),
3448 car_inference::StreamEvent::Usage {
3449 input_tokens,
3450 output_tokens,
3451 } => serde_json::json!({
3452 "type": "usage",
3453 "input_tokens": input_tokens,
3454 "output_tokens": output_tokens,
3455 }),
3456 car_inference::StreamEvent::Done { .. } => {
3461 accumulator.push(&event);
3462 continue;
3463 }
3464 };
3465
3466 let notif = serde_json::json!({
3467 "jsonrpc": "2.0",
3468 "method": "inference.stream.event",
3469 "params": {
3470 "request_id": request_id,
3471 "event": event_payload,
3472 },
3473 });
3474 if let Ok(text) = serde_json::to_string(¬if) {
3475 let _ = session
3476 .channel
3477 .write
3478 .lock()
3479 .await
3480 .send(Message::Text(text.into()))
3481 .await;
3482 }
3483 accumulator.push(&event);
3484 }
3485
3486 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3487 Ok(serde_json::json!({
3488 "text": text,
3489 "tool_calls": tool_calls,
3490 "usage": usage,
3491 }))
3492}
3493
3494async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3495 let engine = get_inference_engine(state);
3496 let req: car_inference::EmbedRequest =
3497 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3498 let _permit = state.admission.acquire().await;
3502 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3503 Ok(serde_json::json!({"embeddings": result}))
3504}
3505
3506async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3507 let engine = get_inference_engine(state);
3508 let req: car_inference::ClassifyRequest =
3509 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3510 let _permit = state.admission.acquire().await;
3511 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3512 Ok(serde_json::json!({"classifications": result}))
3513}
3514
3515fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3519 let total = state.admission.permits();
3520 let available = state.admission.permits_available();
3521 let in_use = total.saturating_sub(available);
3522 Ok(serde_json::json!({
3523 "permits_total": total,
3524 "permits_available": available,
3525 "permits_in_use": in_use,
3526 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3527 }))
3528}
3529
3530async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3531 let model = msg
3532 .params
3533 .get("model")
3534 .and_then(|v| v.as_str())
3535 .ok_or("missing 'model' parameter")?;
3536 let text = msg
3537 .params
3538 .get("text")
3539 .and_then(|v| v.as_str())
3540 .ok_or("missing 'text' parameter")?;
3541 let engine = get_inference_engine(state);
3542 let ids = engine
3543 .tokenize(model, text)
3544 .await
3545 .map_err(|e| e.to_string())?;
3546 Ok(serde_json::json!({"tokens": ids}))
3547}
3548
3549async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3550 let model = msg
3551 .params
3552 .get("model")
3553 .and_then(|v| v.as_str())
3554 .ok_or("missing 'model' parameter")?;
3555 let tokens: Vec<u32> = msg
3556 .params
3557 .get("tokens")
3558 .and_then(|v| v.as_array())
3559 .ok_or("missing 'tokens' parameter")?
3560 .iter()
3561 .map(|t| {
3562 t.as_u64()
3563 .and_then(|n| u32::try_from(n).ok())
3564 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3565 })
3566 .collect::<Result<Vec<_>, _>>()?;
3567 let engine = get_inference_engine(state);
3568 let text = engine
3569 .detokenize(model, &tokens)
3570 .await
3571 .map_err(|e| e.to_string())?;
3572 Ok(serde_json::json!({"text": text}))
3573}
3574
3575async fn handle_models_register(
3594 req: &JsonRpcMessage,
3595 _state: &Arc<ServerState>,
3596) -> Result<Value, String> {
3597 let schema_value = match req.params.get("schema") {
3601 Some(v) => v.clone(),
3602 None => req.params.clone(),
3603 };
3604 let schema: car_inference::ModelSchema =
3605 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3606 let id = schema.id.clone();
3607
3608 let home = std::env::var_os("HOME")
3613 .or_else(|| std::env::var_os("USERPROFILE"))
3614 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3615 let car_dir = std::path::PathBuf::from(home).join(".car");
3616 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3617 let path = car_dir.join("models.json");
3618
3619 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3620 let text =
3621 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3622 if text.trim().is_empty() {
3623 Vec::new()
3624 } else {
3625 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3626 }
3627 } else {
3628 Vec::new()
3629 };
3630 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3632 *slot = schema;
3633 } else {
3634 models.push(schema);
3635 }
3636 let json =
3637 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3638 let tmp = path.with_extension("json.tmp");
3639 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3640 std::fs::rename(&tmp, &path)
3641 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3642 Ok(serde_json::json!({
3643 "id": id,
3644 "registered": true,
3645 "path": path.to_string_lossy(),
3646 "note": "Daemon restart required for live UnifiedRegistry visibility \
3647 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3648 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3649 }))
3650}
3651
3652async fn handle_models_unregister(
3663 req: &JsonRpcMessage,
3664 _state: &Arc<ServerState>,
3665) -> Result<Value, String> {
3666 let id = match req.params.get("id") {
3670 Some(v) => v
3671 .as_str()
3672 .ok_or_else(|| "`id` must be a string".to_string())?
3673 .to_string(),
3674 None => match req.params.as_str() {
3675 Some(s) => s.to_string(),
3676 None => return Err("missing `id` parameter".to_string()),
3677 },
3678 };
3679
3680 let home = std::env::var_os("HOME")
3681 .or_else(|| std::env::var_os("USERPROFILE"))
3682 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3683 let car_dir = std::path::PathBuf::from(home).join(".car");
3684 let path = car_dir.join("models.json");
3685
3686 if !path.exists() {
3687 return Err(format!(
3688 "no models.json at {} — nothing to unregister",
3689 path.display()
3690 ));
3691 }
3692 let text =
3693 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3694 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3695 Vec::new()
3696 } else {
3697 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3698 };
3699 let before = models.len();
3700 models.retain(|m| m.id != id);
3701 if models.len() == before {
3702 return Err(format!("model {} not found in {}", id, path.display()));
3703 }
3704 let json =
3705 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3706 let tmp = path.with_extension("json.tmp");
3707 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3708 std::fs::rename(&tmp, &path)
3709 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3710 Ok(serde_json::json!({
3711 "id": id,
3712 "unregistered": true,
3713 "path": path.to_string_lossy(),
3714 "note": "Daemon restart required for live UnifiedRegistry visibility \
3715 (phase 1, matching models.register).",
3716 }))
3717}
3718
3719fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3720 let engine = get_inference_engine(state);
3721 let models = engine.list_models();
3722 serde_json::to_value(&models).map_err(|e| e.to_string())
3723}
3724
3725fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3726 let engine = get_inference_engine(state);
3727 let models = engine.list_models_unified();
3728 serde_json::to_value(&models).map_err(|e| e.to_string())
3729}
3730
3731#[derive(Debug, Deserialize)]
3732#[serde(rename_all = "camelCase")]
3733struct ModelSearchParams {
3734 #[serde(default)]
3735 query: Option<String>,
3736 #[serde(default)]
3737 capability: Option<car_inference::ModelCapability>,
3738 #[serde(default)]
3739 provider: Option<String>,
3740 #[serde(default)]
3741 local_only: bool,
3742 #[serde(default)]
3743 available_only: bool,
3744 #[serde(default)]
3745 limit: Option<usize>,
3746}
3747
3748#[derive(Debug, Serialize)]
3749#[serde(rename_all = "camelCase")]
3750struct ModelSearchEntry {
3751 #[serde(flatten)]
3752 info: car_inference::ModelInfo,
3753 family: String,
3754 version: String,
3755 tags: Vec<String>,
3756 pullable: bool,
3757 upgrade: Option<car_inference::ModelUpgrade>,
3758}
3759
3760#[derive(Debug, Serialize)]
3761#[serde(rename_all = "camelCase")]
3762struct ModelSearchResponse {
3763 models: Vec<ModelSearchEntry>,
3764 upgrades: Vec<car_inference::ModelUpgrade>,
3765 total: usize,
3766 available: usize,
3767 local: usize,
3768 remote: usize,
3769}
3770
3771fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3772 let params: ModelSearchParams =
3773 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3774 query: None,
3775 capability: None,
3776 provider: None,
3777 local_only: false,
3778 available_only: false,
3779 limit: None,
3780 });
3781 let engine = get_inference_engine(state);
3782 let upgrades = engine.available_model_upgrades();
3783 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3784 .iter()
3785 .cloned()
3786 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3787 .collect();
3788 let query = params
3789 .query
3790 .as_deref()
3791 .map(str::trim)
3792 .filter(|q| !q.is_empty())
3793 .map(|q| q.to_ascii_lowercase());
3794 let provider = params
3795 .provider
3796 .as_deref()
3797 .map(str::trim)
3798 .filter(|p| !p.is_empty())
3799 .map(|p| p.to_ascii_lowercase());
3800
3801 let mut entries: Vec<ModelSearchEntry> = engine
3802 .list_schemas()
3803 .into_iter()
3804 .filter(|schema| {
3805 if let Some(capability) = params.capability {
3806 if !schema.has_capability(capability) {
3807 return false;
3808 }
3809 }
3810 if let Some(provider) = provider.as_deref() {
3811 if schema.provider.to_ascii_lowercase() != provider {
3812 return false;
3813 }
3814 }
3815 if params.local_only && !schema.is_local() {
3816 return false;
3817 }
3818 if params.available_only && !schema.available {
3819 return false;
3820 }
3821 if let Some(query) = query.as_deref() {
3822 let capability_text = schema
3823 .capabilities
3824 .iter()
3825 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3826 .collect::<Vec<_>>()
3827 .join(" ");
3828 let haystack = format!(
3829 "{} {} {} {} {} {}",
3830 schema.id,
3831 schema.name,
3832 schema.provider,
3833 schema.family,
3834 schema.tags.join(" "),
3835 capability_text
3836 )
3837 .to_ascii_lowercase();
3838 if !haystack.contains(query) {
3839 return false;
3840 }
3841 }
3842 true
3843 })
3844 .map(|schema| {
3845 let pullable = !schema.available
3846 && matches!(
3847 schema.source,
3848 car_inference::ModelSource::Local { .. }
3849 | car_inference::ModelSource::Mlx { .. }
3850 );
3851 let info = car_inference::ModelInfo::from(&schema);
3852 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3853 ModelSearchEntry {
3854 info,
3855 family: schema.family,
3856 version: schema.version,
3857 tags: schema.tags,
3858 pullable,
3859 upgrade,
3860 }
3861 })
3862 .collect();
3863 entries.sort_by(|a, b| {
3864 b.info
3865 .available
3866 .cmp(&a.info.available)
3867 .then(b.info.is_local.cmp(&a.info.is_local))
3868 .then(a.info.name.cmp(&b.info.name))
3869 });
3870 if let Some(limit) = params.limit {
3871 entries.truncate(limit);
3872 }
3873
3874 let total = entries.len();
3875 let available = entries.iter().filter(|entry| entry.info.available).count();
3876 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3877 let response = ModelSearchResponse {
3878 models: entries,
3879 upgrades,
3880 total,
3881 available,
3882 local,
3883 remote: total.saturating_sub(local),
3884 };
3885 serde_json::to_value(response).map_err(|e| e.to_string())
3886}
3887
3888fn optional_enum_param<T: serde::de::DeserializeOwned>(
3892 req: &JsonRpcMessage,
3893 key: &str,
3894) -> Result<Option<T>, String> {
3895 match req.params.get(key) {
3896 None | Some(Value::Null) => Ok(None),
3897 Some(v) => serde_json::from_value(v.clone())
3898 .map(Some)
3899 .map_err(|_| format!("invalid '{key}': {v}")),
3900 }
3901}
3902
3903fn recommend_from_params(
3908 req: &JsonRpcMessage,
3909 engine: &car_inference::InferenceEngine,
3910) -> Result<car_inference::RecommendationSet, String> {
3911 let use_case = optional_enum_param::<car_inference::UseCase>(req, "use_case")?.unwrap_or_default();
3912 let tier = optional_enum_param::<car_inference::QualityTier>(req, "tier")?.unwrap_or_default();
3913 let privacy = if req
3914 .params
3915 .get("cloud_ok")
3916 .and_then(|v| v.as_bool())
3917 .unwrap_or(false)
3918 {
3919 car_inference::Privacy::CloudOk
3920 } else {
3921 car_inference::Privacy::OnDevice
3922 };
3923 let hw = car_inference::HardwareInfo::detect();
3924 let schemas = engine.list_schemas();
3925 let refs: Vec<&car_inference::ModelSchema> = schemas.iter().collect();
3926 Ok(car_inference::recommend(&refs, &hw, use_case, tier, privacy))
3927}
3928
3929fn handle_models_recommend(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3931 let engine = get_inference_engine(state);
3932 let set = recommend_from_params(req, engine)?;
3933 serde_json::to_value(set).map_err(|e| e.to_string())
3934}
3935
3936fn handle_models_setup_plan(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3939 let engine = get_inference_engine(state);
3940 let set = recommend_from_params(req, engine)?;
3941 let hw = car_inference::HardwareInfo::detect();
3942 let mut picks = set.picks.into_iter();
3943 let recommended = picks.next();
3944 let alternatives: Vec<_> = picks.collect();
3945 serde_json::to_value(serde_json::json!({
3946 "machine": describe_machine_for_plan(&hw),
3947 "recommended": recommended,
3948 "alternatives": alternatives,
3949 "needs_more_memory": set.not_enough_memory,
3950 "note": set.note,
3951 }))
3952 .map_err(|e| e.to_string())
3953}
3954
3955fn describe_machine_for_plan(hw: &car_inference::HardwareInfo) -> String {
3957 use car_inference::hardware::SupportedAcceleration::*;
3958 match hw.supported_acceleration() {
3959 Apple { unified_memory_mb } => format!(
3960 "Apple Silicon, {} GB unified memory (Metal)",
3961 unified_memory_mb / 1024
3962 ),
3963 Cuda { device_memory_mb } => match device_memory_mb {
3964 Some(mb) => format!("NVIDIA GPU, {} GB VRAM (CUDA)", mb / 1024),
3965 None => "NVIDIA GPU (CUDA)".to_string(),
3966 },
3967 UnsupportedDiscreteGpu { name, .. } => format!(
3968 "{} GB RAM, CPU inference ({name} not yet supported)",
3969 hw.total_ram_mb / 1024
3970 ),
3971 Cpu => format!("{} GB RAM, CPU inference", hw.total_ram_mb / 1024),
3972 }
3973}
3974
3975fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3976 let engine = get_inference_engine(state);
3977 serde_json::to_value(serde_json::json!({
3978 "upgrades": engine.available_model_upgrades()
3979 }))
3980 .map_err(|e| e.to_string())
3981}
3982
3983async fn handle_models_detect_upgrades(state: &ServerState) -> Result<Value, String> {
3986 let engine = get_inference_engine(state);
3987 let findings = engine.detect_upgrades().await;
3988 serde_json::to_value(serde_json::json!({ "upgrades": findings })).map_err(|e| e.to_string())
3989}
3990
3991async fn handle_models_check_upgrade_nudge(
3995 req: &JsonRpcMessage,
3996 state: &ServerState,
3997) -> Result<Value, String> {
3998 let engine = get_inference_engine(state);
3999 let inference_active = req
4000 .params
4001 .get("inference_active")
4002 .and_then(|v| v.as_bool())
4003 .unwrap_or(false);
4004 let (decision, _state) = engine.check_upgrade_nudge(inference_active).await;
4005 serde_json::to_value(decision).map_err(|e| e.to_string())
4006}
4007
4008fn handle_models_dismiss_upgrade(
4010 req: &JsonRpcMessage,
4011 state: &ServerState,
4012) -> Result<Value, String> {
4013 let key = req
4014 .params
4015 .get("dismiss_key")
4016 .and_then(|v| v.as_str())
4017 .map(str::trim)
4018 .filter(|s| !s.is_empty())
4019 .ok_or("missing or empty 'dismiss_key' parameter")?;
4020 let engine = get_inference_engine(state);
4021 engine.dismiss_upgrade_nudge(key).map_err(|e| e.to_string())?;
4022 Ok(serde_json::json!({ "dismissed": key }))
4023}
4024
4025fn handle_models_update_prefs_get(state: &ServerState) -> Result<Value, String> {
4027 let engine = get_inference_engine(state);
4028 serde_json::to_value(engine.update_prefs()).map_err(|e| e.to_string())
4029}
4030
4031fn handle_models_update_prefs_set(
4034 req: &JsonRpcMessage,
4035 state: &ServerState,
4036) -> Result<Value, String> {
4037 let prefs: car_inference::UpdatePreferences =
4038 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid preferences: {e}"))?;
4039 let engine = get_inference_engine(state);
4040 engine.set_update_prefs(&prefs).map_err(|e| e.to_string())?;
4041 serde_json::to_value(prefs).map_err(|e| e.to_string())
4042}
4043
4044pub async fn run_upgrade_nudge_check(state: &Arc<ServerState>) {
4050 let engine = get_inference_engine(state);
4051 let inference_active = state.admission.in_flight() > 0;
4054 let (decision, mut nstate) = engine.check_upgrade_nudge(inference_active).await;
4055 if let Some(nudge) = decision.nudge {
4056 let delivered = broadcast_upgrade_nudge(state, &nudge).await;
4057 if delivered > 0 {
4061 let now = std::time::SystemTime::now()
4062 .duration_since(std::time::UNIX_EPOCH)
4063 .map(|d| d.as_secs())
4064 .unwrap_or(0);
4065 nstate.last_nudge_secs = now;
4066 let _ = nstate.save_to(&car_inference::NudgeState::default_path());
4067 }
4068 }
4069}
4070
4071pub async fn broadcast_upgrade_nudge(
4077 state: &Arc<ServerState>,
4078 nudge: &car_inference::UpgradeNudge,
4079) -> usize {
4080 use futures::SinkExt;
4081 use tokio_tungstenite::tungstenite::Message;
4082 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
4083 .a2ui_subscribers
4084 .lock()
4085 .await
4086 .values()
4087 .cloned()
4088 .collect();
4089 if subscribers.is_empty() {
4090 return 0;
4091 }
4092 let Ok(json) = serde_json::to_string(&serde_json::json!({
4093 "jsonrpc": "2.0",
4094 "method": "models.upgrade_available",
4095 "params": nudge,
4096 })) else {
4097 return 0;
4098 };
4099 let mut delivered = 0;
4100 for channel in subscribers {
4101 if channel
4102 .write
4103 .lock()
4104 .await
4105 .send(Message::Text(json.clone().into()))
4106 .await
4107 .is_ok()
4108 {
4109 delivered += 1;
4110 }
4111 }
4112 delivered
4113}
4114
4115struct PullProgressSink {
4119 tx: tokio::sync::mpsc::UnboundedSender<car_inference::DownloadEvent>,
4120}
4121
4122impl car_inference::DownloadProgress for PullProgressSink {
4123 fn on_event(&self, event: &car_inference::DownloadEvent) {
4124 let _ = self.tx.send(event.clone());
4126 }
4127}
4128
4129async fn handle_models_pull(msg: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4130 let name = msg
4131 .params
4132 .get("name")
4133 .or_else(|| msg.params.get("id"))
4134 .or_else(|| msg.params.get("model"))
4135 .and_then(|v| v.as_str())
4136 .ok_or("missing 'name' parameter")?
4137 .to_string();
4138 let engine = get_inference_engine(state);
4139
4140 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<car_inference::DownloadEvent>();
4146 let sink = car_inference::ProgressSink::new(Arc::new(PullProgressSink { tx }));
4147 let broadcaster_state = state.clone();
4148 let model_label = name.clone();
4149 let broadcaster = tokio::spawn(async move {
4150 while let Some(event) = rx.recv().await {
4151 broadcast_pull_progress(&broadcaster_state, &model_label, &event).await;
4152 }
4153 });
4154
4155 let result = engine.pull_model_with_progress(&name, &sink).await;
4156 drop(sink);
4158 if let Err(e) = broadcaster.await {
4160 tracing::warn!(error = %e, "pull-progress broadcaster task failed");
4161 }
4162
4163 let path = result.map_err(|e| e.to_string())?;
4164 Ok(serde_json::json!({"path": path.display().to_string()}))
4165}
4166
4167async fn broadcast_pull_progress(
4170 state: &Arc<ServerState>,
4171 model: &str,
4172 event: &car_inference::DownloadEvent,
4173) {
4174 use futures::SinkExt;
4175 use tokio_tungstenite::tungstenite::Message;
4176 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
4177 .a2ui_subscribers
4178 .lock()
4179 .await
4180 .values()
4181 .cloned()
4182 .collect();
4183 if subscribers.is_empty() {
4184 return;
4185 }
4186 let Ok(json) = serde_json::to_string(&serde_json::json!({
4187 "jsonrpc": "2.0",
4188 "method": "models.pull_progress",
4189 "params": { "model": model, "event": event },
4190 })) else {
4191 return;
4192 };
4193 for channel in subscribers {
4194 let _ = channel
4195 .write
4196 .lock()
4197 .await
4198 .send(Message::Text(json.clone().into()))
4199 .await;
4200 }
4201}
4202
4203async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4204 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
4205 msg.params
4206 .get("events")
4207 .cloned()
4208 .unwrap_or(msg.params.clone()),
4209 )
4210 .map_err(|e| format!("invalid events: {}", e))?;
4211
4212 let inference = get_inference_engine(state).clone();
4213 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
4214
4215 let skills = engine.distill_skills(&events).await;
4216 serde_json::to_value(&skills).map_err(|e| e.to_string())
4217}
4218
4219async fn handle_memory_consolidate(
4223 session: &crate::session::ClientSession,
4224) -> Result<Value, String> {
4225 let engine_arc = session.effective_memgine().await;
4226 let report = {
4227 let mut engine = engine_arc.lock().await;
4228 engine.consolidate().await
4229 };
4230 if let Some(id) = session.agent_id.lock().await.clone() {
4231 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
4232 tracing::warn!(agent_id = %id, error = %e,
4233 "agent memgine persist after consolidate failed");
4234 }
4235 }
4236 serde_json::to_value(&report).map_err(|e| e.to_string())
4237}
4238
4239async fn handle_skill_repair(
4243 msg: &JsonRpcMessage,
4244 session: &crate::session::ClientSession,
4245) -> Result<Value, String> {
4246 let name = msg
4247 .params
4248 .get("skill_name")
4249 .and_then(|v| v.as_str())
4250 .ok_or("missing 'skill_name' parameter")?;
4251 let mut engine = session.memgine.lock().await;
4252 let code = engine.repair_skill(name).await;
4253 Ok(match code {
4254 Some(c) => serde_json::json!({ "code": c }),
4255 None => Value::Null,
4256 })
4257}
4258
4259async fn handle_skills_ingest_distilled(
4262 msg: &JsonRpcMessage,
4263 session: &crate::session::ClientSession,
4264) -> Result<Value, String> {
4265 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
4266 msg.params
4267 .get("skills")
4268 .cloned()
4269 .unwrap_or(msg.params.clone()),
4270 )
4271 .map_err(|e| format!("invalid skills: {}", e))?;
4272 let mut engine = session.memgine.lock().await;
4273 let nodes = engine.ingest_distilled_skills(&skills);
4274 Ok(serde_json::json!({ "ingested": nodes.len() }))
4275}
4276
4277async fn handle_skills_evolve(
4280 msg: &JsonRpcMessage,
4281 session: &crate::session::ClientSession,
4282) -> Result<Value, String> {
4283 let domain = msg
4284 .params
4285 .get("domain")
4286 .and_then(|v| v.as_str())
4287 .ok_or("missing 'domain' parameter")?
4288 .to_string();
4289 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
4290 msg.params
4291 .get("events")
4292 .cloned()
4293 .unwrap_or(Value::Array(vec![])),
4294 )
4295 .map_err(|e| format!("invalid events: {}", e))?;
4296 let mut engine = session.memgine.lock().await;
4297 let skills = engine.evolve_skills(&events, &domain).await;
4298 serde_json::to_value(&skills).map_err(|e| e.to_string())
4299}
4300
4301async fn handle_skills_domains_needing_evolution(
4303 msg: &JsonRpcMessage,
4304 session: &crate::session::ClientSession,
4305) -> Result<Value, String> {
4306 let threshold = msg
4307 .params
4308 .get("threshold")
4309 .and_then(|v| v.as_f64())
4310 .unwrap_or(0.6);
4311 let engine = session.memgine.lock().await;
4312 let domains = engine.domains_needing_evolution(threshold);
4313 serde_json::to_value(&domains).map_err(|e| e.to_string())
4314}
4315
4316async fn handle_skills_ingest_provisional(
4322 msg: &JsonRpcMessage,
4323 session: &crate::session::ClientSession,
4324) -> Result<Value, String> {
4325 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
4326 msg.params
4327 .get("skills")
4328 .cloned()
4329 .unwrap_or(msg.params.clone()),
4330 )
4331 .map_err(|e| format!("invalid skills: {}", e))?;
4332 let tenant = msg.params.get("tenant").and_then(|v| v.as_str());
4333 let mut engine = session.memgine.lock().await;
4334 let ingested = engine.ingest_provisional_candidates(&skills, tenant);
4335 Ok(serde_json::json!({ "ingested": ingested }))
4336}
4337
4338async fn handle_skills_gate(
4343 _msg: &JsonRpcMessage,
4344 session: &crate::session::ClientSession,
4345) -> Result<Value, String> {
4346 let mut engine = session.memgine.lock().await;
4347 let (promoted, rejected) = engine.gate_skill_candidates();
4348 Ok(serde_json::json!({ "promoted": promoted, "rejected": rejected }))
4349}
4350
4351async fn handle_skill_meta(
4355 msg: &JsonRpcMessage,
4356 session: &crate::session::ClientSession,
4357) -> Result<Value, String> {
4358 let key = msg
4359 .params
4360 .get("key")
4361 .and_then(|v| v.as_str())
4362 .ok_or("missing 'key' parameter")?;
4363 let engine = session.memgine.lock().await;
4364 match engine.skill_meta(key) {
4365 Some(meta) => serde_json::to_value(&meta).map_err(|e| e.to_string()),
4366 None => Ok(Value::Null),
4367 }
4368}
4369
4370async fn handle_skill_export(
4374 msg: &JsonRpcMessage,
4375 session: &crate::session::ClientSession,
4376) -> Result<Value, String> {
4377 let key = msg
4378 .params
4379 .get("key")
4380 .and_then(|v| v.as_str())
4381 .ok_or("missing 'key' parameter")?;
4382 let engine = session.memgine.lock().await;
4383 Ok(engine.export_skill(key).map(Value::String).unwrap_or(Value::Null))
4384}
4385
4386async fn handle_skill_import(
4390 msg: &JsonRpcMessage,
4391 session: &crate::session::ClientSession,
4392) -> Result<Value, String> {
4393 let md = msg
4394 .params
4395 .get("markdown")
4396 .and_then(|v| v.as_str())
4397 .ok_or("missing 'markdown' parameter")?;
4398 let mut engine = session.memgine.lock().await;
4399 engine.import_skill_markdown(md)?;
4400 Ok(serde_json::json!({ "imported": true }))
4401}
4402
4403async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4405 let engine = get_inference_engine(state);
4406 let req: car_inference::RerankRequest =
4407 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
4408 let _permit = state.admission.acquire().await;
4409 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
4410 serde_json::to_value(&result).map_err(|e| e.to_string())
4411}
4412
4413async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4419 use base64::Engine as _;
4420 let engine = get_inference_engine(state);
4421
4422 let mut params = msg.params.clone();
4429 let audio_b64 = params
4430 .as_object_mut()
4431 .and_then(|m| m.remove("audio_b64"))
4432 .and_then(|v| v.as_str().map(str::to_string));
4433 let _tmp_audio = if let Some(b64) = audio_b64 {
4434 let bytes = base64::engine::general_purpose::STANDARD
4435 .decode(b64.as_bytes())
4436 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
4437 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
4438 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
4439 let path = tmp.path().to_string_lossy().into_owned();
4440 if let Some(obj) = params.as_object_mut() {
4441 obj.insert("audio_path".to_string(), Value::String(path));
4442 }
4443 Some(tmp)
4444 } else {
4445 None
4446 };
4447
4448 let req: car_inference::TranscribeRequest =
4449 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
4450 let _permit = state.admission.acquire().await;
4451 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
4452 serde_json::to_value(&result).map_err(|e| e.to_string())
4453}
4454
4455async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4461 use base64::Engine as _;
4462 let engine = get_inference_engine(state);
4463
4464 let mut params = msg.params.clone();
4465 let return_b64 = params
4466 .as_object_mut()
4467 .and_then(|m| m.remove("return_b64"))
4468 .and_then(|v| v.as_bool())
4469 .unwrap_or(false);
4470 let no_output_path = params
4471 .as_object()
4472 .map(|m| !m.contains_key("output_path"))
4473 .unwrap_or(true);
4474
4475 let req: car_inference::SynthesizeRequest =
4476 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
4477 let _permit = state.admission.acquire().await;
4478 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
4479 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4480
4481 if return_b64 || no_output_path {
4485 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
4486 format!(
4487 "synthesize: failed to read rendered audio at {}: {e}",
4488 result.audio_path
4489 )
4490 })?;
4491 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
4492 if let Some(obj) = value.as_object_mut() {
4493 obj.insert("audio_b64".to_string(), Value::String(encoded));
4494 }
4495 }
4496 Ok(value)
4497}
4498
4499async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
4503 let engine = get_inference_engine(state);
4504 let status = engine
4505 .prepare_speech_runtime()
4506 .await
4507 .map_err(|e| e.to_string())?;
4508 serde_json::to_value(&status).map_err(|e| e.to_string())
4509}
4510
4511async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
4514 let prompt = msg
4515 .params
4516 .get("prompt")
4517 .and_then(|v| v.as_str())
4518 .ok_or("missing 'prompt' parameter")?;
4519 let engine = get_inference_engine(state);
4520 let decision = engine.route_adaptive(prompt).await;
4521 serde_json::to_value(&decision).map_err(|e| e.to_string())
4522}
4523
4524async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
4526 let engine = get_inference_engine(state);
4527 let profiles = engine.export_profiles().await;
4528 serde_json::to_value(&profiles).map_err(|e| e.to_string())
4529}
4530
4531#[derive(Deserialize)]
4532#[serde(rename_all = "camelCase")]
4533struct OutcomesResolvePendingParams {
4534 action_results: Vec<(String, bool, f64, String)>,
4539}
4540
4541async fn handle_outcomes_resolve_pending(
4561 req: &JsonRpcMessage,
4562 state: &ServerState,
4563) -> Result<Value, String> {
4564 let params: OutcomesResolvePendingParams =
4565 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
4566 let engine = get_inference_engine(state);
4567 let mut tracker = engine.outcome_tracker.write().await;
4568 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
4569 tracker.resolve_pending_from_signals(inferred);
4570 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
4571}
4572
4573async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
4575 let n = session.runtime.log.lock().await.len();
4576 Ok(Value::from(n as u64))
4577}
4578
4579async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
4580 let stats = session.runtime.log.lock().await.stats();
4581 serde_json::to_value(stats).map_err(|e| e.to_string())
4582}
4583
4584#[derive(Deserialize)]
4585#[serde(rename_all = "camelCase")]
4586struct EventsTruncateParams {
4587 #[serde(default)]
4588 max_events: Option<usize>,
4589 #[serde(default)]
4590 max_spans: Option<usize>,
4591}
4592
4593async fn handle_events_truncate(
4594 msg: &JsonRpcMessage,
4595 session: &crate::session::ClientSession,
4596) -> Result<Value, String> {
4597 let params: EventsTruncateParams =
4598 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
4599 max_events: None,
4600 max_spans: None,
4601 });
4602 let mut log = session.runtime.log.lock().await;
4603 let removed_events = params
4604 .max_events
4605 .map(|max| log.truncate_events_keep_last(max))
4606 .unwrap_or(0);
4607 let removed_spans = params
4608 .max_spans
4609 .map(|max| log.truncate_spans_keep_last(max))
4610 .unwrap_or(0);
4611 let stats = log.stats();
4612 Ok(serde_json::json!({
4613 "removedEvents": removed_events,
4614 "removedSpans": removed_spans,
4615 "stats": stats,
4616 }))
4617}
4618
4619async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
4620 let mut log = session.runtime.log.lock().await;
4621 let removed = log.clear();
4622 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
4623}
4624
4625async fn resolve_run_agent_id(
4646 session: &crate::session::ClientSession,
4647 req: &car_proto::RunStartRequest,
4648) -> Result<String, String> {
4649 if let Some(bound) = session.agent_id.lock().await.clone() {
4657 if let Some(id) = req.agent_id.as_deref() {
4658 let id = id.trim();
4659 if !id.is_empty() && id != bound {
4660 return Err(format!(
4661 "runs.start `agent_id` (`{id}`) does not match this session's bound \
4662 agent: a bound connection can only record runs under its own agent"
4663 ));
4664 }
4665 }
4666 return Ok(bound);
4667 }
4668 if let Some(id) = req.agent_id.as_deref() {
4672 let id = id.trim();
4673 if !id.is_empty() {
4674 return Ok(id.to_string());
4675 }
4676 }
4677 if let Ok(env_id) = std::env::var("CAR_AGENT_ID") {
4679 let env_id = env_id.trim().to_string();
4680 if !env_id.is_empty() {
4681 return Ok(env_id);
4682 }
4683 }
4684 if let Some(name) = req.agent_name.as_deref() {
4687 if let Some(synth) = synthesize_agent_id(name) {
4688 return Ok(synth);
4689 }
4690 }
4691 Err(
4692 "runs.start could not resolve an agent_id: no `agent_id` param, no bound \
4693 session.auth {agent_id}, no CAR_AGENT_ID env, and no usable `agent_name` \
4694 to synthesize one from"
4695 .to_string(),
4696 )
4697}
4698
4699fn synthesize_agent_id(name: &str) -> Option<String> {
4707 let mut slug = String::new();
4708 let mut prev_dash = false;
4709 for ch in name.chars() {
4710 if ch.is_ascii_alphanumeric() {
4711 slug.push(ch.to_ascii_lowercase());
4712 prev_dash = false;
4713 } else if !prev_dash {
4714 slug.push('-');
4715 prev_dash = true;
4716 }
4717 }
4718 let slug = slug.trim_matches('-');
4719 if slug.is_empty() {
4720 return None;
4721 }
4722 Some(format!("name:{slug}"))
4723}
4724
4725async fn handle_runs_start(
4726 req: &JsonRpcMessage,
4727 session: &crate::session::ClientSession,
4728 state: &Arc<ServerState>,
4729) -> Result<Value, String> {
4730 let params: car_proto::RunStartRequest = serde_json::from_value(req.params.clone())
4731 .map_err(|e| format!("runs.start requires {{ intent, agent_id?, agent_name?, outcome_description? }}: {e}"))?;
4732 if params.intent.trim().is_empty() {
4733 return Err("runs.start requires a non-empty `intent`".to_string());
4734 }
4735
4736 let agent_id = resolve_run_agent_id(session, ¶ms).await?;
4737 let run_id = uuid::Uuid::new_v4().to_string();
4738 let started_at = chrono::Utc::now();
4739
4740 *session.current_run_id.lock().await = Some(run_id.clone());
4744
4745 state
4746 .start_run(crate::session::RunMeta {
4747 run_id: run_id.clone(),
4748 agent_id: agent_id.clone(),
4749 client_id: session.client_id.clone(),
4750 intent: params.intent.clone(),
4751 outcome_description: params.outcome_description.clone(),
4752 started_at,
4753 termination: None,
4754 ended_at: None,
4755 turns: Vec::new(),
4756 })
4757 .await;
4758
4759 serde_json::to_value(car_proto::RunStartResponse { run_id, agent_id })
4760 .map_err(|e| e.to_string())
4761}
4762
4763async fn handle_runs_complete(
4764 req: &JsonRpcMessage,
4765 session: &crate::session::ClientSession,
4766 state: &Arc<ServerState>,
4767) -> Result<Value, String> {
4768 let params: car_proto::RunCompleteRequest = serde_json::from_value(req.params.clone())
4769 .map_err(|e| format!("runs.complete requires {{ run_id, outcome }}: {e}"))?;
4770
4771 let termination = car_proto::RunTermination::Outcome {
4772 status: params.outcome.status,
4773 outcome: params.outcome.clone(),
4774 };
4775 state.complete_run(¶ms.run_id, termination).await?;
4776
4777 {
4781 let mut cur = session.current_run_id.lock().await;
4782 if cur.as_deref() == Some(params.run_id.as_str()) {
4783 *cur = None;
4784 }
4785 }
4786
4787 serde_json::to_value(car_proto::RunCompleteResponse {
4788 run_id: params.run_id,
4789 ok: true,
4790 })
4791 .map_err(|e| e.to_string())
4792}
4793
4794async fn authorize_run_access(
4830 session: &crate::session::ClientSession,
4831 state: &Arc<ServerState>,
4832 owning_agent_id: &str,
4833) -> Result<(), String> {
4834 if let Some(bound) = session.agent_id.lock().await.clone() {
4836 if bound == owning_agent_id {
4837 return Ok(());
4838 }
4839 }
4840 if session.is_host.load(std::sync::atomic::Ordering::Acquire) {
4848 return Ok(());
4849 }
4850 tracing::debug!(
4855 owning_agent = %owning_agent_id,
4856 client_id = %session.client_id,
4857 "run access denied: connection neither owns the agent nor is the host client"
4858 );
4859 Err("not authorized to access this run: this connection neither owns the \
4860 owning agent (via session.auth) nor is the host management client"
4861 .to_string())
4862}
4863
4864async fn handle_runs_subscribe(
4879 req: &JsonRpcMessage,
4880 session: &crate::session::ClientSession,
4881 state: &Arc<ServerState>,
4882) -> Result<Value, String> {
4883 let params: car_proto::RunSubscribeRequest = serde_json::from_value(req.params.clone())
4884 .map_err(|e| format!("runs.subscribe requires {{ run_id }}: {e}"))?;
4885
4886 let not_found = || {
4889 serde_json::to_value(serde_json::json!({
4890 "run_id": params.run_id,
4891 "not_found": true,
4892 }))
4893 .map_err(|e| e.to_string())
4894 };
4895
4896 let owning_agent = match state.run_meta(¶ms.run_id).await {
4900 Some(meta) => meta.agent_id,
4901 None => match state.run_store.agent_for_run(¶ms.run_id) {
4902 Some(a) => a,
4903 None => return not_found(),
4904 },
4905 };
4906
4907 if authorize_run_access(session, state, &owning_agent)
4911 .await
4912 .is_err()
4913 {
4914 tracing::debug!(
4915 run_id = %params.run_id,
4916 owning_agent = %owning_agent,
4917 client_id = %session.client_id,
4918 "runs.subscribe denied (unauthorized); returning uniform not-found"
4919 );
4920 return not_found();
4921 }
4922
4923 let snapshot = match state
4924 .subscribe_run(¶ms.run_id, &session.client_id, session.channel.clone())
4925 .await
4926 {
4927 Some(s) => s,
4928 None => return not_found(),
4930 };
4931
4932 serde_json::to_value(snapshot).map_err(|e| e.to_string())
4933}
4934
4935async fn handle_runs_unsubscribe(
4940 req: &JsonRpcMessage,
4941 session: &crate::session::ClientSession,
4942 state: &Arc<ServerState>,
4943) -> Result<Value, String> {
4944 let params: car_proto::RunUnsubscribeRequest = serde_json::from_value(req.params.clone())
4945 .map_err(|e| format!("runs.unsubscribe requires {{ run_id }}: {e}"))?;
4946 let removed = state
4947 .unsubscribe_run(¶ms.run_id, &session.client_id)
4948 .await;
4949 serde_json::to_value(car_proto::RunUnsubscribeResponse {
4950 run_id: params.run_id,
4951 removed,
4952 })
4953 .map_err(|e| e.to_string())
4954}
4955
4956async fn handle_runs_list(
4967 req: &JsonRpcMessage,
4968 session: &crate::session::ClientSession,
4969 state: &Arc<ServerState>,
4970) -> Result<Value, String> {
4971 let params: car_proto::RunListRequest = serde_json::from_value(req.params.clone())
4972 .map_err(|e| format!("runs.list requires {{ agent_id }}: {e}"))?;
4973
4974 authorize_run_access(session, state, ¶ms.agent_id).await?;
4977
4978 let runs = state.run_store.list_runs(¶ms.agent_id);
4979 serde_json::to_value(serde_json::json!({
4980 "agent_id": params.agent_id,
4981 "runs": runs,
4982 }))
4983 .map_err(|e| e.to_string())
4984}
4985
4986async fn handle_runs_get_trace(
5002 req: &JsonRpcMessage,
5003 session: &crate::session::ClientSession,
5004 state: &Arc<ServerState>,
5005) -> Result<Value, String> {
5006 let params: car_proto::RunGetTraceRequest = serde_json::from_value(req.params.clone())
5007 .map_err(|e| format!("runs.get_trace requires {{ run_id, cursor? }}: {e}"))?;
5008
5009 let Some(owning_agent) = state.run_store.agent_for_run(¶ms.run_id) else {
5016 return serde_json::to_value(serde_json::json!({
5017 "run_id": params.run_id,
5018 "not_found": true,
5019 }))
5020 .map_err(|e| e.to_string());
5021 };
5022
5023 authorize_run_access(session, state, &owning_agent).await?;
5025
5026 let records = state
5029 .run_store
5030 .get_run_trace_for(&owning_agent, ¶ms.run_id)
5031 .unwrap_or_default();
5032
5033 let cursor = params.cursor.unwrap_or(0);
5038 let paged: Vec<car_proto::RunRecord> = if cursor == 0 {
5039 records
5040 } else {
5041 records.into_iter().skip(cursor).collect()
5042 };
5043
5044 serde_json::to_value(serde_json::json!({
5045 "run_id": params.run_id,
5046 "agent_id": owning_agent,
5047 "records": paged,
5048 "cursor": cursor,
5049 }))
5050 .map_err(|e| e.to_string())
5051}
5052
5053async fn handle_replan_set_config(
5058 msg: &JsonRpcMessage,
5059 session: &crate::session::ClientSession,
5060) -> Result<Value, String> {
5061 let max_replans = msg
5062 .params
5063 .get("max_replans")
5064 .and_then(|v| v.as_u64())
5065 .unwrap_or(0) as u32;
5066 let delay_ms = msg
5067 .params
5068 .get("delay_ms")
5069 .and_then(|v| v.as_u64())
5070 .unwrap_or(0);
5071 let verify_before_execute = msg
5072 .params
5073 .get("verify_before_execute")
5074 .and_then(|v| v.as_bool())
5075 .unwrap_or(true);
5076 let cfg = car_engine::ReplanConfig {
5077 max_replans,
5078 delay_ms,
5079 verify_before_execute,
5080 };
5081 session.runtime.set_replan_config(cfg).await;
5082 Ok(Value::Null)
5083}
5084
5085async fn handle_skills_list(
5086 msg: &JsonRpcMessage,
5087 session: &crate::session::ClientSession,
5088) -> Result<Value, String> {
5089 let domain = msg.params.get("domain").and_then(|v| v.as_str());
5090 let engine = session.memgine.lock().await;
5091 let skills: Vec<serde_json::Value> = engine
5092 .graph
5093 .inner
5094 .node_indices()
5095 .filter_map(|nix| {
5096 let node = engine.graph.inner.node_weight(nix)?;
5097 if node.kind != car_memgine::MemKind::Skill {
5098 return None;
5099 }
5100 let meta = car_memgine::SkillMeta::from_node(node)?;
5101 if let Some(d) = domain {
5102 match &meta.scope {
5103 car_memgine::SkillScope::Global => {}
5104 car_memgine::SkillScope::Domain(sd) if sd == d => {}
5105 _ => return None,
5106 }
5107 }
5108 Some(serde_json::to_value(&meta).unwrap_or_default())
5109 })
5110 .collect();
5111 serde_json::to_value(&skills).map_err(|e| e.to_string())
5112}
5113
5114#[derive(serde::Deserialize)]
5115struct SecretParams {
5116 #[serde(default)]
5117 service: Option<String>,
5118 key: String,
5119 #[serde(default)]
5120 value: Option<String>,
5121}
5122
5123fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
5124 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5125 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
5126 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
5127}
5128
5129fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
5130 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5131 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
5132}
5133
5134fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
5135 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5136 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
5137}
5138
5139fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
5140 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5141 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
5142}
5143
5144#[derive(serde::Deserialize)]
5145struct PermParams {
5146 domain: String,
5147 #[serde(default)]
5148 target_bundle_id: Option<String>,
5149}
5150
5151fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
5152 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5153 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
5154}
5155
5156fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
5157 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5158 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
5159}
5160
5161fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
5162 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5163 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
5164}
5165
5166fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
5167 #[derive(serde::Deserialize)]
5168 struct P {
5169 start: String,
5170 end: String,
5171 #[serde(default)]
5172 calendar_ids: Vec<String>,
5173 }
5174 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5175 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
5176 .map_err(|e| format!("parse start: {}", e))?
5177 .with_timezone(&chrono::Utc);
5178 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
5179 .map_err(|e| format!("parse end: {}", e))?
5180 .with_timezone(&chrono::Utc);
5181 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
5182}
5183
5184fn handle_calendar_create_event(req: &JsonRpcMessage) -> Result<Value, String> {
5185 let raw = req.params.to_string();
5186 car_ffi_common::integrations::calendar_create_event(&raw)
5187}
5188
5189fn handle_calendar_update_event(req: &JsonRpcMessage) -> Result<Value, String> {
5190 let raw = req.params.to_string();
5191 car_ffi_common::integrations::calendar_update_event(&raw)
5192}
5193
5194fn handle_calendar_delete_event(req: &JsonRpcMessage) -> Result<Value, String> {
5195 #[derive(serde::Deserialize)]
5196 struct P {
5197 event_id: String,
5198 }
5199 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5200 car_ffi_common::integrations::calendar_delete_event(&p.event_id)
5201}
5202
5203fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
5204 #[derive(serde::Deserialize)]
5205 struct P {
5206 query: String,
5207 #[serde(default = "default_limit")]
5208 limit: usize,
5209 #[serde(default)]
5210 container_ids: Vec<String>,
5211 }
5212 fn default_limit() -> usize {
5213 50
5214 }
5215 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5216 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
5217}
5218
5219fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
5220 #[derive(serde::Deserialize, Default)]
5221 struct P {
5222 #[serde(default)]
5223 account_ids: Vec<String>,
5224 }
5225 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
5226 car_ffi_common::integrations::mail_inbox(&p.account_ids)
5227}
5228
5229fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
5230 let raw = req.params.to_string();
5231 car_ffi_common::integrations::mail_send(&raw)
5232}
5233
5234fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
5235 #[derive(serde::Deserialize)]
5236 struct P {
5237 #[serde(default = "default_limit")]
5238 limit: usize,
5239 }
5240 fn default_limit() -> usize {
5241 50
5242 }
5243 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
5244 car_ffi_common::integrations::messages_chats(p.limit)
5245}
5246
5247fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
5248 let raw = req.params.to_string();
5249 car_ffi_common::integrations::messages_send(&raw)
5250}
5251
5252fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
5253 #[derive(serde::Deserialize)]
5254 struct P {
5255 query: String,
5256 #[serde(default = "default_limit")]
5257 limit: usize,
5258 }
5259 fn default_limit() -> usize {
5260 50
5261 }
5262 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5263 car_ffi_common::integrations::notes_find(&p.query, p.limit)
5264}
5265
5266fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
5267 #[derive(serde::Deserialize)]
5268 struct P {
5269 #[serde(default = "default_limit")]
5270 limit: usize,
5271 }
5272 fn default_limit() -> usize {
5273 50
5274 }
5275 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
5276 car_ffi_common::integrations::reminders_items(p.limit)
5277}
5278
5279fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
5280 #[derive(serde::Deserialize)]
5281 struct P {
5282 #[serde(default = "default_limit")]
5283 limit: usize,
5284 }
5285 fn default_limit() -> usize {
5286 100
5287 }
5288 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
5289 car_ffi_common::integrations::bookmarks_list(p.limit)
5290}
5291
5292fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
5293 #[derive(serde::Deserialize)]
5294 struct P {
5295 start: String,
5296 end: String,
5297 }
5298 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5299 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
5300 .map_err(|e| format!("parse start: {}", e))?
5301 .with_timezone(&chrono::Utc);
5302 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
5303 .map_err(|e| format!("parse end: {}", e))?
5304 .with_timezone(&chrono::Utc);
5305 car_ffi_common::health::sleep_windows(s, e)
5306}
5307
5308fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
5309 #[derive(serde::Deserialize)]
5310 struct P {
5311 start: String,
5312 end: String,
5313 }
5314 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5315 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
5316 .map_err(|e| format!("parse start: {}", e))?
5317 .with_timezone(&chrono::Utc);
5318 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
5319 .map_err(|e| format!("parse end: {}", e))?
5320 .with_timezone(&chrono::Utc);
5321 car_ffi_common::health::workouts(s, e)
5322}
5323
5324fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
5325 #[derive(serde::Deserialize)]
5326 struct P {
5327 start: String,
5328 end: String,
5329 }
5330 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5331 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
5332 .map_err(|e| format!("parse start: {}", e))?;
5333 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
5334 .map_err(|e| format!("parse end: {}", e))?;
5335 car_ffi_common::health::activity(s, e)
5336}
5337
5338async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
5339 let closed = session.browser.close().await?;
5340 Ok(serde_json::json!({"closed": closed}))
5341}
5342
5343async fn handle_browser_run(
5344 req: &JsonRpcMessage,
5345 session: &crate::session::ClientSession,
5346) -> Result<Value, String> {
5347 #[derive(serde::Deserialize)]
5348 struct BrowserRunParams {
5349 script: Value,
5351 #[serde(default)]
5352 width: Option<u32>,
5353 #[serde(default)]
5354 height: Option<u32>,
5355 #[serde(default)]
5360 headed: Option<bool>,
5361 #[serde(default)]
5364 extra_args: Option<Vec<String>>,
5365 }
5366 let params: BrowserRunParams =
5367 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5368
5369 let script_json = match params.script {
5371 Value::String(s) => s,
5372 other => other.to_string(),
5373 };
5374
5375 let browser_session = session
5376 .browser
5377 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
5378 width: params.width.unwrap_or(1280),
5379 height: params.height.unwrap_or(720),
5380 headless: !params.headed.unwrap_or(false),
5381 extra_args: params.extra_args.unwrap_or_default(),
5382 })
5383 .await?;
5384
5385 let trace_json = browser_session.run(&script_json).await?;
5386 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
5387}
5388
5389#[derive(Deserialize)]
5402struct VoiceStartParams {
5403 session_id: String,
5404 audio_source: Value,
5405 #[serde(default)]
5406 options: Option<Value>,
5407}
5408
5409async fn handle_voice_transcribe_stream_start(
5410 req: &JsonRpcMessage,
5411 state: &Arc<ServerState>,
5412 session: &Arc<crate::session::ClientSession>,
5413) -> Result<Value, String> {
5414 let params: VoiceStartParams =
5415 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5416 let audio_source_json =
5417 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
5418 let options_json = params
5419 .options
5420 .as_ref()
5421 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
5422 .transpose()?;
5423 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
5424 channel: session.channel.clone(),
5425 });
5426 let json = car_ffi_common::voice::transcribe_stream_start(
5427 ¶ms.session_id,
5428 &audio_source_json,
5429 options_json.as_deref(),
5430 state.voice_sessions.clone(),
5431 sink,
5432 )
5433 .await?;
5434 serde_json::from_str(&json).map_err(|e| e.to_string())
5435}
5436
5437#[derive(Deserialize)]
5438struct VoiceStopParams {
5439 session_id: String,
5440}
5441
5442async fn handle_voice_transcribe_stream_stop(
5443 req: &JsonRpcMessage,
5444 state: &Arc<ServerState>,
5445) -> Result<Value, String> {
5446 let params: VoiceStopParams =
5447 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5448 let json = car_ffi_common::voice::transcribe_stream_stop(
5449 ¶ms.session_id,
5450 state.voice_sessions.clone(),
5451 )
5452 .await?;
5453 serde_json::from_str(&json).map_err(|e| e.to_string())
5454}
5455
5456#[derive(Deserialize)]
5457struct VoicePushParams {
5458 session_id: String,
5459 pcm_b64: String,
5463}
5464
5465async fn handle_voice_transcribe_stream_push(
5466 req: &JsonRpcMessage,
5467 state: &Arc<ServerState>,
5468) -> Result<Value, String> {
5469 use base64::Engine;
5470 let params: VoicePushParams =
5471 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5472 let pcm = base64::engine::general_purpose::STANDARD
5473 .decode(¶ms.pcm_b64)
5474 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
5475 let json = car_ffi_common::voice::transcribe_stream_push(
5476 ¶ms.session_id,
5477 &pcm,
5478 state.voice_sessions.clone(),
5479 )
5480 .await?;
5481 serde_json::from_str(&json).map_err(|e| e.to_string())
5482}
5483
5484fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
5485 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
5486 serde_json::from_str(&json).unwrap_or(Value::Null)
5487}
5488
5489#[derive(Deserialize)]
5490struct VoiceTtsStreamStartParams {
5491 stream_id: String,
5495 text: String,
5498 #[serde(default)]
5501 options: Option<Value>,
5502}
5503
5504async fn handle_voice_tts_stream_start(
5505 req: &JsonRpcMessage,
5506 session: &Arc<crate::session::ClientSession>,
5507) -> Result<Value, String> {
5508 let params: VoiceTtsStreamStartParams =
5509 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5510 let opts_str = params
5511 .options
5512 .as_ref()
5513 .map(|v| v.to_string())
5514 .filter(|s| !s.is_empty());
5515 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
5516 channel: session.channel.clone(),
5517 });
5518 let json = car_ffi_common::voice::tts_stream_start(
5519 ¶ms.stream_id,
5520 ¶ms.text,
5521 opts_str.as_deref(),
5522 sink,
5523 )
5524 .await?;
5525 serde_json::from_str(&json).map_err(|e| e.to_string())
5526}
5527
5528#[derive(Deserialize)]
5529struct VoiceTtsStreamCancelParams {
5530 stream_id: String,
5531}
5532
5533async fn handle_voice_tts_stream_cancel(req: &JsonRpcMessage) -> Result<Value, String> {
5534 let params: VoiceTtsStreamCancelParams =
5535 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5536 let json = car_ffi_common::voice::tts_stream_cancel(¶ms.stream_id).await?;
5537 serde_json::from_str(&json).map_err(|e| e.to_string())
5538}
5539
5540fn handle_voice_tts_stream_list() -> Value {
5541 let json = car_ffi_common::voice::list_tts_streams();
5542 serde_json::from_str(&json).unwrap_or(Value::Null)
5543}
5544
5545async fn handle_voice_dispatch_turn(
5546 req: &JsonRpcMessage,
5547 state: &Arc<ServerState>,
5548 session: &Arc<crate::session::ClientSession>,
5549) -> Result<Value, String> {
5550 let req_value = req.params.clone();
5551 let request: crate::voice_turn::DispatchVoiceTurnRequest =
5552 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
5553 let engine = get_inference_engine(state).clone();
5554 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
5555 channel: session.channel.clone(),
5556 });
5557 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
5558 serde_json::to_value(resp).map_err(|e| e.to_string())
5559}
5560
5561async fn handle_voice_cancel_turn() -> Result<Value, String> {
5562 crate::voice_turn::cancel().await;
5563 Ok(serde_json::json!({"cancelled": true}))
5564}
5565
5566async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
5567 let engine = get_inference_engine(state).clone();
5568 crate::voice_turn::prewarm(engine).await;
5569 Ok(serde_json::json!({"prewarmed": true}))
5570}
5571
5572fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
5591 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
5592 std::sync::OnceLock::new();
5593 SLOT.get_or_init(|| std::sync::RwLock::new(None))
5594}
5595
5596fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
5597 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
5598 std::sync::OnceLock::new();
5599 MAP.get_or_init(dashmap::DashMap::new)
5600}
5601
5602fn ws_runner_completions() -> &'static dashmap::DashMap<
5603 String,
5604 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
5605> {
5606 static MAP: std::sync::OnceLock<
5607 dashmap::DashMap<
5608 String,
5609 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
5610 >,
5611 > = std::sync::OnceLock::new();
5612 MAP.get_or_init(dashmap::DashMap::new)
5613}
5614
5615struct WsInferenceRunner;
5616
5617#[async_trait::async_trait]
5618impl car_inference::InferenceRunner for WsInferenceRunner {
5619 async fn run(
5620 &self,
5621 request: car_inference::tasks::generate::GenerateRequest,
5622 emitter: car_inference::EventEmitter,
5623 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
5624 let channel = ws_runner_session()
5625 .read()
5626 .map_err(|e| {
5627 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
5628 })?
5629 .clone()
5630 .ok_or_else(|| {
5631 car_inference::RunnerError::Declined(
5632 "no WebSocket inference runner registered — call inference.register_runner first"
5633 .into(),
5634 )
5635 })?;
5636
5637 let call_id = uuid::Uuid::new_v4().to_string();
5638 let request_json = serde_json::to_value(&request)
5639 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
5640 let (tx, rx) = tokio::sync::oneshot::channel();
5641 ws_runner_calls().insert(call_id.clone(), emitter);
5642 ws_runner_completions().insert(call_id.clone(), tx);
5643
5644 use futures::SinkExt;
5646 let notification = serde_json::json!({
5647 "jsonrpc": "2.0",
5648 "method": "inference.runner.invoke",
5649 "params": {
5650 "call_id": call_id,
5651 "request": request_json,
5652 },
5653 });
5654 let text = serde_json::to_string(¬ification)
5655 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
5656 let _ = channel
5657 .write
5658 .lock()
5659 .await
5660 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
5661 .await;
5662
5663 let result = rx.await.map_err(|_| {
5664 car_inference::RunnerError::Failed("runner completion channel dropped".into())
5665 })?;
5666 ws_runner_calls().remove(&call_id);
5667 result.map_err(car_inference::RunnerError::Failed)
5668 }
5669}
5670
5671async fn handle_inference_register_runner(
5672 session: &Arc<crate::session::ClientSession>,
5673) -> Result<Value, String> {
5674 let mut guard = ws_runner_session()
5675 .write()
5676 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
5677 *guard = Some(session.channel.clone());
5678 drop(guard);
5679 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
5680 Ok(serde_json::json!({"registered": true}))
5681}
5682
5683#[derive(serde::Deserialize)]
5684struct InferenceRunnerEventParams {
5685 call_id: String,
5686 event: Value,
5687}
5688
5689async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
5690 let params: InferenceRunnerEventParams =
5691 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5692 let stream_event = match parse_runner_event_value(¶ms.event) {
5693 Some(e) => e,
5694 None => return Err("unrecognised runner event shape".into()),
5695 };
5696 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
5697 let emitter = entry.value().clone();
5698 tokio::spawn(async move { emitter.emit(stream_event).await });
5699 }
5700 Ok(serde_json::json!({"emitted": true}))
5701}
5702
5703#[derive(serde::Deserialize)]
5704struct InferenceRunnerCompleteParams {
5705 call_id: String,
5706 result: Value,
5707}
5708
5709async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
5710 let params: InferenceRunnerCompleteParams =
5711 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5712 let result: std::result::Result<car_inference::RunnerResult, String> =
5713 serde_json::from_value(params.result)
5714 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
5715 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
5716 let _ = tx.send(result);
5717 }
5718 Ok(serde_json::json!({"completed": true}))
5719}
5720
5721#[derive(serde::Deserialize)]
5722struct InferenceRunnerFailParams {
5723 call_id: String,
5724 error: String,
5725}
5726
5727async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
5728 let params: InferenceRunnerFailParams =
5729 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5730 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
5731 let _ = tx.send(Err(params.error));
5732 }
5733 Ok(serde_json::json!({"failed": true}))
5734}
5735
5736fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
5737 let ty = v.get("type").and_then(|t| t.as_str())?;
5738 match ty {
5739 "text" => Some(car_inference::StreamEvent::TextDelta(
5740 v.get("data")?.as_str()?.to_string(),
5741 )),
5742 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
5743 name: v.get("name")?.as_str()?.to_string(),
5744 index: v.get("index")?.as_u64()? as usize,
5745 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
5746 }),
5747 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
5748 index: v.get("index")?.as_u64()? as usize,
5749 arguments_delta: v.get("data")?.as_str()?.to_string(),
5750 }),
5751 "usage" => Some(car_inference::StreamEvent::Usage {
5752 input_tokens: v.get("input_tokens")?.as_u64()?,
5753 output_tokens: v.get("output_tokens")?.as_u64()?,
5754 }),
5755 "done" => Some(car_inference::StreamEvent::Done {
5756 text: v.get("text")?.as_str()?.to_string(),
5757 tool_calls: v
5758 .get("tool_calls")
5759 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
5760 .unwrap_or_default(),
5761 }),
5762 _ => None,
5763 }
5764}
5765
5766#[derive(Deserialize)]
5767struct EnrollSpeakerParams {
5768 label: String,
5769 audio: Value,
5770}
5771
5772async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
5773 let params: EnrollSpeakerParams =
5774 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5775 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
5776 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
5777 serde_json::from_str(&json).map_err(|e| e.to_string())
5778}
5779
5780#[derive(Deserialize)]
5781struct RemoveEnrollmentParams {
5782 label: String,
5783}
5784
5785fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
5786 let params: RemoveEnrollmentParams =
5787 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5788 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
5789 serde_json::from_str(&json).map_err(|e| e.to_string())
5790}
5791
5792#[derive(Deserialize)]
5793struct WorkflowRunParams {
5794 workflow: Value,
5795}
5796
5797async fn handle_workflow_run(
5798 req: &JsonRpcMessage,
5799 session: &Arc<crate::session::ClientSession>,
5800) -> Result<Value, String> {
5801 let params: WorkflowRunParams =
5802 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5803 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
5804 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
5805 channel: session.channel.clone(),
5806 host: session.host.clone(),
5807 client_id: session.client_id.clone(),
5808 });
5809 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
5810 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5811 persist_if_paused(&result)?;
5814 Ok(result)
5815}
5816
5817pub fn recover_workflow_checkpoints() {
5822 let dir = match workflow_runs_dir() {
5823 Ok(d) => d,
5824 Err(e) => {
5825 tracing::debug!(error = %e, "no workflow checkpoint dir; skipping recovery");
5826 return;
5827 }
5828 };
5829 match car_workflow::CheckpointStore::open(&dir).and_then(|s| s.recover_orphaned()) {
5830 Ok(0) => {}
5831 Ok(n) => tracing::info!(rearmed = n, "recovered orphaned workflow checkpoints"),
5832 Err(e) => tracing::warn!(error = %e, "workflow checkpoint recovery failed"),
5833 }
5834}
5835
5836fn workflow_runs_dir() -> Result<std::path::PathBuf, String> {
5838 let home = std::env::var("HOME")
5839 .or_else(|_| std::env::var("USERPROFILE"))
5840 .map_err(|_| "cannot resolve home directory for workflow checkpoints".to_string())?;
5841 Ok(std::path::PathBuf::from(home)
5842 .join(".car")
5843 .join("workflow-runs"))
5844}
5845
5846fn persist_if_paused(result: &Value) -> Result<(), String> {
5848 if result.get("status").and_then(|s| s.as_str()) != Some("paused") {
5849 return Ok(());
5850 }
5851 let Some(paused_value) = result.get("paused") else {
5852 return Err("paused result missing checkpoint".to_string());
5853 };
5854 let paused: car_workflow::PausedWorkflow =
5855 serde_json::from_value(paused_value.clone()).map_err(|e| e.to_string())?;
5856 let store = car_workflow::CheckpointStore::open(workflow_runs_dir()?).map_err(|e| e.to_string())?;
5857 store.save(&paused).map_err(|e| e.to_string())
5858}
5859
5860#[derive(Deserialize)]
5861struct WorkflowResumeParams {
5862 run_id: String,
5863 #[serde(default)]
5864 input: Value,
5865}
5866
5867async fn handle_workflow_resume(
5868 req: &JsonRpcMessage,
5869 session: &Arc<crate::session::ClientSession>,
5870) -> Result<Value, String> {
5871 let params: WorkflowResumeParams =
5872 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5873
5874 let store =
5875 car_workflow::CheckpointStore::open(workflow_runs_dir()?).map_err(|e| e.to_string())?;
5876 let paused = store
5879 .claim(¶ms.run_id)
5880 .map_err(|e| e.to_string())?
5881 .ok_or_else(|| {
5882 format!(
5883 "no paused workflow run '{}' (already resumed or unknown)",
5884 params.run_id
5885 )
5886 })?;
5887 let paused_json = serde_json::to_string(&paused).map_err(|e| e.to_string())?;
5888 let input_json = if params.input.is_null() {
5889 "{}".to_string()
5890 } else {
5891 serde_json::to_string(¶ms.input).map_err(|e| e.to_string())?
5892 };
5893
5894 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
5895 channel: session.channel.clone(),
5896 host: session.host.clone(),
5897 client_id: session.client_id.clone(),
5898 });
5899
5900 let json = car_ffi_common::workflow::resume_workflow(&paused_json, &input_json, runner).await;
5901 let result = match json {
5902 Ok(j) => serde_json::from_str::<Value>(&j).map_err(|e| e.to_string())?,
5903 Err(e) => {
5904 let _ = store.save(&paused);
5907 let _ = store.complete(¶ms.run_id);
5908 return Err(e);
5909 }
5910 };
5911 persist_if_paused(&result)?;
5914 store.complete(¶ms.run_id).map_err(|e| e.to_string())?;
5915 Ok(result)
5916}
5917
5918#[derive(Deserialize)]
5919struct BuilderBuildParams {
5920 goal: String,
5921 #[serde(default)]
5922 existing: Value,
5923 #[serde(default = "default_builder_attempts")]
5924 max_attempts: u32,
5925}
5926
5927fn default_builder_attempts() -> u32 {
5928 3
5929}
5930
5931async fn handle_builder_build(
5936 req: &JsonRpcMessage,
5937 state: &Arc<ServerState>,
5938 session: &Arc<crate::session::ClientSession>,
5939) -> Result<Value, String> {
5940 let params: BuilderBuildParams =
5941 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5942 if params.goal.trim().is_empty() {
5943 return Err("missing 'goal'".to_string());
5944 }
5945
5946 let engine = get_inference_engine(state).clone();
5947
5948 let tools: Vec<car_builder::ToolInfo> = session
5949 .runtime
5950 .registry
5951 .schemas()
5952 .await
5953 .into_iter()
5954 .map(|s| car_builder::ToolInfo {
5955 name: s.name,
5956 description: s.description,
5957 })
5958 .collect();
5959 let models: Vec<String> = engine
5960 .list_models_unified()
5961 .into_iter()
5962 .map(|m| m.id)
5963 .collect();
5964 let catalog = car_builder::ToolCatalog {
5965 tools,
5966 models,
5967 agents: Vec::new(),
5968 };
5969
5970 let existing = if params.existing.is_null() {
5971 None
5972 } else {
5973 serde_json::from_value::<car_workflow::Workflow>(params.existing.clone()).ok()
5974 };
5975
5976 let build_req = car_builder::BuildRequest {
5977 goal: params.goal,
5978 catalog,
5979 existing,
5980 feedback: None,
5981 max_attempts: params.max_attempts,
5982 };
5983
5984 let result = car_builder::build_workflow(
5985 |prompt: String| {
5986 let engine = engine.clone();
5987 async move {
5988 let greq = car_inference::GenerateRequest {
5989 prompt,
5990 model: None,
5991 params: car_inference::GenerateParams {
5992 temperature: 0.2,
5993 max_tokens: 4096,
5994 ..Default::default()
5995 },
5996 context: None,
5997 tools: None,
5998 images: None,
5999 messages: None,
6000 cache_control: false,
6001 response_format: None,
6002 intent: None,
6003 };
6004 engine
6005 .generate_tracked(greq)
6006 .await
6007 .map(|r| r.text)
6008 .map_err(|e| e.to_string())
6009 }
6010 },
6011 &build_req,
6012 )
6013 .await;
6014
6015 Ok(serde_json::json!({
6016 "valid": result.valid,
6017 "workflow": result.workflow,
6018 "issues": result.issues,
6019 "warnings": result.warnings,
6020 "attempts": result.attempts,
6021 }))
6022}
6023
6024#[derive(Deserialize)]
6025struct WorkflowVerifyParams {
6026 workflow: Value,
6027}
6028
6029fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
6030 let params: WorkflowVerifyParams =
6031 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6032 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
6033 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
6034 serde_json::from_str(&json).map_err(|e| e.to_string())
6035}
6036
6037async fn handle_meeting_start(
6042 req: &JsonRpcMessage,
6043 state: &Arc<ServerState>,
6044 session: &Arc<crate::session::ClientSession>,
6045) -> Result<Value, String> {
6046 let mut req_value = req.params.clone();
6052 let meeting_id = req_value
6053 .get("id")
6054 .and_then(|v| v.as_str())
6055 .map(str::to_string)
6056 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
6057 if let Some(map) = req_value.as_object_mut() {
6058 map.insert("id".into(), Value::String(meeting_id.clone()));
6059 }
6060 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
6061
6062 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
6063 Arc::new(crate::session::WsVoiceEventSink {
6064 channel: session.channel.clone(),
6065 });
6066
6067 let upstream: Arc<dyn car_voice::VoiceEventSink> =
6072 Arc::new(crate::session::WsMemgineIngestSink {
6073 meeting_id,
6074 engine: session.memgine.clone(),
6075 upstream: ws_upstream,
6076 });
6077
6078 let cwd = std::env::current_dir().ok();
6079 let json = crate::meeting::start_meeting(
6080 &request_json,
6081 state.meetings.clone(),
6082 state.voice_sessions.clone(),
6083 upstream,
6084 None,
6085 cwd,
6086 )
6087 .await?;
6088 serde_json::from_str(&json).map_err(|e| e.to_string())
6089}
6090
6091#[derive(Deserialize)]
6092struct MeetingStopParams {
6093 meeting_id: String,
6094 #[serde(default = "default_summarize")]
6095 summarize: bool,
6096}
6097
6098fn default_summarize() -> bool {
6099 true
6100}
6101
6102async fn handle_meeting_stop(
6103 req: &JsonRpcMessage,
6104 state: &Arc<ServerState>,
6105 _session: &Arc<crate::session::ClientSession>,
6106) -> Result<Value, String> {
6107 let params: MeetingStopParams =
6108 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6109 let inference = if params.summarize {
6110 Some(state.inference.get().cloned()).flatten()
6111 } else {
6112 None
6113 };
6114 let json = crate::meeting::stop_meeting(
6115 ¶ms.meeting_id,
6116 params.summarize,
6117 state.meetings.clone(),
6118 state.voice_sessions.clone(),
6119 inference,
6120 )
6121 .await?;
6122 serde_json::from_str(&json).map_err(|e| e.to_string())
6123}
6124
6125#[derive(Deserialize, Default)]
6126struct MeetingListParams {
6127 #[serde(default)]
6128 root: Option<std::path::PathBuf>,
6129}
6130
6131fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
6132 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6133 let cwd = std::env::current_dir().ok();
6134 let json = crate::meeting::list_meetings(params.root, cwd)?;
6135 serde_json::from_str(&json).map_err(|e| e.to_string())
6136}
6137
6138#[derive(Deserialize)]
6139struct MeetingGetParams {
6140 meeting_id: String,
6141 #[serde(default)]
6142 root: Option<std::path::PathBuf>,
6143}
6144
6145fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
6146 let params: MeetingGetParams =
6147 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6148 let cwd = std::env::current_dir().ok();
6149 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
6150 serde_json::from_str(&json).map_err(|e| e.to_string())
6151}
6152
6153#[derive(Deserialize, Default)]
6158struct RegistryRegisterParams {
6159 entry: Value,
6163 #[serde(default)]
6164 registry_path: Option<std::path::PathBuf>,
6165}
6166
6167fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
6168 let params: RegistryRegisterParams =
6169 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6170 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
6171 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
6172 Ok(Value::Null)
6173}
6174
6175#[derive(Deserialize, Default)]
6176struct RegistryNameParams {
6177 name: String,
6178 #[serde(default)]
6179 registry_path: Option<std::path::PathBuf>,
6180}
6181
6182fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
6183 let params: RegistryNameParams =
6184 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6185 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
6186 serde_json::from_str(&json).map_err(|e| e.to_string())
6187}
6188
6189fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
6190 let params: RegistryNameParams =
6191 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6192 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
6193 Ok(Value::Null)
6194}
6195
6196#[derive(Deserialize, Default)]
6197struct RegistryListParams {
6198 #[serde(default)]
6199 registry_path: Option<std::path::PathBuf>,
6200}
6201
6202fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
6203 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6204 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
6205 serde_json::from_str(&json).map_err(|e| e.to_string())
6206}
6207
6208#[derive(Deserialize, Default)]
6209struct RegistryReapParams {
6210 #[serde(default = "default_reap_age")]
6213 max_age_secs: u64,
6214 #[serde(default)]
6215 registry_path: Option<std::path::PathBuf>,
6216}
6217
6218fn default_reap_age() -> u64 {
6219 60
6220}
6221
6222fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
6223 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
6224 let json =
6225 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
6226 serde_json::from_str(&json).map_err(|e| e.to_string())
6227}
6228
6229async fn handle_a2a_start(
6236 req: &JsonRpcMessage,
6237 session: &crate::session::ClientSession,
6238) -> Result<Value, String> {
6239 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6240 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
6246 serde_json::from_str(&json).map_err(|e| e.to_string())
6247}
6248
6249fn handle_a2a_stop() -> Result<Value, String> {
6250 let json = crate::a2a::stop_a2a()?;
6251 serde_json::from_str(&json).map_err(|e| e.to_string())
6252}
6253
6254fn handle_a2a_status() -> Result<Value, String> {
6255 let json = crate::a2a::a2a_status()?;
6256 serde_json::from_str(&json).map_err(|e| e.to_string())
6257}
6258
6259#[derive(Deserialize)]
6260#[serde(rename_all = "camelCase")]
6261struct A2aSendParams {
6262 endpoint: String,
6263 message: car_a2a::Message,
6264 #[serde(default)]
6265 blocking: bool,
6266 #[serde(default = "default_true")]
6267 ingest_a2ui: bool,
6268 #[serde(default)]
6269 route_auth: Option<A2aRouteAuth>,
6270 #[serde(default)]
6271 allow_untrusted_endpoint: bool,
6272}
6273
6274fn default_true() -> bool {
6275 true
6276}
6277
6278async fn handle_a2a_dispatch(
6288 method: &str,
6289 req: &JsonRpcMessage,
6290 state: &Arc<ServerState>,
6291) -> Result<Value, String> {
6292 let dispatcher = state.a2a_dispatcher().await;
6293 dispatcher
6294 .dispatch(method, req.params.clone())
6295 .await
6296 .map_err(|e| e.to_string())
6297}
6298
6299async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
6300 let params: A2aSendParams =
6301 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6302 let endpoint = trusted_route_endpoint(
6303 Some(params.endpoint.clone()),
6304 params.allow_untrusted_endpoint,
6305 )
6306 .ok_or_else(|| {
6307 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
6308 })?;
6309 let client = match params.route_auth.clone() {
6310 Some(auth) => {
6311 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
6312 }
6313 None => car_a2a::A2aClient::new(endpoint.clone()),
6314 };
6315 let result = client
6316 .send_message(params.message, params.blocking)
6317 .await
6318 .map_err(|e| e.to_string())?;
6319 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
6320 let mut applied = Vec::new();
6321 if params.ingest_a2ui {
6322 state
6323 .a2ui
6324 .validate_payload(&result_value)
6325 .map_err(|e| e.to_string())?;
6326 let routed_endpoint = Some(endpoint.clone());
6327 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
6328 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
6329 if owner.endpoint.is_none() {
6330 owner.with_endpoint(routed_endpoint.clone())
6331 } else {
6332 owner
6333 }
6334 });
6335 applied.push(
6336 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
6337 );
6338 }
6339 }
6340 Ok(serde_json::json!({
6341 "result": result,
6342 "a2ui": {
6343 "applied": applied,
6344 }
6345 }))
6346}
6347
6348async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
6356 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6357 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
6358 serde_json::from_str(&json).map_err(|e| e.to_string())
6359}
6360
6361async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
6362 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6363 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
6364 serde_json::from_str(&json).map_err(|e| e.to_string())
6365}
6366
6367async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
6368 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6369 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
6370 serde_json::from_str(&json).map_err(|e| e.to_string())
6371}
6372
6373async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
6374 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6375 let json = car_ffi_common::notifications::local(&args_json).await?;
6376 serde_json::from_str(&json).map_err(|e| e.to_string())
6377}
6378
6379async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
6380 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
6381 let json = car_ffi_common::vision::ocr(&args_json).await?;
6382 serde_json::from_str(&json).map_err(|e| e.to_string())
6383}
6384
6385async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
6390 let agents = match state.observer_manifest_path() {
6399 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
6400 .map_err(|e| e.to_string())?,
6401 None => {
6402 let supervisor = state.supervisor()?;
6403 supervisor.list().await
6404 }
6405 };
6406 let attached = state.attached_agents.lock().await.clone();
6413 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
6414 for a in agents {
6415 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
6416 let session_id = attached.get(&a.spec.id).cloned();
6417 if let Some(map) = v.as_object_mut() {
6418 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
6419 if let Some(sid) = session_id {
6420 map.insert("session_id".to_string(), Value::String(sid));
6421 }
6422 }
6423 decorated.push(v);
6424 }
6425 Ok(Value::Array(decorated))
6426}
6427
6428async fn handle_agents_upsert(
6429 req: &JsonRpcMessage,
6430 state: &Arc<ServerState>,
6431) -> Result<Value, String> {
6432 let mut params = req.params.clone();
6433 if let Some(name) = params
6442 .get("interpreter")
6443 .and_then(|v| v.as_str())
6444 .map(str::to_string)
6445 {
6446 let resolved =
6447 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
6448 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
6449 }
6450 let spec: car_registry::supervisor::AgentSpec =
6451 serde_json::from_value(params).map_err(|e| e.to_string())?;
6452 let supervisor = state.supervisor()?;
6453 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
6454 serde_json::to_value(agent).map_err(|e| e.to_string())
6455}
6456
6457async fn handle_agents_install(
6471 req: &JsonRpcMessage,
6472 state: &Arc<ServerState>,
6473) -> Result<Value, String> {
6474 let manifest: car_registry::manifest::AgentManifest =
6475 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
6476 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
6477 let supervisor = state.supervisor()?;
6478 let (report, managed) = supervisor
6479 .install_manifest(manifest, &host)
6480 .await
6481 .map_err(|e| e.to_string())?;
6482 Ok(serde_json::json!({
6483 "report": {
6484 "missingOptional": report
6485 .missing_optional
6486 .iter()
6487 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
6488 .collect::<Vec<_>>(),
6489 },
6490 "agent": managed,
6491 }))
6492}
6493
6494async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
6495 let entries = match state.observer_manifest_path() {
6501 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
6502 .map_err(|e| e.to_string())?,
6503 None => {
6504 let supervisor = state.supervisor()?;
6505 supervisor.health().await
6506 }
6507 };
6508 serde_json::to_value(entries).map_err(|e| e.to_string())
6509}
6510
6511fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
6512 req.params
6513 .get("id")
6514 .and_then(Value::as_str)
6515 .map(str::to_string)
6516 .ok_or_else(|| "missing required `id` parameter".to_string())
6517}
6518
6519async fn handle_agents_remove(
6520 req: &JsonRpcMessage,
6521 state: &Arc<ServerState>,
6522) -> Result<Value, String> {
6523 let id = extract_agent_id(req)?;
6524 let supervisor = state.supervisor()?;
6525 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
6526 Ok(serde_json::json!({ "removed": removed }))
6527}
6528
6529async fn handle_agents_start(
6530 req: &JsonRpcMessage,
6531 state: &Arc<ServerState>,
6532) -> Result<Value, String> {
6533 let id = extract_agent_id(req)?;
6534 let supervisor = state.supervisor()?;
6535 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
6536 serde_json::to_value(agent).map_err(|e| e.to_string())
6537}
6538
6539async fn handle_agents_stop(
6540 req: &JsonRpcMessage,
6541 state: &Arc<ServerState>,
6542) -> Result<Value, String> {
6543 let id = extract_agent_id(req)?;
6544 let signal: car_registry::supervisor::StopSignal = req
6545 .params
6546 .get("signal")
6547 .map(|v| serde_json::from_value(v.clone()))
6548 .transpose()
6549 .map_err(|e| e.to_string())?
6550 .unwrap_or_default();
6551 let supervisor = state.supervisor()?;
6552 let agent = supervisor
6553 .stop(&id, signal)
6554 .await
6555 .map_err(|e| e.to_string())?;
6556 serde_json::to_value(agent).map_err(|e| e.to_string())
6557}
6558
6559async fn handle_agents_restart(
6560 req: &JsonRpcMessage,
6561 state: &Arc<ServerState>,
6562) -> Result<Value, String> {
6563 let id = extract_agent_id(req)?;
6564 let supervisor = state.supervisor()?;
6565 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
6566 serde_json::to_value(agent).map_err(|e| e.to_string())
6567}
6568
6569async fn handle_agents_tail_log(
6570 req: &JsonRpcMessage,
6571 state: &Arc<ServerState>,
6572) -> Result<Value, String> {
6573 let id = extract_agent_id(req)?;
6574 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
6575 let supervisor = state.supervisor()?;
6576 let lines = supervisor
6577 .tail_log(&id, n)
6578 .await
6579 .map_err(|e| e.to_string())?;
6580 Ok(serde_json::json!({ "lines": lines }))
6581}
6582
6583async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
6594 let include_health = req
6595 .params
6596 .get("include_health")
6597 .and_then(Value::as_bool)
6598 .unwrap_or(false);
6599 let json = car_ffi_common::external_agents::list(include_health).await?;
6600 serde_json::from_str(&json).map_err(|e| e.to_string())
6601}
6602
6603async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
6604 let include_health = req
6605 .params
6606 .get("include_health")
6607 .and_then(Value::as_bool)
6608 .unwrap_or(false);
6609 let json = car_ffi_common::external_agents::detect(include_health).await?;
6610 serde_json::from_str(&json).map_err(|e| e.to_string())
6611}
6612
6613async fn handle_agents_invoke_external(
6631 req: &JsonRpcMessage,
6632 state: &Arc<ServerState>,
6633 host_session: &Arc<crate::session::ClientSession>,
6634) -> Result<Value, String> {
6635 let id = req
6636 .params
6637 .get("id")
6638 .and_then(Value::as_str)
6639 .ok_or_else(|| "missing required `id` parameter".to_string())?
6640 .to_string();
6641 let task = req
6642 .params
6643 .get("task")
6644 .and_then(Value::as_str)
6645 .ok_or_else(|| "missing required `task` parameter".to_string())?
6646 .to_string();
6647 let stream = req
6648 .params
6649 .get("stream")
6650 .and_then(Value::as_bool)
6651 .unwrap_or(false);
6652 let session_id = req
6653 .params
6654 .get("session_id")
6655 .and_then(Value::as_str)
6656 .map(str::to_string)
6657 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
6658
6659 let mut options_value = req.params.clone();
6665 if let Some(obj) = options_value.as_object_mut() {
6666 obj.remove("id");
6667 obj.remove("task");
6668 obj.remove("stream");
6669 obj.remove("session_id");
6670 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
6679 if !has_explicit_mcp {
6680 if let Some(url) = state.mcp_url.get() {
6681 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
6682 }
6683 }
6684 }
6685
6686 if !stream {
6687 let options_json = options_value.to_string();
6690 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
6691 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
6692 append_external_agent_audit(&id, &task, &options_value, &result);
6693 return Ok(result);
6694 }
6695
6696 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
6702 .map_err(|e| format!("invalid options: {e}"))?;
6703
6704 {
6714 let mut chats = state.chat_sessions.lock().await;
6724 chats.entry(session_id.clone()).or_insert_with(|| {
6725 let created_at = std::time::SystemTime::now()
6726 .duration_since(std::time::UNIX_EPOCH)
6727 .map(|d| d.as_secs())
6728 .unwrap_or(0);
6729 crate::session::ChatSession {
6730 agent_id: id.clone(),
6731 host_client_id: host_session.client_id.clone(),
6732 created_at,
6733 }
6734 });
6735 }
6736
6737 use tokio::sync::mpsc;
6744 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
6745
6746 let drain_state = state.clone();
6747 let drain_session_id = session_id.clone();
6748 let drain_agent_id = id.clone();
6749 tokio::spawn(async move {
6750 while let Some(event) = rx.recv().await {
6751 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
6752 }
6753 });
6754
6755 let emitter_tx = tx.clone();
6756 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
6757 let _ = emitter_tx.send(event);
6762 });
6763
6764 let spawn_state = state.clone();
6770 let spawn_session_id = session_id.clone();
6771 let spawn_id = id.clone();
6772 let spawn_task = task.clone();
6773 let spawn_options = options_value.clone();
6774 tokio::spawn(async move {
6775 let outcome =
6776 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
6777 .await;
6778 drop(tx); let terminal_params: Value;
6785 let result_value: Value;
6786 match outcome {
6787 Ok(res) => {
6788 let mut parts: Vec<String> = Vec::new();
6795 if res.turns > 0 {
6796 parts.push(format!(
6797 "{} turn{}",
6798 res.turns,
6799 if res.turns == 1 { "" } else { "s" }
6800 ));
6801 }
6802 if res.tool_calls > 0 {
6803 parts.push(format!(
6804 "{} tool{}",
6805 res.tool_calls,
6806 if res.tool_calls == 1 { "" } else { "s" }
6807 ));
6808 }
6809 if res.duration_ms > 0 {
6810 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
6811 }
6812 let summary = if parts.is_empty() {
6813 "stop".to_string()
6814 } else {
6815 parts.join(" · ")
6816 };
6817 if res.is_error {
6818 terminal_params = serde_json::json!({
6819 "session_id": spawn_session_id,
6820 "agent_id": spawn_id,
6821 "kind": "error",
6822 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
6823 });
6824 } else {
6825 terminal_params = serde_json::json!({
6826 "session_id": spawn_session_id,
6827 "agent_id": spawn_id,
6828 "kind": "done",
6829 "finish_reason": summary,
6830 });
6831 }
6832 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
6833 }
6834 Err(e) => {
6835 let message = format!("{e}");
6836 terminal_params = serde_json::json!({
6837 "session_id": spawn_session_id,
6838 "agent_id": spawn_id,
6839 "kind": "error",
6840 "error": message.clone(),
6841 });
6842 result_value = serde_json::json!({ "is_error": true, "error": message });
6843 }
6844 }
6845 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
6846 spawn_state
6847 .chat_sessions
6848 .lock()
6849 .await
6850 .remove(&spawn_session_id);
6851 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
6852 });
6853
6854 Ok(serde_json::json!({
6855 "accepted": true,
6856 "session_id": session_id,
6857 }))
6858}
6859
6860async fn emit_external_chat_event(
6877 state: &Arc<ServerState>,
6878 session_id: &str,
6879 agent_id: &str,
6880 event: car_external_agents::StreamEvent,
6881) {
6882 use car_external_agents::StreamEvent;
6883 match event {
6884 StreamEvent::Assistant(a) => {
6885 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
6886 for block in content {
6887 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
6888 match block_type {
6889 "text" => {
6890 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
6891 if !text.is_empty() {
6892 let params = serde_json::json!({
6893 "session_id": session_id,
6894 "agent_id": agent_id,
6895 "kind": "token",
6896 "delta": text,
6897 });
6898 send_external_chat_frame(state, session_id, params).await;
6899 }
6900 }
6901 }
6902 "tool_use" => {
6903 let name = block
6904 .get("name")
6905 .and_then(|v| v.as_str())
6906 .unwrap_or("(unknown tool)");
6907 let params = serde_json::json!({
6908 "session_id": session_id,
6909 "agent_id": agent_id,
6910 "kind": "tool_call",
6911 "detail": name,
6912 });
6913 send_external_chat_frame(state, session_id, params).await;
6914 }
6915 _ => {}
6916 }
6917 }
6918 }
6919 }
6920 _ => {
6921 }
6926 }
6927}
6928
6929async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
6934 use futures::SinkExt;
6935 use tokio_tungstenite::tungstenite::Message;
6936
6937 let host_client_id = state
6938 .chat_sessions
6939 .lock()
6940 .await
6941 .get(session_id)
6942 .map(|s| s.host_client_id.clone());
6943 let Some(host_client_id) = host_client_id else {
6944 return;
6945 };
6946 let host_channel = {
6947 let sessions = state.sessions.lock().await;
6948 sessions.get(&host_client_id).map(|s| s.channel.clone())
6949 };
6950 let Some(channel) = host_channel else {
6951 return;
6952 };
6953 let frame = serde_json::json!({
6954 "jsonrpc": "2.0",
6955 "method": "agents.chat.event",
6956 "params": params,
6957 });
6958 if let Ok(text) = serde_json::to_string(&frame) {
6959 let _ = channel
6960 .write
6961 .lock()
6962 .await
6963 .send(Message::Text(text.into()))
6964 .await;
6965 }
6966}
6967
6968fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
6974 use std::io::Write;
6975 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
6976 Some(home) => home.join(".car"),
6977 None => return,
6978 };
6979 if std::fs::create_dir_all(&car_dir).is_err() {
6980 return;
6981 }
6982 let path = car_dir.join("external-agents.jsonl");
6983 let record = serde_json::json!({
6984 "ts": chrono::Utc::now().to_rfc3339(),
6985 "adapter_id": id,
6986 "task": task,
6987 "options": options,
6988 "result": result,
6989 });
6990 let line = match serde_json::to_string(&record) {
6991 Ok(s) => s,
6992 Err(_) => return,
6993 };
6994 if let Ok(mut f) = std::fs::OpenOptions::new()
6995 .create(true)
6996 .append(true)
6997 .open(&path)
6998 {
6999 let _ = writeln!(f, "{}", line);
7000 } else {
7001 tracing::warn!(
7002 path = %path.display(),
7003 "failed to append external-agent audit record"
7004 );
7005 }
7006}
7007
7008async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
7014 let force = req
7015 .params
7016 .get("force")
7017 .and_then(Value::as_bool)
7018 .unwrap_or(false);
7019 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
7020 let json = car_ffi_common::external_agents::health_one(id, force).await?;
7021 serde_json::from_str(&json).map_err(|e| e.to_string())
7022 } else {
7023 let json = car_ffi_common::external_agents::health(force).await?;
7024 serde_json::from_str(&json).map_err(|e| e.to_string())
7025 }
7026}
7027
7028const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
7046
7047async fn handle_agents_chat(
7052 req: &JsonRpcMessage,
7053 state: &Arc<ServerState>,
7054 host_session: &Arc<crate::session::ClientSession>,
7055) -> Result<Value, String> {
7056 use futures::SinkExt;
7057 use tokio::sync::oneshot;
7058 use tokio_tungstenite::tungstenite::Message;
7059
7060 let agent_id = req
7061 .params
7062 .get("agent_id")
7063 .and_then(Value::as_str)
7064 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
7065 .to_string();
7066 let prompt = req
7067 .params
7068 .get("prompt")
7069 .and_then(Value::as_str)
7070 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
7071 .to_string();
7072 let session_id = req
7073 .params
7074 .get("session_id")
7075 .and_then(Value::as_str)
7076 .map(str::to_string)
7077 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
7078 let stream = req
7079 .params
7080 .get("stream")
7081 .and_then(Value::as_bool)
7082 .unwrap_or(true);
7083 let voice_input = req
7084 .params
7085 .get("voice_input")
7086 .and_then(Value::as_bool)
7087 .unwrap_or(false);
7088
7089 let agent_client_id = state
7095 .attached_agents
7096 .lock()
7097 .await
7098 .get(&agent_id)
7099 .cloned()
7100 .ok_or_else(|| {
7101 format!(
7102 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
7103 agent_id
7104 )
7105 })?;
7106 let agent_channel = {
7107 let sessions = state.sessions.lock().await;
7108 sessions
7109 .get(&agent_client_id)
7110 .map(|s| s.channel.clone())
7111 .ok_or_else(|| {
7112 format!(
7113 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
7114 agent_id, agent_client_id
7115 )
7116 })?
7117 };
7118
7119 {
7125 let created_at = std::time::SystemTime::now()
7126 .duration_since(std::time::UNIX_EPOCH)
7127 .map(|d| d.as_secs())
7128 .unwrap_or(0);
7129 state.chat_sessions.lock().await.insert(
7130 session_id.clone(),
7131 crate::session::ChatSession {
7132 agent_id: agent_id.clone(),
7133 host_client_id: host_session.client_id.clone(),
7134 created_at,
7135 },
7136 );
7137 }
7138
7139 let request_id = agent_channel.next_request_id();
7146 let (tx, rx) = oneshot::channel();
7147 agent_channel
7148 .pending
7149 .lock()
7150 .await
7151 .insert(request_id.clone(), tx);
7152
7153 let rpc_request = serde_json::json!({
7154 "jsonrpc": "2.0",
7155 "method": "agent.chat",
7156 "params": {
7157 "session_id": session_id,
7158 "prompt": prompt,
7159 "stream": stream,
7160 "context": {
7161 "host_client_id": host_session.client_id,
7162 "voice_input": voice_input,
7163 },
7164 },
7165 "id": request_id,
7166 });
7167 let msg = Message::Text(
7168 serde_json::to_string(&rpc_request)
7169 .map_err(|e| e.to_string())?
7170 .into(),
7171 );
7172 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
7173 agent_channel.pending.lock().await.remove(&request_id);
7177 state.chat_sessions.lock().await.remove(&session_id);
7178 return Err(format!(
7179 "failed to deliver agent.chat to `{}`: {}",
7180 agent_id, e
7181 ));
7182 }
7183
7184 let ack = match tokio::time::timeout(
7189 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
7190 rx,
7191 )
7192 .await
7193 {
7194 Ok(Ok(resp)) => resp,
7195 Ok(Err(_)) => {
7196 state.chat_sessions.lock().await.remove(&session_id);
7198 return Err(format!(
7199 "agent `{}` disconnected before acking agents.chat",
7200 agent_id
7201 ));
7202 }
7203 Err(_) => {
7204 agent_channel.pending.lock().await.remove(&request_id);
7208 state.chat_sessions.lock().await.remove(&session_id);
7209 return Err(format!(
7210 "agent `{}` did not ack agents.chat within {}s",
7211 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
7212 ));
7213 }
7214 };
7215
7216 if let Some(err) = ack.error {
7217 state.chat_sessions.lock().await.remove(&session_id);
7219 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
7220 }
7221
7222 Ok(serde_json::json!({
7223 "accepted": true,
7224 "session_id": session_id,
7225 }))
7226}
7227
7228async fn handle_agents_chat_cancel(
7236 req: &JsonRpcMessage,
7237 state: &Arc<ServerState>,
7238) -> Result<Value, String> {
7239 use futures::SinkExt;
7240 use tokio_tungstenite::tungstenite::Message;
7241
7242 let session_id = req
7243 .params
7244 .get("session_id")
7245 .and_then(Value::as_str)
7246 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
7247 .to_string();
7248
7249 let chat = state.chat_sessions.lock().await.remove(&session_id);
7250 let chat = match chat {
7251 Some(c) => c,
7252 None => {
7253 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
7255 }
7256 };
7257
7258 let agent_client_id = state
7261 .attached_agents
7262 .lock()
7263 .await
7264 .get(&chat.agent_id)
7265 .cloned();
7266 if let Some(client_id) = agent_client_id {
7267 let channel_opt = {
7268 let sessions = state.sessions.lock().await;
7269 sessions.get(&client_id).map(|s| s.channel.clone())
7270 };
7271 if let Some(channel) = channel_opt {
7272 let notification = serde_json::json!({
7273 "jsonrpc": "2.0",
7274 "method": "agent.chat.cancel",
7275 "params": { "session_id": session_id },
7276 });
7277 if let Ok(text) = serde_json::to_string(¬ification) {
7278 let _ = channel
7279 .write
7280 .lock()
7281 .await
7282 .send(Message::Text(text.into()))
7283 .await;
7284 }
7285 }
7286 }
7287
7288 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
7289}
7290
7291pub(crate) async fn try_forward_agent_chat_event(
7302 parsed: &JsonRpcMessage,
7303 state: &Arc<ServerState>,
7304) -> bool {
7305 use futures::SinkExt;
7306 use tokio_tungstenite::tungstenite::Message;
7307
7308 let Some(method) = parsed.method.as_deref() else {
7312 return false;
7313 };
7314 if method != "agent.chat.event" {
7315 return false;
7316 }
7317 if !parsed.id.is_null() {
7318 return false;
7321 }
7322 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
7323 return false;
7324 };
7325 let session_id = session_id.to_string();
7326
7327 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
7332 let Some(chat) = chat else {
7333 return true; };
7335
7336 let kind = parsed
7345 .params
7346 .get("kind")
7347 .and_then(Value::as_str)
7348 .map(str::to_string)
7349 .unwrap_or_else(|| {
7350 if parsed.params.get("error").is_some() {
7351 "error".to_string()
7352 } else if parsed.params.get("finish_reason").is_some() {
7353 "done".to_string()
7354 } else {
7355 "token".to_string()
7356 }
7357 });
7358
7359 let host_channel = {
7363 let sessions = state.sessions.lock().await;
7364 sessions
7365 .get(&chat.host_client_id)
7366 .map(|s| s.channel.clone())
7367 };
7368 if let Some(channel) = host_channel {
7369 let mut params = parsed.params.clone();
7370 if let Some(obj) = params.as_object_mut() {
7371 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
7372 obj.entry("kind")
7377 .or_insert_with(|| Value::String(kind.clone()));
7378 }
7379 let forward = serde_json::json!({
7380 "jsonrpc": "2.0",
7381 "method": "agents.chat.event",
7382 "params": params,
7383 });
7384 if let Ok(text) = serde_json::to_string(&forward) {
7385 let send_result = channel
7386 .write
7387 .lock()
7388 .await
7389 .send(Message::Text(text.into()))
7390 .await;
7391 if let Err(e) = send_result {
7392 tracing::warn!(
7393 session_id = %session_id,
7394 agent_id = %chat.agent_id,
7395 host_client_id = %chat.host_client_id,
7396 kind = %kind,
7397 error = %e,
7398 "agent.chat.event forward to host failed at the WS send step"
7399 );
7400 }
7401 }
7402 } else {
7403 tracing::warn!(
7410 session_id = %session_id,
7411 agent_id = %chat.agent_id,
7412 host_client_id = %chat.host_client_id,
7413 kind = %kind,
7414 "agent.chat.event from supervised agent had no host channel \
7415 (host disconnected since `agents.chat`); dropping routing entry"
7416 );
7417 state.chat_sessions.lock().await.remove(&session_id);
7418 return true;
7419 }
7420
7421 if matches!(kind.as_str(), "done" | "error") {
7425 state.chat_sessions.lock().await.remove(&session_id);
7426 }
7427
7428 true
7429}
7430
7431#[cfg(test)]
7432mod fd_leak_regression {
7433 use super::run_dispatch;
7440 use futures::SinkExt;
7441 use std::sync::Arc;
7442 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
7443
7444 #[tokio::test]
7445 async fn abrupt_read_error_still_runs_session_cleanup() {
7446 let tmp = tempfile::TempDir::new().unwrap();
7447 let state = Arc::new(crate::session::ServerState::standalone(
7448 tmp.path().to_path_buf(),
7449 ));
7450
7451 let read = futures::stream::iter(vec![Err::<Message, WsError>(
7455 WsError::ConnectionClosed,
7456 )]);
7457 let write: crate::session::WsSink = Box::pin(
7458 futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
7459 );
7460
7461 let result =
7462 run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
7463 assert!(
7464 result.is_ok(),
7465 "run_dispatch must return Ok after cleanup, got {result:?}"
7466 );
7467
7468 assert!(
7471 state.sessions.lock().await.is_empty(),
7472 "state.sessions must be empty after an abrupt disconnect (car#209)"
7473 );
7474 }
7475}
7476
7477#[cfg(test)]
7478mod a2ui_action_delivery {
7479 use super::{handle_a2ui_action, JsonRpcMessage};
7484 use crate::session::{ServerState, WsChannel, WsSink};
7485 use futures::{SinkExt, StreamExt};
7486 use std::collections::HashMap;
7487 use std::sync::atomic::AtomicU64;
7488 use std::sync::Arc;
7489 use tokio::sync::Mutex;
7490 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
7491
7492 #[tokio::test]
7493 async fn client_action_broadcasts_to_a2ui_subscribers() {
7494 let tmp = tempfile::TempDir::new().unwrap();
7495 let state = Arc::new(ServerState::standalone(tmp.path().to_path_buf()));
7496
7497 let (tx, mut rx) = futures::channel::mpsc::unbounded::<Message>();
7500 let sink: WsSink = Box::pin(tx.sink_map_err(|_| WsError::ConnectionClosed));
7501 let channel = Arc::new(WsChannel {
7502 write: Mutex::new(sink),
7503 pending: Mutex::new(HashMap::new()),
7504 next_id: AtomicU64::new(0),
7505 });
7506 state
7507 .a2ui_subscribers
7508 .lock()
7509 .await
7510 .insert("test-sub".to_string(), channel);
7511
7512 let req: JsonRpcMessage = serde_json::from_value(serde_json::json!({
7516 "jsonrpc": "2.0",
7517 "method": "a2ui.action",
7518 "id": 1,
7519 "params": {
7520 "action": "trader:pause",
7521 "surfaceId": "surf-1",
7522 "sourceComponentId": "b1",
7523 "timestamp": "2026-06-03T00:00:00Z"
7524 }
7525 }))
7526 .unwrap();
7527
7528 let out = handle_a2ui_action(&req, &state).await;
7529 assert!(out.is_ok(), "handle_a2ui_action failed: {out:?}");
7530
7531 let msg = rx.next().await.expect("subscriber received no frame");
7532 let text = match msg {
7533 Message::Text(t) => t.to_string(),
7534 other => panic!("expected text frame, got {other:?}"),
7535 };
7536 let v: serde_json::Value = serde_json::from_str(&text).unwrap();
7537 assert_eq!(v["method"], "a2ui.event");
7538 assert_eq!(v["params"]["kind"], "a2ui.action");
7539 assert_eq!(
7541 v["params"]["result"]["action"]["name"], "trader:pause",
7542 "ClientAction.name should accept the `action` alias"
7543 );
7544 assert_eq!(v["params"]["result"]["surfaceId"], "surf-1");
7545 }
7546}