1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 while let Some(msg) = read.next().await {
167 let msg = msg?;
168 if msg.is_text() {
169 let text = msg.to_text()?;
170 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
171 Ok(m) => m,
172 Err(e) => {
173 send_response(
174 &session.channel,
175 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
176 )
177 .await?;
178 continue;
179 }
180 };
181
182 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
184 if let Some(id_str) = parsed.id.as_str() {
185 let mut pending = session.channel.pending.lock().await;
186 if let Some(tx) = pending.remove(id_str) {
187 let tool_resp = if let Some(result) = parsed.result {
188 ToolExecuteResponse {
189 action_id: id_str.to_string(),
190 output: Some(result),
191 error: None,
192 }
193 } else {
194 let err_msg = parsed
195 .error
196 .as_ref()
197 .and_then(|e| e.get("message"))
198 .and_then(|m| m.as_str())
199 .unwrap_or("unknown error")
200 .to_string();
201 ToolExecuteResponse {
202 action_id: id_str.to_string(),
203 output: None,
204 error: Some(err_msg),
205 }
206 };
207 let _ = tx.send(tool_resp);
208 continue;
209 }
210 }
211 }
212
213 if try_forward_agent_chat_event(&parsed, &state).await {
222 continue;
223 }
224
225 if let Some(method) = &parsed.method {
227 info!(method = %method, "dispatching JSON-RPC method");
228
229 if state.auth_token.get().is_some()
237 && !session
238 .authenticated
239 .load(std::sync::atomic::Ordering::Acquire)
240 && method != "session.auth"
241 {
242 let resp = JsonRpcResponse::error(
243 parsed.id.clone(),
244 -32001,
245 "auth required: send `session.auth` with the per-launch token \
246 from ~/Library/Application Support/ai.parslee.car/auth-token \
247 (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
248 as the first frame on this connection",
249 );
250 send_response(&session.channel, resp).await?;
251 info!(client = %client_id, method = %method,
252 "rejecting non-auth method on unauthenticated session; closing");
253 break;
254 }
255
256 if state.approval_gate.requires_approval(method.as_str()) {
268 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
269 Ok(()) => {}
270 Err(reason) => {
271 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
272 send_response(&session.channel, resp).await?;
273 info!(
274 client = %client_id,
275 method = %method,
276 reason = %reason,
277 "approval gate blocked dispatch"
278 );
279 continue;
280 }
281 }
282 }
283
284 let session_task = session.clone();
299 let state_task = state.clone();
300 let method_owned = method.clone();
301 let parsed_task = parsed;
302 tokio::spawn(async move {
303 let session = session_task;
304 let state = state_task;
305 let parsed = parsed_task;
306 let result = match method_owned.as_str() {
307 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
308 "parslee.auth" => handle_parslee_auth().await,
309 "session.init" => handle_session_init(&parsed, &session).await,
310 "host.subscribe" => handle_host_subscribe(&session, &state).await,
311 "host.agents" => handle_host_agents(&session).await,
312 "host.events" => handle_host_events(&parsed, &session).await,
313 "host.approvals" => handle_host_approvals(&session).await,
314 "host.register_agent" => {
315 handle_host_register_agent(&parsed, &session).await
316 }
317 "host.unregister_agent" => {
318 handle_host_unregister_agent(&parsed, &session).await
319 }
320 "host.set_status" => handle_host_set_status(&parsed, &session).await,
321 "host.notify" => handle_host_notify(&parsed, &session).await,
322 "host.request_approval" => {
323 handle_host_request_approval(&parsed, &session).await
324 }
325 "host.resolve_approval" => {
326 handle_host_resolve_approval(&parsed, &session).await
327 }
328 "tools.register" => handle_tools_register(&parsed, &session).await,
329 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
330 "policy.register" => handle_policy_register(&parsed, &session).await,
331 "session.policy.open" => handle_session_policy_open(&session).await,
332 "session.policy.close" => {
333 handle_session_policy_close(&parsed, &session).await
334 }
335 "verify" => handle_verify(&parsed, &session).await,
336 "state.get" => handle_state_get(&parsed, &session).await,
337 "state.set" => handle_state_set(&parsed, &session).await,
338 "state.exists" => handle_state_exists(&parsed, &session).await,
339 "state.keys" => handle_state_keys(&parsed, &session).await,
340 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
341 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
342 "memory.query" => handle_memory_query(&parsed, &session).await,
343 "memory.build_context" => {
344 handle_memory_build_context(&parsed, &session).await
345 }
346 "memory.build_context_fast" => {
347 handle_memory_build_context_fast(&parsed, &session).await
348 }
349 "memory.consolidate" => handle_memory_consolidate(&session).await,
350 "memory.fact_count" => handle_memory_fact_count(&session).await,
351 "memory.persist" => handle_memory_persist(&parsed, &session).await,
352 "memory.load" => handle_memory_load(&parsed, &session).await,
353 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
354 "skill.find" => handle_skill_find(&parsed, &session).await,
355 "skill.report" => handle_skill_report(&parsed, &session).await,
356 "skill.repair" => handle_skill_repair(&parsed, &session).await,
357 "skills.ingest_distilled" => {
358 handle_skills_ingest_distilled(&parsed, &session).await
359 }
360 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
361 "skills.domains_needing_evolution" => {
362 handle_skills_domains_needing_evolution(&parsed, &session).await
363 }
364 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
365 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
366 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
367 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
368 "multi.vote" => handle_multi_vote(&parsed, &session).await,
369 "scheduler.create" => handle_scheduler_create(&parsed),
370 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
371 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
372 "infer" => handle_infer(&parsed, &state, &session).await,
373 "image.generate" => handle_image_generate(&parsed, &state).await,
374 "video.generate" => handle_video_generate(&parsed, &state).await,
375 "embed" => handle_embed(&parsed, &state).await,
376 "classify" => handle_classify(&parsed, &state).await,
377 "tokenize" => handle_tokenize(&parsed, &state).await,
378 "detokenize" => handle_detokenize(&parsed, &state).await,
379 "rerank" => handle_rerank(&parsed, &state).await,
380 "transcribe" => handle_transcribe(&parsed, &state).await,
381 "synthesize" => handle_synthesize(&parsed, &state).await,
382 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
383 "speech.prepare" => handle_speech_prepare(&state).await,
384 "models.route" => handle_models_route(&parsed, &state).await,
385 "models.stats" => handle_models_stats(&state).await,
386 "outcomes.resolve_pending" => {
387 handle_outcomes_resolve_pending(&parsed, &state).await
388 }
389 "events.count" => handle_events_count(&session).await,
390 "events.stats" => handle_events_stats(&session).await,
391 "events.truncate" => handle_events_truncate(&parsed, &session).await,
392 "events.clear" => handle_events_clear(&session).await,
393 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
394 "models.list" => handle_models_list(&state),
395 "models.register" => handle_models_register(&parsed, &state).await,
396 "models.unregister" => handle_models_unregister(&parsed, &state).await,
397 "models.list_unified" => handle_models_list_unified(&state),
398 "models.search" => handle_models_search(&parsed, &state),
399 "models.upgrades" => handle_models_upgrades(&state),
400 "models.pull" => handle_models_pull(&parsed, &state).await,
401 "models.install" => handle_models_pull(&parsed, &state).await,
402 "skills.distill" => handle_skills_distill(&parsed, &state).await,
403 "skills.list" => handle_skills_list(&parsed, &session).await,
404 "browser.run" => handle_browser_run(&parsed, &session).await,
405 "browser.close" => handle_browser_close(&session).await,
406 "secret.put" => handle_secret_put(&parsed),
407 "secret.get" => handle_secret_get(&parsed),
408 "secret.delete" => handle_secret_delete(&parsed),
409 "secret.status" => handle_secret_status(&parsed),
410 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
411 "permissions.status" => handle_perm_status(&parsed),
412 "permissions.request" => handle_perm_request(&parsed),
413 "permissions.explain" => handle_perm_explain(&parsed),
414 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
415 "accounts.list" => car_ffi_common::accounts::list(),
416 "accounts.open" => {
417 #[derive(serde::Deserialize, Default)]
418 struct OpenParams {
419 #[serde(default)]
420 account_id: Option<String>,
421 }
422 let p: OpenParams =
423 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
424 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
425 }
426 "calendar.list" => car_ffi_common::integrations::calendar_list(),
427 "calendar.events" => handle_calendar_events(&parsed),
428 "contacts.containers" => {
429 car_ffi_common::integrations::contacts_containers()
430 }
431 "contacts.find" => handle_contacts_find(&parsed),
432 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
433 "mail.inbox" => handle_mail_inbox(&parsed),
434 "mail.send" => handle_mail_send(&parsed),
435 "messages.services" => car_ffi_common::integrations::messages_services(),
436 "messages.chats" => handle_messages_chats(&parsed),
437 "messages.send" => handle_messages_send(&parsed),
438 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
439 "notes.find" => handle_notes_find(&parsed),
440 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
441 "reminders.items" => handle_reminders_items(&parsed),
442 "photos.albums" => car_ffi_common::integrations::photos_albums(),
443 "bookmarks.list" => handle_bookmarks_list(&parsed),
444 "files.locations" => car_ffi_common::integrations::files_locations(),
445 "keychain.status" => car_ffi_common::integrations::keychain_status(),
446 "health.status" => car_ffi_common::health::status(),
447 "health.sleep" => handle_health_sleep(&parsed),
448 "health.workouts" => handle_health_workouts(&parsed),
449 "health.activity" => handle_health_activity(&parsed),
450 "voice.transcribe_stream.start" => {
451 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
452 }
453 "voice.transcribe_stream.stop" => {
454 handle_voice_transcribe_stream_stop(&parsed, &state).await
455 }
456 "voice.transcribe_stream.push" => {
457 handle_voice_transcribe_stream_push(&parsed, &state).await
458 }
459 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
460 "voice.dispatch_turn" => {
461 handle_voice_dispatch_turn(&parsed, &state, &session).await
462 }
463 "voice.cancel_turn" => handle_voice_cancel_turn().await,
464 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
465 "inference.register_runner" => {
466 handle_inference_register_runner(&session).await
467 }
468 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
469 "inference.runner.complete" => {
470 handle_inference_runner_complete(&parsed).await
471 }
472 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
473 "voice.providers.list" => {
474 serde_json::from_str::<serde_json::Value>(
478 &car_voice::list_voice_providers_json(),
479 )
480 .map_err(|e| e.to_string())
481 }
482 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
483 .await
484 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
485 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
486 .await
487 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
488 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
489 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
490 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
491 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
492 "workflow.run" => handle_workflow_run(&parsed, &session).await,
493 "workflow.verify" => handle_workflow_verify(&parsed),
494 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
495 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
496 "meeting.list" => handle_meeting_list(&parsed),
497 "meeting.get" => handle_meeting_get(&parsed),
498 "registry.register" => handle_registry_register(&parsed),
499 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
500 "registry.unregister" => handle_registry_unregister(&parsed),
501 "registry.list" => handle_registry_list(&parsed),
502 "registry.reap" => handle_registry_reap(&parsed),
503 "admission.status" => handle_admission_status(&state),
504 "a2a.start" => handle_a2a_start(&parsed, &session).await,
505 "a2a.stop" => handle_a2a_stop(),
506 "a2a.status" => handle_a2a_status(),
507 "a2a.send" => handle_a2a_send(&parsed, &state).await,
508 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
509 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
510 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
511 "a2ui.reap" => handle_a2ui_reap(&state).await,
512 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
513 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
514 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
515 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
516 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
517 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
518 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
519 "automation.run_applescript" => handle_run_applescript(&parsed).await,
520 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
521 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
522 "notifications.local" => handle_local_notification(&parsed).await,
523 "vision.ocr" => handle_vision_ocr(&parsed).await,
524 "agents.list" => handle_agents_list(&state).await,
525 "agents.health" => handle_agents_health(&state).await,
526 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
527 "agents.install" => handle_agents_install(&parsed, &state).await,
528 "agents.remove" => handle_agents_remove(&parsed, &state).await,
529 "agents.start" => handle_agents_start(&parsed, &state).await,
530 "agents.stop" => handle_agents_stop(&parsed, &state).await,
531 "agents.restart" => handle_agents_restart(&parsed, &state).await,
532 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
533 "agents.list_external" => handle_agents_list_external(&parsed).await,
534 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
535 "agents.health_external" => handle_agents_health_external(&parsed).await,
536 "agents.invoke_external" => {
537 handle_agents_invoke_external(&parsed, &state, &session).await
538 }
539 "agents.chat" => handle_agents_chat(&parsed, &state, &session).await,
540 "agents.chat.cancel" => handle_agents_chat_cancel(&parsed, &state).await,
541 "message/send"
548 | "SendMessage"
549 | "message/stream"
550 | "SendStreamingMessage"
551 | "tasks/get"
552 | "GetTask"
553 | "tasks/list"
554 | "ListTasks"
555 | "tasks/cancel"
556 | "CancelTask"
557 | "tasks/resubscribe"
558 | "SubscribeToTask"
559 | "tasks/pushNotificationConfig/set"
560 | "CreateTaskPushNotificationConfig"
561 | "tasks/pushNotificationConfig/get"
562 | "GetTaskPushNotificationConfig"
563 | "tasks/pushNotificationConfig/list"
564 | "ListTaskPushNotificationConfigs"
565 | "tasks/pushNotificationConfig/delete"
566 | "DeleteTaskPushNotificationConfig"
567 | "agent/getAuthenticatedExtendedCard"
568 | "GetExtendedAgentCard" => {
569 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
570 }
571 _ => Err(format!("unknown method: {}", method_owned)),
572 };
573
574 let resp = match result {
575 Ok(value) => JsonRpcResponse::success(parsed.id, value),
576 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
577 };
578 let _ = send_response(&session.channel, resp).await;
579 });
580 }
581 } else if msg.is_close() {
582 info!("Client {} disconnected", client_id);
583 break;
584 }
585 }
586
587 session.host.unsubscribe(&client_id).await;
588 state.a2ui_subscribers.lock().await.remove(&client_id);
589
590 let _removed = state.remove_session(&client_id).await;
601 {
602 let mut pending = session.channel.pending.lock().await;
603 pending.clear();
604 }
605
606 Ok(())
607}
608
609async fn send_response(
610 channel: &WsChannel,
611 resp: JsonRpcResponse,
612) -> Result<(), Box<dyn std::error::Error>> {
613 use futures::SinkExt;
614 let json = serde_json::to_string(&resp)?;
615 channel
616 .write
617 .lock()
618 .await
619 .send(Message::Text(json.into()))
620 .await?;
621 Ok(())
622}
623
624async fn handle_host_subscribe(
627 session: &crate::session::ClientSession,
628 state: &Arc<ServerState>,
629) -> Result<Value, String> {
630 session
631 .host
632 .subscribe(&session.client_id, session.channel.clone())
633 .await;
634 serde_json::to_value(HostSnapshot {
635 subscribed: true,
636 agents: session.host.agents().await,
637 approvals: session.host.approvals().await,
638 events: session.host.events(50).await,
639 identity: Some(daemon_identity(state)),
640 })
641 .map_err(|e| e.to_string())
642}
643
644fn daemon_identity(state: &Arc<ServerState>) -> car_proto::HostIdentity {
652 let (manifest_path, manifest_role) = if let Some(p) = state.observer_manifest_path() {
659 (
660 Some(p.to_string_lossy().into_owned()),
661 car_proto::HostManifestRole::Observer,
662 )
663 } else if let Some(s) = state.supervisor_if_installed() {
664 (
665 Some(s.manifest_path().to_string_lossy().into_owned()),
666 car_proto::HostManifestRole::Owner,
667 )
668 } else {
669 (None, car_proto::HostManifestRole::None)
670 };
671 car_proto::HostIdentity {
672 version: env!("CARGO_PKG_VERSION").to_string(),
673 pid: std::process::id(),
674 manifest_path,
675 manifest_role,
676 parslee: state
677 .parslee_session
678 .get()
679 .map(|session| session.identity.clone()),
680 }
681}
682
683async fn handle_parslee_auth() -> Result<Value, String> {
694 let session = crate::parslee_auth::load_or_refresh()
695 .await?
696 .ok_or_else(|| "Parslee account not authenticated; run `car auth login`".to_string())?;
697 Ok(serde_json::json!({
698 "authenticated": true,
699 "token_type": "Bearer",
700 "access_token": session.access_token,
701 "authorization_header": format!("Bearer {}", session.access_token),
702 "identity": session.identity,
703 }))
704}
705
706async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
707 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
708}
709
710async fn handle_host_events(
711 req: &JsonRpcMessage,
712 session: &crate::session::ClientSession,
713) -> Result<Value, String> {
714 let limit = req
715 .params
716 .get("limit")
717 .and_then(|v| v.as_u64())
718 .unwrap_or(100) as usize;
719 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
720}
721
722async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
723 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
724}
725
726async fn handle_a2ui_apply(
727 req: &JsonRpcMessage,
728 state: &Arc<ServerState>,
729) -> Result<Value, String> {
730 #[derive(Deserialize)]
731 struct Params {
732 #[serde(default)]
733 envelope: Option<car_a2ui::A2uiEnvelope>,
734 #[serde(default)]
735 message: Option<car_a2ui::A2uiEnvelope>,
736 }
737
738 let envelope = if req.params.get("createSurface").is_some()
739 || req.params.get("updateComponents").is_some()
740 || req.params.get("updateDataModel").is_some()
741 || req.params.get("deleteSurface").is_some()
742 {
743 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
744 .map_err(|e| e.to_string())?
745 } else {
746 match serde_json::from_value::<Params>(req.params.clone()) {
747 Ok(params) => params
748 .envelope
749 .or(params.message)
750 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
751 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
752 .map_err(|e| e.to_string())?,
753 }
754 };
755
756 apply_a2ui_envelope(state, envelope, None, None).await
757}
758
759async fn handle_a2ui_ingest(
760 req: &JsonRpcMessage,
761 state: &Arc<ServerState>,
762) -> Result<Value, String> {
763 #[derive(Deserialize)]
764 #[serde(rename_all = "camelCase")]
765 struct Params {
766 #[serde(default)]
767 endpoint: Option<String>,
768 #[serde(default)]
769 a2a_endpoint: Option<String>,
770 #[serde(default)]
771 owner: Option<car_a2ui::A2uiSurfaceOwner>,
772 #[serde(default)]
773 route_auth: Option<A2aRouteAuth>,
774 #[serde(default)]
775 allow_untrusted_endpoint: bool,
776 }
777
778 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
779 endpoint: None,
780 a2a_endpoint: None,
781 owner: None,
782 route_auth: None,
783 allow_untrusted_endpoint: false,
784 });
785 let payload = req.params.get("payload").unwrap_or(&req.params);
786 state
787 .a2ui
788 .validate_payload(payload)
789 .map_err(|e| e.to_string())?;
790 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
791 if envelopes.is_empty() {
792 return Err("no A2UI envelopes found in payload".into());
793 }
794 let endpoint = params.endpoint.or(params.a2a_endpoint);
795 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
796 let owner = params
797 .owner
798 .or_else(|| car_a2ui::owner_from_value(payload))
799 .map(|owner| match endpoint.clone() {
800 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
801 None => owner,
802 });
803
804 let mut results = Vec::new();
805 for envelope in envelopes {
806 let value =
807 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
808 results.push(value);
809 }
810 Ok(serde_json::json!({ "applied": results }))
811}
812
813async fn apply_a2ui_envelope(
814 state: &Arc<ServerState>,
815 envelope: car_a2ui::A2uiEnvelope,
816 owner: Option<car_a2ui::A2uiSurfaceOwner>,
817 route_auth: Option<A2aRouteAuth>,
818) -> Result<Value, String> {
819 let result = state
820 .a2ui
821 .apply_with_owner(envelope, owner)
822 .await
823 .map_err(|e| e.to_string())?;
824 update_a2ui_route_auth(state, &result, route_auth).await;
825 let kind = if result.deleted {
826 "a2ui.surface_deleted"
827 } else {
828 "a2ui.surface_updated"
829 };
830 let message = if result.deleted {
831 format!("A2UI surface {} deleted", result.surface_id)
832 } else {
833 format!("A2UI surface {} updated", result.surface_id)
834 };
835 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
836 state
837 .host
838 .record_event(kind, None, message, payload.clone())
839 .await;
840 broadcast_a2ui_event(state, kind, &payload).await;
844 serde_json::to_value(result).map_err(|e| e.to_string())
845}
846
847async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
848 use futures::SinkExt;
849 use tokio_tungstenite::tungstenite::Message;
850 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
851 .a2ui_subscribers
852 .lock()
853 .await
854 .values()
855 .cloned()
856 .collect();
857 if subscribers.is_empty() {
858 return;
859 }
860 let Ok(json) = serde_json::to_string(&serde_json::json!({
861 "jsonrpc": "2.0",
862 "method": "a2ui.event",
863 "params": {
864 "kind": kind,
865 "result": result,
866 },
867 })) else {
868 return;
869 };
870 for channel in subscribers {
871 let _ = channel
872 .write
873 .lock()
874 .await
875 .send(Message::Text(json.clone().into()))
876 .await;
877 }
878}
879
880async fn update_a2ui_route_auth(
881 state: &Arc<ServerState>,
882 result: &car_a2ui::A2uiApplyResult,
883 route_auth: Option<A2aRouteAuth>,
884) {
885 let mut auth = state.a2ui_route_auth.lock().await;
886 if result.deleted {
887 auth.remove(&result.surface_id);
888 return;
889 }
890
891 let has_route_endpoint = result
892 .surface
893 .as_ref()
894 .and_then(|surface| surface.owner.as_ref())
895 .and_then(|owner| owner.endpoint.as_ref())
896 .is_some();
897 match (has_route_endpoint, route_auth) {
898 (true, Some(route_auth)) => {
899 auth.insert(result.surface_id.clone(), route_auth);
900 }
901 _ => {
902 auth.remove(&result.surface_id);
903 }
904 }
905}
906
907fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
908 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
909}
910
911async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
912 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
913 if !removed.is_empty() {
914 let mut auth = state.a2ui_route_auth.lock().await;
915 for surface_id in &removed {
916 auth.remove(surface_id);
917 }
918 }
919 Ok(serde_json::json!({ "removed": removed }))
920}
921
922async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
923 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
924}
925
926async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
927 let surface_id = req
928 .params
929 .get("surface_id")
930 .or_else(|| req.params.get("surfaceId"))
931 .and_then(Value::as_str)
932 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
933 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
934}
935
936async fn handle_a2ui_subscribe(
942 session: &crate::session::ClientSession,
943 state: &Arc<ServerState>,
944) -> Result<Value, String> {
945 state
946 .a2ui_subscribers
947 .lock()
948 .await
949 .insert(session.client_id.clone(), session.channel.clone());
950 Ok(serde_json::json!({ "subscribed": true }))
951}
952
953async fn handle_a2ui_unsubscribe(
957 session: &crate::session::ClientSession,
958 state: &Arc<ServerState>,
959) -> Result<Value, String> {
960 state
961 .a2ui_subscribers
962 .lock()
963 .await
964 .remove(&session.client_id);
965 Ok(serde_json::json!({ "subscribed": false }))
966}
967
968async fn handle_a2ui_replay(
975 req: &JsonRpcMessage,
976 state: &Arc<ServerState>,
977) -> Result<Value, String> {
978 let surface_id = req
979 .params
980 .get("surface_id")
981 .or_else(|| req.params.get("surfaceId"))
982 .and_then(Value::as_str)
983 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
984 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
985}
986
987async fn handle_a2ui_action(
988 req: &JsonRpcMessage,
989 state: &Arc<ServerState>,
990) -> Result<Value, String> {
991 let action: car_a2ui::ClientAction =
992 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
993 let owner = state.a2ui.owner(&action.surface_id).await;
994 let route = route_a2ui_action(state, &action, owner.clone()).await;
995 let payload = serde_json::json!({
996 "action": action,
997 "owner": owner,
998 "route": route,
999 });
1000 let event = state
1001 .host
1002 .record_event(
1003 "a2ui.action",
1004 None,
1005 format!(
1006 "A2UI action {} from {}",
1007 action.name, action.source_component_id
1008 ),
1009 payload,
1010 )
1011 .await;
1012 Ok(serde_json::json!({
1013 "event": event,
1014 "route": route,
1015 }))
1016}
1017
1018async fn handle_a2ui_render_report(
1025 req: &JsonRpcMessage,
1026 state: &Arc<ServerState>,
1027) -> Result<Value, String> {
1028 let report: car_a2ui::RenderReport =
1032 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1033 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
1034 let kind = "a2ui.render_report";
1035 let message = format!("A2UI render report for surface {}", report.surface_id);
1036 let event = state
1037 .host
1038 .record_event(kind, None, message, payload.clone())
1039 .await;
1040 broadcast_a2ui_event(state, kind, &payload).await;
1041
1042 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
1050 if !state.ui_agent_budget.try_consume(&report.surface_id) {
1056 tracing::warn!(
1057 surface_id = %report.surface_id,
1058 count = state.ui_agent_budget.count(&report.surface_id),
1059 max = state.ui_agent_budget.max(),
1060 "ui-agent iteration budget exhausted; skipping agent invocation"
1061 );
1062 return Ok(serde_json::json!({ "event": event }));
1063 }
1064 match state.ui_agent.on_render_report(&report, &surface) {
1068 car_ui_agent::Decision::Patch {
1069 envelope,
1070 strategy_id,
1071 patch_hash,
1072 elapsed_ns,
1073 } => {
1074 if !state
1082 .ui_agent_oscillation
1083 .check_and_record(&report.surface_id, patch_hash)
1084 {
1085 tracing::warn!(
1086 surface_id = %report.surface_id,
1087 strategy = %strategy_id,
1088 patch_hash,
1089 "ui-agent oscillation detected; suppressing patch"
1090 );
1091 state.ui_agent_budget.refund(&report.surface_id);
1094 return Ok(serde_json::json!({ "event": event }));
1095 }
1096 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1097 patch_components: Some(envelope),
1098 ..Default::default()
1099 };
1100 if let Err(e) = apply_a2ui_envelope(state, a2ui_envelope, None, None).await {
1101 tracing::warn!(
1102 surface_id = %report.surface_id,
1103 strategy = %strategy_id,
1104 patch_hash,
1105 elapsed_ns,
1106 error = %e,
1107 "ui-agent patch apply failed",
1108 );
1109 state.ui_agent_budget.refund(&report.surface_id);
1111 } else {
1112 tracing::debug!(
1113 surface_id = %report.surface_id,
1114 strategy = %strategy_id,
1115 patch_hash,
1116 elapsed_ns,
1117 iteration = state.ui_agent_budget.count(&report.surface_id),
1118 "ui-agent patch applied",
1119 );
1120 if let Some(memgine) = state.shared_memgine.clone() {
1130 let speaker = format!("ui-agent/{}", report.surface_id);
1131 let text = format!("strategy applied: {}", strategy_id);
1132 tokio::spawn(async move {
1133 let mut guard = memgine.lock().await;
1134 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1135 });
1136 }
1137 }
1138 }
1139 car_ui_agent::Decision::StableNoChange => {
1140 state.ui_agent_budget.refund(&report.surface_id);
1142 }
1143 car_ui_agent::Decision::HardStop { reason } => {
1144 state.ui_agent_budget.refund(&report.surface_id);
1145 tracing::error!(
1151 surface_id = %report.surface_id,
1152 reason = %reason,
1153 "ui-agent hard-stopped improvement loop",
1154 );
1155 }
1156 }
1157 } else {
1158 tracing::debug!(
1159 surface_id = %report.surface_id,
1160 "ui-agent skipped — surface not found in store",
1161 );
1162 }
1163
1164 Ok(serde_json::json!({ "event": event }))
1165}
1166
1167async fn route_a2ui_action(
1168 state: &Arc<ServerState>,
1169 action: &car_a2ui::ClientAction,
1170 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1171) -> Value {
1172 let Some(owner) = owner else {
1173 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1174 };
1175 if owner.kind != "a2a" {
1176 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1177 }
1178 let Some(endpoint) = owner.endpoint.clone() else {
1179 return serde_json::json!({
1180 "delivered": false,
1181 "reason": "surface owner has no endpoint",
1182 "owner": owner
1183 });
1184 };
1185
1186 let message = car_a2a::Message {
1187 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1188 role: car_a2a::MessageRole::User,
1189 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1190 data: serde_json::json!({
1191 "a2uiAction": action,
1192 }),
1193 metadata: Default::default(),
1194 })],
1195 task_id: owner.task_id.clone(),
1196 context_id: owner.context_id.clone(),
1197 metadata: Default::default(),
1198 };
1199
1200 let auth = state
1201 .a2ui_route_auth
1202 .lock()
1203 .await
1204 .get(&action.surface_id)
1205 .cloned()
1206 .map(client_auth_from_route_auth)
1207 .unwrap_or(car_a2a::ClientAuth::None);
1208
1209 match car_a2a::A2aClient::new(endpoint.clone())
1210 .with_auth(auth)
1211 .send_message(message, false)
1212 .await
1213 {
1214 Ok(result) => serde_json::json!({
1215 "delivered": true,
1216 "owner": owner,
1217 "endpoint": endpoint,
1218 "result": result,
1219 }),
1220 Err(error) => serde_json::json!({
1221 "delivered": false,
1222 "owner": owner,
1223 "endpoint": endpoint,
1224 "error": error.to_string(),
1225 }),
1226 }
1227}
1228
1229fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1230 match auth {
1231 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1232 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1233 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1234 }
1235}
1236
1237fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1238 let endpoint = endpoint?;
1239 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1240 Some(endpoint)
1241 } else {
1242 None
1243 }
1244}
1245
1246fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1247 endpoint == "http://localhost"
1248 || endpoint.starts_with("http://localhost:")
1249 || endpoint.starts_with("http://localhost/")
1250 || endpoint == "http://127.0.0.1"
1251 || endpoint.starts_with("http://127.0.0.1:")
1252 || endpoint.starts_with("http://127.0.0.1/")
1253 || endpoint == "http://[::1]"
1254 || endpoint.starts_with("http://[::1]:")
1255 || endpoint.starts_with("http://[::1]/")
1256}
1257
1258async fn handle_host_register_agent(
1259 req: &JsonRpcMessage,
1260 session: &crate::session::ClientSession,
1261) -> Result<Value, String> {
1262 let request: RegisterHostAgentRequest =
1263 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1264 serde_json::to_value(
1265 session
1266 .host
1267 .register_agent(&session.client_id, request)
1268 .await?,
1269 )
1270 .map_err(|e| e.to_string())
1271}
1272
1273async fn handle_host_unregister_agent(
1274 req: &JsonRpcMessage,
1275 session: &crate::session::ClientSession,
1276) -> Result<Value, String> {
1277 let agent_id = req
1278 .params
1279 .get("agent_id")
1280 .and_then(|v| v.as_str())
1281 .ok_or("missing agent_id")?;
1282 session
1283 .host
1284 .unregister_agent(&session.client_id, agent_id)
1285 .await?;
1286 Ok(serde_json::json!({"ok": true}))
1287}
1288
1289async fn handle_host_set_status(
1290 req: &JsonRpcMessage,
1291 session: &crate::session::ClientSession,
1292) -> Result<Value, String> {
1293 let request: SetHostAgentStatusRequest =
1294 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1295 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1296 .map_err(|e| e.to_string())
1297}
1298
1299async fn handle_host_notify(
1300 req: &JsonRpcMessage,
1301 session: &crate::session::ClientSession,
1302) -> Result<Value, String> {
1303 let kind = req
1304 .params
1305 .get("kind")
1306 .and_then(|v| v.as_str())
1307 .unwrap_or("host.notification");
1308 let agent_id = req
1309 .params
1310 .get("agent_id")
1311 .and_then(|v| v.as_str())
1312 .map(str::to_string);
1313 let message = req
1314 .params
1315 .get("message")
1316 .and_then(|v| v.as_str())
1317 .unwrap_or("");
1318 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1319 serde_json::to_value(
1320 session
1321 .host
1322 .record_event(kind, agent_id, message, payload)
1323 .await,
1324 )
1325 .map_err(|e| e.to_string())
1326}
1327
1328async fn handle_host_request_approval(
1329 req: &JsonRpcMessage,
1330 session: &crate::session::ClientSession,
1331) -> Result<Value, String> {
1332 let request: CreateHostApprovalRequest =
1333 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1334 if let Some(agent_id) = &request.agent_id {
1335 let _ = session
1340 .host
1341 .set_status(
1342 &session.client_id,
1343 SetHostAgentStatusRequest {
1344 agent_id: agent_id.clone(),
1345 status: HostAgentStatus::WaitingForApproval,
1346 current_task: None,
1347 message: Some("Waiting for approval".to_string()),
1348 payload: Value::Null,
1349 },
1350 )
1351 .await;
1352 }
1353 let owner_client_id = if request.system_level {
1360 None
1361 } else {
1362 Some(session.client_id.as_str())
1363 };
1364 serde_json::to_value(session.host.create_approval(owner_client_id, request).await?)
1365 .map_err(|e| e.to_string())
1366}
1367
1368async fn handle_host_resolve_approval(
1369 req: &JsonRpcMessage,
1370 session: &crate::session::ClientSession,
1371) -> Result<Value, String> {
1372 let request: ResolveHostApprovalRequest =
1373 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1374 serde_json::to_value(
1375 session
1376 .host
1377 .resolve_approval(&session.client_id, request)
1378 .await?,
1379 )
1380 .map_err(|e| e.to_string())
1381}
1382
1383async fn handle_session_auth(
1394 req: &JsonRpcMessage,
1395 session: &crate::session::ClientSession,
1396 state: &Arc<ServerState>,
1397) -> Result<Value, String> {
1398 let supplied = req
1399 .params
1400 .get("token")
1401 .and_then(Value::as_str)
1402 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1403 let agent_id = req
1410 .params
1411 .get("agent_id")
1412 .and_then(Value::as_str)
1413 .map(str::to_string);
1414
1415 if let Some(id) = agent_id {
1416 let supervisor = state.supervisor()?;
1417 if !supervisor.validate_agent_token(&id, supplied).await {
1418 return Err(format!(
1419 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1420 ));
1421 }
1422 {
1426 let mut attached = state.attached_agents.lock().await;
1427 if let Some(prior) = attached.get(&id) {
1428 if prior != &session.client_id {
1429 return Err(format!(
1430 "auth failed: agent_id `{id}` is already attached on \
1431 another connection (client_id={prior})"
1432 ));
1433 }
1434 }
1435 attached.insert(id.clone(), session.client_id.clone());
1436 }
1437 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1442 *session.bound_memgine.lock().await = Some(agent_eng);
1443 *session.agent_id.lock().await = Some(id.clone());
1444 session
1445 .authenticated
1446 .store(true, std::sync::atomic::Ordering::Release);
1447 return Ok(serde_json::json!({
1448 "ok": true,
1449 "auth_enabled": true,
1450 "agent_id": id,
1451 }));
1452 }
1453
1454 let expected = match state.auth_token.get() {
1455 Some(t) => t,
1456 None => {
1457 session
1463 .authenticated
1464 .store(true, std::sync::atomic::Ordering::Release);
1465 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1466 }
1467 };
1468 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1469 return Err("auth failed: token mismatch".to_string());
1470 }
1471 session
1472 .authenticated
1473 .store(true, std::sync::atomic::Ordering::Release);
1474 Ok(serde_json::json!({
1475 "ok": true,
1476 "auth_enabled": true,
1477 "parslee": state.parslee_session.get().map(|session| session.identity.clone()),
1478 }))
1479}
1480
1481fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1485 if a.len() != b.len() {
1486 return false;
1487 }
1488 let mut diff: u8 = 0;
1489 for (x, y) in a.iter().zip(b.iter()) {
1490 diff |= x ^ y;
1491 }
1492 diff == 0
1493}
1494
1495async fn gate_high_risk_method(
1505 method: &str,
1506 params: &Value,
1507 state: &Arc<ServerState>,
1508) -> Result<(), String> {
1509 let timeout = state.approval_gate.timeout;
1510 let req = CreateHostApprovalRequest {
1511 agent_id: None,
1512 action: format!("ws.method:{method}"),
1513 details: serde_json::json!({
1514 "method": method,
1515 "params_preview": preview_params(params, 2_000),
1519 }),
1520 options: vec!["approve".to_string(), "deny".to_string()],
1521 system_level: true,
1525 };
1526 match state
1527 .host
1528 .request_and_wait_approval(req, "approve", timeout)
1529 .await
1530 {
1531 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1532 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1533 "{method} denied by user (approval gate, audit 2026-05). \
1534 To call this method without an interactive prompt, start \
1535 car-server with --no-approvals on a trusted machine."
1536 )),
1537 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1538 "{method} approval timed out after {}s with no resolution. \
1539 The approval is still visible in `host.approvals` for \
1540 forensics; resubmit the request to retry.",
1541 timeout.as_secs()
1542 )),
1543 Err(e) => Err(format!("approval gate error: {e}")),
1544 }
1545}
1546
1547fn preview_params(value: &Value, max_chars: usize) -> Value {
1548 let s = value.to_string();
1549 if s.len() <= max_chars {
1550 value.clone()
1551 } else {
1552 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1553 }
1554}
1555
1556async fn handle_session_init(
1557 req: &JsonRpcMessage,
1558 session: &crate::session::ClientSession,
1559) -> Result<Value, String> {
1560 let init: SessionInitRequest =
1561 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1562
1563 for tool in &init.tools {
1564 register_from_definition(&session.runtime, tool).await;
1565 }
1566
1567 let mut policy_count = 0;
1568 {
1569 let mut policies = session.runtime.policies.write().await;
1570 for policy_def in &init.policies {
1571 if let Some(check) = build_policy_check(policy_def) {
1572 policies.register(&policy_def.name, check, "");
1573 policy_count += 1;
1574 }
1575 }
1576 }
1577
1578 serde_json::to_value(SessionInitResponse {
1579 session_id: session.client_id.clone(),
1580 tools_registered: init.tools.len(),
1581 policies_registered: policy_count,
1582 })
1583 .map_err(|e| e.to_string())
1584}
1585
1586fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1587 match def.rule.as_str() {
1588 "deny_tool" => {
1589 let target = def.target.clone();
1590 Some(Box::new(
1591 move |action: &car_ir::Action, _: &car_state::StateStore| {
1592 if action.tool.as_deref() == Some(&target) {
1593 Some(format!("tool '{}' denied", target))
1594 } else {
1595 None
1596 }
1597 },
1598 ))
1599 }
1600 "require_state" => {
1601 let key = def.key.clone();
1602 let value = def.value.clone();
1603 Some(Box::new(
1604 move |_: &car_ir::Action, state: &car_state::StateStore| {
1605 if state.get(&key).as_ref() != Some(&value) {
1606 Some(format!("state['{}'] must be {:?}", key, value))
1607 } else {
1608 None
1609 }
1610 },
1611 ))
1612 }
1613 "deny_tool_param" => {
1614 let target = def.target.clone();
1615 let param = def.key.clone();
1616 let pattern = def.pattern.clone();
1617 Some(Box::new(
1618 move |action: &car_ir::Action, _: &car_state::StateStore| {
1619 if action.tool.as_deref() != Some(&target) {
1620 return None;
1621 }
1622 if let Some(val) = action.parameters.get(¶m) {
1623 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1624 if s.contains(&pattern) {
1625 return Some(format!("param '{}' matches '{}'", param, pattern));
1626 }
1627 }
1628 None
1629 },
1630 ))
1631 }
1632 _ => None,
1633 }
1634}
1635
1636async fn handle_tools_register(
1637 req: &JsonRpcMessage,
1638 session: &crate::session::ClientSession,
1639) -> Result<Value, String> {
1640 let tools: Vec<ToolDefinition> =
1641 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1642 for tool in &tools {
1643 register_from_definition(&session.runtime, tool).await;
1644 }
1645 Ok(Value::from(tools.len()))
1646}
1647
1648async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1655 runtime
1656 .register_tool_schema(car_ir::ToolSchema {
1657 name: def.name.clone(),
1658 description: def.description.clone(),
1659 parameters: def.parameters.clone(),
1660 returns: def.returns.clone(),
1661 idempotent: def.idempotent,
1662 cache_ttl_secs: def.cache_ttl_secs,
1663 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1664 max_calls: rl.max_calls,
1665 interval_secs: rl.interval_secs,
1666 }),
1667 })
1668 .await;
1669}
1670
1671async fn handle_proposal_submit(
1672 req: &JsonRpcMessage,
1673 session: &crate::session::ClientSession,
1674) -> Result<Value, String> {
1675 let submit: ProposalSubmitRequest =
1676 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1677 let session_id = req
1683 .params
1684 .get("session_id")
1685 .and_then(|v| v.as_str())
1686 .map(str::to_string);
1687
1688 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1697 Some(v) if !v.is_null() => {
1698 Some(serde_json::from_value(v.clone()).map_err(|e| format!("invalid scope: {e}"))?)
1699 }
1700 _ => None,
1701 };
1702
1703 let result = match (session_id, scope) {
1704 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1709 (Some(sid), None) => {
1710 session
1711 .runtime
1712 .execute_with_session(&submit.proposal, &sid)
1713 .await
1714 }
1715 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1716 (None, None) => session.runtime.execute(&submit.proposal).await,
1717 };
1718 serde_json::to_value(result).map_err(|e| e.to_string())
1719}
1720
1721async fn handle_session_policy_open(
1722 session: &crate::session::ClientSession,
1723) -> Result<Value, String> {
1724 let id = session.runtime.open_session().await;
1725 Ok(serde_json::json!({ "session_id": id }))
1726}
1727
1728async fn handle_session_policy_close(
1729 req: &JsonRpcMessage,
1730 session: &crate::session::ClientSession,
1731) -> Result<Value, String> {
1732 let sid = req
1733 .params
1734 .get("session_id")
1735 .and_then(|v| v.as_str())
1736 .ok_or("missing 'session_id'")?;
1737 let closed = session.runtime.close_session(sid).await;
1738 Ok(serde_json::json!({ "closed": closed }))
1739}
1740
1741async fn handle_policy_register(
1747 req: &JsonRpcMessage,
1748 session: &crate::session::ClientSession,
1749) -> Result<Value, String> {
1750 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1751 .map_err(|e| format!("invalid policy params: {e}"))?;
1752 let session_id = req
1753 .params
1754 .get("session_id")
1755 .and_then(|v| v.as_str())
1756 .map(str::to_string);
1757 let check = build_policy_check(&def)
1758 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1759 match session_id {
1760 Some(sid) => session
1761 .runtime
1762 .register_policy_in_session(&sid, &def.name, check, "")
1763 .await
1764 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1765 None => {
1766 let mut policies = session.runtime.policies.write().await;
1767 policies.register(&def.name, check, "");
1768 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1769 }
1770 }
1771}
1772
1773async fn handle_verify(
1774 req: &JsonRpcMessage,
1775 session: &crate::session::ClientSession,
1776) -> Result<Value, String> {
1777 let vr: VerifyRequest =
1778 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1779 let tools: std::collections::HashSet<String> =
1780 session.runtime.tools.read().await.keys().cloned().collect();
1781 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1782 serde_json::to_value(VerifyResponse {
1783 valid: result.valid,
1784 issues: result
1785 .issues
1786 .iter()
1787 .map(|i| VerifyIssueProto {
1788 action_id: i.action_id.clone(),
1789 severity: i.severity.clone(),
1790 message: i.message.clone(),
1791 })
1792 .collect(),
1793 simulated_state: result.simulated_state,
1794 })
1795 .map_err(|e| e.to_string())
1796}
1797
1798fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1805 req.params
1806 .get("tenant_id")
1807 .and_then(|v| v.as_str())
1808 .filter(|s| !s.is_empty())
1809 .map(str::to_string)
1810}
1811
1812async fn handle_state_get(
1813 req: &JsonRpcMessage,
1814 session: &crate::session::ClientSession,
1815) -> Result<Value, String> {
1816 let key = req
1817 .params
1818 .get("key")
1819 .and_then(|v| v.as_str())
1820 .ok_or("missing 'key'")?;
1821 let tenant = tenant_from_params(req);
1822 Ok(session
1823 .runtime
1824 .state
1825 .scoped(tenant.as_deref())
1826 .get(key)
1827 .unwrap_or(Value::Null))
1828}
1829
1830async fn handle_state_set(
1831 req: &JsonRpcMessage,
1832 session: &crate::session::ClientSession,
1833) -> Result<Value, String> {
1834 let key = req
1835 .params
1836 .get("key")
1837 .and_then(|v| v.as_str())
1838 .ok_or("missing 'key'")?;
1839 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1840 let tenant = tenant_from_params(req);
1841 session
1842 .runtime
1843 .state
1844 .scoped(tenant.as_deref())
1845 .set(key, value, "client");
1846 Ok(Value::from("ok"))
1847}
1848
1849async fn handle_state_exists(
1853 req: &JsonRpcMessage,
1854 session: &crate::session::ClientSession,
1855) -> Result<Value, String> {
1856 let key = req
1857 .params
1858 .get("key")
1859 .and_then(|v| v.as_str())
1860 .ok_or("missing 'key'")?;
1861 let tenant = tenant_from_params(req);
1862 Ok(Value::Bool(
1863 session.runtime.state.scoped(tenant.as_deref()).exists(key),
1864 ))
1865}
1866
1867async fn handle_state_keys(
1870 req: &JsonRpcMessage,
1871 session: &crate::session::ClientSession,
1872) -> Result<Value, String> {
1873 let tenant = tenant_from_params(req);
1874 Ok(Value::Array(
1875 session
1876 .runtime
1877 .state
1878 .scoped(tenant.as_deref())
1879 .keys()
1880 .into_iter()
1881 .map(Value::String)
1882 .collect(),
1883 ))
1884}
1885
1886async fn handle_state_snapshot(
1897 req: &JsonRpcMessage,
1898 session: &crate::session::ClientSession,
1899) -> Result<Value, String> {
1900 let tenant = tenant_from_params(req);
1901 let view = session.runtime.state.scoped(tenant.as_deref());
1902 let mut map = serde_json::Map::new();
1903 for key in view.keys() {
1904 if let Some(value) = view.get(&key) {
1905 map.insert(key, value);
1906 }
1907 }
1908 Ok(Value::Object(map))
1909}
1910
1911fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1917 let base = car_ffi_common::memory_path::ensure_base()
1918 .map_err(|e| format!("memory base unavailable: {e}"))?;
1919 let dir = base.join("agents");
1920 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1921 Ok(dir.join(format!("{agent_id}.json")))
1922}
1923
1924async fn get_or_load_agent_memgine(
1931 state: &Arc<ServerState>,
1932 agent_id: &str,
1933) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1934 {
1935 let map = state.agent_memgines.lock().await;
1936 if let Some(eng) = map.get(agent_id) {
1937 return Ok(eng.clone());
1938 }
1939 }
1940 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
1942 None,
1943 )));
1944 let path = agent_memgine_snapshot_path(agent_id)?;
1945 if path.exists() {
1946 let content = std::fs::read_to_string(&path)
1947 .map_err(|e| format!("read {}: {}", path.display(), e))?;
1948 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1949 let mut g = engine.lock().await;
1950 let mut loaded: u32 = 0;
1951 for fact in &facts {
1952 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1953 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1954 let kind = fact
1955 .get("kind")
1956 .and_then(|v| v.as_str())
1957 .unwrap_or("pattern");
1958 let fid = format!("loaded-{loaded}");
1959 g.ingest_fact(
1960 &fid,
1961 subject,
1962 body,
1963 "user",
1964 "peer",
1965 chrono::Utc::now(),
1966 "global",
1967 None,
1968 vec![],
1969 kind == "constraint",
1970 );
1971 loaded += 1;
1972 }
1973 }
1974 let mut map = state.agent_memgines.lock().await;
1975 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1976 Ok(stored)
1977}
1978
1979async fn persist_agent_memgine(
1983 agent_id: &str,
1984 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1985) -> Result<(), String> {
1986 let path = agent_memgine_snapshot_path(agent_id)?;
1987 let g = engine.lock().await;
1988 let facts: Vec<Value> = g
1989 .graph
1990 .inner
1991 .node_indices()
1992 .filter_map(|nix| {
1993 let node = g.graph.inner.node_weight(nix)?;
1994 if !node.is_valid() {
1995 return None;
1996 }
1997 if node.kind == car_memgine::MemKind::Identity
1998 || node.kind == car_memgine::MemKind::Environment
1999 {
2000 return None;
2001 }
2002 Some(serde_json::json!({
2003 "subject": node.key,
2004 "body": node.value,
2005 "kind": match node.kind {
2006 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2007 car_memgine::MemKind::Conversation => "outcome",
2008 _ => "pattern",
2009 },
2010 "confidence": 0.5,
2011 "content_type": node.content_type.as_label(),
2012 }))
2013 })
2014 .collect();
2015 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2016 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
2017 Ok(())
2018}
2019
2020async fn handle_memory_fact_count(
2027 session: &crate::session::ClientSession,
2028) -> Result<Value, String> {
2029 let engine_arc = session.effective_memgine().await;
2030 let engine = engine_arc.lock().await;
2031 Ok(Value::from(engine.valid_fact_count()))
2032}
2033
2034async fn handle_memory_add_fact(
2035 req: &JsonRpcMessage,
2036 session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038 let subject = req
2039 .params
2040 .get("subject")
2041 .and_then(|v| v.as_str())
2042 .ok_or("missing subject")?;
2043 let body = req
2044 .params
2045 .get("body")
2046 .and_then(|v| v.as_str())
2047 .ok_or("missing body")?;
2048 let kind = req
2049 .params
2050 .get("kind")
2051 .and_then(|v| v.as_str())
2052 .unwrap_or("pattern");
2053 let engine_arc = session.effective_memgine().await;
2057 let count = {
2058 let mut engine = engine_arc.lock().await;
2059 let fid = format!("ws-{}", engine.valid_fact_count());
2060 engine.ingest_fact(
2061 &fid,
2062 subject,
2063 body,
2064 "user",
2065 "peer",
2066 chrono::Utc::now(),
2067 "global",
2068 None,
2069 vec![],
2070 kind == "constraint",
2071 );
2072 engine.valid_fact_count()
2073 };
2074 if let Some(id) = session.agent_id.lock().await.clone() {
2077 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
2078 tracing::warn!(agent_id = %id, error = %e,
2079 "agent memgine persist failed; in-memory state is canonical");
2080 }
2081 }
2082 Ok(Value::from(count))
2083}
2084
2085async fn handle_memory_query(
2086 req: &JsonRpcMessage,
2087 session: &crate::session::ClientSession,
2088) -> Result<Value, String> {
2089 let query = req
2090 .params
2091 .get("query")
2092 .and_then(|v| v.as_str())
2093 .ok_or("missing query")?;
2094 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2095 let engine_arc = session.effective_memgine().await;
2096 let engine = engine_arc.lock().await;
2097 let seeds = engine.graph.find_seeds(query, 5);
2098 let hits = if !seeds.is_empty() {
2103 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2104 } else {
2105 vec![]
2106 };
2107 let results: Vec<Value> = hits
2108 .iter()
2109 .filter_map(|hit| {
2110 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2111 Some(serde_json::json!({
2112 "subject": node.key,
2113 "body": node.value,
2114 "kind": format!("{:?}", node.kind).to_lowercase(),
2115 "confidence": hit.activation,
2116 }))
2117 })
2118 .collect();
2119 serde_json::to_value(results).map_err(|e| e.to_string())
2120}
2121
2122async fn handle_memory_build_context(
2123 req: &JsonRpcMessage,
2124 session: &crate::session::ClientSession,
2125) -> Result<Value, String> {
2126 let query = req
2127 .params
2128 .get("query")
2129 .and_then(|v| v.as_str())
2130 .unwrap_or("");
2131 let model_context_window = req
2135 .params
2136 .get("model_context_window")
2137 .and_then(|v| v.as_u64())
2138 .map(|w| w as usize);
2139 let mut engine = session.memgine.lock().await;
2140 Ok(Value::from(
2141 engine.build_context_for_model(query, model_context_window),
2142 ))
2143}
2144
2145async fn handle_memory_build_context_fast(
2151 req: &JsonRpcMessage,
2152 session: &crate::session::ClientSession,
2153) -> Result<Value, String> {
2154 let query = req
2155 .params
2156 .get("query")
2157 .and_then(|v| v.as_str())
2158 .unwrap_or("");
2159 let model_context_window = req
2160 .params
2161 .get("model_context_window")
2162 .and_then(|v| v.as_u64())
2163 .map(|w| w as usize);
2164 let mut engine = session.memgine.lock().await;
2165 Ok(Value::from(engine.build_context_with_options(
2166 query,
2167 model_context_window,
2168 car_memgine::ContextMode::Fast,
2169 None,
2170 )))
2171}
2172
2173async fn handle_memory_persist(
2189 req: &JsonRpcMessage,
2190 session: &crate::session::ClientSession,
2191) -> Result<Value, String> {
2192 let path = req
2193 .params
2194 .get("path")
2195 .and_then(|v| v.as_str())
2196 .ok_or("missing path")?;
2197 let resolved = car_ffi_common::memory_path::resolve(path)
2198 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2199 let engine = session.memgine.lock().await;
2200 let facts: Vec<Value> = engine
2201 .graph
2202 .inner
2203 .node_indices()
2204 .filter_map(|nix| {
2205 let node = engine.graph.inner.node_weight(nix)?;
2206 if !node.is_valid() {
2207 return None;
2208 }
2209 if node.kind == car_memgine::MemKind::Identity
2210 || node.kind == car_memgine::MemKind::Environment
2211 {
2212 return None;
2213 }
2214 Some(serde_json::json!({
2215 "subject": node.key,
2216 "body": node.value,
2217 "kind": match node.kind {
2218 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2219 car_memgine::MemKind::Conversation => "outcome",
2220 _ => "pattern",
2221 },
2222 "confidence": 0.5,
2223 "content_type": node.content_type.as_label(),
2224 }))
2225 })
2226 .collect();
2227 let count = facts.len();
2228 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2229 std::fs::write(&resolved, json)
2230 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2231 Ok(Value::from(count as u64))
2232}
2233
2234async fn handle_memory_load(
2240 req: &JsonRpcMessage,
2241 session: &crate::session::ClientSession,
2242) -> Result<Value, String> {
2243 let path = req
2244 .params
2245 .get("path")
2246 .and_then(|v| v.as_str())
2247 .ok_or("missing path")?;
2248 let resolved = car_ffi_common::memory_path::resolve(path)
2249 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2250 let content = std::fs::read_to_string(&resolved)
2251 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2252 let facts: Vec<Value> =
2253 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2254 let mut engine = session.memgine.lock().await;
2255 engine.reset();
2256 let mut count: u32 = 0;
2257 for fact in &facts {
2258 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2259 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2260 let kind = fact
2261 .get("kind")
2262 .and_then(|v| v.as_str())
2263 .unwrap_or("pattern");
2264 let fid = format!("loaded-{}", count);
2265 engine.ingest_fact(
2266 &fid,
2267 subject,
2268 body,
2269 "user",
2270 "peer",
2271 chrono::Utc::now(),
2272 "global",
2273 None,
2274 vec![],
2275 kind == "constraint",
2276 );
2277 count += 1;
2278 }
2279 Ok(Value::from(count))
2280}
2281
2282async fn handle_skill_ingest(
2285 req: &JsonRpcMessage,
2286 session: &crate::session::ClientSession,
2287) -> Result<Value, String> {
2288 let name = req
2289 .params
2290 .get("name")
2291 .and_then(|v| v.as_str())
2292 .ok_or("missing name")?;
2293 let code = req
2294 .params
2295 .get("code")
2296 .and_then(|v| v.as_str())
2297 .ok_or("missing code")?;
2298 let platform = req
2299 .params
2300 .get("platform")
2301 .and_then(|v| v.as_str())
2302 .unwrap_or("unknown");
2303 let persona = req
2304 .params
2305 .get("persona")
2306 .and_then(|v| v.as_str())
2307 .unwrap_or("");
2308 let url_pattern = req
2309 .params
2310 .get("url_pattern")
2311 .and_then(|v| v.as_str())
2312 .unwrap_or("");
2313 let description = req
2314 .params
2315 .get("description")
2316 .and_then(|v| v.as_str())
2317 .unwrap_or("");
2318 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2319 let keywords: Vec<String> = req
2320 .params
2321 .get("task_keywords")
2322 .and_then(|v| v.as_array())
2323 .map(|arr| {
2324 arr.iter()
2325 .filter_map(|v| v.as_str().map(String::from))
2326 .collect()
2327 })
2328 .unwrap_or_default();
2329
2330 let trigger = car_memgine::SkillTrigger {
2331 persona: persona.into(),
2332 url_pattern: url_pattern.into(),
2333 task_keywords: keywords,
2334 structured: None,
2335 };
2336 let mut engine = session.memgine.lock().await;
2337 let node = engine.ingest_skill(
2338 name,
2339 code,
2340 platform,
2341 trigger,
2342 description,
2343 supersedes,
2344 vec![],
2345 vec![],
2346 );
2347 Ok(Value::from(node.index() as u64))
2348}
2349
2350async fn handle_skill_find(
2351 req: &JsonRpcMessage,
2352 session: &crate::session::ClientSession,
2353) -> Result<Value, String> {
2354 let persona = req
2355 .params
2356 .get("persona")
2357 .and_then(|v| v.as_str())
2358 .unwrap_or("");
2359 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2360 let task = req
2361 .params
2362 .get("task")
2363 .and_then(|v| v.as_str())
2364 .unwrap_or("");
2365 let max = req
2366 .params
2367 .get("max_results")
2368 .and_then(|v| v.as_u64())
2369 .unwrap_or(1) as usize;
2370 let engine = session.memgine.lock().await;
2371 let results = engine.find_skill(persona, url, task, max);
2372 let json: Vec<Value> = results
2373 .iter()
2374 .map(|(m, s)| {
2375 serde_json::json!({
2376 "name": m.name, "code": m.code, "platform": m.platform,
2377 "description": m.description, "stats": m.stats, "match_score": s,
2378 })
2379 })
2380 .collect();
2381 serde_json::to_value(json).map_err(|e| e.to_string())
2382}
2383
2384async fn handle_skill_report(
2385 req: &JsonRpcMessage,
2386 session: &crate::session::ClientSession,
2387) -> Result<Value, String> {
2388 let name = req
2389 .params
2390 .get("skill_name")
2391 .and_then(|v| v.as_str())
2392 .ok_or("missing skill_name")?;
2393 let outcome_str = req
2394 .params
2395 .get("outcome")
2396 .and_then(|v| v.as_str())
2397 .ok_or("missing outcome")?;
2398 let outcome = match outcome_str {
2399 "success" => car_memgine::SkillOutcome::Success,
2400 _ => car_memgine::SkillOutcome::Fail,
2401 };
2402 let mut engine = session.memgine.lock().await;
2403 let stats = engine
2404 .report_outcome(name, outcome)
2405 .ok_or(format!("skill '{}' not found", name))?;
2406 serde_json::to_value(stats).map_err(|e| e.to_string())
2407}
2408
2409struct WsAgentRunner {
2418 channel: Arc<WsChannel>,
2419 host: Arc<crate::host::HostState>,
2420 client_id: String,
2421}
2422
2423#[async_trait::async_trait]
2424impl car_multi::AgentRunner for WsAgentRunner {
2425 async fn run(
2426 &self,
2427 spec: &car_multi::AgentSpec,
2428 task: &str,
2429 _runtime: &car_engine::Runtime,
2430 _mailbox: &car_multi::Mailbox,
2431 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2432 use futures::SinkExt;
2433
2434 let request_id = self.channel.next_request_id();
2435 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2436 let agent = self
2437 .host
2438 .register_agent(
2439 &self.client_id,
2440 RegisterHostAgentRequest {
2441 id: Some(agent_id.clone()),
2442 name: spec.name.clone(),
2443 kind: "callback".to_string(),
2444 capabilities: spec.tools.clone(),
2445 project: spec
2446 .metadata
2447 .get("project")
2448 .and_then(|v| v.as_str())
2449 .map(str::to_string),
2450 pid: None,
2451 display: serde_json::from_value(
2452 spec.metadata
2453 .get("display")
2454 .cloned()
2455 .unwrap_or(serde_json::Value::Null),
2456 )
2457 .unwrap_or_default(),
2458 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2459 },
2460 )
2461 .await
2462 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2463 let _ = self
2464 .host
2465 .set_status(
2466 &self.client_id,
2467 SetHostAgentStatusRequest {
2468 agent_id: agent.id.clone(),
2469 status: HostAgentStatus::Running,
2470 current_task: Some(task.to_string()),
2471 message: Some(format!("{} started", spec.name)),
2472 payload: serde_json::json!({ "task": task }),
2473 },
2474 )
2475 .await;
2476
2477 let rpc_request = serde_json::json!({
2478 "jsonrpc": "2.0",
2479 "method": "multi.run_agent",
2480 "params": {
2481 "spec": spec,
2482 "task": task,
2483 },
2484 "id": request_id,
2485 });
2486
2487 let (tx, rx) = tokio::sync::oneshot::channel();
2489 self.channel
2490 .pending
2491 .lock()
2492 .await
2493 .insert(request_id.clone(), tx);
2494
2495 let msg = Message::Text(
2496 serde_json::to_string(&rpc_request)
2497 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2498 .into(),
2499 );
2500 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2501 let _ = self
2502 .host
2503 .set_status(
2504 &self.client_id,
2505 SetHostAgentStatusRequest {
2506 agent_id: agent_id.clone(),
2507 status: HostAgentStatus::Errored,
2508 current_task: None,
2509 message: Some(format!("{} failed to start", spec.name)),
2510 payload: serde_json::json!({ "error": e.to_string() }),
2511 },
2512 )
2513 .await;
2514 return Err(car_multi::MultiError::AgentFailed(
2515 spec.name.clone(),
2516 format!("ws send error: {}", e),
2517 ));
2518 }
2519
2520 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2522 Ok(Ok(response)) => response,
2523 Ok(Err(_)) => {
2524 let _ = self
2525 .host
2526 .set_status(
2527 &self.client_id,
2528 SetHostAgentStatusRequest {
2529 agent_id: agent_id.clone(),
2530 status: HostAgentStatus::Errored,
2531 current_task: None,
2532 message: Some(format!("{} callback channel closed", spec.name)),
2533 payload: Value::Null,
2534 },
2535 )
2536 .await;
2537 return Err(car_multi::MultiError::AgentFailed(
2538 spec.name.clone(),
2539 "agent callback channel closed".into(),
2540 ));
2541 }
2542 Err(_) => {
2543 let _ = self
2544 .host
2545 .set_status(
2546 &self.client_id,
2547 SetHostAgentStatusRequest {
2548 agent_id: agent_id.clone(),
2549 status: HostAgentStatus::Errored,
2550 current_task: None,
2551 message: Some(format!("{} timed out", spec.name)),
2552 payload: Value::Null,
2553 },
2554 )
2555 .await;
2556 return Err(car_multi::MultiError::AgentFailed(
2557 spec.name.clone(),
2558 "agent callback timed out (300s)".into(),
2559 ));
2560 }
2561 };
2562
2563 if let Some(err) = response.error {
2564 let _ = self
2565 .host
2566 .set_status(
2567 &self.client_id,
2568 SetHostAgentStatusRequest {
2569 agent_id: agent_id.clone(),
2570 status: HostAgentStatus::Errored,
2571 current_task: None,
2572 message: Some(format!("{} errored", spec.name)),
2573 payload: serde_json::json!({ "error": err }),
2574 },
2575 )
2576 .await;
2577 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2578 }
2579
2580 let output_value = response.output.unwrap_or(Value::Null);
2581 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2582 car_multi::MultiError::AgentFailed(
2583 spec.name.clone(),
2584 format!("invalid AgentOutput: {}", e),
2585 )
2586 })?;
2587 let status = if output.error.is_some() {
2588 HostAgentStatus::Errored
2589 } else {
2590 HostAgentStatus::Completed
2591 };
2592 let message = if output.error.is_some() {
2593 format!("{} errored", spec.name)
2594 } else {
2595 format!("{} completed", spec.name)
2596 };
2597 let _ = self
2598 .host
2599 .set_status(
2600 &self.client_id,
2601 SetHostAgentStatusRequest {
2602 agent_id,
2603 status,
2604 current_task: None,
2605 message: Some(message),
2606 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2607 },
2608 )
2609 .await;
2610
2611 Ok(output)
2612 }
2613}
2614
2615fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2616 let safe_name: String = name
2617 .chars()
2618 .map(|c| {
2619 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2620 c
2621 } else {
2622 '-'
2623 }
2624 })
2625 .collect();
2626 format!("{}:{}:{}", client_id, safe_name, request_id)
2627}
2628
2629async fn handle_multi_swarm(
2630 req: &JsonRpcMessage,
2631 session: &crate::session::ClientSession,
2632) -> Result<Value, String> {
2633 let mode_str = req
2634 .params
2635 .get("mode")
2636 .and_then(|v| v.as_str())
2637 .ok_or("missing 'mode'")?;
2638 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2639 let task = req
2640 .params
2641 .get("task")
2642 .and_then(|v| v.as_str())
2643 .ok_or("missing 'task'")?;
2644
2645 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2646 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2647 let agent_specs: Vec<car_multi::AgentSpec> =
2648 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2649 let synth: Option<car_multi::AgentSpec> = req
2650 .params
2651 .get("synthesizer")
2652 .map(|v| serde_json::from_value(v.clone()))
2653 .transpose()
2654 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2655
2656 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2657 channel: session.channel.clone(),
2658 host: session.host.clone(),
2659 client_id: session.client_id.clone(),
2660 });
2661 let infra = car_multi::SharedInfra::new();
2662
2663 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2664 if let Some(s) = synth {
2665 swarm = swarm.with_synthesizer(s);
2666 }
2667
2668 let result = swarm
2669 .run(task, &runner, &infra)
2670 .await
2671 .map_err(|e| format!("swarm error: {}", e))?;
2672 serde_json::to_value(result).map_err(|e| e.to_string())
2673}
2674
2675async fn handle_multi_pipeline(
2676 req: &JsonRpcMessage,
2677 session: &crate::session::ClientSession,
2678) -> Result<Value, String> {
2679 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2680 let task = req
2681 .params
2682 .get("task")
2683 .and_then(|v| v.as_str())
2684 .ok_or("missing 'task'")?;
2685
2686 let stage_specs: Vec<car_multi::AgentSpec> =
2687 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2688
2689 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2690 channel: session.channel.clone(),
2691 host: session.host.clone(),
2692 client_id: session.client_id.clone(),
2693 });
2694 let infra = car_multi::SharedInfra::new();
2695
2696 let result = car_multi::Pipeline::new(stage_specs)
2697 .run(task, &runner, &infra)
2698 .await
2699 .map_err(|e| format!("pipeline error: {}", e))?;
2700 serde_json::to_value(result).map_err(|e| e.to_string())
2701}
2702
2703async fn handle_multi_supervisor(
2704 req: &JsonRpcMessage,
2705 session: &crate::session::ClientSession,
2706) -> Result<Value, String> {
2707 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2708 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2709 let task = req
2710 .params
2711 .get("task")
2712 .and_then(|v| v.as_str())
2713 .ok_or("missing 'task'")?;
2714 let max_rounds = req
2715 .params
2716 .get("max_rounds")
2717 .and_then(|v| v.as_u64())
2718 .unwrap_or(3) as u32;
2719
2720 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2721 .map_err(|e| format!("invalid workers: {}", e))?;
2722 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2723 .map_err(|e| format!("invalid supervisor: {}", e))?;
2724
2725 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2726 channel: session.channel.clone(),
2727 host: session.host.clone(),
2728 client_id: session.client_id.clone(),
2729 });
2730 let infra = car_multi::SharedInfra::new();
2731
2732 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2733 .with_max_rounds(max_rounds)
2734 .run(task, &runner, &infra)
2735 .await
2736 .map_err(|e| format!("supervisor error: {}", e))?;
2737 serde_json::to_value(result).map_err(|e| e.to_string())
2738}
2739
2740async fn handle_multi_map_reduce(
2741 req: &JsonRpcMessage,
2742 session: &crate::session::ClientSession,
2743) -> Result<Value, String> {
2744 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2745 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2746 let task = req
2747 .params
2748 .get("task")
2749 .and_then(|v| v.as_str())
2750 .ok_or("missing 'task'")?;
2751 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2752
2753 let mapper_spec: car_multi::AgentSpec =
2754 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2755 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2756 .map_err(|e| format!("invalid reducer: {}", e))?;
2757 let items: Vec<String> =
2758 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2759
2760 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2761 channel: session.channel.clone(),
2762 host: session.host.clone(),
2763 client_id: session.client_id.clone(),
2764 });
2765 let infra = car_multi::SharedInfra::new();
2766
2767 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2768 .run(task, &items, &runner, &infra)
2769 .await
2770 .map_err(|e| format!("map_reduce error: {}", e))?;
2771 serde_json::to_value(result).map_err(|e| e.to_string())
2772}
2773
2774async fn handle_multi_vote(
2775 req: &JsonRpcMessage,
2776 session: &crate::session::ClientSession,
2777) -> Result<Value, String> {
2778 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2779 let task = req
2780 .params
2781 .get("task")
2782 .and_then(|v| v.as_str())
2783 .ok_or("missing 'task'")?;
2784
2785 let agent_specs: Vec<car_multi::AgentSpec> =
2786 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2787 let synth: Option<car_multi::AgentSpec> = req
2788 .params
2789 .get("synthesizer")
2790 .map(|v| serde_json::from_value(v.clone()))
2791 .transpose()
2792 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2793
2794 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2795 channel: session.channel.clone(),
2796 host: session.host.clone(),
2797 client_id: session.client_id.clone(),
2798 });
2799 let infra = car_multi::SharedInfra::new();
2800
2801 let mut vote = car_multi::Vote::new(agent_specs);
2802 if let Some(s) = synth {
2803 vote = vote.with_synthesizer(s);
2804 }
2805
2806 let result = vote
2807 .run(task, &runner, &infra)
2808 .await
2809 .map_err(|e| format!("vote error: {}", e))?;
2810 serde_json::to_value(result).map_err(|e| e.to_string())
2811}
2812
2813fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2818 let name = req
2819 .params
2820 .get("name")
2821 .and_then(|v| v.as_str())
2822 .ok_or("scheduler.create requires 'name'")?;
2823 let prompt = req
2824 .params
2825 .get("prompt")
2826 .and_then(|v| v.as_str())
2827 .ok_or("scheduler.create requires 'prompt'")?;
2828
2829 let mut task = car_scheduler::Task::new(name, prompt);
2830
2831 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2832 let trigger = match t {
2833 "once" => car_scheduler::TaskTrigger::Once,
2834 "cron" => car_scheduler::TaskTrigger::Cron,
2835 "interval" => car_scheduler::TaskTrigger::Interval,
2836 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2837 _ => car_scheduler::TaskTrigger::Manual,
2838 };
2839 let schedule = req
2840 .params
2841 .get("schedule")
2842 .and_then(|v| v.as_str())
2843 .unwrap_or("");
2844 task = task.with_trigger(trigger, schedule);
2845 }
2846
2847 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2848 task = task.with_system_prompt(sp);
2849 }
2850
2851 serde_json::to_value(&task).map_err(|e| e.to_string())
2852}
2853
2854async fn handle_scheduler_run(
2855 req: &JsonRpcMessage,
2856 session: &crate::session::ClientSession,
2857) -> Result<Value, String> {
2858 let task_val = req
2859 .params
2860 .get("task")
2861 .ok_or("scheduler.run requires 'task'")?;
2862 let mut task: car_scheduler::Task =
2863 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2864
2865 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2866 channel: session.channel.clone(),
2867 host: session.host.clone(),
2868 client_id: session.client_id.clone(),
2869 });
2870 let executor = car_scheduler::Executor::new(runner);
2871 let execution = executor.run_once(&mut task).await;
2872
2873 serde_json::to_value(&execution).map_err(|e| e.to_string())
2874}
2875
2876async fn handle_scheduler_run_loop(
2877 req: &JsonRpcMessage,
2878 session: &crate::session::ClientSession,
2879) -> Result<Value, String> {
2880 let task_val = req
2881 .params
2882 .get("task")
2883 .ok_or("scheduler.run_loop requires 'task'")?;
2884 let mut task: car_scheduler::Task =
2885 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2886 let max_iterations = req
2887 .params
2888 .get("max_iterations")
2889 .and_then(|v| v.as_u64())
2890 .map(|v| v as u32);
2891
2892 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2893 channel: session.channel.clone(),
2894 host: session.host.clone(),
2895 client_id: session.client_id.clone(),
2896 });
2897 let executor = car_scheduler::Executor::new(runner);
2898 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2899 let executions = executor
2900 .run_loop(&mut task, max_iterations, cancel_rx)
2901 .await;
2902
2903 serde_json::to_value(&executions).map_err(|e| e.to_string())
2904}
2905
2906fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2911 state.inference.get_or_init(|| {
2912 Arc::new(car_inference::InferenceEngine::new(
2913 car_inference::InferenceConfig::default(),
2914 ))
2915 })
2916}
2917
2918async fn handle_infer(
2919 msg: &JsonRpcMessage,
2920 state: &ServerState,
2921 session: &crate::session::ClientSession,
2922) -> Result<Value, String> {
2923 let engine = get_inference_engine(state);
2924 let mut req: car_inference::GenerateRequest =
2925 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2926
2927 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2929 let mut memgine = session.memgine.lock().await;
2930 let ctx = memgine.build_context(cq);
2931 if !ctx.is_empty() {
2932 req.context = Some(ctx);
2933 }
2934 }
2935
2936 let _permit = state.admission.acquire().await;
2942
2943 let result = engine
2954 .generate_tracked(req)
2955 .await
2956 .map_err(|e| e.to_string())?;
2957 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2958}
2959
2960async fn handle_image_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2990 let engine = get_inference_engine(state);
2991 let req: car_inference::GenerateImageRequest =
2992 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2993 let _permit = state.admission.acquire().await;
2996 let result = engine
2997 .generate_image(req)
2998 .await
2999 .map_err(|e| e.to_string())?;
3000 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3001}
3002
3003async fn handle_video_generate(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3004 let engine = get_inference_engine(state);
3005 let req: car_inference::GenerateVideoRequest =
3006 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3007 let _permit = state.admission.acquire().await;
3008 let result = engine
3009 .generate_video(req)
3010 .await
3011 .map_err(|e| e.to_string())?;
3012 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
3013}
3014
3015async fn handle_infer_stream(
3016 msg: &JsonRpcMessage,
3017 session: &crate::session::ClientSession,
3018 state: &ServerState,
3019) -> Result<Value, String> {
3020 use futures::SinkExt;
3021 use tokio_tungstenite::tungstenite::Message;
3022
3023 let engine = get_inference_engine(state);
3024 let mut req: car_inference::GenerateRequest =
3025 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3026
3027 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
3030 let mut memgine = session.memgine.lock().await;
3031 let ctx = memgine.build_context(cq);
3032 if !ctx.is_empty() {
3033 req.context = Some(ctx);
3034 }
3035 }
3036
3037 let _permit = state.admission.acquire().await;
3038 let mut rx = engine
3039 .generate_tracked_stream(req)
3040 .await
3041 .map_err(|e| e.to_string())?;
3042
3043 let mut accumulator = car_inference::StreamAccumulator::default();
3044 let request_id = msg.id.clone();
3045
3046 while let Some(event) = rx.recv().await {
3047 let event_payload = match &event {
3048 car_inference::StreamEvent::TextDelta(text) => {
3049 serde_json::json!({"type": "text", "data": text})
3050 }
3051 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
3052 serde_json::json!({"type": "tool_start", "name": name, "index": index})
3053 }
3054 car_inference::StreamEvent::ToolCallDelta {
3055 index,
3056 arguments_delta,
3057 } => serde_json::json!({
3058 "type": "tool_delta",
3059 "index": index,
3060 "data": arguments_delta,
3061 }),
3062 car_inference::StreamEvent::Usage {
3063 input_tokens,
3064 output_tokens,
3065 } => serde_json::json!({
3066 "type": "usage",
3067 "input_tokens": input_tokens,
3068 "output_tokens": output_tokens,
3069 }),
3070 car_inference::StreamEvent::Done { .. } => {
3075 accumulator.push(&event);
3076 continue;
3077 }
3078 };
3079
3080 let notif = serde_json::json!({
3081 "jsonrpc": "2.0",
3082 "method": "inference.stream.event",
3083 "params": {
3084 "request_id": request_id,
3085 "event": event_payload,
3086 },
3087 });
3088 if let Ok(text) = serde_json::to_string(¬if) {
3089 let _ = session
3090 .channel
3091 .write
3092 .lock()
3093 .await
3094 .send(Message::Text(text.into()))
3095 .await;
3096 }
3097 accumulator.push(&event);
3098 }
3099
3100 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3101 Ok(serde_json::json!({
3102 "text": text,
3103 "tool_calls": tool_calls,
3104 "usage": usage,
3105 }))
3106}
3107
3108async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3109 let engine = get_inference_engine(state);
3110 let req: car_inference::EmbedRequest =
3111 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3112 let _permit = state.admission.acquire().await;
3116 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3117 Ok(serde_json::json!({"embeddings": result}))
3118}
3119
3120async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3121 let engine = get_inference_engine(state);
3122 let req: car_inference::ClassifyRequest =
3123 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3124 let _permit = state.admission.acquire().await;
3125 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3126 Ok(serde_json::json!({"classifications": result}))
3127}
3128
3129fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3133 let total = state.admission.permits();
3134 let available = state.admission.permits_available();
3135 let in_use = total.saturating_sub(available);
3136 Ok(serde_json::json!({
3137 "permits_total": total,
3138 "permits_available": available,
3139 "permits_in_use": in_use,
3140 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3141 }))
3142}
3143
3144async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3145 let model = msg
3146 .params
3147 .get("model")
3148 .and_then(|v| v.as_str())
3149 .ok_or("missing 'model' parameter")?;
3150 let text = msg
3151 .params
3152 .get("text")
3153 .and_then(|v| v.as_str())
3154 .ok_or("missing 'text' parameter")?;
3155 let engine = get_inference_engine(state);
3156 let ids = engine
3157 .tokenize(model, text)
3158 .await
3159 .map_err(|e| e.to_string())?;
3160 Ok(serde_json::json!({"tokens": ids}))
3161}
3162
3163async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3164 let model = msg
3165 .params
3166 .get("model")
3167 .and_then(|v| v.as_str())
3168 .ok_or("missing 'model' parameter")?;
3169 let tokens: Vec<u32> = msg
3170 .params
3171 .get("tokens")
3172 .and_then(|v| v.as_array())
3173 .ok_or("missing 'tokens' parameter")?
3174 .iter()
3175 .map(|t| {
3176 t.as_u64()
3177 .and_then(|n| u32::try_from(n).ok())
3178 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3179 })
3180 .collect::<Result<Vec<_>, _>>()?;
3181 let engine = get_inference_engine(state);
3182 let text = engine
3183 .detokenize(model, &tokens)
3184 .await
3185 .map_err(|e| e.to_string())?;
3186 Ok(serde_json::json!({"text": text}))
3187}
3188
3189async fn handle_models_register(
3208 req: &JsonRpcMessage,
3209 _state: &Arc<ServerState>,
3210) -> Result<Value, String> {
3211 let schema_value = match req.params.get("schema") {
3215 Some(v) => v.clone(),
3216 None => req.params.clone(),
3217 };
3218 let schema: car_inference::ModelSchema =
3219 serde_json::from_value(schema_value).map_err(|e| format!("invalid ModelSchema: {e}"))?;
3220 let id = schema.id.clone();
3221
3222 let home = std::env::var_os("HOME")
3227 .or_else(|| std::env::var_os("USERPROFILE"))
3228 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3229 let car_dir = std::path::PathBuf::from(home).join(".car");
3230 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3231 let path = car_dir.join("models.json");
3232
3233 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3234 let text =
3235 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3236 if text.trim().is_empty() {
3237 Vec::new()
3238 } else {
3239 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3240 }
3241 } else {
3242 Vec::new()
3243 };
3244 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3246 *slot = schema;
3247 } else {
3248 models.push(schema);
3249 }
3250 let json =
3251 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3252 let tmp = path.with_extension("json.tmp");
3253 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3254 std::fs::rename(&tmp, &path)
3255 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3256 Ok(serde_json::json!({
3257 "id": id,
3258 "registered": true,
3259 "path": path.to_string_lossy(),
3260 "note": "Daemon restart required for live UnifiedRegistry visibility \
3261 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3262 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3263 }))
3264}
3265
3266async fn handle_models_unregister(
3277 req: &JsonRpcMessage,
3278 _state: &Arc<ServerState>,
3279) -> Result<Value, String> {
3280 let id = match req.params.get("id") {
3284 Some(v) => v
3285 .as_str()
3286 .ok_or_else(|| "`id` must be a string".to_string())?
3287 .to_string(),
3288 None => match req.params.as_str() {
3289 Some(s) => s.to_string(),
3290 None => return Err("missing `id` parameter".to_string()),
3291 },
3292 };
3293
3294 let home = std::env::var_os("HOME")
3295 .or_else(|| std::env::var_os("USERPROFILE"))
3296 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3297 let car_dir = std::path::PathBuf::from(home).join(".car");
3298 let path = car_dir.join("models.json");
3299
3300 if !path.exists() {
3301 return Err(format!(
3302 "no models.json at {} — nothing to unregister",
3303 path.display()
3304 ));
3305 }
3306 let text =
3307 std::fs::read_to_string(&path).map_err(|e| format!("read {}: {e}", path.display()))?;
3308 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3309 Vec::new()
3310 } else {
3311 serde_json::from_str(&text).map_err(|e| format!("parse {}: {e}", path.display()))?
3312 };
3313 let before = models.len();
3314 models.retain(|m| m.id != id);
3315 if models.len() == before {
3316 return Err(format!("model {} not found in {}", id, path.display()));
3317 }
3318 let json =
3319 serde_json::to_string_pretty(&models).map_err(|e| format!("serialize models.json: {e}"))?;
3320 let tmp = path.with_extension("json.tmp");
3321 std::fs::write(&tmp, json).map_err(|e| format!("write {}: {e}", tmp.display()))?;
3322 std::fs::rename(&tmp, &path)
3323 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3324 Ok(serde_json::json!({
3325 "id": id,
3326 "unregistered": true,
3327 "path": path.to_string_lossy(),
3328 "note": "Daemon restart required for live UnifiedRegistry visibility \
3329 (phase 1, matching models.register).",
3330 }))
3331}
3332
3333fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3334 let engine = get_inference_engine(state);
3335 let models = engine.list_models();
3336 serde_json::to_value(&models).map_err(|e| e.to_string())
3337}
3338
3339fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3340 let engine = get_inference_engine(state);
3341 let models = engine.list_models_unified();
3342 serde_json::to_value(&models).map_err(|e| e.to_string())
3343}
3344
3345#[derive(Debug, Deserialize)]
3346#[serde(rename_all = "camelCase")]
3347struct ModelSearchParams {
3348 #[serde(default)]
3349 query: Option<String>,
3350 #[serde(default)]
3351 capability: Option<car_inference::ModelCapability>,
3352 #[serde(default)]
3353 provider: Option<String>,
3354 #[serde(default)]
3355 local_only: bool,
3356 #[serde(default)]
3357 available_only: bool,
3358 #[serde(default)]
3359 limit: Option<usize>,
3360}
3361
3362#[derive(Debug, Serialize)]
3363#[serde(rename_all = "camelCase")]
3364struct ModelSearchEntry {
3365 #[serde(flatten)]
3366 info: car_inference::ModelInfo,
3367 family: String,
3368 version: String,
3369 tags: Vec<String>,
3370 pullable: bool,
3371 upgrade: Option<car_inference::ModelUpgrade>,
3372}
3373
3374#[derive(Debug, Serialize)]
3375#[serde(rename_all = "camelCase")]
3376struct ModelSearchResponse {
3377 models: Vec<ModelSearchEntry>,
3378 upgrades: Vec<car_inference::ModelUpgrade>,
3379 total: usize,
3380 available: usize,
3381 local: usize,
3382 remote: usize,
3383}
3384
3385fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3386 let params: ModelSearchParams =
3387 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3388 query: None,
3389 capability: None,
3390 provider: None,
3391 local_only: false,
3392 available_only: false,
3393 limit: None,
3394 });
3395 let engine = get_inference_engine(state);
3396 let upgrades = engine.available_model_upgrades();
3397 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3398 .iter()
3399 .cloned()
3400 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3401 .collect();
3402 let query = params
3403 .query
3404 .as_deref()
3405 .map(str::trim)
3406 .filter(|q| !q.is_empty())
3407 .map(|q| q.to_ascii_lowercase());
3408 let provider = params
3409 .provider
3410 .as_deref()
3411 .map(str::trim)
3412 .filter(|p| !p.is_empty())
3413 .map(|p| p.to_ascii_lowercase());
3414
3415 let mut entries: Vec<ModelSearchEntry> = engine
3416 .list_schemas()
3417 .into_iter()
3418 .filter(|schema| {
3419 if let Some(capability) = params.capability {
3420 if !schema.has_capability(capability) {
3421 return false;
3422 }
3423 }
3424 if let Some(provider) = provider.as_deref() {
3425 if schema.provider.to_ascii_lowercase() != provider {
3426 return false;
3427 }
3428 }
3429 if params.local_only && !schema.is_local() {
3430 return false;
3431 }
3432 if params.available_only && !schema.available {
3433 return false;
3434 }
3435 if let Some(query) = query.as_deref() {
3436 let capability_text = schema
3437 .capabilities
3438 .iter()
3439 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3440 .collect::<Vec<_>>()
3441 .join(" ");
3442 let haystack = format!(
3443 "{} {} {} {} {} {}",
3444 schema.id,
3445 schema.name,
3446 schema.provider,
3447 schema.family,
3448 schema.tags.join(" "),
3449 capability_text
3450 )
3451 .to_ascii_lowercase();
3452 if !haystack.contains(query) {
3453 return false;
3454 }
3455 }
3456 true
3457 })
3458 .map(|schema| {
3459 let pullable = !schema.available
3460 && matches!(
3461 schema.source,
3462 car_inference::ModelSource::Local { .. }
3463 | car_inference::ModelSource::Mlx { .. }
3464 );
3465 let info = car_inference::ModelInfo::from(&schema);
3466 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3467 ModelSearchEntry {
3468 info,
3469 family: schema.family,
3470 version: schema.version,
3471 tags: schema.tags,
3472 pullable,
3473 upgrade,
3474 }
3475 })
3476 .collect();
3477 entries.sort_by(|a, b| {
3478 b.info
3479 .available
3480 .cmp(&a.info.available)
3481 .then(b.info.is_local.cmp(&a.info.is_local))
3482 .then(a.info.name.cmp(&b.info.name))
3483 });
3484 if let Some(limit) = params.limit {
3485 entries.truncate(limit);
3486 }
3487
3488 let total = entries.len();
3489 let available = entries.iter().filter(|entry| entry.info.available).count();
3490 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3491 let response = ModelSearchResponse {
3492 models: entries,
3493 upgrades,
3494 total,
3495 available,
3496 local,
3497 remote: total.saturating_sub(local),
3498 };
3499 serde_json::to_value(response).map_err(|e| e.to_string())
3500}
3501
3502fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3503 let engine = get_inference_engine(state);
3504 serde_json::to_value(serde_json::json!({
3505 "upgrades": engine.available_model_upgrades()
3506 }))
3507 .map_err(|e| e.to_string())
3508}
3509
3510async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3511 let name = msg
3512 .params
3513 .get("name")
3514 .or_else(|| msg.params.get("id"))
3515 .or_else(|| msg.params.get("model"))
3516 .and_then(|v| v.as_str())
3517 .ok_or("missing 'name' parameter")?;
3518 let engine = get_inference_engine(state);
3519 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3520 Ok(serde_json::json!({"path": path.display().to_string()}))
3521}
3522
3523async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3524 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3525 msg.params
3526 .get("events")
3527 .cloned()
3528 .unwrap_or(msg.params.clone()),
3529 )
3530 .map_err(|e| format!("invalid events: {}", e))?;
3531
3532 let inference = get_inference_engine(state).clone();
3533 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3534
3535 let skills = engine.distill_skills(&events).await;
3536 serde_json::to_value(&skills).map_err(|e| e.to_string())
3537}
3538
3539async fn handle_memory_consolidate(
3543 session: &crate::session::ClientSession,
3544) -> Result<Value, String> {
3545 let engine_arc = session.effective_memgine().await;
3546 let report = {
3547 let mut engine = engine_arc.lock().await;
3548 engine.consolidate().await
3549 };
3550 if let Some(id) = session.agent_id.lock().await.clone() {
3551 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3552 tracing::warn!(agent_id = %id, error = %e,
3553 "agent memgine persist after consolidate failed");
3554 }
3555 }
3556 serde_json::to_value(&report).map_err(|e| e.to_string())
3557}
3558
3559async fn handle_skill_repair(
3563 msg: &JsonRpcMessage,
3564 session: &crate::session::ClientSession,
3565) -> Result<Value, String> {
3566 let name = msg
3567 .params
3568 .get("skill_name")
3569 .and_then(|v| v.as_str())
3570 .ok_or("missing 'skill_name' parameter")?;
3571 let mut engine = session.memgine.lock().await;
3572 let code = engine.repair_skill(name).await;
3573 Ok(match code {
3574 Some(c) => serde_json::json!({ "code": c }),
3575 None => Value::Null,
3576 })
3577}
3578
3579async fn handle_skills_ingest_distilled(
3582 msg: &JsonRpcMessage,
3583 session: &crate::session::ClientSession,
3584) -> Result<Value, String> {
3585 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3586 msg.params
3587 .get("skills")
3588 .cloned()
3589 .unwrap_or(msg.params.clone()),
3590 )
3591 .map_err(|e| format!("invalid skills: {}", e))?;
3592 let mut engine = session.memgine.lock().await;
3593 let nodes = engine.ingest_distilled_skills(&skills);
3594 Ok(serde_json::json!({ "ingested": nodes.len() }))
3595}
3596
3597async fn handle_skills_evolve(
3600 msg: &JsonRpcMessage,
3601 session: &crate::session::ClientSession,
3602) -> Result<Value, String> {
3603 let domain = msg
3604 .params
3605 .get("domain")
3606 .and_then(|v| v.as_str())
3607 .ok_or("missing 'domain' parameter")?
3608 .to_string();
3609 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3610 msg.params
3611 .get("events")
3612 .cloned()
3613 .unwrap_or(Value::Array(vec![])),
3614 )
3615 .map_err(|e| format!("invalid events: {}", e))?;
3616 let mut engine = session.memgine.lock().await;
3617 let skills = engine.evolve_skills(&events, &domain).await;
3618 serde_json::to_value(&skills).map_err(|e| e.to_string())
3619}
3620
3621async fn handle_skills_domains_needing_evolution(
3623 msg: &JsonRpcMessage,
3624 session: &crate::session::ClientSession,
3625) -> Result<Value, String> {
3626 let threshold = msg
3627 .params
3628 .get("threshold")
3629 .and_then(|v| v.as_f64())
3630 .unwrap_or(0.6);
3631 let engine = session.memgine.lock().await;
3632 let domains = engine.domains_needing_evolution(threshold);
3633 serde_json::to_value(&domains).map_err(|e| e.to_string())
3634}
3635
3636async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3638 let engine = get_inference_engine(state);
3639 let req: car_inference::RerankRequest =
3640 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3641 let _permit = state.admission.acquire().await;
3642 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3643 serde_json::to_value(&result).map_err(|e| e.to_string())
3644}
3645
3646async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3652 use base64::Engine as _;
3653 let engine = get_inference_engine(state);
3654
3655 let mut params = msg.params.clone();
3662 let audio_b64 = params
3663 .as_object_mut()
3664 .and_then(|m| m.remove("audio_b64"))
3665 .and_then(|v| v.as_str().map(str::to_string));
3666 let _tmp_audio = if let Some(b64) = audio_b64 {
3667 let bytes = base64::engine::general_purpose::STANDARD
3668 .decode(b64.as_bytes())
3669 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3670 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3671 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3672 let path = tmp.path().to_string_lossy().into_owned();
3673 if let Some(obj) = params.as_object_mut() {
3674 obj.insert("audio_path".to_string(), Value::String(path));
3675 }
3676 Some(tmp)
3677 } else {
3678 None
3679 };
3680
3681 let req: car_inference::TranscribeRequest =
3682 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3683 let _permit = state.admission.acquire().await;
3684 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3685 serde_json::to_value(&result).map_err(|e| e.to_string())
3686}
3687
3688async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3694 use base64::Engine as _;
3695 let engine = get_inference_engine(state);
3696
3697 let mut params = msg.params.clone();
3698 let return_b64 = params
3699 .as_object_mut()
3700 .and_then(|m| m.remove("return_b64"))
3701 .and_then(|v| v.as_bool())
3702 .unwrap_or(false);
3703 let no_output_path = params
3704 .as_object()
3705 .map(|m| !m.contains_key("output_path"))
3706 .unwrap_or(true);
3707
3708 let req: car_inference::SynthesizeRequest =
3709 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3710 let _permit = state.admission.acquire().await;
3711 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3712 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3713
3714 if return_b64 || no_output_path {
3718 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3719 format!(
3720 "synthesize: failed to read rendered audio at {}: {e}",
3721 result.audio_path
3722 )
3723 })?;
3724 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3725 if let Some(obj) = value.as_object_mut() {
3726 obj.insert("audio_b64".to_string(), Value::String(encoded));
3727 }
3728 }
3729 Ok(value)
3730}
3731
3732async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3736 let engine = get_inference_engine(state);
3737 let status = engine
3738 .prepare_speech_runtime()
3739 .await
3740 .map_err(|e| e.to_string())?;
3741 serde_json::to_value(&status).map_err(|e| e.to_string())
3742}
3743
3744async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3747 let prompt = msg
3748 .params
3749 .get("prompt")
3750 .and_then(|v| v.as_str())
3751 .ok_or("missing 'prompt' parameter")?;
3752 let engine = get_inference_engine(state);
3753 let decision = engine.route_adaptive(prompt).await;
3754 serde_json::to_value(&decision).map_err(|e| e.to_string())
3755}
3756
3757async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3759 let engine = get_inference_engine(state);
3760 let profiles = engine.export_profiles().await;
3761 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3762}
3763
3764#[derive(Deserialize)]
3765#[serde(rename_all = "camelCase")]
3766struct OutcomesResolvePendingParams {
3767 action_results: Vec<(String, bool, f64, String)>,
3772}
3773
3774async fn handle_outcomes_resolve_pending(
3794 req: &JsonRpcMessage,
3795 state: &ServerState,
3796) -> Result<Value, String> {
3797 let params: OutcomesResolvePendingParams =
3798 serde_json::from_value(req.params.clone()).map_err(|e| format!("invalid params: {e}"))?;
3799 let engine = get_inference_engine(state);
3800 let mut tracker = engine.outcome_tracker.write().await;
3801 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
3802 tracker.resolve_pending_from_signals(inferred);
3803 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3804}
3805
3806async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3808 let n = session.runtime.log.lock().await.len();
3809 Ok(Value::from(n as u64))
3810}
3811
3812async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3813 let stats = session.runtime.log.lock().await.stats();
3814 serde_json::to_value(stats).map_err(|e| e.to_string())
3815}
3816
3817#[derive(Deserialize)]
3818#[serde(rename_all = "camelCase")]
3819struct EventsTruncateParams {
3820 #[serde(default)]
3821 max_events: Option<usize>,
3822 #[serde(default)]
3823 max_spans: Option<usize>,
3824}
3825
3826async fn handle_events_truncate(
3827 msg: &JsonRpcMessage,
3828 session: &crate::session::ClientSession,
3829) -> Result<Value, String> {
3830 let params: EventsTruncateParams =
3831 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3832 max_events: None,
3833 max_spans: None,
3834 });
3835 let mut log = session.runtime.log.lock().await;
3836 let removed_events = params
3837 .max_events
3838 .map(|max| log.truncate_events_keep_last(max))
3839 .unwrap_or(0);
3840 let removed_spans = params
3841 .max_spans
3842 .map(|max| log.truncate_spans_keep_last(max))
3843 .unwrap_or(0);
3844 let stats = log.stats();
3845 Ok(serde_json::json!({
3846 "removedEvents": removed_events,
3847 "removedSpans": removed_spans,
3848 "stats": stats,
3849 }))
3850}
3851
3852async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3853 let mut log = session.runtime.log.lock().await;
3854 let removed = log.clear();
3855 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3856}
3857
3858async fn handle_replan_set_config(
3863 msg: &JsonRpcMessage,
3864 session: &crate::session::ClientSession,
3865) -> Result<Value, String> {
3866 let max_replans = msg
3867 .params
3868 .get("max_replans")
3869 .and_then(|v| v.as_u64())
3870 .unwrap_or(0) as u32;
3871 let delay_ms = msg
3872 .params
3873 .get("delay_ms")
3874 .and_then(|v| v.as_u64())
3875 .unwrap_or(0);
3876 let verify_before_execute = msg
3877 .params
3878 .get("verify_before_execute")
3879 .and_then(|v| v.as_bool())
3880 .unwrap_or(true);
3881 let cfg = car_engine::ReplanConfig {
3882 max_replans,
3883 delay_ms,
3884 verify_before_execute,
3885 };
3886 session.runtime.set_replan_config(cfg).await;
3887 Ok(Value::Null)
3888}
3889
3890async fn handle_skills_list(
3891 msg: &JsonRpcMessage,
3892 session: &crate::session::ClientSession,
3893) -> Result<Value, String> {
3894 let domain = msg.params.get("domain").and_then(|v| v.as_str());
3895 let engine = session.memgine.lock().await;
3896 let skills: Vec<serde_json::Value> = engine
3897 .graph
3898 .inner
3899 .node_indices()
3900 .filter_map(|nix| {
3901 let node = engine.graph.inner.node_weight(nix)?;
3902 if node.kind != car_memgine::MemKind::Skill {
3903 return None;
3904 }
3905 let meta = car_memgine::SkillMeta::from_node(node)?;
3906 if let Some(d) = domain {
3907 match &meta.scope {
3908 car_memgine::SkillScope::Global => {}
3909 car_memgine::SkillScope::Domain(sd) if sd == d => {}
3910 _ => return None,
3911 }
3912 }
3913 Some(serde_json::to_value(&meta).unwrap_or_default())
3914 })
3915 .collect();
3916 serde_json::to_value(&skills).map_err(|e| e.to_string())
3917}
3918
3919#[derive(serde::Deserialize)]
3920struct SecretParams {
3921 #[serde(default)]
3922 service: Option<String>,
3923 key: String,
3924 #[serde(default)]
3925 value: Option<String>,
3926}
3927
3928fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3929 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3930 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3931 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3932}
3933
3934fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3935 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3936 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3937}
3938
3939fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3940 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3941 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3942}
3943
3944fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3945 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3946 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3947}
3948
3949#[derive(serde::Deserialize)]
3950struct PermParams {
3951 domain: String,
3952 #[serde(default)]
3953 target_bundle_id: Option<String>,
3954}
3955
3956fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3957 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3958 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3959}
3960
3961fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3962 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3963 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3964}
3965
3966fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3967 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3968 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3969}
3970
3971fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3972 #[derive(serde::Deserialize)]
3973 struct P {
3974 start: String,
3975 end: String,
3976 #[serde(default)]
3977 calendar_ids: Vec<String>,
3978 }
3979 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3980 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3981 .map_err(|e| format!("parse start: {}", e))?
3982 .with_timezone(&chrono::Utc);
3983 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3984 .map_err(|e| format!("parse end: {}", e))?
3985 .with_timezone(&chrono::Utc);
3986 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3987}
3988
3989fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3990 #[derive(serde::Deserialize)]
3991 struct P {
3992 query: String,
3993 #[serde(default = "default_limit")]
3994 limit: usize,
3995 #[serde(default)]
3996 container_ids: Vec<String>,
3997 }
3998 fn default_limit() -> usize {
3999 50
4000 }
4001 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4002 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
4003}
4004
4005fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
4006 #[derive(serde::Deserialize, Default)]
4007 struct P {
4008 #[serde(default)]
4009 account_ids: Vec<String>,
4010 }
4011 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
4012 car_ffi_common::integrations::mail_inbox(&p.account_ids)
4013}
4014
4015fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
4016 let raw = req.params.to_string();
4017 car_ffi_common::integrations::mail_send(&raw)
4018}
4019
4020fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
4021 #[derive(serde::Deserialize)]
4022 struct P {
4023 #[serde(default = "default_limit")]
4024 limit: usize,
4025 }
4026 fn default_limit() -> usize {
4027 50
4028 }
4029 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4030 car_ffi_common::integrations::messages_chats(p.limit)
4031}
4032
4033fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
4034 let raw = req.params.to_string();
4035 car_ffi_common::integrations::messages_send(&raw)
4036}
4037
4038fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
4039 #[derive(serde::Deserialize)]
4040 struct P {
4041 query: String,
4042 #[serde(default = "default_limit")]
4043 limit: usize,
4044 }
4045 fn default_limit() -> usize {
4046 50
4047 }
4048 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4049 car_ffi_common::integrations::notes_find(&p.query, p.limit)
4050}
4051
4052fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
4053 #[derive(serde::Deserialize)]
4054 struct P {
4055 #[serde(default = "default_limit")]
4056 limit: usize,
4057 }
4058 fn default_limit() -> usize {
4059 50
4060 }
4061 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
4062 car_ffi_common::integrations::reminders_items(p.limit)
4063}
4064
4065fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
4066 #[derive(serde::Deserialize)]
4067 struct P {
4068 #[serde(default = "default_limit")]
4069 limit: usize,
4070 }
4071 fn default_limit() -> usize {
4072 100
4073 }
4074 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
4075 car_ffi_common::integrations::bookmarks_list(p.limit)
4076}
4077
4078fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
4079 #[derive(serde::Deserialize)]
4080 struct P {
4081 start: String,
4082 end: String,
4083 }
4084 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4085 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4086 .map_err(|e| format!("parse start: {}", e))?
4087 .with_timezone(&chrono::Utc);
4088 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4089 .map_err(|e| format!("parse end: {}", e))?
4090 .with_timezone(&chrono::Utc);
4091 car_ffi_common::health::sleep_windows(s, e)
4092}
4093
4094fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4095 #[derive(serde::Deserialize)]
4096 struct P {
4097 start: String,
4098 end: String,
4099 }
4100 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4101 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4102 .map_err(|e| format!("parse start: {}", e))?
4103 .with_timezone(&chrono::Utc);
4104 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4105 .map_err(|e| format!("parse end: {}", e))?
4106 .with_timezone(&chrono::Utc);
4107 car_ffi_common::health::workouts(s, e)
4108}
4109
4110fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4111 #[derive(serde::Deserialize)]
4112 struct P {
4113 start: String,
4114 end: String,
4115 }
4116 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4117 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4118 .map_err(|e| format!("parse start: {}", e))?;
4119 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4120 .map_err(|e| format!("parse end: {}", e))?;
4121 car_ffi_common::health::activity(s, e)
4122}
4123
4124async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4125 let closed = session.browser.close().await?;
4126 Ok(serde_json::json!({"closed": closed}))
4127}
4128
4129async fn handle_browser_run(
4130 req: &JsonRpcMessage,
4131 session: &crate::session::ClientSession,
4132) -> Result<Value, String> {
4133 #[derive(serde::Deserialize)]
4134 struct BrowserRunParams {
4135 script: Value,
4137 #[serde(default)]
4138 width: Option<u32>,
4139 #[serde(default)]
4140 height: Option<u32>,
4141 #[serde(default)]
4146 headed: Option<bool>,
4147 #[serde(default)]
4150 extra_args: Option<Vec<String>>,
4151 }
4152 let params: BrowserRunParams =
4153 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4154
4155 let script_json = match params.script {
4157 Value::String(s) => s,
4158 other => other.to_string(),
4159 };
4160
4161 let browser_session = session
4162 .browser
4163 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4164 width: params.width.unwrap_or(1280),
4165 height: params.height.unwrap_or(720),
4166 headless: !params.headed.unwrap_or(false),
4167 extra_args: params.extra_args.unwrap_or_default(),
4168 })
4169 .await?;
4170
4171 let trace_json = browser_session.run(&script_json).await?;
4172 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4173}
4174
4175#[derive(Deserialize)]
4188struct VoiceStartParams {
4189 session_id: String,
4190 audio_source: Value,
4191 #[serde(default)]
4192 options: Option<Value>,
4193}
4194
4195async fn handle_voice_transcribe_stream_start(
4196 req: &JsonRpcMessage,
4197 state: &Arc<ServerState>,
4198 session: &Arc<crate::session::ClientSession>,
4199) -> Result<Value, String> {
4200 let params: VoiceStartParams =
4201 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4202 let audio_source_json =
4203 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
4204 let options_json = params
4205 .options
4206 .as_ref()
4207 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4208 .transpose()?;
4209 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4210 channel: session.channel.clone(),
4211 });
4212 let json = car_ffi_common::voice::transcribe_stream_start(
4213 ¶ms.session_id,
4214 &audio_source_json,
4215 options_json.as_deref(),
4216 state.voice_sessions.clone(),
4217 sink,
4218 )
4219 .await?;
4220 serde_json::from_str(&json).map_err(|e| e.to_string())
4221}
4222
4223#[derive(Deserialize)]
4224struct VoiceStopParams {
4225 session_id: String,
4226}
4227
4228async fn handle_voice_transcribe_stream_stop(
4229 req: &JsonRpcMessage,
4230 state: &Arc<ServerState>,
4231) -> Result<Value, String> {
4232 let params: VoiceStopParams =
4233 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4234 let json = car_ffi_common::voice::transcribe_stream_stop(
4235 ¶ms.session_id,
4236 state.voice_sessions.clone(),
4237 )
4238 .await?;
4239 serde_json::from_str(&json).map_err(|e| e.to_string())
4240}
4241
4242#[derive(Deserialize)]
4243struct VoicePushParams {
4244 session_id: String,
4245 pcm_b64: String,
4249}
4250
4251async fn handle_voice_transcribe_stream_push(
4252 req: &JsonRpcMessage,
4253 state: &Arc<ServerState>,
4254) -> Result<Value, String> {
4255 use base64::Engine;
4256 let params: VoicePushParams =
4257 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4258 let pcm = base64::engine::general_purpose::STANDARD
4259 .decode(¶ms.pcm_b64)
4260 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4261 let json = car_ffi_common::voice::transcribe_stream_push(
4262 ¶ms.session_id,
4263 &pcm,
4264 state.voice_sessions.clone(),
4265 )
4266 .await?;
4267 serde_json::from_str(&json).map_err(|e| e.to_string())
4268}
4269
4270fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4271 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4272 serde_json::from_str(&json).unwrap_or(Value::Null)
4273}
4274
4275async fn handle_voice_dispatch_turn(
4276 req: &JsonRpcMessage,
4277 state: &Arc<ServerState>,
4278 session: &Arc<crate::session::ClientSession>,
4279) -> Result<Value, String> {
4280 let req_value = req.params.clone();
4281 let request: crate::voice_turn::DispatchVoiceTurnRequest =
4282 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4283 let engine = get_inference_engine(state).clone();
4284 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4285 channel: session.channel.clone(),
4286 });
4287 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4288 serde_json::to_value(resp).map_err(|e| e.to_string())
4289}
4290
4291async fn handle_voice_cancel_turn() -> Result<Value, String> {
4292 crate::voice_turn::cancel().await;
4293 Ok(serde_json::json!({"cancelled": true}))
4294}
4295
4296async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4297 let engine = get_inference_engine(state).clone();
4298 crate::voice_turn::prewarm(engine).await;
4299 Ok(serde_json::json!({"prewarmed": true}))
4300}
4301
4302fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4321 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4322 std::sync::OnceLock::new();
4323 SLOT.get_or_init(|| std::sync::RwLock::new(None))
4324}
4325
4326fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4327 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4328 std::sync::OnceLock::new();
4329 MAP.get_or_init(dashmap::DashMap::new)
4330}
4331
4332fn ws_runner_completions() -> &'static dashmap::DashMap<
4333 String,
4334 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4335> {
4336 static MAP: std::sync::OnceLock<
4337 dashmap::DashMap<
4338 String,
4339 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4340 >,
4341 > = std::sync::OnceLock::new();
4342 MAP.get_or_init(dashmap::DashMap::new)
4343}
4344
4345struct WsInferenceRunner;
4346
4347#[async_trait::async_trait]
4348impl car_inference::InferenceRunner for WsInferenceRunner {
4349 async fn run(
4350 &self,
4351 request: car_inference::tasks::generate::GenerateRequest,
4352 emitter: car_inference::EventEmitter,
4353 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4354 let channel = ws_runner_session()
4355 .read()
4356 .map_err(|e| {
4357 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4358 })?
4359 .clone()
4360 .ok_or_else(|| {
4361 car_inference::RunnerError::Declined(
4362 "no WebSocket inference runner registered — call inference.register_runner first"
4363 .into(),
4364 )
4365 })?;
4366
4367 let call_id = uuid::Uuid::new_v4().to_string();
4368 let request_json = serde_json::to_value(&request)
4369 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4370 let (tx, rx) = tokio::sync::oneshot::channel();
4371 ws_runner_calls().insert(call_id.clone(), emitter);
4372 ws_runner_completions().insert(call_id.clone(), tx);
4373
4374 use futures::SinkExt;
4376 let notification = serde_json::json!({
4377 "jsonrpc": "2.0",
4378 "method": "inference.runner.invoke",
4379 "params": {
4380 "call_id": call_id,
4381 "request": request_json,
4382 },
4383 });
4384 let text = serde_json::to_string(¬ification)
4385 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4386 let _ = channel
4387 .write
4388 .lock()
4389 .await
4390 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4391 .await;
4392
4393 let result = rx.await.map_err(|_| {
4394 car_inference::RunnerError::Failed("runner completion channel dropped".into())
4395 })?;
4396 ws_runner_calls().remove(&call_id);
4397 result.map_err(car_inference::RunnerError::Failed)
4398 }
4399}
4400
4401async fn handle_inference_register_runner(
4402 session: &Arc<crate::session::ClientSession>,
4403) -> Result<Value, String> {
4404 let mut guard = ws_runner_session()
4405 .write()
4406 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4407 *guard = Some(session.channel.clone());
4408 drop(guard);
4409 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4410 Ok(serde_json::json!({"registered": true}))
4411}
4412
4413#[derive(serde::Deserialize)]
4414struct InferenceRunnerEventParams {
4415 call_id: String,
4416 event: Value,
4417}
4418
4419async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4420 let params: InferenceRunnerEventParams =
4421 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4422 let stream_event = match parse_runner_event_value(¶ms.event) {
4423 Some(e) => e,
4424 None => return Err("unrecognised runner event shape".into()),
4425 };
4426 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
4427 let emitter = entry.value().clone();
4428 tokio::spawn(async move { emitter.emit(stream_event).await });
4429 }
4430 Ok(serde_json::json!({"emitted": true}))
4431}
4432
4433#[derive(serde::Deserialize)]
4434struct InferenceRunnerCompleteParams {
4435 call_id: String,
4436 result: Value,
4437}
4438
4439async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4440 let params: InferenceRunnerCompleteParams =
4441 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4442 let result: std::result::Result<car_inference::RunnerResult, String> =
4443 serde_json::from_value(params.result)
4444 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4445 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4446 let _ = tx.send(result);
4447 }
4448 Ok(serde_json::json!({"completed": true}))
4449}
4450
4451#[derive(serde::Deserialize)]
4452struct InferenceRunnerFailParams {
4453 call_id: String,
4454 error: String,
4455}
4456
4457async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4458 let params: InferenceRunnerFailParams =
4459 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4460 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4461 let _ = tx.send(Err(params.error));
4462 }
4463 Ok(serde_json::json!({"failed": true}))
4464}
4465
4466fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4467 let ty = v.get("type").and_then(|t| t.as_str())?;
4468 match ty {
4469 "text" => Some(car_inference::StreamEvent::TextDelta(
4470 v.get("data")?.as_str()?.to_string(),
4471 )),
4472 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4473 name: v.get("name")?.as_str()?.to_string(),
4474 index: v.get("index")?.as_u64()? as usize,
4475 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4476 }),
4477 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4478 index: v.get("index")?.as_u64()? as usize,
4479 arguments_delta: v.get("data")?.as_str()?.to_string(),
4480 }),
4481 "usage" => Some(car_inference::StreamEvent::Usage {
4482 input_tokens: v.get("input_tokens")?.as_u64()?,
4483 output_tokens: v.get("output_tokens")?.as_u64()?,
4484 }),
4485 "done" => Some(car_inference::StreamEvent::Done {
4486 text: v.get("text")?.as_str()?.to_string(),
4487 tool_calls: v
4488 .get("tool_calls")
4489 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4490 .unwrap_or_default(),
4491 }),
4492 _ => None,
4493 }
4494}
4495
4496#[derive(Deserialize)]
4497struct EnrollSpeakerParams {
4498 label: String,
4499 audio: Value,
4500}
4501
4502async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4503 let params: EnrollSpeakerParams =
4504 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4505 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
4506 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
4507 serde_json::from_str(&json).map_err(|e| e.to_string())
4508}
4509
4510#[derive(Deserialize)]
4511struct RemoveEnrollmentParams {
4512 label: String,
4513}
4514
4515fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4516 let params: RemoveEnrollmentParams =
4517 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4518 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
4519 serde_json::from_str(&json).map_err(|e| e.to_string())
4520}
4521
4522#[derive(Deserialize)]
4523struct WorkflowRunParams {
4524 workflow: Value,
4525}
4526
4527async fn handle_workflow_run(
4528 req: &JsonRpcMessage,
4529 session: &Arc<crate::session::ClientSession>,
4530) -> Result<Value, String> {
4531 let params: WorkflowRunParams =
4532 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4533 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4534 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4535 channel: session.channel.clone(),
4536 host: session.host.clone(),
4537 client_id: session.client_id.clone(),
4538 });
4539 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4540 serde_json::from_str(&json).map_err(|e| e.to_string())
4541}
4542
4543#[derive(Deserialize)]
4544struct WorkflowVerifyParams {
4545 workflow: Value,
4546}
4547
4548fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4549 let params: WorkflowVerifyParams =
4550 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4551 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4552 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4553 serde_json::from_str(&json).map_err(|e| e.to_string())
4554}
4555
4556async fn handle_meeting_start(
4561 req: &JsonRpcMessage,
4562 state: &Arc<ServerState>,
4563 session: &Arc<crate::session::ClientSession>,
4564) -> Result<Value, String> {
4565 let mut req_value = req.params.clone();
4571 let meeting_id = req_value
4572 .get("id")
4573 .and_then(|v| v.as_str())
4574 .map(str::to_string)
4575 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4576 if let Some(map) = req_value.as_object_mut() {
4577 map.insert("id".into(), Value::String(meeting_id.clone()));
4578 }
4579 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4580
4581 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4582 Arc::new(crate::session::WsVoiceEventSink {
4583 channel: session.channel.clone(),
4584 });
4585
4586 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4591 Arc::new(crate::session::WsMemgineIngestSink {
4592 meeting_id,
4593 engine: session.memgine.clone(),
4594 upstream: ws_upstream,
4595 });
4596
4597 let cwd = std::env::current_dir().ok();
4598 let json = crate::meeting::start_meeting(
4599 &request_json,
4600 state.meetings.clone(),
4601 state.voice_sessions.clone(),
4602 upstream,
4603 None,
4604 cwd,
4605 )
4606 .await?;
4607 serde_json::from_str(&json).map_err(|e| e.to_string())
4608}
4609
4610#[derive(Deserialize)]
4611struct MeetingStopParams {
4612 meeting_id: String,
4613 #[serde(default = "default_summarize")]
4614 summarize: bool,
4615}
4616
4617fn default_summarize() -> bool {
4618 true
4619}
4620
4621async fn handle_meeting_stop(
4622 req: &JsonRpcMessage,
4623 state: &Arc<ServerState>,
4624 _session: &Arc<crate::session::ClientSession>,
4625) -> Result<Value, String> {
4626 let params: MeetingStopParams =
4627 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4628 let inference = if params.summarize {
4629 Some(state.inference.get().cloned()).flatten()
4630 } else {
4631 None
4632 };
4633 let json = crate::meeting::stop_meeting(
4634 ¶ms.meeting_id,
4635 params.summarize,
4636 state.meetings.clone(),
4637 state.voice_sessions.clone(),
4638 inference,
4639 )
4640 .await?;
4641 serde_json::from_str(&json).map_err(|e| e.to_string())
4642}
4643
4644#[derive(Deserialize, Default)]
4645struct MeetingListParams {
4646 #[serde(default)]
4647 root: Option<std::path::PathBuf>,
4648}
4649
4650fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4651 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4652 let cwd = std::env::current_dir().ok();
4653 let json = crate::meeting::list_meetings(params.root, cwd)?;
4654 serde_json::from_str(&json).map_err(|e| e.to_string())
4655}
4656
4657#[derive(Deserialize)]
4658struct MeetingGetParams {
4659 meeting_id: String,
4660 #[serde(default)]
4661 root: Option<std::path::PathBuf>,
4662}
4663
4664fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4665 let params: MeetingGetParams =
4666 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4667 let cwd = std::env::current_dir().ok();
4668 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4669 serde_json::from_str(&json).map_err(|e| e.to_string())
4670}
4671
4672#[derive(Deserialize, Default)]
4677struct RegistryRegisterParams {
4678 entry: Value,
4682 #[serde(default)]
4683 registry_path: Option<std::path::PathBuf>,
4684}
4685
4686fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4687 let params: RegistryRegisterParams =
4688 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4689 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4690 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4691 Ok(Value::Null)
4692}
4693
4694#[derive(Deserialize, Default)]
4695struct RegistryNameParams {
4696 name: String,
4697 #[serde(default)]
4698 registry_path: Option<std::path::PathBuf>,
4699}
4700
4701fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4702 let params: RegistryNameParams =
4703 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4704 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4705 serde_json::from_str(&json).map_err(|e| e.to_string())
4706}
4707
4708fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4709 let params: RegistryNameParams =
4710 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4711 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4712 Ok(Value::Null)
4713}
4714
4715#[derive(Deserialize, Default)]
4716struct RegistryListParams {
4717 #[serde(default)]
4718 registry_path: Option<std::path::PathBuf>,
4719}
4720
4721fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4722 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4723 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4724 serde_json::from_str(&json).map_err(|e| e.to_string())
4725}
4726
4727#[derive(Deserialize, Default)]
4728struct RegistryReapParams {
4729 #[serde(default = "default_reap_age")]
4732 max_age_secs: u64,
4733 #[serde(default)]
4734 registry_path: Option<std::path::PathBuf>,
4735}
4736
4737fn default_reap_age() -> u64 {
4738 60
4739}
4740
4741fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4742 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4743 let json =
4744 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4745 serde_json::from_str(&json).map_err(|e| e.to_string())
4746}
4747
4748async fn handle_a2a_start(
4755 req: &JsonRpcMessage,
4756 session: &crate::session::ClientSession,
4757) -> Result<Value, String> {
4758 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4759 let json = crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
4765 serde_json::from_str(&json).map_err(|e| e.to_string())
4766}
4767
4768fn handle_a2a_stop() -> Result<Value, String> {
4769 let json = crate::a2a::stop_a2a()?;
4770 serde_json::from_str(&json).map_err(|e| e.to_string())
4771}
4772
4773fn handle_a2a_status() -> Result<Value, String> {
4774 let json = crate::a2a::a2a_status()?;
4775 serde_json::from_str(&json).map_err(|e| e.to_string())
4776}
4777
4778#[derive(Deserialize)]
4779#[serde(rename_all = "camelCase")]
4780struct A2aSendParams {
4781 endpoint: String,
4782 message: car_a2a::Message,
4783 #[serde(default)]
4784 blocking: bool,
4785 #[serde(default = "default_true")]
4786 ingest_a2ui: bool,
4787 #[serde(default)]
4788 route_auth: Option<A2aRouteAuth>,
4789 #[serde(default)]
4790 allow_untrusted_endpoint: bool,
4791}
4792
4793fn default_true() -> bool {
4794 true
4795}
4796
4797async fn handle_a2a_dispatch(
4807 method: &str,
4808 req: &JsonRpcMessage,
4809 state: &Arc<ServerState>,
4810) -> Result<Value, String> {
4811 let dispatcher = state.a2a_dispatcher().await;
4812 dispatcher
4813 .dispatch(method, req.params.clone())
4814 .await
4815 .map_err(|e| e.to_string())
4816}
4817
4818async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4819 let params: A2aSendParams =
4820 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4821 let endpoint = trusted_route_endpoint(
4822 Some(params.endpoint.clone()),
4823 params.allow_untrusted_endpoint,
4824 )
4825 .ok_or_else(|| {
4826 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4827 })?;
4828 let client = match params.route_auth.clone() {
4829 Some(auth) => {
4830 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4831 }
4832 None => car_a2a::A2aClient::new(endpoint.clone()),
4833 };
4834 let result = client
4835 .send_message(params.message, params.blocking)
4836 .await
4837 .map_err(|e| e.to_string())?;
4838 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4839 let mut applied = Vec::new();
4840 if params.ingest_a2ui {
4841 state
4842 .a2ui
4843 .validate_payload(&result_value)
4844 .map_err(|e| e.to_string())?;
4845 let routed_endpoint = Some(endpoint.clone());
4846 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4847 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4848 if owner.endpoint.is_none() {
4849 owner.with_endpoint(routed_endpoint.clone())
4850 } else {
4851 owner
4852 }
4853 });
4854 applied.push(
4855 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4856 );
4857 }
4858 }
4859 Ok(serde_json::json!({
4860 "result": result,
4861 "a2ui": {
4862 "applied": applied,
4863 }
4864 }))
4865}
4866
4867async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4875 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4876 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4877 serde_json::from_str(&json).map_err(|e| e.to_string())
4878}
4879
4880async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4881 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4882 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4883 serde_json::from_str(&json).map_err(|e| e.to_string())
4884}
4885
4886async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4887 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4888 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4889 serde_json::from_str(&json).map_err(|e| e.to_string())
4890}
4891
4892async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4893 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4894 let json = car_ffi_common::notifications::local(&args_json).await?;
4895 serde_json::from_str(&json).map_err(|e| e.to_string())
4896}
4897
4898async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4899 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4900 let json = car_ffi_common::vision::ocr(&args_json).await?;
4901 serde_json::from_str(&json).map_err(|e| e.to_string())
4902}
4903
4904async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4909 let agents = match state.observer_manifest_path() {
4918 Some(p) => car_registry::supervisor::Supervisor::list_from_manifest(p)
4919 .map_err(|e| e.to_string())?,
4920 None => {
4921 let supervisor = state.supervisor()?;
4922 supervisor.list().await
4923 }
4924 };
4925 let attached = state.attached_agents.lock().await.clone();
4932 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4933 for a in agents {
4934 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4935 let session_id = attached.get(&a.spec.id).cloned();
4936 if let Some(map) = v.as_object_mut() {
4937 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4938 if let Some(sid) = session_id {
4939 map.insert("session_id".to_string(), Value::String(sid));
4940 }
4941 }
4942 decorated.push(v);
4943 }
4944 Ok(Value::Array(decorated))
4945}
4946
4947async fn handle_agents_upsert(
4948 req: &JsonRpcMessage,
4949 state: &Arc<ServerState>,
4950) -> Result<Value, String> {
4951 let mut params = req.params.clone();
4952 if let Some(name) = params
4961 .get("interpreter")
4962 .and_then(|v| v.as_str())
4963 .map(str::to_string)
4964 {
4965 let resolved =
4966 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
4967 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4968 }
4969 let spec: car_registry::supervisor::AgentSpec =
4970 serde_json::from_value(params).map_err(|e| e.to_string())?;
4971 let supervisor = state.supervisor()?;
4972 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4973 serde_json::to_value(agent).map_err(|e| e.to_string())
4974}
4975
4976async fn handle_agents_install(
4990 req: &JsonRpcMessage,
4991 state: &Arc<ServerState>,
4992) -> Result<Value, String> {
4993 let manifest: car_registry::manifest::AgentManifest =
4994 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4995 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
4996 let supervisor = state.supervisor()?;
4997 let (report, managed) = supervisor
4998 .install_manifest(manifest, &host)
4999 .await
5000 .map_err(|e| e.to_string())?;
5001 Ok(serde_json::json!({
5002 "report": {
5003 "missingOptional": report
5004 .missing_optional
5005 .iter()
5006 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
5007 .collect::<Vec<_>>(),
5008 },
5009 "agent": managed,
5010 }))
5011}
5012
5013async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
5014 let entries = match state.observer_manifest_path() {
5020 Some(p) => car_registry::supervisor::Supervisor::health_from_manifest(p)
5021 .map_err(|e| e.to_string())?,
5022 None => {
5023 let supervisor = state.supervisor()?;
5024 supervisor.health().await
5025 }
5026 };
5027 serde_json::to_value(entries).map_err(|e| e.to_string())
5028}
5029
5030fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
5031 req.params
5032 .get("id")
5033 .and_then(Value::as_str)
5034 .map(str::to_string)
5035 .ok_or_else(|| "missing required `id` parameter".to_string())
5036}
5037
5038async fn handle_agents_remove(
5039 req: &JsonRpcMessage,
5040 state: &Arc<ServerState>,
5041) -> Result<Value, String> {
5042 let id = extract_agent_id(req)?;
5043 let supervisor = state.supervisor()?;
5044 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
5045 Ok(serde_json::json!({ "removed": removed }))
5046}
5047
5048async fn handle_agents_start(
5049 req: &JsonRpcMessage,
5050 state: &Arc<ServerState>,
5051) -> Result<Value, String> {
5052 let id = extract_agent_id(req)?;
5053 let supervisor = state.supervisor()?;
5054 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
5055 serde_json::to_value(agent).map_err(|e| e.to_string())
5056}
5057
5058async fn handle_agents_stop(
5059 req: &JsonRpcMessage,
5060 state: &Arc<ServerState>,
5061) -> Result<Value, String> {
5062 let id = extract_agent_id(req)?;
5063 let signal: car_registry::supervisor::StopSignal = req
5064 .params
5065 .get("signal")
5066 .map(|v| serde_json::from_value(v.clone()))
5067 .transpose()
5068 .map_err(|e| e.to_string())?
5069 .unwrap_or_default();
5070 let supervisor = state.supervisor()?;
5071 let agent = supervisor
5072 .stop(&id, signal)
5073 .await
5074 .map_err(|e| e.to_string())?;
5075 serde_json::to_value(agent).map_err(|e| e.to_string())
5076}
5077
5078async fn handle_agents_restart(
5079 req: &JsonRpcMessage,
5080 state: &Arc<ServerState>,
5081) -> Result<Value, String> {
5082 let id = extract_agent_id(req)?;
5083 let supervisor = state.supervisor()?;
5084 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
5085 serde_json::to_value(agent).map_err(|e| e.to_string())
5086}
5087
5088async fn handle_agents_tail_log(
5089 req: &JsonRpcMessage,
5090 state: &Arc<ServerState>,
5091) -> Result<Value, String> {
5092 let id = extract_agent_id(req)?;
5093 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
5094 let supervisor = state.supervisor()?;
5095 let lines = supervisor
5096 .tail_log(&id, n)
5097 .await
5098 .map_err(|e| e.to_string())?;
5099 Ok(serde_json::json!({ "lines": lines }))
5100}
5101
5102async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5113 let include_health = req
5114 .params
5115 .get("include_health")
5116 .and_then(Value::as_bool)
5117 .unwrap_or(false);
5118 let json = car_ffi_common::external_agents::list(include_health).await?;
5119 serde_json::from_str(&json).map_err(|e| e.to_string())
5120}
5121
5122async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5123 let include_health = req
5124 .params
5125 .get("include_health")
5126 .and_then(Value::as_bool)
5127 .unwrap_or(false);
5128 let json = car_ffi_common::external_agents::detect(include_health).await?;
5129 serde_json::from_str(&json).map_err(|e| e.to_string())
5130}
5131
5132async fn handle_agents_invoke_external(
5150 req: &JsonRpcMessage,
5151 state: &Arc<ServerState>,
5152 host_session: &Arc<crate::session::ClientSession>,
5153) -> Result<Value, String> {
5154 let id = req
5155 .params
5156 .get("id")
5157 .and_then(Value::as_str)
5158 .ok_or_else(|| "missing required `id` parameter".to_string())?
5159 .to_string();
5160 let task = req
5161 .params
5162 .get("task")
5163 .and_then(Value::as_str)
5164 .ok_or_else(|| "missing required `task` parameter".to_string())?
5165 .to_string();
5166 let stream = req
5167 .params
5168 .get("stream")
5169 .and_then(Value::as_bool)
5170 .unwrap_or(false);
5171 let session_id = req
5172 .params
5173 .get("session_id")
5174 .and_then(Value::as_str)
5175 .map(str::to_string)
5176 .unwrap_or_else(|| format!("ext-{}", uuid::Uuid::new_v4().simple()));
5177
5178 let mut options_value = req.params.clone();
5184 if let Some(obj) = options_value.as_object_mut() {
5185 obj.remove("id");
5186 obj.remove("task");
5187 obj.remove("stream");
5188 obj.remove("session_id");
5189 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5198 if !has_explicit_mcp {
5199 if let Some(url) = state.mcp_url.get() {
5200 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5201 }
5202 }
5203 }
5204
5205 if !stream {
5206 let options_json = options_value.to_string();
5209 let json = car_ffi_common::external_agents::invoke(&id, &task, &options_json).await?;
5210 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5211 append_external_agent_audit(&id, &task, &options_value, &result);
5212 return Ok(result);
5213 }
5214
5215 let opts: car_external_agents::InvokeOptions = serde_json::from_value(options_value.clone())
5221 .map_err(|e| format!("invalid options: {e}"))?;
5222
5223 {
5233 let mut chats = state.chat_sessions.lock().await;
5243 chats.entry(session_id.clone()).or_insert_with(|| {
5244 let created_at = std::time::SystemTime::now()
5245 .duration_since(std::time::UNIX_EPOCH)
5246 .map(|d| d.as_secs())
5247 .unwrap_or(0);
5248 crate::session::ChatSession {
5249 agent_id: id.clone(),
5250 host_client_id: host_session.client_id.clone(),
5251 created_at,
5252 }
5253 });
5254 }
5255
5256 use tokio::sync::mpsc;
5263 let (tx, mut rx) = mpsc::unbounded_channel::<car_external_agents::StreamEvent>();
5264
5265 let drain_state = state.clone();
5266 let drain_session_id = session_id.clone();
5267 let drain_agent_id = id.clone();
5268 tokio::spawn(async move {
5269 while let Some(event) = rx.recv().await {
5270 emit_external_chat_event(&drain_state, &drain_session_id, &drain_agent_id, event).await;
5271 }
5272 });
5273
5274 let emitter_tx = tx.clone();
5275 let emitter: car_external_agents::StreamEventEmitter = Arc::new(move |event| {
5276 let _ = emitter_tx.send(event);
5281 });
5282
5283 let spawn_state = state.clone();
5289 let spawn_session_id = session_id.clone();
5290 let spawn_id = id.clone();
5291 let spawn_task = task.clone();
5292 let spawn_options = options_value.clone();
5293 tokio::spawn(async move {
5294 let outcome =
5295 car_external_agents::invoke_with_emitter(&spawn_id, &spawn_task, opts, Some(emitter))
5296 .await;
5297 drop(tx); let terminal_params: Value;
5304 let result_value: Value;
5305 match outcome {
5306 Ok(res) => {
5307 let mut parts: Vec<String> = Vec::new();
5314 if res.turns > 0 {
5315 parts.push(format!(
5316 "{} turn{}",
5317 res.turns,
5318 if res.turns == 1 { "" } else { "s" }
5319 ));
5320 }
5321 if res.tool_calls > 0 {
5322 parts.push(format!(
5323 "{} tool{}",
5324 res.tool_calls,
5325 if res.tool_calls == 1 { "" } else { "s" }
5326 ));
5327 }
5328 if res.duration_ms > 0 {
5329 parts.push(format!("{:.1}s", res.duration_ms as f64 / 1000.0));
5330 }
5331 let summary = if parts.is_empty() {
5332 "stop".to_string()
5333 } else {
5334 parts.join(" · ")
5335 };
5336 if res.is_error {
5337 terminal_params = serde_json::json!({
5338 "session_id": spawn_session_id,
5339 "agent_id": spawn_id,
5340 "kind": "error",
5341 "error": res.error.clone().unwrap_or_else(|| "external agent reported error".to_string()),
5342 });
5343 } else {
5344 terminal_params = serde_json::json!({
5345 "session_id": spawn_session_id,
5346 "agent_id": spawn_id,
5347 "kind": "done",
5348 "finish_reason": summary,
5349 });
5350 }
5351 result_value = serde_json::to_value(&res).unwrap_or(Value::Null);
5352 }
5353 Err(e) => {
5354 let message = format!("{e}");
5355 terminal_params = serde_json::json!({
5356 "session_id": spawn_session_id,
5357 "agent_id": spawn_id,
5358 "kind": "error",
5359 "error": message.clone(),
5360 });
5361 result_value = serde_json::json!({ "is_error": true, "error": message });
5362 }
5363 }
5364 send_external_chat_frame(&spawn_state, &spawn_session_id, terminal_params).await;
5365 spawn_state
5366 .chat_sessions
5367 .lock()
5368 .await
5369 .remove(&spawn_session_id);
5370 append_external_agent_audit(&spawn_id, &spawn_task, &spawn_options, &result_value);
5371 });
5372
5373 Ok(serde_json::json!({
5374 "accepted": true,
5375 "session_id": session_id,
5376 }))
5377}
5378
5379async fn emit_external_chat_event(
5396 state: &Arc<ServerState>,
5397 session_id: &str,
5398 agent_id: &str,
5399 event: car_external_agents::StreamEvent,
5400) {
5401 use car_external_agents::StreamEvent;
5402 match event {
5403 StreamEvent::Assistant(a) => {
5404 if let Some(content) = a.message.get("content").and_then(|v| v.as_array()) {
5405 for block in content {
5406 let block_type = block.get("type").and_then(|v| v.as_str()).unwrap_or("");
5407 match block_type {
5408 "text" => {
5409 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
5410 if !text.is_empty() {
5411 let params = serde_json::json!({
5412 "session_id": session_id,
5413 "agent_id": agent_id,
5414 "kind": "token",
5415 "delta": text,
5416 });
5417 send_external_chat_frame(state, session_id, params).await;
5418 }
5419 }
5420 }
5421 "tool_use" => {
5422 let name = block
5423 .get("name")
5424 .and_then(|v| v.as_str())
5425 .unwrap_or("(unknown tool)");
5426 let params = serde_json::json!({
5427 "session_id": session_id,
5428 "agent_id": agent_id,
5429 "kind": "tool_call",
5430 "detail": name,
5431 });
5432 send_external_chat_frame(state, session_id, params).await;
5433 }
5434 _ => {}
5435 }
5436 }
5437 }
5438 }
5439 _ => {
5440 }
5445 }
5446}
5447
5448async fn send_external_chat_frame(state: &Arc<ServerState>, session_id: &str, params: Value) {
5453 use futures::SinkExt;
5454 use tokio_tungstenite::tungstenite::Message;
5455
5456 let host_client_id = state
5457 .chat_sessions
5458 .lock()
5459 .await
5460 .get(session_id)
5461 .map(|s| s.host_client_id.clone());
5462 let Some(host_client_id) = host_client_id else {
5463 return;
5464 };
5465 let host_channel = {
5466 let sessions = state.sessions.lock().await;
5467 sessions.get(&host_client_id).map(|s| s.channel.clone())
5468 };
5469 let Some(channel) = host_channel else {
5470 return;
5471 };
5472 let frame = serde_json::json!({
5473 "jsonrpc": "2.0",
5474 "method": "agents.chat.event",
5475 "params": params,
5476 });
5477 if let Ok(text) = serde_json::to_string(&frame) {
5478 let _ = channel
5479 .write
5480 .lock()
5481 .await
5482 .send(Message::Text(text.into()))
5483 .await;
5484 }
5485}
5486
5487fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5493 use std::io::Write;
5494 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5495 Some(home) => home.join(".car"),
5496 None => return,
5497 };
5498 if std::fs::create_dir_all(&car_dir).is_err() {
5499 return;
5500 }
5501 let path = car_dir.join("external-agents.jsonl");
5502 let record = serde_json::json!({
5503 "ts": chrono::Utc::now().to_rfc3339(),
5504 "adapter_id": id,
5505 "task": task,
5506 "options": options,
5507 "result": result,
5508 });
5509 let line = match serde_json::to_string(&record) {
5510 Ok(s) => s,
5511 Err(_) => return,
5512 };
5513 if let Ok(mut f) = std::fs::OpenOptions::new()
5514 .create(true)
5515 .append(true)
5516 .open(&path)
5517 {
5518 let _ = writeln!(f, "{}", line);
5519 } else {
5520 tracing::warn!(
5521 path = %path.display(),
5522 "failed to append external-agent audit record"
5523 );
5524 }
5525}
5526
5527async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5533 let force = req
5534 .params
5535 .get("force")
5536 .and_then(Value::as_bool)
5537 .unwrap_or(false);
5538 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5539 let json = car_ffi_common::external_agents::health_one(id, force).await?;
5540 serde_json::from_str(&json).map_err(|e| e.to_string())
5541 } else {
5542 let json = car_ffi_common::external_agents::health(force).await?;
5543 serde_json::from_str(&json).map_err(|e| e.to_string())
5544 }
5545}
5546
5547const AGENT_CHAT_ACK_TIMEOUT_SECS: u64 = 5;
5565
5566async fn handle_agents_chat(
5571 req: &JsonRpcMessage,
5572 state: &Arc<ServerState>,
5573 host_session: &Arc<crate::session::ClientSession>,
5574) -> Result<Value, String> {
5575 use futures::SinkExt;
5576 use tokio::sync::oneshot;
5577 use tokio_tungstenite::tungstenite::Message;
5578
5579 let agent_id = req
5580 .params
5581 .get("agent_id")
5582 .and_then(Value::as_str)
5583 .ok_or_else(|| "`agents.chat` requires `agent_id`".to_string())?
5584 .to_string();
5585 let prompt = req
5586 .params
5587 .get("prompt")
5588 .and_then(Value::as_str)
5589 .ok_or_else(|| "`agents.chat` requires `prompt`".to_string())?
5590 .to_string();
5591 let session_id = req
5592 .params
5593 .get("session_id")
5594 .and_then(Value::as_str)
5595 .map(str::to_string)
5596 .unwrap_or_else(|| format!("chat-{}", uuid::Uuid::new_v4().simple()));
5597 let stream = req
5598 .params
5599 .get("stream")
5600 .and_then(Value::as_bool)
5601 .unwrap_or(true);
5602 let voice_input = req
5603 .params
5604 .get("voice_input")
5605 .and_then(Value::as_bool)
5606 .unwrap_or(false);
5607
5608 let agent_client_id = state
5614 .attached_agents
5615 .lock()
5616 .await
5617 .get(&agent_id)
5618 .cloned()
5619 .ok_or_else(|| {
5620 format!(
5621 "agent `{}` is not attached to this daemon — supervisor may have it stopped, or it hasn't called session.auth yet",
5622 agent_id
5623 )
5624 })?;
5625 let agent_channel = {
5626 let sessions = state.sessions.lock().await;
5627 sessions
5628 .get(&agent_client_id)
5629 .map(|s| s.channel.clone())
5630 .ok_or_else(|| {
5631 format!(
5632 "agent `{}` client_id `{}` not found in session registry (raced with disconnect)",
5633 agent_id, agent_client_id
5634 )
5635 })?
5636 };
5637
5638 {
5644 let created_at = std::time::SystemTime::now()
5645 .duration_since(std::time::UNIX_EPOCH)
5646 .map(|d| d.as_secs())
5647 .unwrap_or(0);
5648 state.chat_sessions.lock().await.insert(
5649 session_id.clone(),
5650 crate::session::ChatSession {
5651 agent_id: agent_id.clone(),
5652 host_client_id: host_session.client_id.clone(),
5653 created_at,
5654 },
5655 );
5656 }
5657
5658 let request_id = agent_channel.next_request_id();
5665 let (tx, rx) = oneshot::channel();
5666 agent_channel
5667 .pending
5668 .lock()
5669 .await
5670 .insert(request_id.clone(), tx);
5671
5672 let rpc_request = serde_json::json!({
5673 "jsonrpc": "2.0",
5674 "method": "agent.chat",
5675 "params": {
5676 "session_id": session_id,
5677 "prompt": prompt,
5678 "stream": stream,
5679 "context": {
5680 "host_client_id": host_session.client_id,
5681 "voice_input": voice_input,
5682 },
5683 },
5684 "id": request_id,
5685 });
5686 let msg = Message::Text(
5687 serde_json::to_string(&rpc_request)
5688 .map_err(|e| e.to_string())?
5689 .into(),
5690 );
5691 if let Err(e) = agent_channel.write.lock().await.send(msg).await {
5692 agent_channel.pending.lock().await.remove(&request_id);
5696 state.chat_sessions.lock().await.remove(&session_id);
5697 return Err(format!(
5698 "failed to deliver agent.chat to `{}`: {}",
5699 agent_id, e
5700 ));
5701 }
5702
5703 let ack = match tokio::time::timeout(
5708 std::time::Duration::from_secs(AGENT_CHAT_ACK_TIMEOUT_SECS),
5709 rx,
5710 )
5711 .await
5712 {
5713 Ok(Ok(resp)) => resp,
5714 Ok(Err(_)) => {
5715 state.chat_sessions.lock().await.remove(&session_id);
5717 return Err(format!(
5718 "agent `{}` disconnected before acking agents.chat",
5719 agent_id
5720 ));
5721 }
5722 Err(_) => {
5723 agent_channel.pending.lock().await.remove(&request_id);
5727 state.chat_sessions.lock().await.remove(&session_id);
5728 return Err(format!(
5729 "agent `{}` did not ack agents.chat within {}s",
5730 agent_id, AGENT_CHAT_ACK_TIMEOUT_SECS
5731 ));
5732 }
5733 };
5734
5735 if let Some(err) = ack.error {
5736 state.chat_sessions.lock().await.remove(&session_id);
5738 return Err(format!("agent `{}` rejected chat: {}", agent_id, err));
5739 }
5740
5741 Ok(serde_json::json!({
5742 "accepted": true,
5743 "session_id": session_id,
5744 }))
5745}
5746
5747async fn handle_agents_chat_cancel(
5755 req: &JsonRpcMessage,
5756 state: &Arc<ServerState>,
5757) -> Result<Value, String> {
5758 use futures::SinkExt;
5759 use tokio_tungstenite::tungstenite::Message;
5760
5761 let session_id = req
5762 .params
5763 .get("session_id")
5764 .and_then(Value::as_str)
5765 .ok_or_else(|| "`agents.chat.cancel` requires `session_id`".to_string())?
5766 .to_string();
5767
5768 let chat = state.chat_sessions.lock().await.remove(&session_id);
5769 let chat = match chat {
5770 Some(c) => c,
5771 None => {
5772 return Ok(serde_json::json!({ "cancelled": false, "reason": "unknown session_id" }));
5774 }
5775 };
5776
5777 let agent_client_id = state
5780 .attached_agents
5781 .lock()
5782 .await
5783 .get(&chat.agent_id)
5784 .cloned();
5785 if let Some(client_id) = agent_client_id {
5786 let channel_opt = {
5787 let sessions = state.sessions.lock().await;
5788 sessions.get(&client_id).map(|s| s.channel.clone())
5789 };
5790 if let Some(channel) = channel_opt {
5791 let notification = serde_json::json!({
5792 "jsonrpc": "2.0",
5793 "method": "agent.chat.cancel",
5794 "params": { "session_id": session_id },
5795 });
5796 if let Ok(text) = serde_json::to_string(¬ification) {
5797 let _ = channel
5798 .write
5799 .lock()
5800 .await
5801 .send(Message::Text(text.into()))
5802 .await;
5803 }
5804 }
5805 }
5806
5807 Ok(serde_json::json!({ "cancelled": true, "session_id": session_id }))
5808}
5809
5810pub(crate) async fn try_forward_agent_chat_event(
5821 parsed: &JsonRpcMessage,
5822 state: &Arc<ServerState>,
5823) -> bool {
5824 use futures::SinkExt;
5825 use tokio_tungstenite::tungstenite::Message;
5826
5827 let Some(method) = parsed.method.as_deref() else {
5831 return false;
5832 };
5833 if method != "agent.chat.event" {
5834 return false;
5835 }
5836 if !parsed.id.is_null() {
5837 return false;
5840 }
5841 let Some(session_id) = parsed.params.get("session_id").and_then(Value::as_str) else {
5842 return false;
5843 };
5844 let session_id = session_id.to_string();
5845
5846 let chat = state.chat_sessions.lock().await.get(&session_id).cloned();
5851 let Some(chat) = chat else {
5852 return true; };
5854
5855 let kind = parsed
5858 .params
5859 .get("kind")
5860 .and_then(Value::as_str)
5861 .unwrap_or("token")
5862 .to_string();
5863
5864 let host_channel = {
5868 let sessions = state.sessions.lock().await;
5869 sessions
5870 .get(&chat.host_client_id)
5871 .map(|s| s.channel.clone())
5872 };
5873 if let Some(channel) = host_channel {
5874 let mut params = parsed.params.clone();
5875 if let Some(obj) = params.as_object_mut() {
5876 obj.insert("agent_id".to_string(), Value::String(chat.agent_id.clone()));
5877 }
5878 let forward = serde_json::json!({
5879 "jsonrpc": "2.0",
5880 "method": "agents.chat.event",
5881 "params": params,
5882 });
5883 if let Ok(text) = serde_json::to_string(&forward) {
5884 let _ = channel
5885 .write
5886 .lock()
5887 .await
5888 .send(Message::Text(text.into()))
5889 .await;
5890 }
5891 }
5892 if matches!(kind.as_str(), "done" | "error") {
5899 state.chat_sessions.lock().await.remove(&session_id);
5900 }
5901
5902 true
5903}