1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 let mut conn_tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
175
176 while let Some(msg) = read.next().await {
177 while conn_tasks.try_join_next().is_some() {}
180
181 let msg = match msg {
190 Ok(m) => m,
191 Err(e) => {
192 info!("read error from {}: {}; closing", client_id, e);
193 break;
194 }
195 };
196 if msg.is_text() {
197 let text = match msg.to_text() {
198 Ok(t) => t,
199 Err(e) => {
200 info!("non-text frame from {}: {}; closing", client_id, e);
201 break;
202 }
203 };
204 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
205 Ok(m) => m,
206 Err(e) => {
207 send_response(
208 &session.channel,
209 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
210 )
211 .await
212 .ok();
213 continue;
214 }
215 };
216
217 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
219 if let Some(id_str) = parsed.id.as_str() {
220 let mut pending = session.channel.pending.lock().await;
221 if let Some(tx) = pending.remove(id_str) {
222 let tool_resp = if let Some(result) = parsed.result {
223 ToolExecuteResponse {
224 action_id: id_str.to_string(),
225 output: Some(result),
226 error: None,
227 }
228 } else {
229 let err_msg = parsed
230 .error
231 .as_ref()
232 .and_then(|e| e.get("message"))
233 .and_then(|m| m.as_str())
234 .unwrap_or("unknown error")
235 .to_string();
236 ToolExecuteResponse {
237 action_id: id_str.to_string(),
238 output: None,
239 error: Some(err_msg),
240 }
241 };
242 let _ = tx.send(tool_resp);
243 continue;
244 }
245 }
246 }
247
248 if try_forward_agent_chat_event(&parsed, &state).await {
257 continue;
258 }
259
260 if let Some(method) = &parsed.method {
262 info!(method = %method, "dispatching JSON-RPC method");
263
264 if state.auth_token.get().is_some()
272 && !session
273 .authenticated
274 .load(std::sync::atomic::Ordering::Acquire)
275 && method != "session.auth"
276 {
277 let resp = JsonRpcResponse::error(
278 parsed.id.clone(),
279 -32001,
280 "auth required: send `session.auth` with the per-launch token \
281 from ~/Library/Application Support/ai.parslee.car/auth-token \
282 (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
283 as the first frame on this connection",
284 );
285 let _ = send_response(&session.channel, resp).await;
286 info!(client = %client_id, method = %method,
287 "rejecting non-auth method on unauthenticated session; closing");
288 break;
289 }
290
291 if state.approval_gate.requires_approval(method.as_str()) {
303 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
304 Ok(()) => {}
305 Err(reason) => {
306 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
307 let _ = send_response(&session.channel, resp).await;
308 info!(
309 client = %client_id,
310 method = %method,
311 reason = %reason,
312 "approval gate blocked dispatch"
313 );
314 continue;
315 }
316 }
317 }
318
319 let session_task = session.clone();
334 let state_task = state.clone();
335 let method_owned = method.clone();
336 let parsed_task = parsed;
337 conn_tasks.spawn(async move {
340 let session = session_task;
341 let state = state_task;
342 let parsed = parsed_task;
343 let result = match method_owned.as_str() {
344 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
345 "parslee.auth" => handle_parslee_auth().await,
346 "auth.start" => handle_auth_start(&parsed).await,
347 "auth.complete" => handle_auth_complete(&parsed).await,
348 "auth.status" => handle_auth_status().await,
349 "auth.logout" => handle_auth_logout().await,
350 "session.init" => handle_session_init(&parsed, &session).await,
351 "host.subscribe" => handle_host_subscribe(&session, &state).await,
352 "host.agents" => handle_host_agents(&session).await,
353 "host.events" => handle_host_events(&parsed, &session).await,
354 "host.approvals" => handle_host_approvals(&session).await,
355 "host.register_agent" => {
356 handle_host_register_agent(&parsed, &session).await
357 }
358 "host.unregister_agent" => {
359 handle_host_unregister_agent(&parsed, &session).await
360 }
361 "host.set_status" => handle_host_set_status(&parsed, &session).await,
362 "host.notify" => handle_host_notify(&parsed, &session).await,
363 "host.request_approval" => {
364 handle_host_request_approval(&parsed, &session).await
365 }
366 "host.resolve_approval" => {
367 handle_host_resolve_approval(&parsed, &session).await
368 }
369 "tools.register" => handle_tools_register(&parsed, &session).await,
370 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
371 "policy.register" => handle_policy_register(&parsed, &session).await,
372 "session.policy.open" => handle_session_policy_open(&session).await,
373 "session.policy.close" => {
374 handle_session_policy_close(&parsed, &session).await
375 }
376 "verify" => handle_verify(&parsed, &session).await,
377 "state.get" => handle_state_get(&parsed, &session).await,
378 "state.set" => handle_state_set(&parsed, &session).await,
379 "state.exists" => handle_state_exists(&parsed, &session).await,
380 "state.keys" => handle_state_keys(&parsed, &session).await,
381 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
382 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
383 "memory.query" => handle_memory_query(&parsed, &session).await,
384 "memory.build_context" => {
385 handle_memory_build_context(&parsed, &session).await
386 }
387 "memory.build_context_fast" => {
388 handle_memory_build_context_fast(&parsed, &session).await
389 }
390 "memory.consolidate" => handle_memory_consolidate(&session).await,
391 "memory.fact_count" => handle_memory_fact_count(&session).await,
392 "memory.persist" => handle_memory_persist(&parsed, &session).await,
393 "memory.load" => handle_memory_load(&parsed, &session).await,
394 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
395 "skill.find" => handle_skill_find(&parsed, &session).await,
396 "skill.report" => handle_skill_report(&parsed, &session).await,
397 "skill.repair" => handle_skill_repair(&parsed, &session).await,
398 "skills.ingest_distilled" => {
399 handle_skills_ingest_distilled(&parsed, &session).await
400 }
401 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
402 "skills.domains_needing_evolution" => {
403 handle_skills_domains_needing_evolution(&parsed, &session).await
404 }
405 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
406 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
407 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
408 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
409 "multi.vote" => handle_multi_vote(&parsed, &session).await,
410 "scheduler.create" => handle_scheduler_create(&parsed),
411 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
412 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
413 "infer" => handle_infer(&parsed, &state, &session).await,
414 "image.generate" => handle_image_generate(&parsed, &state).await,
415 "video.generate" => handle_video_generate(&parsed, &state).await,
416 "embed" => handle_embed(&parsed, &state).await,
417 "classify" => handle_classify(&parsed, &state).await,
418 "tokenize" => handle_tokenize(&parsed, &state).await,
419 "detokenize" => handle_detokenize(&parsed, &state).await,
420 "rerank" => handle_rerank(&parsed, &state).await,
421 "transcribe" => handle_transcribe(&parsed, &state).await,
422 "synthesize" => handle_synthesize(&parsed, &state).await,
423 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
424 "speech.prepare" => handle_speech_prepare(&state).await,
425 "models.route" => handle_models_route(&parsed, &state).await,
426 "models.stats" => handle_models_stats(&state).await,
427 "outcomes.resolve_pending" => {
428 handle_outcomes_resolve_pending(&parsed, &state).await
429 }
430 "events.count" => handle_events_count(&session).await,
431 "events.stats" => handle_events_stats(&session).await,
432 "events.truncate" => handle_events_truncate(&parsed, &session).await,
433 "events.clear" => handle_events_clear(&session).await,
434 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
435 "models.list" => handle_models_list(&state),
436 "models.register" => handle_models_register(&parsed, &state).await,
437 "models.unregister" => handle_models_unregister(&parsed, &state).await,
438 "models.list_unified" => handle_models_list_unified(&state),
439 "models.search" => handle_models_search(&parsed, &state),
440 "models.upgrades" => handle_models_upgrades(&state),
441 "models.pull" => handle_models_pull(&parsed, &state).await,
442 "models.install" => handle_models_pull(&parsed, &state).await,
443 "skills.distill" => handle_skills_distill(&parsed, &state).await,
444 "skills.list" => handle_skills_list(&parsed, &session).await,
445 "browser.run" => handle_browser_run(&parsed, &session).await,
446 "browser.close" => handle_browser_close(&session).await,
447 "secret.put" => handle_secret_put(&parsed),
448 "secret.get" => handle_secret_get(&parsed),
449 "secret.delete" => handle_secret_delete(&parsed),
450 "secret.status" => handle_secret_status(&parsed),
451 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
452 "permissions.status" => handle_perm_status(&parsed),
453 "permissions.request" => handle_perm_request(&parsed),
454 "permissions.explain" => handle_perm_explain(&parsed),
455 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
456 "accounts.list" => car_ffi_common::accounts::list(),
457 "accounts.open" => {
458 #[derive(serde::Deserialize, Default)]
459 struct OpenParams {
460 #[serde(default)]
461 account_id: Option<String>,
462 }
463 let p: OpenParams =
464 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
465 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
466 }
467 "calendar.list" => car_ffi_common::integrations::calendar_list(),
468 "calendar.events" => handle_calendar_events(&parsed),
469 "contacts.containers" => {
470 car_ffi_common::integrations::contacts_containers()
471 }
472 "contacts.find" => handle_contacts_find(&parsed),
473 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
474 "mail.inbox" => handle_mail_inbox(&parsed),
475 "mail.send" => handle_mail_send(&parsed),
476 "messages.services" => car_ffi_common::integrations::messages_services(),
477 "messages.chats" => handle_messages_chats(&parsed),
478 "messages.send" => handle_messages_send(&parsed),
479 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
480 "notes.find" => handle_notes_find(&parsed),
481 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
482 "reminders.items" => handle_reminders_items(&parsed),
483 "photos.albums" => car_ffi_common::integrations::photos_albums(),
484 "bookmarks.list" => handle_bookmarks_list(&parsed),
485 "files.locations" => car_ffi_common::integrations::files_locations(),
486 "keychain.status" => car_ffi_common::integrations::keychain_status(),
487 "health.status" => car_ffi_common::health::status(),
488 "health.sleep" => handle_health_sleep(&parsed),
489 "health.workouts" => handle_health_workouts(&parsed),
490 "health.activity" => handle_health_activity(&parsed),
491 "voice.transcribe_stream.start" => {
492 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
493 }
494 "voice.transcribe_stream.stop" => {
495 handle_voice_transcribe_stream_stop(&parsed, &state).await
496 }
497 "voice.transcribe_stream.push" => {
498 handle_voice_transcribe_stream_push(&parsed, &state).await
499 }
500 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
501 "voice.dispatch_turn" => {
502 handle_voice_dispatch_turn(&parsed, &state, &session).await
503 }
504 "voice.cancel_turn" => handle_voice_cancel_turn().await,
505 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
506 "inference.register_runner" => {
507 handle_inference_register_runner(&session).await
508 }
509 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
510 "inference.runner.complete" => {
511 handle_inference_runner_complete(&parsed).await
512 }
513 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
514 "voice.providers.list" => {
515 serde_json::from_str::<serde_json::Value>(
519 &car_voice::list_voice_providers_json(),
520 )
521 .map_err(|e| e.to_string())
522 }
523 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
524 .await
525 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
526 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
527 .await
528 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
529 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
530 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
531 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
532 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
533 "workflow.run" => handle_workflow_run(&parsed, &session).await,
534 "workflow.verify" => handle_workflow_verify(&parsed),
535 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
536 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
537 "meeting.list" => handle_meeting_list(&parsed),
538 "meeting.get" => handle_meeting_get(&parsed),
539 "registry.register" => handle_registry_register(&parsed),
540 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
541 "registry.unregister" => handle_registry_unregister(&parsed),
542 "registry.list" => handle_registry_list(&parsed),
543 "registry.reap" => handle_registry_reap(&parsed),
544 "admission.status" => handle_admission_status(&state),
545 "a2a.start" => handle_a2a_start(&parsed, &session).await,
546 "a2a.stop" => handle_a2a_stop(),
547 "a2a.status" => handle_a2a_status(),
548 "a2a.send" => handle_a2a_send(&parsed, &state).await,
549 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
550 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
551 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
552 "a2ui.reap" => handle_a2ui_reap(&state).await,
553 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
554 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
555 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
556 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
557 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
558 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
559 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
560 "automation.run_applescript" => handle_run_applescript(&parsed).await,
561 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
562 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
563 "notifications.local" => handle_local_notification(&parsed).await,
564 "vision.ocr" => handle_vision_ocr(&parsed).await,
565 "agents.list" => handle_agents_list(&state).await,
566 "agents.health" => handle_agents_health(&state).await,
567 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
568 "agents.install" => handle_agents_install(&parsed, &state).await,
569 "agents.remove" => handle_agents_remove(&parsed, &state).await,
570 "agents.start" => handle_agents_start(&parsed, &state).await,
571 "agents.stop" => handle_agents_stop(&parsed, &state).await,
572 "agents.restart" => handle_agents_restart(&parsed, &state).await,
573 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
574 "agents.list_external" => handle_agents_list_external(&parsed).await,
575 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
576 "agents.health_external" => handle_agents_health_external(&parsed).await,
577 "agents.invoke_external" => {
578 handle_agents_invoke_external(&parsed, &state, &session).await
579 }
580 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
581 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
582 "message/send"
589 | "SendMessage"
590 | "message/stream"
591 | "SendStreamingMessage"
592 | "tasks/get"
593 | "GetTask"
594 | "tasks/list"
595 | "ListTasks"
596 | "tasks/cancel"
597 | "CancelTask"
598 | "tasks/resubscribe"
599 | "SubscribeToTask"
600 | "tasks/pushNotificationConfig/set"
601 | "CreateTaskPushNotificationConfig"
602 | "tasks/pushNotificationConfig/get"
603 | "GetTaskPushNotificationConfig"
604 | "tasks/pushNotificationConfig/list"
605 | "ListTaskPushNotificationConfigs"
606 | "tasks/pushNotificationConfig/delete"
607 | "DeleteTaskPushNotificationConfig"
608 | "agent/getAuthenticatedExtendedCard"
609 | "GetExtendedAgentCard" => {
610 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
611 }
612 _ => Err(format!("unknown method: {}", method_owned)),
613 };
614
615 let resp = match result {
616 Ok(value) => JsonRpcResponse::success(parsed.id, value),
617 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
618 };
619 let _ = send_response(&session.channel, resp).await;
620 });
621 }
622 } else if msg.is_close() {
623 info!("Client {} disconnected", client_id);
624 break;
625 }
626 }
627
628 conn_tasks.abort_all();
633
634 session.host.unsubscribe(&client_id).await;
635 session.host.reap_session_approvals(&client_id).await;
641 state.a2ui_subscribers.lock().await.remove(&client_id);
642
643 let _removed = state.remove_session(&client_id).await;
654 {
655 let mut pending = session.channel.pending.lock().await;
656 pending.clear();
657 }
658
659 Ok(())
660}
661
662async fn send_response(
663 channel: &WsChannel,
664 resp: JsonRpcResponse,
665) -> Result<(), Box<dyn std::error::Error>> {
666 use futures::SinkExt;
667 let json = serde_json::to_string(&resp)?;
668 channel
669 .write
670 .lock()
671 .await
672 .send(Message::Text(json.into()))
673 .await?;
674 Ok(())
675}
676
677async fn handle_host_subscribe(
680 session: &crate::session::ClientSession,
681 state: &Arc<ServerState>,
682) -> Result<Value, String> {
683 session
684 .host
685 .subscribe(&session.client_id, session.channel.clone())
686 .await;
687 serde_json::to_value(HostSnapshot {
688 subscribed: true,
689 agents: session.host.agents().await,
690 approvals: session.host.approvals().await,
691 events: session.host.events(50).await,
692 identity: Some(daemon_identity(state)),
693 })
694 .map_err(|e| e.to_string())
695}
696
697fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
705 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
712 (
713 Some(p.to_string_lossy().into_owned()),
714 car_proto::HostManifestRole::Observer,
715 )
716 } else if let Some(s) = state.supervisor_if_installed() {
717 (
718 Some(s.manifest_path().to_string_lossy().into_owned()),
719 car_proto::HostManifestRole::Owner,
720 )
721 } else {
722 (None, car_proto::HostManifestRole::None)
723 };
724 car_proto::HostIdentity {
725 version: env!("CARGO_PKG_VERSION").to_string(),
726 pid: std::process::id(),
727 manifest_path,
728 manifest_role,
729 parslee: state
730 .parslee_session
731 .get()
732 .map(|session| session.identity.clone()),
733 }
734}
735
736async fn handle_parslee_auth() -> Result<Value, String> {
747 let session = crate::parslee_auth::load_or_refresh()
748 .await?
749 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
750 Ok(serde_json::json!({
751 "authenticated": true,
752 "token_type": "Bearer",
753 "access_token": session.access_token,
754 "authorization_header": format!("Bearer {}", session.access_token),
755 "identity": session.identity,
756 }))
757}
758
759async fn handle_auth_start(req: &JsonRpcMessage) -> Result<Value, String> {
765 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
766 let client_id = req
767 .params
768 .get("client_id")
769 .and_then(|v| v.as_str())
770 .unwrap_or("parslee-car");
771 let redirect_uri = req
772 .params
773 .get("redirect_uri")
774 .and_then(|v| v.as_str())
775 .ok_or_else(|| "redirect_uri is required".to_string())?;
776 let provider = req.params.get("provider").and_then(|v| v.as_str());
777 let state = car_auth::new_state();
778 let verifier = car_auth::pkce_verifier();
779 let challenge = car_auth::pkce_challenge(&verifier);
780 let url =
781 car_auth::authorize_url(&api_base, client_id, redirect_uri, &state, &challenge, provider)?;
782 Ok(serde_json::json!({
783 "authorize_url": url,
784 "state": state,
785 "verifier": verifier,
786 }))
787}
788
789async fn handle_auth_complete(req: &JsonRpcMessage) -> Result<Value, String> {
790 let api_base = car_auth::api_base(req.params.get("api_base").and_then(|v| v.as_str()));
791 let client_id = req
792 .params
793 .get("client_id")
794 .and_then(|v| v.as_str())
795 .unwrap_or("parslee-car");
796 let redirect_uri = req
797 .params
798 .get("redirect_uri")
799 .and_then(|v| v.as_str())
800 .ok_or_else(|| "redirect_uri is required".to_string())?;
801 let code = req
802 .params
803 .get("code")
804 .and_then(|v| v.as_str())
805 .ok_or_else(|| "code is required".to_string())?;
806 let verifier = req
807 .params
808 .get("verifier")
809 .and_then(|v| v.as_str())
810 .ok_or_else(|| "verifier is required".to_string())?;
811 let token =
812 car_auth::exchange_code(&api_base, client_id, redirect_uri, code, verifier).await?;
813 car_auth::store_tokens(&api_base, &token)?;
814 Ok(serde_json::json!({ "ok": true }))
815}
816
817async fn handle_auth_status() -> Result<Value, String> {
818 match car_auth::fetch_status(None).await? {
819 Some(session_json) => {
820 let session: Value = serde_json::from_str(&session_json).unwrap_or(Value::Null);
821 Ok(serde_json::json!({ "authenticated": true, "session": session }))
822 }
823 None => Ok(serde_json::json!({ "authenticated": false })),
824 }
825}
826
827async fn handle_auth_logout() -> Result<Value, String> {
828 car_auth::clear_tokens()?;
829 Ok(serde_json::json!({ "ok": true }))
830}
831
832async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
833 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
834}
835
836async fn handle_host_events(
837 req: &JsonRpcMessage,
838 session: &crate::session::ClientSession,
839) -> Result<Value, String> {
840 let limit = req
841 .params
842 .get("limit")
843 .and_then(|v| v.as_u64())
844 .unwrap_or(100) as usize;
845 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
846}
847
848async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
849 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
850}
851
852async fn handle_a2ui_apply(
853 req: &JsonRpcMessage,
854 state: &Arc<ServerState>,
855) -> Result<Value, String> {
856 #[derive(Deserialize)]
857 struct Params {
858 #[serde(default)]
859 envelope: Option<car_a2ui::A2uiEnvelope>,
860 #[serde(default)]
861 message: Option<car_a2ui::A2uiEnvelope>,
862 }
863
864 let envelope = if req.params.get("createSurface").is_some()
865 || req.params.get("updateComponents").is_some()
866 || req.params.get("updateDataModel").is_some()
867 || req.params.get("deleteSurface").is_some()
868 {
869 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
870 .map_err(|e| e.to_string())?
871 } else {
872 match serde_json::from_value::<Params>(req.params.clone()) {
873 Ok(params) => params
874 .envelope
875 .or(params.message)
876 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
877 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
878 .map_err(|e| e.to_string())?,
879 }
880 };
881
882 apply_a2ui_envelope(state, envelope, None, None).await
883}
884
885async fn handle_a2ui_ingest(
886 req: &JsonRpcMessage,
887 state: &Arc<ServerState>,
888) -> Result<Value, String> {
889 #[derive(Deserialize)]
890 #[serde(rename_all = "camelCase")]
891 struct Params {
892 #[serde(default)]
893 endpoint: Option<String>,
894 #[serde(default)]
895 a2a_endpoint: Option<String>,
896 #[serde(default)]
897 owner: Option<car_a2ui::A2uiSurfaceOwner>,
898 #[serde(default)]
899 route_auth: Option<A2aRouteAuth>,
900 #[serde(default)]
901 allow_untrusted_endpoint: bool,
902 }
903
904 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
905 endpoint: None,
906 a2a_endpoint: None,
907 owner: None,
908 route_auth: None,
909 allow_untrusted_endpoint: false,
910 });
911 let payload = req.params.get("payload").unwrap_or(&req.params);
912 state
913 .a2ui
914 .validate_payload(payload)
915 .map_err(|e| e.to_string())?;
916 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
917 if envelopes.is_empty() {
918 return Err("no A2UI envelopes found in payload".into());
919 }
920 let endpoint = params.endpoint.or(params.a2a_endpoint);
921 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
922 let owner = params
923 .owner
924 .or_else(|| car_a2ui::owner_from_value(payload))
925 .map(|owner| match endpoint.clone() {
926 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
927 None => owner,
928 });
929
930 let mut results = Vec::new();
931 for envelope in envelopes {
932 let value =
933 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
934 results.push(value);
935 }
936 Ok(serde_json::json!({ "applied": results }))
937}
938
939async fn apply_a2ui_envelope(
940 state: &Arc<ServerState>,
941 envelope: car_a2ui::A2uiEnvelope,
942 owner: Option<car_a2ui::A2uiSurfaceOwner>,
943 route_auth: Option<A2aRouteAuth>,
944) -> Result<Value, String> {
945 let result = state
946 .a2ui
947 .apply_with_owner(envelope, owner)
948 .await
949 .map_err(|e| e.to_string())?;
950 update_a2ui_route_auth(state, &result, route_auth).await;
951 let kind = if result.deleted {
952 "a2ui.surface_deleted"
953 } else {
954 "a2ui.surface_updated"
955 };
956 let message = if result.deleted {
957 format!("A2UI surface {} deleted", result.surface_id)
958 } else {
959 format!("A2UI surface {} updated", result.surface_id)
960 };
961 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
962 state
963 .host
964 .record_event(kind, None, message, payload.clone())
965 .await;
966 broadcast_a2ui_event(state, kind, &payload).await;
970 serde_json::to_value(result).map_err(|e| e.to_string())
971}
972
973async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
974 use futures::SinkExt;
975 use tokio_tungstenite::tungstenite::Message;
976 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
977 .a2ui_subscribers
978 .lock()
979 .await
980 .values()
981 .cloned()
982 .collect();
983 if subscribers.is_empty() {
984 return;
985 }
986 let Ok(json) = serde_json::to_string(&serde_json::json!({
987 "jsonrpc": "2.0",
988 "method": "a2ui.event",
989 "params": {
990 "kind": kind,
991 "result": result,
992 },
993 })) else {
994 return;
995 };
996 for channel in subscribers {
997 let _ = channel
998 .write
999 .lock()
1000 .await
1001 .send(Message::Text(json.clone().into()))
1002 .await;
1003 }
1004}
1005
1006async fn update_a2ui_route_auth(
1007 state: &Arc<ServerState>,
1008 result: &car_a2ui::A2uiApplyResult,
1009 route_auth: Option<A2aRouteAuth>,
1010) {
1011 let mut auth = state.a2ui_route_auth.lock().await;
1012 if result.deleted {
1013 auth.remove(&result.surface_id);
1014 return;
1015 }
1016
1017 let has_route_endpoint = result
1018 .surface
1019 .as_ref()
1020 .and_then(|surface| surface.owner.as_ref())
1021 .and_then(|owner| owner.endpoint.as_ref())
1022 .is_some();
1023 match (has_route_endpoint, route_auth) {
1024 (true, Some(route_auth)) => {
1025 auth.insert(result.surface_id.clone(), route_auth);
1026 }
1027 _ => {
1028 auth.remove(&result.surface_id);
1029 }
1030 }
1031}
1032
1033fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
1034 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
1035}
1036
1037async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
1038 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
1039 if !removed.is_empty() {
1040 let mut auth = state.a2ui_route_auth.lock().await;
1041 for surface_id in &removed {
1042 auth.remove(surface_id);
1043 }
1044 }
1045 Ok(serde_json::json!({ "removed": removed }))
1046}
1047
1048async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
1049 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
1050}
1051
1052async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
1053 let surface_id = req
1054 .params
1055 .get("surface_id")
1056 .or_else(|| req.params.get("surfaceId"))
1057 .and_then(Value::as_str)
1058 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
1059 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1060}
1061
1062async fn handle_a2ui_subscribe(
1068 session: &crate::session::ClientSession,
1069 state: &Arc<ServerState>,
1070) -> Result<Value, String> {
1071 state
1072 .a2ui_subscribers
1073 .lock()
1074 .await
1075 .insert(session.client_id.clone(), session.channel.clone());
1076 Ok(serde_json::json!({ "subscribed": true }))
1077}
1078
1079async fn handle_a2ui_unsubscribe(
1083 session: &crate::session::ClientSession,
1084 state: &Arc<ServerState>,
1085) -> Result<Value, String> {
1086 state
1087 .a2ui_subscribers
1088 .lock()
1089 .await
1090 .remove(&session.client_id);
1091 Ok(serde_json::json!({ "subscribed": false }))
1092}
1093
1094async fn handle_a2ui_replay(
1101 req: &JsonRpcMessage,
1102 state: &Arc<ServerState>,
1103) -> Result<Value, String> {
1104 let surface_id = req
1105 .params
1106 .get("surface_id")
1107 .or_else(|| req.params.get("surfaceId"))
1108 .and_then(Value::as_str)
1109 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
1110 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
1111}
1112
1113async fn handle_a2ui_action(
1114 req: &JsonRpcMessage,
1115 state: &Arc<ServerState>,
1116) -> Result<Value, String> {
1117 let action: car_a2ui::ClientAction =
1118 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1119 let owner = state.a2ui.owner(&action.surface_id).await;
1120 let route = route_a2ui_action(state, &action, owner.clone()).await;
1121 let payload = serde_json::json!({
1122 "action": action,
1123 "owner": owner,
1124 "route": route,
1125 });
1126 let event = state
1127 .host
1128 .record_event(
1129 "a2ui.action",
1130 None,
1131 format!(
1132 "A2UI action {} from {}",
1133 action.name, action.source_component_id
1134 ),
1135 payload,
1136 )
1137 .await;
1138 Ok(serde_json::json!({
1139 "event": event,
1140 "route": route,
1141 }))
1142}
1143
1144async fn handle_a2ui_render_report(
1151 req: &JsonRpcMessage,
1152 state: &Arc<ServerState>,
1153) -> Result<Value, String> {
1154 let report: car_a2ui::RenderReport =
1158 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1159 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1160 let kind = "a2ui.render_report";
1161 let message = format!("A2UI render report for surface {}", report.surface_id);
1162 let event = state
1163 .host
1164 .record_event(kind, None, message, payload.clone())
1165 .await;
1166 broadcast_a2ui_event(state, kind, &payload).await;
1167
1168 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1176 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1182 tracing::warn!(
1183 surface_id = %report.surface_id,
1184 count = state.ui_agent_budget.count(&report.surface_id),
1185 max = state.ui_agent_budget.max(),
1186 "ui-agent iteration budget exhausted; skipping agent invocation"
1187 );
1188 return Ok(serde_json::json!({ "event": event }));
1189 }
1190 match state.ui_agent.on_render_report(&report, &surface) {
1194 car_ui_agent::Decision::Patch {
1195 envelope,
1196 strategy_id,
1197 patch_hash,
1198 elapsed_ns,
1199 } => {
1200 if !state
1208 .ui_agent_oscillation
1209 .check_and_record(&report.surface_id, patch_hash)
1210 {
1211 tracing::warn!(
1212 surface_id = %report.surface_id,
1213 strategy = %strategy_id,
1214 patch_hash,
1215 "ui-agent oscillation detected; suppressing patch"
1216 );
1217 state.ui_agent_budget.refund(&report.surface_id);
1220 return Ok(serde_json::json!({ "event": event }));
1221 }
1222 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1223 patch_components: Some(envelope),
1224 ..Default::default()
1225 };
1226 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1227 tracing::warn!(
1228 surface_id = %report.surface_id,
1229 strategy = %strategy_id,
1230 patch_hash,
1231 elapsed_ns,
1232 error = %e,
1233 "ui-agent patch apply failed",
1234 );
1235 state.ui_agent_budget.refund(&report.surface_id);
1237 } else {
1238 tracing::debug!(
1239 surface_id = %report.surface_id,
1240 strategy = %strategy_id,
1241 patch_hash,
1242 elapsed_ns,
1243 iteration = state.ui_agent_budget.count(&report.surface_id),
1244 "ui-agent patch applied",
1245 );
1246 if let Some(memgine) = state.shared_memgine.clone() {
1256 let speaker = format!("ui-agent/{}", report.surface_id);
1257 let text = format!("strategy applied: {}", strategy_id);
1258 tokio::spawn(async move {
1259 let mut guard = memgine.lock().await;
1260 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1261 });
1262 }
1263 }
1264 }
1265 car_ui_agent::Decision::StableNoChange => {
1266 state.ui_agent_budget.refund(&report.surface_id);
1268 }
1269 car_ui_agent::Decision::HardStop { reason } => {
1270 state.ui_agent_budget.refund(&report.surface_id);
1271 tracing::error!(
1277 surface_id = %report.surface_id,
1278 reason = %reason,
1279 "ui-agent hard-stopped improvement loop",
1280 );
1281 }
1282 }
1283 } else {
1284 tracing::debug!(
1285 surface_id = %report.surface_id,
1286 "ui-agent skipped — surface not found in store",
1287 );
1288 }
1289
1290 Ok(serde_json::json!({ "event": event }))
1291}
1292
1293async fn route_a2ui_action(
1294 state: &Arc<ServerState>,
1295 action: &car_a2ui::ClientAction,
1296 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1297) -> Value {
1298 let Some(owner) = owner else {
1299 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1300 };
1301 if owner.kind != "a2a" {
1302 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1303 }
1304 let Some(endpoint) = owner.endpoint.clone() else {
1305 return serde_json::json!({
1306 "delivered": false,
1307 "reason": "surface owner has no endpoint",
1308 "owner": owner
1309 });
1310 };
1311
1312 let message = car_a2a::Message {
1313 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1314 role: car_a2a::MessageRole::User,
1315 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1316 data: serde_json::json!({
1317 "a2uiAction": action,
1318 }),
1319 metadata: Default::default(),
1320 })],
1321 task_id: owner.task_id.clone(),
1322 context_id: owner.context_id.clone(),
1323 metadata: Default::default(),
1324 };
1325
1326 let auth = state
1327 .a2ui_route_auth
1328 .lock()
1329 .await
1330 .get(&action.surface_id)
1331 .cloned()
1332 .map(client_auth_from_route_auth)
1333 .unwrap_or(car_a2a::ClientAuth::None);
1334
1335 match car_a2a::A2aClient::new(endpoint.clone())
1336 .with_auth(auth)
1337 .send_message(message, false)
1338 .await
1339 {
1340 Ok(result) => serde_json::json!({
1341 "delivered": true,
1342 "owner": owner,
1343 "endpoint": endpoint,
1344 "result": result,
1345 }),
1346 Err(error) => serde_json::json!({
1347 "delivered": false,
1348 "owner": owner,
1349 "endpoint": endpoint,
1350 "error": error.to_string(),
1351 }),
1352 }
1353}
1354
1355fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1356 match auth {
1357 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1358 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1359 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1360 }
1361}
1362
1363fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1364 let endpoint = endpoint?;
1365 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1366 Some(endpoint)
1367 } else {
1368 None
1369 }
1370}
1371
1372fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1373 endpoint == "http://localhost"
1374 || endpoint.starts_with("http://localhost:")
1375 || endpoint.starts_with("http://localhost/")
1376 || endpoint == "http://127.0.0.1"
1377 || endpoint.starts_with("http://127.0.0.1:")
1378 || endpoint.starts_with("http://127.0.0.1/")
1379 || endpoint == "http://[::1]"
1380 || endpoint.starts_with("http://[::1]:")
1381 || endpoint.starts_with("http://[::1]/")
1382}
1383
1384async fn handle_host_register_agent(
1385 req: &JsonRpcMessage,
1386 session: &crate::session::ClientSession,
1387) -> Result<Value, String> {
1388 let request: RegisterHostAgentRequest =
1389 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1390 serde_json::to_value(
1391 session
1392 .host
1393 .register_agent(&session.client_id, request)
1394 .await?,
1395 )
1396 .map_err(|e| e.to_string())
1397}
1398
1399async fn handle_host_unregister_agent(
1400 req: &JsonRpcMessage,
1401 session: &crate::session::ClientSession,
1402) -> Result<Value, String> {
1403 let agent_id = req
1404 .params
1405 .get("agent_id")
1406 .and_then(|v| v.as_str())
1407 .ok_or("missing agent_id")?;
1408 session
1409 .host
1410 .unregister_agent(&session.client_id, agent_id)
1411 .await?;
1412 Ok(serde_json::json!({"ok": true}))
1413}
1414
1415async fn handle_host_set_status(
1416 req: &JsonRpcMessage,
1417 session: &crate::session::ClientSession,
1418) -> Result<Value, String> {
1419 let request: SetHostAgentStatusRequest =
1420 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1421 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1422 .map_err(|e| e.to_string())
1423}
1424
1425async fn handle_host_notify(
1426 req: &JsonRpcMessage,
1427 session: &crate::session::ClientSession,
1428) -> Result<Value, String> {
1429 let kind = req
1430 .params
1431 .get("kind")
1432 .and_then(|v| v.as_str())
1433 .unwrap_or("host.notification");
1434 let agent_id = req
1435 .params
1436 .get("agent_id")
1437 .and_then(|v| v.as_str())
1438 .map(str::to_string);
1439 let message = req
1440 .params
1441 .get("message")
1442 .and_then(|v| v.as_str())
1443 .unwrap_or("");
1444 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1445 serde_json::to_value(
1446 session
1447 .host
1448 .record_event(kind, agent_id, message, payload)
1449 .await,
1450 )
1451 .map_err(|e| e.to_string())
1452}
1453
1454async fn handle_host_request_approval(
1455 req: &JsonRpcMessage,
1456 session: &crate::session::ClientSession,
1457) -> Result<Value, String> {
1458 let request: CreateHostApprovalRequest =
1459 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1460 if let Some(agent_id) = &request.agent_id {
1461 let _ = session
1466 .host
1467 .set_status(
1468 &session.client_id,
1469 SetHostAgentStatusRequest {
1470 agent_id: agent_id.clone(),
1471 status: HostAgentStatus::WaitingForApproval,
1472 current_task: None,
1473 message: Some("Waiting for approval".to_string()),
1474 payload: Value::Null,
1475 },
1476 )
1477 .await;
1478 }
1479 let owner_client_id = if request.system_level {
1486 None
1487 } else {
1488 Some(session.client_id.as_str())
1489 };
1490 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1491 .map_err(|e| e.to_string())
1492}
1493
1494async fn handle_host_resolve_approval(
1495 req: &JsonRpcMessage,
1496 session: &crate::session::ClientSession,
1497) -> Result<Value, String> {
1498 let request: ResolveHostApprovalRequest =
1499 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1500 serde_json::to_value(
1501 session
1502 .host
1503 .resolve_approval(&session.client_id, request)
1504 .await?,
1505 )
1506 .map_err(|e| e.to_string())
1507}
1508
1509async fn handle_session_auth(
1520 req: &JsonRpcMessage,
1521 session: &crate::session::ClientSession,
1522 state: &Arc<ServerState>,
1523) -> Result<Value, String> {
1524 let supplied = req
1525 .params
1526 .get("token")
1527 .and_then(Value::as_str)
1528 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1529 let agent_id = req
1536 .params
1537 .get("agent_id")
1538 .and_then(Value::as_str)
1539 .map(str::to_string);
1540
1541 if let Some(id) = agent_id {
1542 let supervisor = state.supervisor()?;
1543 if !supervisor.validate_agent_token(&id, supplied).await {
1544 return Err(format!(
1545 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1546 ));
1547 }
1548 {
1552 let mut attached = state.attached_agents.lock().await;
1553 if let Some(prior) = attached.get(&id) {
1554 if prior != &session.client_id {
1555 return Err(format!(
1556 "auth failed: agent_id `{id}` is already attached on \
1557 another connection (client_id={prior})"
1558 ));
1559 }
1560 }
1561 attached.insert(id.clone(), session.client_id.clone());
1562 }
1563 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1568 *session.bound_memgine.lock().await = Some(agent_eng);
1569 *session.agent_id.lock().await = Some(id.clone());
1570 session
1571 .authenticated
1572 .store(true, std::sync::atomic::Ordering::Release);
1573 return Ok(serde_json::json!({
1574 "ok": true,
1575 "auth_enabled": true,
1576 "agent_id": id,
1577 }));
1578 }
1579
1580 let expected = match state.auth_token.get() {
1581 Some(t) => t,
1582 None => {
1583 session
1589 .authenticated
1590 .store(true, std::sync::atomic::Ordering::Release);
1591 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1592 }
1593 };
1594 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1595 return Err("auth failed: token mismatch".to_string());
1596 }
1597 session
1598 .authenticated
1599 .store(true, std::sync::atomic::Ordering::Release);
1600 Ok(serde_json::json!({
1601 "ok": true,
1602 "auth_enabled": true,
1603 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1604 }))
1605}
1606
1607fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1611 if a.len() != b.len() {
1612 return false;
1613 }
1614 let mut diff: u8 = 0;
1615 for (x, y) in a.iter().zip(b.iter()) {
1616 diff |= x ^ y;
1617 }
1618 diff == 0
1619}
1620
1621async fn gate_high_risk_method(
1631 method: &str,
1632 params: &Value,
1633 state: &Arc<ServerState>,
1634) -> Result<(), String> {
1635 let timeout = state.approval_gate.timeout;
1636 let req = CreateHostApprovalRequest {
1637 agent_id: None,
1638 action: format!("ws.method:{method}"),
1639 details: serde_json::json!({
1640 "method": method,
1641 "params_preview": preview_params(params, 2_000),
1645 }),
1646 options: vec!["approve".to_string(), "deny".to_string()],
1647 system_level: true,
1651 };
1652 match state
1653 .host
1654 .request_and_wait_approval(req, "approve", timeout)
1655 .await
1656 {
1657 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1658 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1659 "{method} denied by user (approval gate, audit 2026-05). \
1660 To call this method without an interactive prompt, start \
1661 car-server with --no-approvals on a trusted machine."
1662 )),
1663 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1664 "{method} approval timed out after {}s with no resolution. \
1665 The approval is still visible in `host.approvals` for \
1666 forensics; resubmit the request to retry.",
1667 timeout.as_secs()
1668 )),
1669 Err(e) => Err(format!("approval gate error: {e}")),
1670 }
1671}
1672
1673fn preview_params(value: &Value, max_chars: usize) -> Value {
1674 let s = value.to_string();
1675 if s.len() <= max_chars {
1676 value.clone()
1677 } else {
1678 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1679 }
1680}
1681
1682async fn handle_session_init(
1683 req: &JsonRpcMessage,
1684 session: &crate::session::ClientSession,
1685) -> Result<Value, String> {
1686 let init: SessionInitRequest =
1687 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1688
1689 for tool in &init.tools {
1690 register_from_definition(&session.runtime, tool).await;
1691 }
1692
1693 let mut policy_count = 0;
1694 {
1695 let mut policies = session.runtime.policies.write().await;
1696 for policy_def in &init.policies {
1697 if let Some(check) = build_policy_check(policy_def) {
1698 policies.register(&policy_def.name, check, "");
1699 policy_count += 1;
1700 }
1701 }
1702 }
1703
1704 serde_json::to_value(SessionInitResponse {
1705 session_id: session.client_id.clone(),
1706 tools_registered: init.tools.len(),
1707 policies_registered: policy_count,
1708 })
1709 .map_err(|e| e.to_string())
1710}
1711
1712fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1713 match def.rule.as_str() {
1714 "deny_tool" => {
1715 let target = def.target.clone();
1716 Some(Box::new(
1717 move |action: &car_ir::Action, _: &car_state::StateStore| {
1718 if action.tool.as_deref() == Some(&target) {
1719 Some(format!("tool '{}' denied", target))
1720 } else {
1721 None
1722 }
1723 },
1724 ))
1725 }
1726 "require_state" => {
1727 let key = def.key.clone();
1728 let value = def.value.clone();
1729 Some(Box::new(
1730 move |_: &car_ir::Action, state: &car_state::StateStore| {
1731 if state.get(&key).as_ref() != Some(&value) {
1732 Some(format!("state['{}'] must be {:?}", key, value))
1733 } else {
1734 None
1735 }
1736 },
1737 ))
1738 }
1739 "deny_tool_param" => {
1740 let target = def.target.clone();
1741 let param = def.key.clone();
1742 let pattern = def.pattern.clone();
1743 Some(Box::new(
1744 move |action: &car_ir::Action, _: &car_state::StateStore| {
1745 if action.tool.as_deref() != Some(&target) {
1746 return None;
1747 }
1748 if let Some(val) = action.parameters.get(¶m) {
1749 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1750 if s.contains(&pattern) {
1751 return Some(format!("param '{}' matches '{}'", param, pattern));
1752 }
1753 }
1754 None
1755 },
1756 ))
1757 }
1758 _ => None,
1759 }
1760}
1761
1762async fn handle_tools_register(
1763 req: &JsonRpcMessage,
1764 session: &crate::session::ClientSession,
1765) -> Result<Value, String> {
1766 let tools: Vec<ToolDefinition> =
1767 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1768 for tool in &tools {
1769 register_from_definition(&session.runtime, tool).await;
1770 }
1771 Ok(Value::from(tools.len()))
1772}
1773
1774async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1781 runtime
1782 .register_tool_schema(car_ir::ToolSchema {
1783 name: def.name.clone(),
1784 description: def.description.clone(),
1785 parameters: def.parameters.clone(),
1786 returns: def.returns.clone(),
1787 idempotent: def.idempotent,
1788 cache_ttl_secs: def.cache_ttl_secs,
1789 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1790 max_calls: rl.max_calls,
1791 interval_secs: rl.interval_secs,
1792 }),
1793 })
1794 .await;
1795}
1796
1797async fn handle_proposal_submit(
1798 req: &JsonRpcMessage,
1799 session: &crate::session::ClientSession,
1800) -> Result<Value, String> {
1801 let submit: ProposalSubmitRequest =
1802 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1803 let session_id = req
1809 .params
1810 .get("session_id")
1811 .and_then(|v| v.as_str())
1812 .map(str::to_string);
1813
1814 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1823 Some(v) if !v.is_null() => {
1824 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1825 }
1826 _ => None,
1827 };
1828
1829 let result = match (session_id, scope) {
1830 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1835 (Some(sid), None) => {
1836 session
1837 .runtime
1838 .execute_with_session(&submit.proposal, &sid)
1839 .await
1840 }
1841 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1842 (None, None) => session.runtime.execute(&submit.proposal).await,
1843 };
1844 serde_json::to_value(result).map_err(|e| e.to_string())
1845}
1846
1847async fn handle_session_policy_open(
1848 session: &crate::session::ClientSession,
1849) -> Result<Value, String> {
1850 let id = session.runtime.open_session().await;
1851 Ok(serde_json::json!({ "session_id": id }))
1852}
1853
1854async fn handle_session_policy_close(
1855 req: &JsonRpcMessage,
1856 session: &crate::session::ClientSession,
1857) -> Result<Value, String> {
1858 let sid = req
1859 .params
1860 .get("session_id")
1861 .and_then(|v| v.as_str())
1862 .ok_or("missing 'session_id'")?;
1863 let closed = session.runtime.close_session(sid).await;
1864 Ok(serde_json::json!({ "closed": closed }))
1865}
1866
1867async fn handle_policy_register(
1873 req: &JsonRpcMessage,
1874 session: &crate::session::ClientSession,
1875) -> Result<Value, String> {
1876 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1877 .map_err(|e| format!("invalid policy params: {e}"))?;
1878 let session_id = req
1879 .params
1880 .get("session_id")
1881 .and_then(|v| v.as_str())
1882 .map(str::to_string);
1883 let check = build_policy_check(&def)
1884 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1885 match session_id {
1886 Some(sid) => session
1887 .runtime
1888 .register_policy_in_session(&sid, &def.name, check, "")
1889 .await
1890 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1891 None => {
1892 let mut policies = session.runtime.policies.write().await;
1893 policies.register(&def.name, check, "");
1894 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1895 }
1896 }
1897}
1898
1899async fn handle_verify(
1900 req: &JsonRpcMessage,
1901 session: &crate::session::ClientSession,
1902) -> Result<Value, String> {
1903 let vr: VerifyRequest =
1904 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1905 let tools: std::collections::HashSet<String> =
1906 session.runtime.tools.read().await.keys().cloned().collect();
1907 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1908 serde_json::to_value(VerifyResponse {
1909 valid: result.valid,
1910 issues: result
1911 .issues
1912 .iter()
1913 .map(|i| VerifyIssueProto {
1914 action_id: i.action_id.clone(),
1915 severity: i.severity.clone(),
1916 message: i.message.clone(),
1917 })
1918 .collect(),
1919 simulated_state: result.simulated_state,
1920 })
1921 .map_err(|e| e.to_string())
1922}
1923
1924fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1931 req.params
1932 .get("tenant_id")
1933 .and_then(|v| v.as_str())
1934 .filter(|s| !s.is_empty())
1935 .map(str::to_string)
1936}
1937
1938async fn handle_state_get(
1939 req: &JsonRpcMessage,
1940 session: &crate::session::ClientSession,
1941) -> Result<Value, String> {
1942 let key = req
1943 .params
1944 .get("key")
1945 .and_then(|v| v.as_str())
1946 .ok_or("missing 'key'")?;
1947 let tenant = tenant_from_params(req);
1948 Ok(session
1949 .runtime
1950 .state
1951 .scoped(tenant.as_deref())
1952 .get(key)
1953 .unwrap_or(Value::Null))
1954}
1955
1956async fn handle_state_set(
1957 req: &JsonRpcMessage,
1958 session: &crate::session::ClientSession,
1959) -> Result<Value, String> {
1960 let key = req
1961 .params
1962 .get("key")
1963 .and_then(|v| v.as_str())
1964 .ok_or("missing 'key'")?;
1965 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1966 let tenant = tenant_from_params(req);
1967 session
1968 .runtime
1969 .state
1970 .scoped(tenant.as_deref())
1971 .set(key, value, "client");
1972 Ok(Value::from("ok"))
1973}
1974
1975async fn handle_state_exists(
1979 req: &JsonRpcMessage,
1980 session: &crate::session::ClientSession,
1981) -> Result<Value, String> {
1982 let key = req
1983 .params
1984 .get("key")
1985 .and_then(|v| v.as_str())
1986 .ok_or("missing 'key'")?;
1987 let tenant = tenant_from_params(req);
1988 Ok(Value::Bool(
1989 session.runtime.state.scoped(tenant.as_deref()).exists(key),
1990 ))
1991}
1992
1993async fn handle_state_keys(
1996 req: &JsonRpcMessage,
1997 session: &crate::session::ClientSession,
1998) -> Result<Value, String> {
1999 let tenant = tenant_from_params(req);
2000 Ok(Value::Array(
2001 session
2002 .runtime
2003 .state
2004 .scoped(tenant.as_deref())
2005 .keys()
2006 .into_iter()
2007 .map(Value::String)
2008 .collect(),
2009 ))
2010}
2011
2012async fn handle_state_snapshot(
2023 req: &JsonRpcMessage,
2024 session: &crate::session::ClientSession,
2025) -> Result<Value, String> {
2026 let tenant = tenant_from_params(req);
2027 let view = session.runtime.state.scoped(tenant.as_deref());
2028 let mut map = serde_json::Map::new();
2029 for key in view.keys() {
2030 if let Some(value) = view.get(&key) {
2031 map.insert(key, value);
2032 }
2033 }
2034 Ok(Value::Object(map))
2035}
2036
2037fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
2043 let base = car_ffi_common::memory_path::ensure_base()
2044 .map_err(|e| format!("memory base unavailable: {e}"))?;
2045 let dir = base.join("agents");
2046 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
2047 Ok(dir.join(format!("{agent_id}.json")))
2048}
2049
2050async fn get_or_load_agent_memgine(
2057 state: &Arc<ServerState>,
2058 agent_id: &str,
2059) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
2060 {
2061 let map = state.agent_memgines.lock().await;
2062 if let Some(eng) = map.get(agent_id) {
2063 return Ok(eng.clone());
2064 }
2065 }
2066 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
2068 None,
2069 )));
2070 let path = agent_memgine_snapshot_path(agent_id)?;
2071 if path.exists() {
2072 let content = std::fs::read_to_string(&path)
2073 .map_err(|e| format!("read {}: {}", path.display(), e))?;
2074 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
2075 let mut g = engine.lock().await;
2076 let mut loaded: u32 = 0;
2077 for fact in &facts {
2078 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2079 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2080 let kind = fact
2081 .get("kind")
2082 .and_then(|v| v.as_str())
2083 .unwrap_or("pattern");
2084 let fid = format!("loaded-{loaded}");
2085 g.ingest_fact(
2086 &fid,
2087 subject,
2088 body,
2089 "user",
2090 "peer",
2091 chrono::Utc::now(),
2092 "global",
2093 None,
2094 vec![],
2095 kind == "constraint",
2096 );
2097 loaded += 1;
2098 }
2099 }
2100 let mut map = state.agent_memgines.lock().await;
2101 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
2102 Ok(stored)
2103}
2104
2105async fn persist_agent_memgine(
2109 agent_id: &str,
2110 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
2111) -> Result<(), String> {
2112 let path = agent_memgine_snapshot_path(agent_id)?;
2113 let g = engine.lock().await;
2114 let facts: Vec<Value> = g
2115 .graph
2116 .inner
2117 .node_indices()
2118 .filter_map(|nix| {
2119 let node = g.graph.inner.node_weight(nix)?;
2120 if !node.is_valid() {
2121 return None;
2122 }
2123 if node.kind == car_memgine::MemKind::Identity
2124 || node.kind == car_memgine::MemKind::Environment
2125 {
2126 return None;
2127 }
2128 Some(serde_json::json!({
2129 "subject": node.key,
2130 "body": node.value,
2131 "kind": match node.kind {
2132 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2133 car_memgine::MemKind::Conversation => "outcome",
2134 _ => "pattern",
2135 },
2136 "confidence": 0.5,
2137 "content_type": node.content_type.as_label(),
2138 }))
2139 })
2140 .collect();
2141 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2142 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2143 Ok(())
2144}
2145
2146async fn handle_memory_fact_count(
2153 session: &crate::session::ClientSession,
2154) -> Result<Value, String> {
2155 let engine_arc = session.effective_memgine().await;
2156 let engine = engine_arc.lock().await;
2157 Ok(Value::from(engine.valid_fact_count()))
2158}
2159
2160async fn handle_memory_add_fact(
2161 req: &JsonRpcMessage,
2162 session: &crate::session::ClientSession,
2163) -> Result<Value, String> {
2164 let subject = req
2165 .params
2166 .get("subject")
2167 .and_then(|v| v.as_str())
2168 .ok_or("missing subject")?;
2169 let body = req
2170 .params
2171 .get("body")
2172 .and_then(|v| v.as_str())
2173 .ok_or("missing body")?;
2174 let kind = req
2175 .params
2176 .get("kind")
2177 .and_then(|v| v.as_str())
2178 .unwrap_or("pattern");
2179 let engine_arc = session.effective_memgine().await;
2183 let count = {
2184 let mut engine = engine_arc.lock().await;
2185 let fid = format!("ws-{}", engine.valid_fact_count());
2186 engine.ingest_fact(
2187 &fid,
2188 subject,
2189 body,
2190 "user",
2191 "peer",
2192 chrono::Utc::now(),
2193 "global",
2194 None,
2195 vec![],
2196 kind == "constraint",
2197 );
2198 engine.valid_fact_count()
2199 };
2200 if let Some(id) = session.agent_id.lock().await.clone() {
2203 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2204 tracing::warn!(agent_id = %id, error = %e,
2205 "agent memgine persist failed; in-memory state is canonical");
2206 }
2207 }
2208 Ok(Value::from(count))
2209}
2210
2211async fn handle_memory_query(
2212 req: &JsonRpcMessage,
2213 session: &crate::session::ClientSession,
2214) -> Result<Value, String> {
2215 let query = req
2216 .params
2217 .get("query")
2218 .and_then(|v| v.as_str())
2219 .ok_or("missing query")?;
2220 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2221 let engine_arc = session.effective_memgine().await;
2222 let engine = engine_arc.lock().await;
2223 let seeds = engine.graph.find_seeds(query, 5);
2224 let hits = if !seeds.is_empty() {
2229 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2230 } else {
2231 vec![]
2232 };
2233 let results: Vec<Value> = hits
2234 .iter()
2235 .filter_map(|hit| {
2236 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2237 Some(serde_json::json!({
2238 "subject": node.key,
2239 "body": node.value,
2240 "kind": format!("{:?}", node.kind).to_lowercase(),
2241 "confidence": hit.activation,
2242 }))
2243 })
2244 .collect();
2245 serde_json::to_value(results).map_err(|e| e.to_string())
2246}
2247
2248async fn handle_memory_build_context(
2249 req: &JsonRpcMessage,
2250 session: &crate::session::ClientSession,
2251) -> Result<Value, String> {
2252 let query = req
2253 .params
2254 .get("query")
2255 .and_then(|v| v.as_str())
2256 .unwrap_or("");
2257 let model_context_window = req
2261 .params
2262 .get("model_context_window")
2263 .and_then(|v| v.as_u64())
2264 .map(|w| w as usize);
2265 let mut engine = session.memgine.lock().await;
2266 Ok(Value::from(
2267 engine.build_context_for_model(query, model_context_window),
2268 ))
2269}
2270
2271async fn handle_memory_build_context_fast(
2277 req: &JsonRpcMessage,
2278 session: &crate::session::ClientSession,
2279) -> Result<Value, String> {
2280 let query = req
2281 .params
2282 .get("query")
2283 .and_then(|v| v.as_str())
2284 .unwrap_or("");
2285 let model_context_window = req
2286 .params
2287 .get("model_context_window")
2288 .and_then(|v| v.as_u64())
2289 .map(|w| w as usize);
2290 let mut engine = session.memgine.lock().await;
2291 Ok(Value::from(engine.build_context_with_options(
2292 query,
2293 model_context_window,
2294 car_memgine::ContextMode::Fast,
2295 None,
2296 )))
2297}
2298
2299async fn handle_memory_persist(
2315 req: &JsonRpcMessage,
2316 session: &crate::session::ClientSession,
2317) -> Result<Value, String> {
2318 let path = req
2319 .params
2320 .get("path")
2321 .and_then(|v| v.as_str())
2322 .ok_or("missing path")?;
2323 let resolved = car_ffi_common::memory_path::resolve(path)
2324 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2325 let engine = session.memgine.lock().await;
2326 let facts: Vec<Value> = engine
2327 .graph
2328 .inner
2329 .node_indices()
2330 .filter_map(|nix| {
2331 let node = engine.graph.inner.node_weight(nix)?;
2332 if !node.is_valid() {
2333 return None;
2334 }
2335 if node.kind == car_memgine::MemKind::Identity
2336 || node.kind == car_memgine::MemKind::Environment
2337 {
2338 return None;
2339 }
2340 Some(serde_json::json!({
2341 "subject": node.key,
2342 "body": node.value,
2343 "kind": match node.kind {
2344 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2345 car_memgine::MemKind::Conversation => "outcome",
2346 _ => "pattern",
2347 },
2348 "confidence": 0.5,
2349 "content_type": node.content_type.as_label(),
2350 }))
2351 })
2352 .collect();
2353 let count = facts.len();
2354 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2355 std::fs::write(&resolved, json)
2356 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2357 Ok(Value::from(count as u64))
2358}
2359
2360async fn handle_memory_load(
2366 req: &JsonRpcMessage,
2367 session: &crate::session::ClientSession,
2368) -> Result<Value, String> {
2369 let path = req
2370 .params
2371 .get("path")
2372 .and_then(|v| v.as_str())
2373 .ok_or("missing path")?;
2374 let resolved = car_ffi_common::memory_path::resolve(path)
2375 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2376 let content = std::fs::read_to_string(&resolved)
2377 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2378 let facts: Vec<Value> =
2379 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2380 let mut engine = session.memgine.lock().await;
2381 engine.reset();
2382 let mut count: u32 = 0;
2383 for fact in &facts {
2384 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2385 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2386 let kind = fact
2387 .get("kind")
2388 .and_then(|v| v.as_str())
2389 .unwrap_or("pattern");
2390 let fid = format!("loaded-{}", count);
2391 engine.ingest_fact(
2392 &fid,
2393 subject,
2394 body,
2395 "user",
2396 "peer",
2397 chrono::Utc::now(),
2398 "global",
2399 None,
2400 vec![],
2401 kind == "constraint",
2402 );
2403 count += 1;
2404 }
2405 Ok(Value::from(count))
2406}
2407
2408async fn handle_skill_ingest(
2411 req: &JsonRpcMessage,
2412 session: &crate::session::ClientSession,
2413) -> Result<Value, String> {
2414 let name = req
2415 .params
2416 .get("name")
2417 .and_then(|v| v.as_str())
2418 .ok_or("missing name")?;
2419 let code = req
2420 .params
2421 .get("code")
2422 .and_then(|v| v.as_str())
2423 .ok_or("missing code")?;
2424 let platform = req
2425 .params
2426 .get("platform")
2427 .and_then(|v| v.as_str())
2428 .unwrap_or("unknown");
2429 let persona = req
2430 .params
2431 .get("persona")
2432 .and_then(|v| v.as_str())
2433 .unwrap_or("");
2434 let url_pattern = req
2435 .params
2436 .get("url_pattern")
2437 .and_then(|v| v.as_str())
2438 .unwrap_or("");
2439 let description = req
2440 .params
2441 .get("description")
2442 .and_then(|v| v.as_str())
2443 .unwrap_or("");
2444 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2445 let keywords: Vec<String> = req
2446 .params
2447 .get("task_keywords")
2448 .and_then(|v| v.as_array())
2449 .map(|arr| {
2450 arr.iter()
2451 .filter_map(|v| v.as_str().map(String::from))
2452 .collect()
2453 })
2454 .unwrap_or_default();
2455
2456 let trigger = car_memgine::SkillTrigger {
2457 persona: persona.into(),
2458 url_pattern: url_pattern.into(),
2459 task_keywords: keywords,
2460 structured: None,
2461 };
2462 let mut engine = session.memgine.lock().await;
2463 let node = engine.ingest_skill(
2464 name,
2465 code,
2466 platform,
2467 trigger,
2468 description,
2469 supersedes,
2470 vec![],
2471 vec![],
2472 );
2473 Ok(Value::from(node.index() as u64))
2474}
2475
2476async fn handle_skill_find(
2477 req: &JsonRpcMessage,
2478 session: &crate::session::ClientSession,
2479) -> Result<Value, String> {
2480 let persona = req
2481 .params
2482 .get("persona")
2483 .and_then(|v| v.as_str())
2484 .unwrap_or("");
2485 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2486 let task = req
2487 .params
2488 .get("task")
2489 .and_then(|v| v.as_str())
2490 .unwrap_or("");
2491 let max = req
2492 .params
2493 .get("max_results")
2494 .and_then(|v| v.as_u64())
2495 .unwrap_or(1) as usize;
2496 let engine = session.memgine.lock().await;
2497 let results = engine.find_skill(persona, url, task, max);
2498 let json: Vec<Value> = results
2499 .iter()
2500 .map(|(m, s)| {
2501 serde_json::json!({
2502 "name": m.name, "code": m.code, "platform": m.platform,
2503 "description": m.description, "stats": m.stats, "match_score": s,
2504 })
2505 })
2506 .collect();
2507 serde_json::to_value(json).map_err(|e| e.to_string())
2508}
2509
2510async fn handle_skill_report(
2511 req: &JsonRpcMessage,
2512 session: &crate::session::ClientSession,
2513) -> Result<Value, String> {
2514 let name = req
2515 .params
2516 .get("skill_name")
2517 .and_then(|v| v.as_str())
2518 .ok_or("missing skill_name")?;
2519 let outcome_str = req
2520 .params
2521 .get("outcome")
2522 .and_then(|v| v.as_str())
2523 .ok_or("missing outcome")?;
2524 let outcome = match outcome_str {
2525 "success" => car_memgine::SkillOutcome::Success,
2526 _ => car_memgine::SkillOutcome::Fail,
2527 };
2528 let mut engine = session.memgine.lock().await;
2529 let stats = engine
2530 .report_outcome(name, outcome)
2531 .ok_or(format!("skill '{}' not found", name))?;
2532 serde_json::to_value(stats).map_err(|e| e.to_string())
2533}
2534
2535struct WsAgentRunner {
2544 channel: Arc<WsChannel>,
2545 host: Arc<crate::host::HostState>,
2546 client_id: String,
2547}
2548
2549#[async_trait::async_trait]
2550impl car_multi::AgentRunner for WsAgentRunner {
2551 async fn run(
2552 &self,
2553 spec: &car_multi::AgentSpec,
2554 task: &str,
2555 _runtime: &car_engine::Runtime,
2556 _mailbox: &car_multi::Mailbox,
2557 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2558 use futures::SinkExt;
2559
2560 let request_id = self.channel.next_request_id();
2561 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2562 let agent = self
2563 .host
2564 .register_agent(
2565 &self.client_id,
2566 RegisterHostAgentRequest {
2567 id: Some(agent_id.clone()),
2568 name: spec.name.clone(),
2569 kind: "callback".to_string(),
2570 capabilities: spec.tools.clone(),
2571 project: spec
2572 .metadata
2573 .get("project")
2574 .and_then(|v| v.as_str())
2575 .map(str::to_string),
2576 pid: None,
2577 display: serde_json::from_value(
2578 spec.metadata
2579 .get("display")
2580 .cloned()
2581 .unwrap_or(serde_json::Value::Null),
2582 )
2583 .unwrap_or_default(),
2584 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2585 },
2586 )
2587 .await
2588 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2589 let _ = self
2590 .host
2591 .set_status(
2592 &self.client_id,
2593 SetHostAgentStatusRequest {
2594 agent_id: agent.id.clone(),
2595 status: HostAgentStatus::Running,
2596 current_task: Some(task.to_string()),
2597 message: Some(format!("{} started", spec.name)),
2598 payload: serde_json::json!({ "task": task }),
2599 },
2600 )
2601 .await;
2602
2603 let rpc_request = serde_json::json!({
2604 "jsonrpc": "2.0",
2605 "method": "multi.run_agent",
2606 "params": {
2607 "spec": spec,
2608 "task": task,
2609 },
2610 "id": request_id,
2611 });
2612
2613 let (tx, rx) = tokio::sync::oneshot::channel();
2615 self.channel
2616 .pending
2617 .lock()
2618 .await
2619 .insert(request_id.clone(), tx);
2620
2621 let msg = Message::Text(
2622 serde_json::to_string(&rpc_request)
2623 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2624 .into(),
2625 );
2626 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2627 let _ = self
2628 .host
2629 .set_status(
2630 &self.client_id,
2631 SetHostAgentStatusRequest {
2632 agent_id: agent_id.clone(),
2633 status: HostAgentStatus::Errored,
2634 current_task: None,
2635 message: Some(format!("{} failed to start", spec.name)),
2636 payload: serde_json::json!({ "error": e.to_string() }),
2637 },
2638 )
2639 .await;
2640 return Err(car_multi::MultiError::AgentFailed(
2641 spec.name.clone(),
2642 format!("ws send error: {}", e),
2643 ));
2644 }
2645
2646 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2648 Ok(Ok(response)) => response,
2649 Ok(Err(_)) => {
2650 let _ = self
2651 .host
2652 .set_status(
2653 &self.client_id,
2654 SetHostAgentStatusRequest {
2655 agent_id: agent_id.clone(),
2656 status: HostAgentStatus::Errored,
2657 current_task: None,
2658 message: Some(format!("{} callback channel closed", spec.name)),
2659 payload: Value::Null,
2660 },
2661 )
2662 .await;
2663 return Err(car_multi::MultiError::AgentFailed(
2664 spec.name.clone(),
2665 "agent callback channel closed".into(),
2666 ));
2667 }
2668 Err(_) => {
2669 let _ = self
2670 .host
2671 .set_status(
2672 &self.client_id,
2673 SetHostAgentStatusRequest {
2674 agent_id: agent_id.clone(),
2675 status: HostAgentStatus::Errored,
2676 current_task: None,
2677 message: Some(format!("{} timed out", spec.name)),
2678 payload: Value::Null,
2679 },
2680 )
2681 .await;
2682 return Err(car_multi::MultiError::AgentFailed(
2683 spec.name.clone(),
2684 "agent callback timed out (300s)".into(),
2685 ));
2686 }
2687 };
2688
2689 if let Some(err) = response.error {
2690 let _ = self
2691 .host
2692 .set_status(
2693 &self.client_id,
2694 SetHostAgentStatusRequest {
2695 agent_id: agent_id.clone(),
2696 status: HostAgentStatus::Errored,
2697 current_task: None,
2698 message: Some(format!("{} errored", spec.name)),
2699 payload: serde_json::json!({ "error": err }),
2700 },
2701 )
2702 .await;
2703 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2704 }
2705
2706 let output_value = response.output.unwrap_or(Value::Null);
2707 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2708 car_multi::MultiError::AgentFailed(
2709 spec.name.clone(),
2710 format!("invalid AgentOutput: {}", e),
2711 )
2712 })?;
2713 let status = if output.error.is_some() {
2714 HostAgentStatus::Errored
2715 } else {
2716 HostAgentStatus::Completed
2717 };
2718 let message = if output.error.is_some() {
2719 format!("{} errored", spec.name)
2720 } else {
2721 format!("{} completed", spec.name)
2722 };
2723 let _ = self
2724 .host
2725 .set_status(
2726 &self.client_id,
2727 SetHostAgentStatusRequest {
2728 agent_id,
2729 status,
2730 current_task: None,
2731 message: Some(message),
2732 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2733 },
2734 )
2735 .await;
2736
2737 Ok(output)
2738 }
2739}
2740
2741fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2742 let safe_name: String = name
2743 .chars()
2744 .map(|c| {
2745 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2746 c
2747 } else {
2748 '-'
2749 }
2750 })
2751 .collect();
2752 format!("{}:{}:{}", client_id, safe_name, request_id)
2753}
2754
2755async fn handle_multi_swarm(
2756 req: &JsonRpcMessage,
2757 session: &crate::session::ClientSession,
2758) -> Result<Value, String> {
2759 let mode_str = req
2760 .params
2761 .get("mode")
2762 .and_then(|v| v.as_str())
2763 .ok_or("missing 'mode'")?;
2764 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2765 let task = req
2766 .params
2767 .get("task")
2768 .and_then(|v| v.as_str())
2769 .ok_or("missing 'task'")?;
2770
2771 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2772 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2773 let agent_specs: Vec<car_multi::AgentSpec> =
2774 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2775 let synth: Option<car_multi::AgentSpec> = req
2776 .params
2777 .get("synthesizer")
2778 .map(|v| serde_json::from_value(v.clone()))
2779 .transpose()
2780 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2781
2782 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2783 channel: session.channel.clone(),
2784 host: session.host.clone(),
2785 client_id: session.client_id.clone(),
2786 });
2787 let infra = car_multi::SharedInfra::new();
2788
2789 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2790 if let Some(s) = synth {
2791 swarm = swarm.with_synthesizer(s);
2792 }
2793
2794 let result = swarm
2795 .run(task, &runner, &infra)
2796 .await
2797 .map_err(|e| format!("swarm error: {}", e))?;
2798 serde_json::to_value(result).map_err(|e| e.to_string())
2799}
2800
2801async fn handle_multi_pipeline(
2802 req: &JsonRpcMessage,
2803 session: &crate::session::ClientSession,
2804) -> Result<Value, String> {
2805 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2806 let task = req
2807 .params
2808 .get("task")
2809 .and_then(|v| v.as_str())
2810 .ok_or("missing 'task'")?;
2811
2812 let stage_specs: Vec<car_multi::AgentSpec> =
2813 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2814
2815 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2816 channel: session.channel.clone(),
2817 host: session.host.clone(),
2818 client_id: session.client_id.clone(),
2819 });
2820 let infra = car_multi::SharedInfra::new();
2821
2822 let result = car_multi::Pipeline::new(stage_specs)
2823 .run(task, &runner, &infra)
2824 .await
2825 .map_err(|e| format!("pipeline error: {}", e))?;
2826 serde_json::to_value(result).map_err(|e| e.to_string())
2827}
2828
2829async fn handle_multi_supervisor(
2830 req: &JsonRpcMessage,
2831 session: &crate::session::ClientSession,
2832) -> Result<Value, String> {
2833 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2834 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2835 let task = req
2836 .params
2837 .get("task")
2838 .and_then(|v| v.as_str())
2839 .ok_or("missing 'task'")?;
2840 let max_rounds = req
2841 .params
2842 .get("max_rounds")
2843 .and_then(|v| v.as_u64())
2844 .unwrap_or(3) as u32;
2845
2846 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2847 .map_err(|e| format!("invalid workers: {}", e))?;
2848 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2849 .map_err(|e| format!("invalid supervisor: {}", e))?;
2850
2851 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2852 channel: session.channel.clone(),
2853 host: session.host.clone(),
2854 client_id: session.client_id.clone(),
2855 });
2856 let infra = car_multi::SharedInfra::new();
2857
2858 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2859 .with_max_rounds(max_rounds)
2860 .run(task, &runner, &infra)
2861 .await
2862 .map_err(|e| format!("supervisor error: {}", e))?;
2863 serde_json::to_value(result).map_err(|e| e.to_string())
2864}
2865
2866async fn handle_multi_map_reduce(
2867 req: &JsonRpcMessage,
2868 session: &crate::session::ClientSession,
2869) -> Result<Value, String> {
2870 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2871 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2872 let task = req
2873 .params
2874 .get("task")
2875 .and_then(|v| v.as_str())
2876 .ok_or("missing 'task'")?;
2877 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2878
2879 let mapper_spec: car_multi::AgentSpec =
2880 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2881 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2882 .map_err(|e| format!("invalid reducer: {}", e))?;
2883 let items: Vec<String> =
2884 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2885
2886 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2887 channel: session.channel.clone(),
2888 host: session.host.clone(),
2889 client_id: session.client_id.clone(),
2890 });
2891 let infra = car_multi::SharedInfra::new();
2892
2893 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2894 .run(task, &items, &runner, &infra)
2895 .await
2896 .map_err(|e| format!("map_reduce error: {}", e))?;
2897 serde_json::to_value(result).map_err(|e| e.to_string())
2898}
2899
2900async fn handle_multi_vote(
2901 req: &JsonRpcMessage,
2902 session: &crate::session::ClientSession,
2903) -> Result<Value, String> {
2904 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2905 let task = req
2906 .params
2907 .get("task")
2908 .and_then(|v| v.as_str())
2909 .ok_or("missing 'task'")?;
2910
2911 let agent_specs: Vec<car_multi::AgentSpec> =
2912 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2913 let synth: Option<car_multi::AgentSpec> = req
2914 .params
2915 .get("synthesizer")
2916 .map(|v| serde_json::from_value(v.clone()))
2917 .transpose()
2918 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2919
2920 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2921 channel: session.channel.clone(),
2922 host: session.host.clone(),
2923 client_id: session.client_id.clone(),
2924 });
2925 let infra = car_multi::SharedInfra::new();
2926
2927 let mut vote = car_multi::Vote::new(agent_specs);
2928 if let Some(s) = synth {
2929 vote = vote.with_synthesizer(s);
2930 }
2931
2932 let result = vote
2933 .run(task, &runner, &infra)
2934 .await
2935 .map_err(|e| format!("vote error: {}", e))?;
2936 serde_json::to_value(result).map_err(|e| e.to_string())
2937}
2938
2939fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2944 let name = req
2945 .params
2946 .get("name")
2947 .and_then(|v| v.as_str())
2948 .ok_or("scheduler.create requires 'name'")?;
2949 let prompt = req
2950 .params
2951 .get("prompt")
2952 .and_then(|v| v.as_str())
2953 .ok_or("scheduler.create requires 'prompt'")?;
2954
2955 let mut task = car_scheduler::Task::new(name, prompt);
2956
2957 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2958 let trigger = match t {
2959 "once" => car_scheduler::TaskTrigger::Once,
2960 "cron" => car_scheduler::TaskTrigger::Cron,
2961 "interval" => car_scheduler::TaskTrigger::Interval,
2962 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2963 _ => car_scheduler::TaskTrigger::Manual,
2964 };
2965 let schedule = req
2966 .params
2967 .get("schedule")
2968 .and_then(|v| v.as_str())
2969 .unwrap_or("");
2970 task = task.with_trigger(trigger, schedule);
2971 }
2972
2973 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2974 task = task.with_system_prompt(sp);
2975 }
2976
2977 serde_json::to_value(&task).map_err(|e| e.to_string())
2978}
2979
2980async fn handle_scheduler_run(
2981 req: &JsonRpcMessage,
2982 session: &crate::session::ClientSession,
2983) -> Result<Value, String> {
2984 let task_val = req
2985 .params
2986 .get("task")
2987 .ok_or("scheduler.run requires 'task'")?;
2988 let mut task: car_scheduler::Task =
2989 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2990
2991 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2992 channel: session.channel.clone(),
2993 host: session.host.clone(),
2994 client_id: session.client_id.clone(),
2995 });
2996 let executor = car_scheduler::Executor::new(runner);
2997 let execution = executor.run_once(&mut task).await;
2998
2999 serde_json::to_value(&execution).map_err(|e| e.to_string())
3000}
3001
3002async fn handle_scheduler_run_loop(
3003 req: &JsonRpcMessage,
3004 session: &crate::session::ClientSession,
3005) -> Result<Value, String> {
3006 let task_val = req
3007 .params
3008 .get("task")
3009 .ok_or("scheduler.run_loop requires 'task'")?;
3010 let mut task: car_scheduler::Task =
3011 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
3012 let max_iterations = req
3013 .params
3014 .get("max_iterations")
3015 .and_then(|v| v.as_u64())
3016 .map(|v| v as u32);
3017
3018 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3019 channel: session.channel.clone(),
3020 host: session.host.clone(),
3021 client_id: session.client_id.clone(),
3022 });
3023 let executor = car_scheduler::Executor::new(runner);
3024 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
3025 let executions = executor
3026 .run_loop(&mut task, max_iterations, cancel_rx)
3027 .await;
3028
3029 serde_json::to_value(&executions).map_err(|e| e.to_string())
3030}
3031
3032fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
3037 state.inference.get_or_init(|| {
3038 Arc::new(car_inference::InferenceEngine::new(
3039 car_inference::InferenceConfig::default(),
3040 ))
3041 })
3042}
3043
3044async fn handle_infer(
3045 msg: &JsonRpcMessage,
3046 state: &ServerState,
3047 session: &crate::session::ClientSession,
3048) -> Result<Value, String> {
3049 let engine = get_inference_engine(state);
3050 let mut req: car_inference::GenerateRequest =
3051 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3052
3053 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3055 let mut memgine = session.memgine.lock().await;
3056 let ctx = memgine.build_context(cq);
3057 if !ctx.is_empty() {
3058 req.context = Some(ctx);
3059 }
3060 }
3061
3062 let _permit = state.admission.acquire().await;
3068
3069 let result = engine
3080 .generate_tracked(req)
3081 .await
3082 .map_err(|e| e.to_string())?;
3083 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3084}
3085
3086async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3116 let engine = get_inference_engine(state);
3117 let req: car_inference::GenerateImageRequest =
3118 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3119 let _permit = state.admission.acquire().await;
3122 let result = engine
3123 .generate_image(req)
3124 .await
3125 .map_err(|e| e.to_string())?;
3126 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3127}
3128
3129async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3130 let engine = get_inference_engine(state);
3131 let req: car_inference::GenerateVideoRequest =
3132 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3133 let _permit = state.admission.acquire().await;
3134 let result = engine
3135 .generate_video(req)
3136 .await
3137 .map_err(|e| e.to_string())?;
3138 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3139}
3140
3141async fn handle_infer_stream(
3142 msg: &JsonRpcMessage,
3143 session: &crate::session::ClientSession,
3144 state: &ServerState,
3145) -> Result<Value, String> {
3146 use futures::SinkExt;
3147 use tokio_tungstenite::tungstenite::Message;
3148
3149 let engine = get_inference_engine(state);
3150 let mut req: car_inference::GenerateRequest =
3151 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3152
3153 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3156 let mut memgine = session.memgine.lock().await;
3157 let ctx = memgine.build_context(cq);
3158 if !ctx.is_empty() {
3159 req.context = Some(ctx);
3160 }
3161 }
3162
3163 let _permit = state.admission.acquire().await;
3164 let mut rx = engine
3165 .generate_tracked_stream(req)
3166 .await
3167 .map_err(|e| e.to_string())?;
3168
3169 let mut accumulator = car_inference::StreamAccumulator::default();
3170 let request_id = msg.id.clone();
3171
3172 while let Some(event) = rx.recv().await {
3173 let event_payload = match &event {
3174 car_inference::StreamEvent::TextDelta(text) => {
3175 serde_json::json!({"type": "text", "data": text})
3176 }
3177 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3178 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3179 }
3180 car_inference::StreamEvent::ToolCallDelta {
3181 index,
3182 arguments_delta,
3183 } => serde_json::json!({
3184 "type": "tool_delta",
3185 "index": index,
3186 "data": arguments_delta,
3187 }),
3188 car_inference::StreamEvent::Usage {
3189 input_tokens,
3190 output_tokens,
3191 } => serde_json::json!({
3192 "type": "usage",
3193 "input_tokens": input_tokens,
3194 "output_tokens": output_tokens,
3195 }),
3196 car_inference::StreamEvent::Done { .. } => {
3201 accumulator.push(&event);
3202 continue;
3203 }
3204 };
3205
3206 let notif = serde_json::json!({
3207 "jsonrpc": "2.0",
3208 "method": "inference.stream.event",
3209 "params": {
3210 "request_id": request_id,
3211 "event": event_payload,
3212 },
3213 });
3214 if let Ok(text) = serde_json::to_string(¬if) {
3215 let _ = session
3216 .channel
3217 .write
3218 .lock()
3219 .await
3220 .send(Message::Text(text.into()))
3221 .await;
3222 }
3223 accumulator.push(&event);
3224 }
3225
3226 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3227 Ok(serde_json::json!({
3228 "text": text,
3229 "tool_calls": tool_calls,
3230 "usage": usage,
3231 }))
3232}
3233
3234async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3235 let engine = get_inference_engine(state);
3236 let req: car_inference::EmbedRequest =
3237 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3238 let _permit = state.admission.acquire().await;
3242 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3243 Ok(serde_json::json!({"embeddings": result}))
3244}
3245
3246async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3247 let engine = get_inference_engine(state);
3248 let req: car_inference::ClassifyRequest =
3249 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3250 let _permit = state.admission.acquire().await;
3251 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3252 Ok(serde_json::json!({"classifications": result}))
3253}
3254
3255fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3259 let total = state.admission.permits();
3260 let available = state.admission.permits_available();
3261 let in_use = total.saturating_sub(available);
3262 Ok(serde_json::json!({
3263 "permits_total": total,
3264 "permits_available": available,
3265 "permits_in_use": in_use,
3266 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3267 }))
3268}
3269
3270async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3271 let model = msg
3272 .params
3273 .get("model")
3274 .and_then(|v| v.as_str())
3275 .ok_or("missing 'model' parameter")?;
3276 let text = msg
3277 .params
3278 .get("text")
3279 .and_then(|v| v.as_str())
3280 .ok_or("missing 'text' parameter")?;
3281 let engine = get_inference_engine(state);
3282 let ids = engine
3283 .tokenize(model, text)
3284 .await
3285 .map_err(|e| e.to_string())?;
3286 Ok(serde_json::json!({"tokens": ids}))
3287}
3288
3289async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3290 let model = msg
3291 .params
3292 .get("model")
3293 .and_then(|v| v.as_str())
3294 .ok_or("missing 'model' parameter")?;
3295 let tokens: Vec<u32> = msg
3296 .params
3297 .get("tokens")
3298 .and_then(|v| v.as_array())
3299 .ok_or("missing 'tokens' parameter")?
3300 .iter()
3301 .map(|t| {
3302 t.as_u64()
3303 .and_then(|n| u32::try_from(n).ok())
3304 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3305 })
3306 .collect::<Result<Vec<_>, _>>()?;
3307 let engine = get_inference_engine(state);
3308 let text = engine
3309 .detokenize(model, &tokens)
3310 .await
3311 .map_err(|e| e.to_string())?;
3312 Ok(serde_json::json!({"text": text}))
3313}
3314
3315async fn handle_models_register(
3334 req: &JsonRpcMessage,
3335 _state: &Arc<ServerState>,
3336) -> Result<Value, String> {
3337 let schema_value = match req.params.get("schema") {
3341 Some(v) => v.clone(),
3342 None => req.params.clone(),
3343 };
3344 let schema: car_inference::ModelSchema =
3345 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3346 let id = schema.id.clone();
3347
3348 let home = std::env::var_os("HOME")
3353 .or_else(|| std::env::var_os("USERPROFILE"))
3354 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3355 let car_dir = std::path::PathBuf::from(home).join(".car");
3356 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3357 let path = car_dir.join("models.json");
3358
3359 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3360 let text =
3361 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3362 if text.trim().is_empty() {
3363 Vec::new()
3364 } else {
3365 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3366 }
3367 } else {
3368 Vec::new()
3369 };
3370 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3372 *slot = schema;
3373 } else {
3374 models.push(schema);
3375 }
3376 let json =
3377 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3378 let tmp = path.with_extension("json.tmp");
3379 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3380 std::fs::rename(&tmp, &path)
3381 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3382 Ok(serde_json::json!({
3383 "id": id,
3384 "registered": true,
3385 "path": path.to_string_lossy(),
3386 "note": "Daemon restart required for live UnifiedRegistry visibility \
3387 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3388 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3389 }))
3390}
3391
3392async fn handle_models_unregister(
3403 req: &JsonRpcMessage,
3404 _state: &Arc<ServerState>,
3405) -> Result<Value, String> {
3406 let id = match req.params.get("id") {
3410 Some(v) => v
3411 .as_str()
3412 .ok_or_else(|| "`id` must be a string".to_string())?
3413 .to_string(),
3414 None => match req.params.as_str() {
3415 Some(s) => s.to_string(),
3416 None => return Err("missing `id` parameter".to_string()),
3417 },
3418 };
3419
3420 let home = std::env::var_os("HOME")
3421 .or_else(|| std::env::var_os("USERPROFILE"))
3422 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3423 let car_dir = std::path::PathBuf::from(home).join(".car");
3424 let path = car_dir.join("models.json");
3425
3426 if !path.exists() {
3427 return Err(format!(
3428 "no models.json at {} — nothing to unregister",
3429 path.display()
3430 ));
3431 }
3432 let text =
3433 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3434 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3435 Vec::new()
3436 } else {
3437 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3438 };
3439 let before = models.len();
3440 models.retain(|m| m.id != id);
3441 if models.len() == before {
3442 return Err(format!("model {} not found in {}", id, path.display()));
3443 }
3444 let json =
3445 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3446 let tmp = path.with_extension("json.tmp");
3447 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3448 std::fs::rename(&tmp, &path)
3449 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3450 Ok(serde_json::json!({
3451 "id": id,
3452 "unregistered": true,
3453 "path": path.to_string_lossy(),
3454 "note": "Daemon restart required for live UnifiedRegistry visibility \
3455 (phase 1, matching models.register).",
3456 }))
3457}
3458
3459fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3460 let engine = get_inference_engine(state);
3461 let models = engine.list_models();
3462 serde_json::to_value(&models).map_err(|e| e.to_string())
3463}
3464
3465fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3466 let engine = get_inference_engine(state);
3467 let models = engine.list_models_unified();
3468 serde_json::to_value(&models).map_err(|e| e.to_string())
3469}
3470
3471#[derive(Debug, Deserialize)]
3472#[serde(rename_all = "camelCase")]
3473struct ModelSearchParams {
3474 #[serde(default)]
3475 query: Option<String>,
3476 #[serde(default)]
3477 capability: Option<car_inference::ModelCapability>,
3478 #[serde(default)]
3479 provider: Option<String>,
3480 #[serde(default)]
3481 local_only: bool,
3482 #[serde(default)]
3483 available_only: bool,
3484 #[serde(default)]
3485 limit: Option<usize>,
3486}
3487
3488#[derive(Debug, Serialize)]
3489#[serde(rename_all = "camelCase")]
3490struct ModelSearchEntry {
3491 #[serde(flatten)]
3492 info: car_inference::ModelInfo,
3493 family: String,
3494 version: String,
3495 tags: Vec<String>,
3496 pullable: bool,
3497 upgrade: Option<car_inference::ModelUpgrade>,
3498}
3499
3500#[derive(Debug, Serialize)]
3501#[serde(rename_all = "camelCase")]
3502struct ModelSearchResponse {
3503 models: Vec<ModelSearchEntry>,
3504 upgrades: Vec<car_inference::ModelUpgrade>,
3505 total: usize,
3506 available: usize,
3507 local: usize,
3508 remote: usize,
3509}
3510
3511fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3512 let params: ModelSearchParams =
3513 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3514 query: None,
3515 capability: None,
3516 provider: None,
3517 local_only: false,
3518 available_only: false,
3519 limit: None,
3520 });
3521 let engine = get_inference_engine(state);
3522 let upgrades = engine.available_model_upgrades();
3523 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3524 .iter()
3525 .cloned()
3526 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3527 .collect();
3528 let query = params
3529 .query
3530 .as_deref()
3531 .map(str::trim)
3532 .filter(|q| !q.is_empty())
3533 .map(|q| q.to_ascii_lowercase());
3534 let provider = params
3535 .provider
3536 .as_deref()
3537 .map(str::trim)
3538 .filter(|p| !p.is_empty())
3539 .map(|p| p.to_ascii_lowercase());
3540
3541 let mut entries: Vec<ModelSearchEntry> = engine
3542 .list_schemas()
3543 .into_iter()
3544 .filter(|schema| {
3545 if let Some(capability) = params.capability {
3546 if !schema.has_capability(capability) {
3547 return false;
3548 }
3549 }
3550 if let Some(provider) = provider.as_deref() {
3551 if schema.provider.to_ascii_lowercase() != provider {
3552 return false;
3553 }
3554 }
3555 if params.local_only && !schema.is_local() {
3556 return false;
3557 }
3558 if params.available_only && !schema.available {
3559 return false;
3560 }
3561 if let Some(query) = query.as_deref() {
3562 let capability_text = schema
3563 .capabilities
3564 .iter()
3565 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3566 .collect::<Vec<_>>()
3567 .join(" ");
3568 let haystack = format!(
3569 "{} {} {} {} {} {}",
3570 schema.id,
3571 schema.name,
3572 schema.provider,
3573 schema.family,
3574 schema.tags.join(" "),
3575 capability_text
3576 )
3577 .to_ascii_lowercase();
3578 if !haystack.contains(query) {
3579 return false;
3580 }
3581 }
3582 true
3583 })
3584 .map(|schema| {
3585 let pullable = !schema.available
3586 && matches!(
3587 schema.source,
3588 car_inference::ModelSource::Local { .. }
3589 | car_inference::ModelSource::Mlx { .. }
3590 );
3591 let info = car_inference::ModelInfo::from(&schema);
3592 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3593 ModelSearchEntry {
3594 info,
3595 family: schema.family,
3596 version: schema.version,
3597 tags: schema.tags,
3598 pullable,
3599 upgrade,
3600 }
3601 })
3602 .collect();
3603 entries.sort_by(|a, b| {
3604 b.info
3605 .available
3606 .cmp(&a.info.available)
3607 .then(b.info.is_local.cmp(&a.info.is_local))
3608 .then(a.info.name.cmp(&b.info.name))
3609 });
3610 if let Some(limit) = params.limit {
3611 entries.truncate(limit);
3612 }
3613
3614 let total = entries.len();
3615 let available = entries.iter().filter(|entry| entry.info.available).count();
3616 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3617 let response = ModelSearchResponse {
3618 models: entries,
3619 upgrades,
3620 total,
3621 available,
3622 local,
3623 remote: total.saturating_sub(local),
3624 };
3625 serde_json::to_value(response).map_err(|e| e.to_string())
3626}
3627
3628fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3629 let engine = get_inference_engine(state);
3630 serde_json::to_value(serde_json::json!({
3631 "upgrades": engine.available_model_upgrades()
3632 }))
3633 .map_err(|e| e.to_string())
3634}
3635
3636async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3637 let name = msg
3638 .params
3639 .get("name")
3640 .or_else(|| msg.params.get("id"))
3641 .or_else(|| msg.params.get("model"))
3642 .and_then(|v| v.as_str())
3643 .ok_or("missing 'name' parameter")?;
3644 let engine = get_inference_engine(state);
3645 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3646 Ok(serde_json::json!({"path": path.display().to_string()}))
3647}
3648
3649async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3650 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3651 msg.params
3652 .get("events")
3653 .cloned()
3654 .unwrap_or(msg.params.clone()),
3655 )
3656 .map_err(|e| format!("invalid events: {}", e))?;
3657
3658 let inference = get_inference_engine(state).clone();
3659 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3660
3661 let skills = engine.distill_skills(&events).await;
3662 serde_json::to_value(&skills).map_err(|e| e.to_string())
3663}
3664
3665async fn handle_memory_consolidate(
3669 session: &crate::session::ClientSession,
3670) -> Result<Value, String> {
3671 let engine_arc = session.effective_memgine().await;
3672 let report = {
3673 let mut engine = engine_arc.lock().await;
3674 engine.consolidate().await
3675 };
3676 if let Some(id) = session.agent_id.lock().await.clone() {
3677 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3678 tracing::warn!(agent_id = %id, error = %e,
3679 "agent memgine persist after consolidate failed");
3680 }
3681 }
3682 serde_json::to_value(&report).map_err(|e| e.to_string())
3683}
3684
3685async fn handle_skill_repair(
3689 msg: &JsonRpcMessage,
3690 session: &crate::session::ClientSession,
3691) -> Result<Value, String> {
3692 let name = msg
3693 .params
3694 .get("skill_name")
3695 .and_then(|v| v.as_str())
3696 .ok_or("missing 'skill_name' parameter")?;
3697 let mut engine = session.memgine.lock().await;
3698 let code = engine.repair_skill(name).await;
3699 Ok(match code {
3700 Some(c) => serde_json::json!({ "code": c }),
3701 None => Value::Null,
3702 })
3703}
3704
3705async fn handle_skills_ingest_distilled(
3708 msg: &JsonRpcMessage,
3709 session: &crate::session::ClientSession,
3710) -> Result<Value, String> {
3711 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3712 msg.params
3713 .get("skills")
3714 .cloned()
3715 .unwrap_or(msg.params.clone()),
3716 )
3717 .map_err(|e| format!("invalid skills: {}", e))?;
3718 let mut engine = session.memgine.lock().await;
3719 let nodes = engine.ingest_distilled_skills(&skills);
3720 Ok(serde_json::json!({ "ingested": nodes.len() }))
3721}
3722
3723async fn handle_skills_evolve(
3726 msg: &JsonRpcMessage,
3727 session: &crate::session::ClientSession,
3728) -> Result<Value, String> {
3729 let domain = msg
3730 .params
3731 .get("domain")
3732 .and_then(|v| v.as_str())
3733 .ok_or("missing 'domain' parameter")?
3734 .to_string();
3735 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3736 msg.params
3737 .get("events")
3738 .cloned()
3739 .unwrap_or(Value::Array(vec![])),
3740 )
3741 .map_err(|e| format!("invalid events: {}", e))?;
3742 let mut engine = session.memgine.lock().await;
3743 let skills = engine.evolve_skills(&events, &domain).await;
3744 serde_json::to_value(&skills).map_err(|e| e.to_string())
3745}
3746
3747async fn handle_skills_domains_needing_evolution(
3749 msg: &JsonRpcMessage,
3750 session: &crate::session::ClientSession,
3751) -> Result<Value, String> {
3752 let threshold = msg
3753 .params
3754 .get("threshold")
3755 .and_then(|v| v.as_f64())
3756 .unwrap_or(0.6);
3757 let engine = session.memgine.lock().await;
3758 let domains = engine.domains_needing_evolution(threshold);
3759 serde_json::to_value(&domains).map_err(|e| e.to_string())
3760}
3761
3762async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3764 let engine = get_inference_engine(state);
3765 let req: car_inference::RerankRequest =
3766 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3767 let _permit = state.admission.acquire().await;
3768 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3769 serde_json::to_value(&result).map_err(|e| e.to_string())
3770}
3771
3772async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3778 use base64::Engine as _;
3779 let engine = get_inference_engine(state);
3780
3781 let mut params = msg.params.clone();
3788 let audio_b64 = params
3789 .as_object_mut()
3790 .and_then(|m| m.remove("audio_b64"))
3791 .and_then(|v| v.as_str().map(str::to_string));
3792 let _tmp_audio = if let Some(b64) = audio_b64 {
3793 let bytes = base64::engine::general_purpose::STANDARD
3794 .decode(b64.as_bytes())
3795 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3796 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3797 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3798 let path = tmp.path().to_string_lossy().into_owned();
3799 if let Some(obj) = params.as_object_mut() {
3800 obj.insert("audio_path".to_string(), Value::String(path));
3801 }
3802 Some(tmp)
3803 } else {
3804 None
3805 };
3806
3807 let req: car_inference::TranscribeRequest =
3808 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3809 let _permit = state.admission.acquire().await;
3810 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3811 serde_json::to_value(&result).map_err(|e| e.to_string())
3812}
3813
3814async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3820 use base64::Engine as _;
3821 let engine = get_inference_engine(state);
3822
3823 let mut params = msg.params.clone();
3824 let return_b64 = params
3825 .as_object_mut()
3826 .and_then(|m| m.remove("return_b64"))
3827 .and_then(|v| v.as_bool())
3828 .unwrap_or(false);
3829 let no_output_path = params
3830 .as_object()
3831 .map(|m| !m.contains_key("output_path"))
3832 .unwrap_or(true);
3833
3834 let req: car_inference::SynthesizeRequest =
3835 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3836 let _permit = state.admission.acquire().await;
3837 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3838 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3839
3840 if return_b64 || no_output_path {
3844 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3845 format!(
3846 "synthesize: failed to read rendered audio at {}: {e}",
3847 result.audio_path
3848 )
3849 })?;
3850 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3851 if let Some(obj) = value.as_object_mut() {
3852 obj.insert("audio_b64".to_string(), Value::String(encoded));
3853 }
3854 }
3855 Ok(value)
3856}
3857
3858async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3862 let engine = get_inference_engine(state);
3863 let status = engine
3864 .prepare_speech_runtime()
3865 .await
3866 .map_err(|e| e.to_string())?;
3867 serde_json::to_value(&status).map_err(|e| e.to_string())
3868}
3869
3870async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3873 let prompt = msg
3874 .params
3875 .get("prompt")
3876 .and_then(|v| v.as_str())
3877 .ok_or("missing 'prompt' parameter")?;
3878 let engine = get_inference_engine(state);
3879 let decision = engine.route_adaptive(prompt).await;
3880 serde_json::to_value(&decision).map_err(|e| e.to_string())
3881}
3882
3883async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3885 let engine = get_inference_engine(state);
3886 let profiles = engine.export_profiles().await;
3887 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3888}
3889
3890#[derive(Deserialize)]
3891#[serde(rename_all = "camelCase")]
3892struct OutcomesResolvePendingParams {
3893 action_results: Vec<(String, bool, f64, String)>,
3898}
3899
3900async fn handle_outcomes_resolve_pending(
3920 req: &JsonRpcMessage,
3921 state: &ServerState,
3922) -> Result<Value, String> {
3923 let params: OutcomesResolvePendingParams =
3924 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3925 let engine = get_inference_engine(state);
3926 let mut tracker = engine.outcome_tracker.write().await;
3927 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
3928 tracker.resolve_pending_from_signals(inferred);
3929 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3930}
3931
3932async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3934 let n = session.runtime.log.lock().await.len();
3935 Ok(Value::from(n as u64))
3936}
3937
3938async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3939 let stats = session.runtime.log.lock().await.stats();
3940 serde_json::to_value(stats).map_err(|e| e.to_string())
3941}
3942
3943#[derive(Deserialize)]
3944#[serde(rename_all = "camelCase")]
3945struct EventsTruncateParams {
3946 #[serde(default)]
3947 max_events: Option<usize>,
3948 #[serde(default)]
3949 max_spans: Option<usize>,
3950}
3951
3952async fn handle_events_truncate(
3953 msg: &JsonRpcMessage,
3954 session: &crate::session::ClientSession,
3955) -> Result<Value, String> {
3956 let params: EventsTruncateParams =
3957 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3958 max_events: None,
3959 max_spans: None,
3960 });
3961 let mut log = session.runtime.log.lock().await;
3962 let removed_events = params
3963 .max_events
3964 .map(|max| log.truncate_events_keep_last(max))
3965 .unwrap_or(0);
3966 let removed_spans = params
3967 .max_spans
3968 .map(|max| log.truncate_spans_keep_last(max))
3969 .unwrap_or(0);
3970 let stats = log.stats();
3971 Ok(serde_json::json!({
3972 "removedEvents": removed_events,
3973 "removedSpans": removed_spans,
3974 "stats": stats,
3975 }))
3976}
3977
3978async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3979 let mut log = session.runtime.log.lock().await;
3980 let removed = log.clear();
3981 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3982}
3983
3984async fn handle_replan_set_config(
3989 msg: &JsonRpcMessage,
3990 session: &crate::session::ClientSession,
3991) -> Result<Value, String> {
3992 let max_replans = msg
3993 .params
3994 .get("max_replans")
3995 .and_then(|v| v.as_u64())
3996 .unwrap_or(0) as u32;
3997 let delay_ms = msg
3998 .params
3999 .get("delay_ms")
4000 .and_then(|v| v.as_u64())
4001 .unwrap_or(0);
4002 let verify_before_execute = msg
4003 .params
4004 .get("verify_before_execute")
4005 .and_then(|v| v.as_bool())
4006 .unwrap_or(true);
4007 let cfg = car_engine::ReplanConfig {
4008 max_replans,
4009 delay_ms,
4010 verify_before_execute,
4011 };
4012 session.runtime.set_replan_config(cfg).await;
4013 Ok(Value::Null)
4014}
4015
4016async fn handle_skills_list(
4017 msg: &JsonRpcMessage,
4018 session: &crate::session::ClientSession,
4019) -> Result<Value, String> {
4020 let domain = msg.params.get("domain").and_then(|v| v.as_str());
4021 let engine = session.memgine.lock().await;
4022 let skills: Vec<serde_json::Value> = engine
4023 .graph
4024 .inner
4025 .node_indices()
4026 .filter_map(|nix| {
4027 let node = engine.graph.inner.node_weight(nix)?;
4028 if node.kind != car_memgine::MemKind::Skill {
4029 return None;
4030 }
4031 let meta = car_memgine::SkillMeta::from_node(node)?;
4032 if let Some(d) = domain {
4033 match &meta.scope {
4034 car_memgine::SkillScope::Global => {}
4035 car_memgine::SkillScope::Domain(sd) if sd == d => {}
4036 _ => return None,
4037 }
4038 }
4039 Some(serde_json::to_value(&meta).unwrap_or_default())
4040 })
4041 .collect();
4042 serde_json::to_value(&skills).map_err(|e| e.to_string())
4043}
4044
4045#[derive(serde::Deserialize)]
4046struct SecretParams {
4047 #[serde(default)]
4048 service: Option<String>,
4049 key: String,
4050 #[serde(default)]
4051 value: Option<String>,
4052}
4053
4054fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
4055 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4056 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
4057 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
4058}
4059
4060fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
4061 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4062 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
4063}
4064
4065fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
4066 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4067 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
4068}
4069
4070fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
4071 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4072 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
4073}
4074
4075#[derive(serde::Deserialize)]
4076struct PermParams {
4077 domain: String,
4078 #[serde(default)]
4079 target_bundle_id: Option<String>,
4080}
4081
4082fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
4083 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4084 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
4085}
4086
4087fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
4088 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4089 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
4090}
4091
4092fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
4093 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4094 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
4095}
4096
4097fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
4098 #[derive(serde::Deserialize)]
4099 struct P {
4100 start: String,
4101 end: String,
4102 #[serde(default)]
4103 calendar_ids: Vec<String>,
4104 }
4105 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4106 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
4107 .map_err(|e| format!("parse start: {}", e))?
4108 .with_timezone(&chrono::Utc);
4109 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
4110 .map_err(|e| format!("parse end: {}", e))?
4111 .with_timezone(&chrono::Utc);
4112 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
4113}
4114
4115fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
4116 #[derive(serde::Deserialize)]
4117 struct P {
4118 query: String,
4119 #[serde(default = "default_limit")]
4120 limit: usize,
4121 #[serde(default)]
4122 container_ids: Vec<String>,
4123 }
4124 fn default_limit() -> usize {
4125 50
4126 }
4127 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4128 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4129}
4130
4131fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4132 #[derive(serde::Deserialize, Default)]
4133 struct P {
4134 #[serde(default)]
4135 account_ids: Vec<String>,
4136 }
4137 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4138 car_ffi_common::integrations::mail_inbox(&p.account_ids)
4139}
4140
4141fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4142 let raw = req.params.to_string();
4143 car_ffi_common::integrations::mail_send(&raw)
4144}
4145
4146fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4147 #[derive(serde::Deserialize)]
4148 struct P {
4149 #[serde(default = "default_limit")]
4150 limit: usize,
4151 }
4152 fn default_limit() -> usize {
4153 50
4154 }
4155 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4156 car_ffi_common::integrations::messages_chats(p.limit)
4157}
4158
4159fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4160 let raw = req.params.to_string();
4161 car_ffi_common::integrations::messages_send(&raw)
4162}
4163
4164fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4165 #[derive(serde::Deserialize)]
4166 struct P {
4167 query: String,
4168 #[serde(default = "default_limit")]
4169 limit: usize,
4170 }
4171 fn default_limit() -> usize {
4172 50
4173 }
4174 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4175 car_ffi_common::integrations::notes_find(&p.query, p.limit)
4176}
4177
4178fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4179 #[derive(serde::Deserialize)]
4180 struct P {
4181 #[serde(default = "default_limit")]
4182 limit: usize,
4183 }
4184 fn default_limit() -> usize {
4185 50
4186 }
4187 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4188 car_ffi_common::integrations::reminders_items(p.limit)
4189}
4190
4191fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4192 #[derive(serde::Deserialize)]
4193 struct P {
4194 #[serde(default = "default_limit")]
4195 limit: usize,
4196 }
4197 fn default_limit() -> usize {
4198 100
4199 }
4200 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4201 car_ffi_common::integrations::bookmarks_list(p.limit)
4202}
4203
4204fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4205 #[derive(serde::Deserialize)]
4206 struct P {
4207 start: String,
4208 end: String,
4209 }
4210 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4211 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4212 .map_err(|e| format!("parse start: {}", e))?
4213 .with_timezone(&chrono::Utc);
4214 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4215 .map_err(|e| format!("parse end: {}", e))?
4216 .with_timezone(&chrono::Utc);
4217 car_ffi_common::health::sleep_windows(s, e)
4218}
4219
4220fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4221 #[derive(serde::Deserialize)]
4222 struct P {
4223 start: String,
4224 end: String,
4225 }
4226 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4227 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4228 .map_err(|e| format!("parse start: {}", e))?
4229 .with_timezone(&chrono::Utc);
4230 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4231 .map_err(|e| format!("parse end: {}", e))?
4232 .with_timezone(&chrono::Utc);
4233 car_ffi_common::health::workouts(s, e)
4234}
4235
4236fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4237 #[derive(serde::Deserialize)]
4238 struct P {
4239 start: String,
4240 end: String,
4241 }
4242 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4243 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4244 .map_err(|e| format!("parse start: {}", e))?;
4245 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4246 .map_err(|e| format!("parse end: {}", e))?;
4247 car_ffi_common::health::activity(s, e)
4248}
4249
4250async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4251 let closed = session.browser.close().await?;
4252 Ok(serde_json::json!({"closed": closed}))
4253}
4254
4255async fn handle_browser_run(
4256 req: &JsonRpcMessage,
4257 session: &crate::session::ClientSession,
4258) -> Result<Value, String> {
4259 #[derive(serde::Deserialize)]
4260 struct BrowserRunParams {
4261 script: Value,
4263 #[serde(default)]
4264 width: Option<u32>,
4265 #[serde(default)]
4266 height: Option<u32>,
4267 #[serde(default)]
4272 headed: Option<bool>,
4273 #[serde(default)]
4276 extra_args: Option<Vec<String>>,
4277 }
4278 let params: BrowserRunParams =
4279 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4280
4281 let script_json = match params.script {
4283 Value::String(s) => s,
4284 other => other.to_string(),
4285 };
4286
4287 let browser_session = session
4288 .browser
4289 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4290 width: params.width.unwrap_or(1280),
4291 height: params.height.unwrap_or(720),
4292 headless: !params.headed.unwrap_or(false),
4293 extra_args: params.extra_args.unwrap_or_default(),
4294 })
4295 .await?;
4296
4297 let trace_json = browser_session.run(&script_json).await?;
4298 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4299}
4300
4301#[derive(Deserialize)]
4314struct VoiceStartParams {
4315 session_id: String,
4316 audio_source: Value,
4317 #[serde(default)]
4318 options: Option<Value>,
4319}
4320
4321async fn handle_voice_transcribe_stream_start(
4322 req: &JsonRpcMessage,
4323 state: &Arc<ServerState>,
4324 session: &Arc<crate::session::ClientSession>,
4325) -> Result<Value, String> {
4326 let params: VoiceStartParams =
4327 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4328 let audio_source_json =
4329 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
4330 let options_json = params
4331 .options
4332 .as_ref()
4333 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4334 .transpose()?;
4335 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4336 channel: session.channel.clone(),
4337 });
4338 let json = car_ffi_common::voice::transcribe_stream_start(
4339 ¶ms.session_id,
4340 &audio_source_json,
4341 options_json.as_deref(),
4342 state.voice_sessions.clone(),
4343 sink,
4344 )
4345 .await?;
4346 serde_json::from_str(&json).map_err(|e| e.to_string())
4347}
4348
4349#[derive(Deserialize)]
4350struct VoiceStopParams {
4351 session_id: String,
4352}
4353
4354async fn handle_voice_transcribe_stream_stop(
4355 req: &JsonRpcMessage,
4356 state: &Arc<ServerState>,
4357) -> Result<Value, String> {
4358 let params: VoiceStopParams =
4359 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4360 let json = car_ffi_common::voice::transcribe_stream_stop(
4361 ¶ms.session_id,
4362 state.voice_sessions.clone(),
4363 )
4364 .await?;
4365 serde_json::from_str(&json).map_err(|e| e.to_string())
4366}
4367
4368#[derive(Deserialize)]
4369struct VoicePushParams {
4370 session_id: String,
4371 pcm_b64: String,
4375}
4376
4377async fn handle_voice_transcribe_stream_push(
4378 req: &JsonRpcMessage,
4379 state: &Arc<ServerState>,
4380) -> Result<Value, String> {
4381 use base64::Engine;
4382 let params: VoicePushParams =
4383 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4384 let pcm = base64::engine::general_purpose::STANDARD
4385 .decode(¶ms.pcm_b64)
4386 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4387 let json = car_ffi_common::voice::transcribe_stream_push(
4388 ¶ms.session_id,
4389 &pcm,
4390 state.voice_sessions.clone(),
4391 )
4392 .await?;
4393 serde_json::from_str(&json).map_err(|e| e.to_string())
4394}
4395
4396fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4397 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4398 serde_json::from_str(&json).unwrap_or(Value::Null)
4399}
4400
4401async fn handle_voice_dispatch_turn(
4402 req: &JsonRpcMessage,
4403 state: &Arc<ServerState>,
4404 session: &Arc<crate::session::ClientSession>,
4405) -> Result<Value, String> {
4406 let req_value = req.params.clone();
4407 let request: crate::voice_turn::DispatchVoiceTurnRequest =
4408 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4409 let engine = get_inference_engine(state).clone();
4410 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4411 channel: session.channel.clone(),
4412 });
4413 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4414 serde_json::to_value(resp).map_err(|e| e.to_string())
4415}
4416
4417async fn handle_voice_cancel_turn() -> Result<Value, String> {
4418 crate::voice_turn::cancel().await;
4419 Ok(serde_json::json!({"cancelled": true}))
4420}
4421
4422async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4423 let engine = get_inference_engine(state).clone();
4424 crate::voice_turn::prewarm(engine).await;
4425 Ok(serde_json::json!({"prewarmed": true}))
4426}
4427
4428fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4447 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4448 std::sync::OnceLock::new();
4449 SLOT.get_or_init(|| std::sync::RwLock::new(None))
4450}
4451
4452fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4453 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4454 std::sync::OnceLock::new();
4455 MAP.get_or_init(dashmap::DashMap::new)
4456}
4457
4458fn ws_runner_completions() -> &'static dashmap::DashMap<
4459 String,
4460 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4461> {
4462 static MAP: std::sync::OnceLock<
4463 dashmap::DashMap<
4464 String,
4465 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4466 >,
4467 > = std::sync::OnceLock::new();
4468 MAP.get_or_init(dashmap::DashMap::new)
4469}
4470
4471struct WsInferenceRunner;
4472
4473#[async_trait::async_trait]
4474impl car_inference::InferenceRunner for WsInferenceRunner {
4475 async fn run(
4476 &self,
4477 request: car_inference::tasks::generate::GenerateRequest,
4478 emitter: car_inference::EventEmitter,
4479 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4480 let channel = ws_runner_session()
4481 .read()
4482 .map_err(|e| {
4483 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4484 })?
4485 .clone()
4486 .ok_or_else(|| {
4487 car_inference::RunnerError::Declined(
4488 "no WebSocket inference runner registered — call inference.register_runner first"
4489 .into(),
4490 )
4491 })?;
4492
4493 let call_id = uuid::Uuid::new_v4().to_string();
4494 let request_json = serde_json::to_value(&request)
4495 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4496 let (tx, rx) = tokio::sync::oneshot::channel();
4497 ws_runner_calls().insert(call_id.clone(), emitter);
4498 ws_runner_completions().insert(call_id.clone(), tx);
4499
4500 use futures::SinkExt;
4502 let notification = serde_json::json!({
4503 "jsonrpc": "2.0",
4504 "method": "inference.runner.invoke",
4505 "params": {
4506 "call_id": call_id,
4507 "request": request_json,
4508 },
4509 });
4510 let text = serde_json::to_string(¬ification)
4511 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4512 let _ = channel
4513 .write
4514 .lock()
4515 .await
4516 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4517 .await;
4518
4519 let result = rx.await.map_err(|_| {
4520 car_inference::RunnerError::Failed("runner completion channel dropped".into())
4521 })?;
4522 ws_runner_calls().remove(&call_id);
4523 result.map_err(car_inference::RunnerError::Failed)
4524 }
4525}
4526
4527async fn handle_inference_register_runner(
4528 session: &Arc<crate::session::ClientSession>,
4529) -> Result<Value, String> {
4530 let mut guard = ws_runner_session()
4531 .write()
4532 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4533 *guard = Some(session.channel.clone());
4534 drop(guard);
4535 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4536 Ok(serde_json::json!({"registered": true}))
4537}
4538
4539#[derive(serde::Deserialize)]
4540struct InferenceRunnerEventParams {
4541 call_id: String,
4542 event: Value,
4543}
4544
4545async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4546 let params: InferenceRunnerEventParams =
4547 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4548 let stream_event = match parse_runner_event_value(¶ms.event) {
4549 Some(e) => e,
4550 None => return Err("unrecognised runner event shape".into()),
4551 };
4552 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
4553 let emitter = entry.value().clone();
4554 tokio::spawn(async move { emitter.emit(stream_event).await });
4555 }
4556 Ok(serde_json::json!({"emitted": true}))
4557}
4558
4559#[derive(serde::Deserialize)]
4560struct InferenceRunnerCompleteParams {
4561 call_id: String,
4562 result: Value,
4563}
4564
4565async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4566 let params: InferenceRunnerCompleteParams =
4567 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4568 let result: std::result::Result<car_inference::RunnerResult, String> =
4569 serde_json::from_value(params.result)
4570 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4571 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4572 let _ = tx.send(result);
4573 }
4574 Ok(serde_json::json!({"completed": true}))
4575}
4576
4577#[derive(serde::Deserialize)]
4578struct InferenceRunnerFailParams {
4579 call_id: String,
4580 error: String,
4581}
4582
4583async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4584 let params: InferenceRunnerFailParams =
4585 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4586 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4587 let _ = tx.send(Err(params.error));
4588 }
4589 Ok(serde_json::json!({"failed": true}))
4590}
4591
4592fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4593 let ty = v.get("type").and_then(|t| t.as_str())?;
4594 match ty {
4595 "text" => Some(car_inference::StreamEvent::TextDelta(
4596 v.get("data")?.as_str()?.to_string(),
4597 )),
4598 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4599 name: v.get("name")?.as_str()?.to_string(),
4600 index: v.get("index")?.as_u64()? as usize,
4601 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4602 }),
4603 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4604 index: v.get("index")?.as_u64()? as usize,
4605 arguments_delta: v.get("data")?.as_str()?.to_string(),
4606 }),
4607 "usage" => Some(car_inference::StreamEvent::Usage {
4608 input_tokens: v.get("input_tokens")?.as_u64()?,
4609 output_tokens: v.get("output_tokens")?.as_u64()?,
4610 }),
4611 "done" => Some(car_inference::StreamEvent::Done {
4612 text: v.get("text")?.as_str()?.to_string(),
4613 tool_calls: v
4614 .get("tool_calls")
4615 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4616 .unwrap_or_default(),
4617 }),
4618 _ => None,
4619 }
4620}
4621
4622#[derive(Deserialize)]
4623struct EnrollSpeakerParams {
4624 label: String,
4625 audio: Value,
4626}
4627
4628async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4629 let params: EnrollSpeakerParams =
4630 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4631 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
4632 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
4633 serde_json::from_str(&json).map_err(|e| e.to_string())
4634}
4635
4636#[derive(Deserialize)]
4637struct RemoveEnrollmentParams {
4638 label: String,
4639}
4640
4641fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4642 let params: RemoveEnrollmentParams =
4643 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4644 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
4645 serde_json::from_str(&json).map_err(|e| e.to_string())
4646}
4647
4648#[derive(Deserialize)]
4649struct WorkflowRunParams {
4650 workflow: Value,
4651}
4652
4653async fn handle_workflow_run(
4654 req: &JsonRpcMessage,
4655 session: &Arc<crate::session::ClientSession>,
4656) -> Result<Value, String> {
4657 let params: WorkflowRunParams =
4658 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4659 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4660 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4661 channel: session.channel.clone(),
4662 host: session.host.clone(),
4663 client_id: session.client_id.clone(),
4664 });
4665 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4666 serde_json::from_str(&json).map_err(|e| e.to_string())
4667}
4668
4669#[derive(Deserialize)]
4670struct WorkflowVerifyParams {
4671 workflow: Value,
4672}
4673
4674fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4675 let params: WorkflowVerifyParams =
4676 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4677 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4678 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4679 serde_json::from_str(&json).map_err(|e| e.to_string())
4680}
4681
4682async fn handle_meeting_start(
4687 req: &JsonRpcMessage,
4688 state: &Arc<ServerState>,
4689 session: &Arc<crate::session::ClientSession>,
4690) -> Result<Value, String> {
4691 let mut req_value = req.params.clone();
4697 let meeting_id = req_value
4698 .get("id")
4699 .and_then(|v| v.as_str())
4700 .map(str::to_string)
4701 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4702 if let Some(map) = req_value.as_object_mut() {
4703 map.insert("id".into(), Value::String(meeting_id.clone()));
4704 }
4705 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4706
4707 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4708 Arc::new(crate::session::WsVoiceEventSink {
4709 channel: session.channel.clone(),
4710 });
4711
4712 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4717 Arc::new(crate::session::WsMemgineIngestSink {
4718 meeting_id,
4719 engine: session.memgine.clone(),
4720 upstream: ws_upstream,
4721 });
4722
4723 let cwd = std::env::current_dir().ok();
4724 let json = crate::meeting::start_meeting(
4725 &request_json,
4726 state.meetings.clone(),
4727 state.voice_sessions.clone(),
4728 upstream,
4729 None,
4730 cwd,
4731 )
4732 .await?;
4733 serde_json::from_str(&json).map_err(|e| e.to_string())
4734}
4735
4736#[derive(Deserialize)]
4737struct MeetingStopParams {
4738 meeting_id: String,
4739 #[serde(default = "default_summarize")]
4740 summarize: bool,
4741}
4742
4743fn default_summarize() -> bool {
4744 true
4745}
4746
4747async fn handle_meeting_stop(
4748 req: &JsonRpcMessage,
4749 state: &Arc<ServerState>,
4750 _session: &Arc<crate::session::ClientSession>,
4751) -> Result<Value, String> {
4752 let params: MeetingStopParams =
4753 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4754 let inference = if params.summarize {
4755 Some(state.inference.get().cloned()).flatten()
4756 } else {
4757 None
4758 };
4759 let json = crate::meeting::stop_meeting(
4760 ¶ms.meeting_id,
4761 params.summarize,
4762 state.meetings.clone(),
4763 state.voice_sessions.clone(),
4764 inference,
4765 )
4766 .await?;
4767 serde_json::from_str(&json).map_err(|e| e.to_string())
4768}
4769
4770#[derive(Deserialize, Default)]
4771struct MeetingListParams {
4772 #[serde(default)]
4773 root: Option<std::path::PathBuf>,
4774}
4775
4776fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4777 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4778 let cwd = std::env::current_dir().ok();
4779 let json = crate::meeting::list_meetings(params.root, cwd)?;
4780 serde_json::from_str(&json).map_err(|e| e.to_string())
4781}
4782
4783#[derive(Deserialize)]
4784struct MeetingGetParams {
4785 meeting_id: String,
4786 #[serde(default)]
4787 root: Option<std::path::PathBuf>,
4788}
4789
4790fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4791 let params: MeetingGetParams =
4792 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4793 let cwd = std::env::current_dir().ok();
4794 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4795 serde_json::from_str(&json).map_err(|e| e.to_string())
4796}
4797
4798#[derive(Deserialize, Default)]
4803struct RegistryRegisterParams {
4804 entry: Value,
4808 #[serde(default)]
4809 registry_path: Option<std::path::PathBuf>,
4810}
4811
4812fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4813 let params: RegistryRegisterParams =
4814 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4815 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4816 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4817 Ok(Value::Null)
4818}
4819
4820#[derive(Deserialize, Default)]
4821struct RegistryNameParams {
4822 name: String,
4823 #[serde(default)]
4824 registry_path: Option<std::path::PathBuf>,
4825}
4826
4827fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4828 let params: RegistryNameParams =
4829 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4830 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4831 serde_json::from_str(&json).map_err(|e| e.to_string())
4832}
4833
4834fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4835 let params: RegistryNameParams =
4836 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4837 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4838 Ok(Value::Null)
4839}
4840
4841#[derive(Deserialize, Default)]
4842struct RegistryListParams {
4843 #[serde(default)]
4844 registry_path: Option<std::path::PathBuf>,
4845}
4846
4847fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4848 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4849 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4850 serde_json::from_str(&json).map_err(|e| e.to_string())
4851}
4852
4853#[derive(Deserialize, Default)]
4854struct RegistryReapParams {
4855 #[serde(default = "default_reap_age")]
4858 max_age_secs: u64,
4859 #[serde(default)]
4860 registry_path: Option<std::path::PathBuf>,
4861}
4862
4863fn default_reap_age() -> u64 {
4864 60
4865}
4866
4867fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4868 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4869 let json =
4870 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4871 serde_json::from_str(&json).map_err(|e| e.to_string())
4872}
4873
4874async fn handle_a2a_start(
4881 req: &JsonRpcMessage,
4882 session: &crate::session::ClientSession,
4883) -> Result<Value, String> {
4884 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4885 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
4891 serde_json::from_str(&json).map_err(|e| e.to_string())
4892}
4893
4894fn handle_a2a_stop() -> Result<Value, String> {
4895 let json = crate::a2a::stop_a2a()?;
4896 serde_json::from_str(&json).map_err(|e| e.to_string())
4897}
4898
4899fn handle_a2a_status() -> Result<Value, String> {
4900 let json = crate::a2a::a2a_status()?;
4901 serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904#[derive(Deserialize)]
4905#[serde(rename_all = "camelCase")]
4906struct A2aSendParams {
4907 endpoint: String,
4908 message: car_a2a::Message,
4909 #[serde(default)]
4910 blocking: bool,
4911 #[serde(default = "default_true")]
4912 ingest_a2ui: bool,
4913 #[serde(default)]
4914 route_auth: Option<A2aRouteAuth>,
4915 #[serde(default)]
4916 allow_untrusted_endpoint: bool,
4917}
4918
4919fn default_true() -> bool {
4920 true
4921}
4922
4923async fn handle_a2a_dispatch(
4933 method: &str,
4934 req: &JsonRpcMessage,
4935 state: &Arc<ServerState>,
4936) -> Result<Value, String> {
4937 let dispatcher = state.a2a_dispatcher().await;
4938 dispatcher
4939 .dispatch(method, req.params.clone())
4940 .await
4941 .map_err(|e| e.to_string())
4942}
4943
4944async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4945 let params: A2aSendParams =
4946 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4947 let endpoint = trusted_route_endpoint(
4948 Some(params.endpoint.clone()),
4949 params.allow_untrusted_endpoint,
4950 )
4951 .ok_or_else(|| {
4952 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4953 })?;
4954 let client = match params.route_auth.clone() {
4955 Some(auth) => {
4956 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4957 }
4958 None => car_a2a::A2aClient::new(endpoint.clone()),
4959 };
4960 let result = client
4961 .send_message(params.message, params.blocking)
4962 .await
4963 .map_err(|e| e.to_string())?;
4964 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4965 let mut applied = Vec::new();
4966 if params.ingest_a2ui {
4967 state
4968 .a2ui
4969 .validate_payload(&result_value)
4970 .map_err(|e| e.to_string())?;
4971 let routed_endpoint = Some(endpoint.clone());
4972 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4973 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4974 if owner.endpoint.is_none() {
4975 owner.with_endpoint(routed_endpoint.clone())
4976 } else {
4977 owner
4978 }
4979 });
4980 applied.push(
4981 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4982 );
4983 }
4984 }
4985 Ok(serde_json::json!({
4986 "result": result,
4987 "a2ui": {
4988 "applied": applied,
4989 }
4990 }))
4991}
4992
4993async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
5001 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5002 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
5003 serde_json::from_str(&json).map_err(|e| e.to_string())
5004}
5005
5006async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
5007 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5008 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
5009 serde_json::from_str(&json).map_err(|e| e.to_string())
5010}
5011
5012async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
5013 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5014 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
5015 serde_json::from_str(&json).map_err(|e| e.to_string())
5016}
5017
5018async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
5019 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5020 let json = car_ffi_common::notifications::local(&args_json).await?;
5021 serde_json::from_str(&json).map_err(|e| e.to_string())
5022}
5023
5024async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
5025 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
5026 let json = car_ffi_common::vision::ocr(&args_json).await?;
5027 serde_json::from_str(&json).map_err(|e| e.to_string())
5028}
5029
5030async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
5035 let agents = match state.observer_manifest_path() {
5044 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
5045 .map_err(|e| e.to_string())?,
5046 None => {
5047 let supervisor = state.supervisor()?;
5048 supervisor.list().await
5049 }
5050 };
5051 let attached = state.attached_agents.lock().await.clone();
5058 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
5059 for a in agents {
5060 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
5061 let session_id = attached.get(&a.spec.id).cloned();
5062 if let Some(map) = v.as_object_mut() {
5063 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
5064 if let Some(sid) = session_id {
5065 map.insert("session_id".to_string(), Value::String(sid));
5066 }
5067 }
5068 decorated.push(v);
5069 }
5070 Ok(Value::Array(decorated))
5071}
5072
5073async fn handle_agents_upsert(
5074 req: &JsonRpcMessage,
5075 state: &Arc<ServerState>,
5076) -> Result<Value, String> {
5077 let mut params = req.params.clone();
5078 if let Some(name) = params
5087 .get("interpreter")
5088 .and_then(|v| v.as_str())
5089 .map(str::to_string)
5090 {
5091 let resolved =
5092 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
5093 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
5094 }
5095 let spec: car_registry::supervisor::AgentSpec =
5096 serde_json::from_value(params).map_err(|e| e.to_string())?;
5097 let supervisor = state.supervisor()?;
5098 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
5099 serde_json::to_value(agent).map_err(|e| e.to_string())
5100}
5101
5102async fn handle_agents_install(
5116 req: &JsonRpcMessage,
5117 state: &Arc<ServerState>,
5118) -> Result<Value, String> {
5119 let manifest: car_registry::manifest::AgentManifest =
5120 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
5121 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
5122 let supervisor = state.supervisor()?;
5123 let (report, managed) = supervisor
5124 .install_manifest(manifest, &host)
5125 .await
5126 .map_err(|e| e.to_string())?;
5127 Ok(serde_json::json!({
5128 "report": {
5129 "missingOptional": report
5130 .missing_optional
5131 .iter()
5132 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5133 .collect::<Vec<_>>(),
5134 },
5135 "agent": managed,
5136 }))
5137}
5138
5139async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5140 let entries = match state.observer_manifest_path() {
5146 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5147 .map_err(|e| e.to_string())?,
5148 None => {
5149 let supervisor = state.supervisor()?;
5150 supervisor.health().await
5151 }
5152 };
5153 serde_json::to_value(entries).map_err(|e| e.to_string())
5154}
5155
5156fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5157 req.params
5158 .get("id")
5159 .and_then(Value::as_str)
5160 .map(str::to_string)
5161 .ok_or_else(|| "missing required `id` parameter".to_string())
5162}
5163
5164async fn handle_agents_remove(
5165 req: &JsonRpcMessage,
5166 state: &Arc<ServerState>,
5167) -> Result<Value, String> {
5168 let id = extract_agent_id(req)?;
5169 let supervisor = state.supervisor()?;
5170 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5171 Ok(serde_json::json!({ "removed": removed }))
5172}
5173
5174async fn handle_agents_start(
5175 req: &JsonRpcMessage,
5176 state: &Arc<ServerState>,
5177) -> Result<Value, String> {
5178 let id = extract_agent_id(req)?;
5179 let supervisor = state.supervisor()?;
5180 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5181 serde_json::to_value(agent).map_err(|e| e.to_string())
5182}
5183
5184async fn handle_agents_stop(
5185 req: &JsonRpcMessage,
5186 state: &Arc<ServerState>,
5187) -> Result<Value, String> {
5188 let id = extract_agent_id(req)?;
5189 let signal: car_registry::supervisor::StopSignal = req
5190 .params
5191 .get("signal")
5192 .map(|v| serde_json::from_value(v.clone()))
5193 .transpose()
5194 .map_err(|e| e.to_string())?
5195 .unwrap_or_default();
5196 let supervisor = state.supervisor()?;
5197 let agent = supervisor
5198 .stop(&id, signal)
5199 .await
5200 .map_err(|e| e.to_string())?;
5201 serde_json::to_value(agent).map_err(|e| e.to_string())
5202}
5203
5204async fn handle_agents_restart(
5205 req: &JsonRpcMessage,
5206 state: &Arc<ServerState>,
5207) -> Result<Value, String> {
5208 let id = extract_agent_id(req)?;
5209 let supervisor = state.supervisor()?;
5210 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5211 serde_json::to_value(agent).map_err(|e| e.to_string())
5212}
5213
5214async fn handle_agents_tail_log(
5215 req: &JsonRpcMessage,
5216 state: &Arc<ServerState>,
5217) -> Result<Value, String> {
5218 let id = extract_agent_id(req)?;
5219 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5220 let supervisor = state.supervisor()?;
5221 let lines = supervisor
5222 .tail_log(&id, n)
5223 .await
5224 .map_err(|e| e.to_string())?;
5225 Ok(serde_json::json!({ "lines": lines }))
5226}
5227
5228async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5239 let include_health = req
5240 .params
5241 .get("include_health")
5242 .and_then(Value::as_bool)
5243 .unwrap_or(false);
5244 let json = car_ffi_common::external_agents::list(include_health).await?;
5245 serde_json::from_str(&json).map_err(|e| e.to_string())
5246}
5247
5248async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5249 let include_health = req
5250 .params
5251 .get("include_health")
5252 .and_then(Value::as_bool)
5253 .unwrap_or(false);
5254 let json = car_ffi_common::external_agents::detect(include_health).await?;
5255 serde_json::from_str(&json).map_err(|e| e.to_string())
5256}
5257
5258async fn handle_agents_invoke_external(
5276 req: &JsonRpcMessage,
5277 state: &Arc<ServerState>,
5278 host_session: &Arc<crate::session::ClientSession>,
5279) -> Result<Value, String> {
5280 let id = req
5281 .params
5282 .get("id")
5283 .and_then(Value::as_str)
5284 .ok_or_else(|| "missing required `id` parameter".to_string())?
5285 .to_string();
5286 let task = req
5287 .params
5288 .get("task")
5289 .and_then(Value::as_str)
5290 .ok_or_else(|| "missing required `task` parameter".to_string())?
5291 .to_string();
5292 let stream = req
5293 .params
5294 .get("stream")
5295 .and_then(Value::as_bool)
5296 .unwrap_or(false);
5297 let session_id = req
5298 .params
5299 .get("session_id")
5300 .and_then(Value::as_str)
5301 .map(str::to_string)
5302 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5303
5304 let mut options_value = req.params.clone();
5310 if let Some(obj) = options_value.as_object_mut() {
5311 obj.remove("id");
5312 obj.remove("task");
5313 obj.remove("stream");
5314 obj.remove("session_id");
5315 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5324 if !has_explicit_mcp {
5325 if let Some(url) = state.mcp_url.get() {
5326 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5327 }
5328 }
5329 }
5330
5331 if !stream {
5332 let options_json = options_value.to_string();
5335 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5336 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5337 append_external_agent_audit(&id, &task, &options_value, &result);
5338 return Ok(result);
5339 }
5340
5341 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5347 .map_err(|e| format!("invalid options: {e}"))?;
5348
5349 {
5359 let mut chats = state.chat_sessions.lock().await;
5369 chats.entry(session_id.clone()).or_insert_with(|| {
5370 let created_at = std::time::SystemTime::now()
5371 .duration_since(std::time::UNIX_EPOCH)
5372 .map(|d| d.as_secs())
5373 .unwrap_or(0);
5374 crate::session::ChatSession {
5375 agent_id: id.clone(),
5376 host_client_id: host_session.client_id.clone(),
5377 created_at,
5378 }
5379 });
5380 }
5381
5382 use tokio::sync::mpsc;
5389 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5390
5391 let drain_state = state.clone();
5392 let drain_session_id = session_id.clone();
5393 let drain_agent_id = id.clone();
5394 tokio::spawn(async move {
5395 while let Some(event) = rx.recv().await {
5396 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5397 }
5398 });
5399
5400 let emitter_tx = tx.clone();
5401 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5402 let _ = emitter_tx.send(event);
5407 });
5408
5409 let spawn_state = state.clone();
5415 let spawn_session_id = session_id.clone();
5416 let spawn_id = id.clone();
5417 let spawn_task = task.clone();
5418 let spawn_options = options_value.clone();
5419 tokio::spawn(async move {
5420 let outcome =
5421 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5422 .await;
5423 drop(tx); let terminal_params: Value;
5430 let result_value: Value;
5431 match outcome {
5432 Ok(res) => {
5433 let mut parts: Vec<String> = Vec::new();
5440 if res.turns > 0 {
5441 parts.push(format!(
5442 "{} turn{}",
5443 res.turns,
5444 if res.turns == 1 { "" } else { "s" }
5445 ));
5446 }
5447 if res.tool_calls > 0 {
5448 parts.push(format!(
5449 "{} tool{}",
5450 res.tool_calls,
5451 if res.tool_calls == 1 { "" } else { "s" }
5452 ));
5453 }
5454 if res.duration_ms > 0 {
5455 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5456 }
5457 let summary = if parts.is_empty() {
5458 "stop".to_string()
5459 } else {
5460 parts.join(" · ")
5461 };
5462 if res.is_error {
5463 terminal_params = serde_json::json!({
5464 "session_id": spawn_session_id,
5465 "agent_id": spawn_id,
5466 "kind": "error",
5467 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5468 });
5469 } else {
5470 terminal_params = serde_json::json!({
5471 "session_id": spawn_session_id,
5472 "agent_id": spawn_id,
5473 "kind": "done",
5474 "finish_reason": summary,
5475 });
5476 }
5477 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5478 }
5479 Err(e) => {
5480 let message = format!("{e}");
5481 terminal_params = serde_json::json!({
5482 "session_id": spawn_session_id,
5483 "agent_id": spawn_id,
5484 "kind": "error",
5485 "error": message.clone(),
5486 });
5487 result_value = serde_json::json!({ "is_error": true, "error": message });
5488 }
5489 }
5490 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5491 spawn_state
5492 .chat_sessions
5493 .lock()
5494 .await
5495 .remove(&spawn_session_id);
5496 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5497 });
5498
5499 Ok(serde_json::json!({
5500 "accepted": true,
5501 "session_id": session_id,
5502 }))
5503}
5504
5505async fn emit_external_chat_event(
5522 state: &Arc<ServerState>,
5523 session_id: &str,
5524 agent_id: &str,
5525 event: car_external_agents::StreamEvent,
5526) {
5527 use car_external_agents::StreamEvent;
5528 match event {
5529 StreamEvent::Assistant(a) => {
5530 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5531 for block in content {
5532 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5533 match block_type {
5534 "text" => {
5535 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5536 if !text.is_empty() {
5537 let params = serde_json::json!({
5538 "session_id": session_id,
5539 "agent_id": agent_id,
5540 "kind": "token",
5541 "delta": text,
5542 });
5543 send_external_chat_frame(state, session_id, params).await;
5544 }
5545 }
5546 }
5547 "tool_use" => {
5548 let name = block
5549 .get("name")
5550 .and_then(|v| v.as_str())
5551 .unwrap_or("(unknown tool)");
5552 let params = serde_json::json!({
5553 "session_id": session_id,
5554 "agent_id": agent_id,
5555 "kind": "tool_call",
5556 "detail": name,
5557 });
5558 send_external_chat_frame(state, session_id, params).await;
5559 }
5560 _ => {}
5561 }
5562 }
5563 }
5564 }
5565 _ => {
5566 }
5571 }
5572}
5573
5574async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5579 use futures::SinkExt;
5580 use tokio_tungstenite::tungstenite::Message;
5581
5582 let host_client_id = state
5583 .chat_sessions
5584 .lock()
5585 .await
5586 .get(session_id)
5587 .map(|s| s.host_client_id.clone());
5588 let Some(host_client_id) = host_client_id else {
5589 return;
5590 };
5591 let host_channel = {
5592 let sessions = state.sessions.lock().await;
5593 sessions.get(&host_client_id).map(|s| s.channel.clone())
5594 };
5595 let Some(channel) = host_channel else {
5596 return;
5597 };
5598 let frame = serde_json::json!({
5599 "jsonrpc": "2.0",
5600 "method": "agents.chat.event",
5601 "params": params,
5602 });
5603 if let Ok(text) = serde_json::to_string(&frame) {
5604 let _ = channel
5605 .write
5606 .lock()
5607 .await
5608 .send(Message::Text(text.into()))
5609 .await;
5610 }
5611}
5612
5613fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5619 use std::io::Write;
5620 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5621 Some(home) => home.join(".car"),
5622 None => return,
5623 };
5624 if std::fs::create_dir_all(&car_dir).is_err() {
5625 return;
5626 }
5627 let path = car_dir.join("external-agents.jsonl");
5628 let record = serde_json::json!({
5629 "ts": chrono::Utc::now().to_rfc3339(),
5630 "adapter_id": id,
5631 "task": task,
5632 "options": options,
5633 "result": result,
5634 });
5635 let line = match serde_json::to_string(&record) {
5636 Ok(s) => s,
5637 Err(_) => return,
5638 };
5639 if let Ok(mut f) = std::fs::OpenOptions::new()
5640 .create(true)
5641 .append(true)
5642 .open(&path)
5643 {
5644 let _ = writeln!(f, "{}", line);
5645 } else {
5646 tracing::warn!(
5647 path = %path.display(),
5648 "failed to append external-agent audit record"
5649 );
5650 }
5651}
5652
5653async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5659 let force = req
5660 .params
5661 .get("force")
5662 .and_then(Value::as_bool)
5663 .unwrap_or(false);
5664 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5665 let json = car_ffi_common::external_agents::health_one(id, force).await?;
5666 serde_json::from_str(&json).map_err(|e| e.to_string())
5667 } else {
5668 let json = car_ffi_common::external_agents::health(force).await?;
5669 serde_json::from_str(&json).map_err(|e| e.to_string())
5670 }
5671}
5672
5673const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5691
5692async fn handle_agents_chat(
5697 req: &JsonRpcMessage,
5698 state: &Arc<ServerState>,
5699 host_session: &Arc<crate::session::ClientSession>,
5700) -> Result<Value, String> {
5701 use futures::SinkExt;
5702 use tokio::sync::oneshot;
5703 use tokio_tungstenite::tungstenite::Message;
5704
5705 let agent_id = req
5706 .params
5707 .get("agent_id")
5708 .and_then(Value::as_str)
5709 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5710 .to_string();
5711 let prompt = req
5712 .params
5713 .get("prompt")
5714 .and_then(Value::as_str)
5715 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5716 .to_string();
5717 let session_id = req
5718 .params
5719 .get("session_id")
5720 .and_then(Value::as_str)
5721 .map(str::to_string)
5722 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5723 let stream = req
5724 .params
5725 .get("stream")
5726 .and_then(Value::as_bool)
5727 .unwrap_or(true);
5728 let voice_input = req
5729 .params
5730 .get("voice_input")
5731 .and_then(Value::as_bool)
5732 .unwrap_or(false);
5733
5734 let agent_client_id = state
5740 .attached_agents
5741 .lock()
5742 .await
5743 .get(&agent_id)
5744 .cloned()
5745 .ok_or_else(|| {
5746 format!(
5747 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5748 agent_id
5749 )
5750 })?;
5751 let agent_channel = {
5752 let sessions = state.sessions.lock().await;
5753 sessions
5754 .get(&agent_client_id)
5755 .map(|s| s.channel.clone())
5756 .ok_or_else(|| {
5757 format!(
5758 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5759 agent_id, agent_client_id
5760 )
5761 })?
5762 };
5763
5764 {
5770 let created_at = std::time::SystemTime::now()
5771 .duration_since(std::time::UNIX_EPOCH)
5772 .map(|d| d.as_secs())
5773 .unwrap_or(0);
5774 state.chat_sessions.lock().await.insert(
5775 session_id.clone(),
5776 crate::session::ChatSession {
5777 agent_id: agent_id.clone(),
5778 host_client_id: host_session.client_id.clone(),
5779 created_at,
5780 },
5781 );
5782 }
5783
5784 let request_id = agent_channel.next_request_id();
5791 let (tx, rx) = oneshot::channel();
5792 agent_channel
5793 .pending
5794 .lock()
5795 .await
5796 .insert(request_id.clone(), tx);
5797
5798 let rpc_request = serde_json::json!({
5799 "jsonrpc": "2.0",
5800 "method": "agent.chat",
5801 "params": {
5802 "session_id": session_id,
5803 "prompt": prompt,
5804 "stream": stream,
5805 "context": {
5806 "host_client_id": host_session.client_id,
5807 "voice_input": voice_input,
5808 },
5809 },
5810 "id": request_id,
5811 });
5812 let msg = Message::Text(
5813 serde_json::to_string(&rpc_request)
5814 .map_err(|e| e.to_string())?
5815 .into(),
5816 );
5817 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5818 agent_channel.pending.lock().await.remove(&request_id);
5822 state.chat_sessions.lock().await.remove(&session_id);
5823 return Err(format!(
5824 "failed to deliver agent.chat to `{}`: {}",
5825 agent_id, e
5826 ));
5827 }
5828
5829 let ack = match tokio::time::timeout(
5834 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5835 rx,
5836 )
5837 .await
5838 {
5839 Ok(Ok(resp)) => resp,
5840 Ok(Err(_)) => {
5841 state.chat_sessions.lock().await.remove(&session_id);
5843 return Err(format!(
5844 "agent `{}` disconnected before acking agents.chat",
5845 agent_id
5846 ));
5847 }
5848 Err(_) => {
5849 agent_channel.pending.lock().await.remove(&request_id);
5853 state.chat_sessions.lock().await.remove(&session_id);
5854 return Err(format!(
5855 "agent `{}` did not ack agents.chat within {}s",
5856 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5857 ));
5858 }
5859 };
5860
5861 if let Some(err) = ack.error {
5862 state.chat_sessions.lock().await.remove(&session_id);
5864 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5865 }
5866
5867 Ok(serde_json::json!({
5868 "accepted": true,
5869 "session_id": session_id,
5870 }))
5871}
5872
5873async fn handle_agents_chat_cancel(
5881 req: &JsonRpcMessage,
5882 state: &Arc<ServerState>,
5883) -> Result<Value, String> {
5884 use futures::SinkExt;
5885 use tokio_tungstenite::tungstenite::Message;
5886
5887 let session_id = req
5888 .params
5889 .get("session_id")
5890 .and_then(Value::as_str)
5891 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5892 .to_string();
5893
5894 let chat = state.chat_sessions.lock().await.remove(&session_id);
5895 let chat = match chat {
5896 Some(c) => c,
5897 None => {
5898 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
5900 }
5901 };
5902
5903 let agent_client_id = state
5906 .attached_agents
5907 .lock()
5908 .await
5909 .get(&chat.agent_id)
5910 .cloned();
5911 if let Some(client_id) = agent_client_id {
5912 let channel_opt = {
5913 let sessions = state.sessions.lock().await;
5914 sessions.get(&client_id).map(|s| s.channel.clone())
5915 };
5916 if let Some(channel) = channel_opt {
5917 let notification = serde_json::json!({
5918 "jsonrpc": "2.0",
5919 "method": "agent.chat.cancel",
5920 "params": { "session_id": session_id },
5921 });
5922 if let Ok(text) = serde_json::to_string(¬ification) {
5923 let _ = channel
5924 .write
5925 .lock()
5926 .await
5927 .send(Message::Text(text.into()))
5928 .await;
5929 }
5930 }
5931 }
5932
5933 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
5934}
5935
5936pub(crate) async fn try_forward_agent_chat_event(
5947 parsed: &JsonRpcMessage,
5948 state: &Arc<ServerState>,
5949) -> bool {
5950 use futures::SinkExt;
5951 use tokio_tungstenite::tungstenite::Message;
5952
5953 let Some(method) = parsed.method.as_deref() else {
5957 return false;
5958 };
5959 if method != "agent.chat.event" {
5960 return false;
5961 }
5962 if !parsed.id.is_null() {
5963 return false;
5966 }
5967 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
5968 return false;
5969 };
5970 let session_id = session_id.to_string();
5971
5972 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
5977 let Some(chat) = chat else {
5978 return true; };
5980
5981 let kind = parsed
5984 .params
5985 .get("kind")
5986 .and_then(Value::as_str)
5987 .unwrap_or("token")
5988 .to_string();
5989
5990 let host_channel = {
5994 let sessions = state.sessions.lock().await;
5995 sessions
5996 .get(&chat.host_client_id)
5997 .map(|s| s.channel.clone())
5998 };
5999 if let Some(channel) = host_channel {
6000 let mut params = parsed.params.clone();
6001 if let Some(obj) = params.as_object_mut() {
6002 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
6003 }
6004 let forward = serde_json::json!({
6005 "jsonrpc": "2.0",
6006 "method": "agents.chat.event",
6007 "params": params,
6008 });
6009 if let Ok(text) = serde_json::to_string(&forward) {
6010 let _ = channel
6011 .write
6012 .lock()
6013 .await
6014 .send(Message::Text(text.into()))
6015 .await;
6016 }
6017 }
6018 if matches!(kind.as_str(), "done" | "error") {
6025 state.chat_sessions.lock().await.remove(&session_id);
6026 }
6027
6028 true
6029}
6030
6031#[cfg(test)]
6032mod fd_leak_regression {
6033 use super::run_dispatch;
6040 use futures::SinkExt;
6041 use std::sync::Arc;
6042 use tokio_tungstenite::tungstenite::{Error as WsError, Message};
6043
6044 #[tokio::test]
6045 async fn abrupt_read_error_still_runs_session_cleanup() {
6046 let tmp = tempfile::TempDir::new().unwrap();
6047 let state = Arc::new(crate::session::ServerState::standalone(
6048 tmp.path().to_path_buf(),
6049 ));
6050
6051 let read = futures::stream::iter(vec![Err::<Message, WsError>(
6055 WsError::ConnectionClosed,
6056 )]);
6057 let write: crate::session::WsSink = Box::pin(
6058 futures::sink::drain().sink_map_err(|_| WsError::ConnectionClosed),
6059 );
6060
6061 let result =
6062 run_dispatch(read, write, "test-peer".to_string(), state.clone()).await;
6063 assert!(
6064 result.is_ok(),
6065 "run_dispatch must return Ok after cleanup, got {result:?}"
6066 );
6067
6068 assert!(
6071 state.sessions.lock().await.is_empty(),
6072 "state.sessions must be empty after an abrupt disconnect (car#209)"
6073 );
6074 }
6075}