1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
88 name = "ws.connection",
89 skip_all,
90 fields(peer = %peer),
91)]
92pub async fn handle_connection(
93 stream: TcpStream,
94 peer: SocketAddr,
95 state: Arc<ServerState>,
96) -> Result<(), Box<dyn std::error::Error>> {
97 let ws_stream = accept_async(stream).await?;
98 let (write, read) = ws_stream.split();
99 run_dispatch(read, Box::pin(write), peer.to_string(), state).await
100}
101
102#[cfg(unix)]
113#[instrument(
114 name = "ws.connection",
115 skip_all,
116 fields(peer = %peer),
117)]
118pub async fn handle_connection_unix(
119 stream: tokio::net::UnixStream,
120 peer: String,
121 state: Arc<ServerState>,
122) -> Result<(), Box<dyn std::error::Error>> {
123 let ws_stream = tokio_tungstenite::accept_async(stream).await?;
124 let (write, read) = ws_stream.split();
125 run_dispatch(read, Box::pin(write), peer, state).await
126}
127
128#[instrument(
138 name = "ws.dispatch",
139 skip_all,
140 fields(client_id = tracing::field::Empty, peer = %peer),
141)]
142pub async fn run_dispatch<R>(
143 mut read: R,
144 write: crate::session::WsSink,
145 peer: String,
146 state: Arc<ServerState>,
147) -> Result<(), Box<dyn std::error::Error>>
148where
149 R: futures::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
150 + Unpin
151 + Send,
152{
153 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
154 tracing::Span::current().record("client_id", &client_id.as_str());
155
156 info!("New connection from {}", peer);
157
158 let channel = Arc::new(WsChannel {
159 write: Mutex::new(write),
160 pending: Mutex::new(HashMap::new()),
161 next_id: AtomicU64::new(1),
162 });
163
164 let session = state.create_session(&client_id, channel.clone()).await;
165
166 while let Some(msg) = read.next().await {
167 let msg = msg?;
168 if msg.is_text() {
169 let text = msg.to_text()?;
170 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
171 Ok(m) => m,
172 Err(e) => {
173 send_response(
174 &session.channel,
175 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
176 )
177 .await?;
178 continue;
179 }
180 };
181
182 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
184 if let Some(id_str) = parsed.id.as_str() {
185 let mut pending = session.channel.pending.lock().await;
186 if let Some(tx) = pending.remove(id_str) {
187 let tool_resp = if let Some(result) = parsed.result {
188 ToolExecuteResponse {
189 action_id: id_str.to_string(),
190 output: Some(result),
191 error: None,
192 }
193 } else {
194 let err_msg = parsed
195 .error
196 .as_ref()
197 .and_then(|e| e.get("message"))
198 .and_then(|m| m.as_str())
199 .unwrap_or("unknown error")
200 .to_string();
201 ToolExecuteResponse {
202 action_id: id_str.to_string(),
203 output: None,
204 error: Some(err_msg),
205 }
206 };
207 let _ = tx.send(tool_resp);
208 continue;
209 }
210 }
211 }
212
213 if let Some(method) = &parsed.method {
215 info!(method = %method, "dispatching JSON-RPC method");
216
217 if state.auth_token.get().is_some()
225 && !session
226 .authenticated
227 .load(std::sync::atomic::Ordering::Acquire)
228 && method != "session.auth"
229 {
230 let resp = JsonRpcResponse::error(
231 parsed.id.clone(),
232 -32001,
233 "auth required: send `session.auth` with the per-launch token \
234 from ~/Library/Application Support/ai.parslee.car/auth-token \
235 (macOS) or $XDG_RUNTIME_DIR/ai.parslee.car/auth-token (Linux) \
236 as the first frame on this connection",
237 );
238 send_response(&session.channel, resp).await?;
239 info!(client = %client_id, method = %method,
240 "rejecting non-auth method on unauthenticated session; closing");
241 break;
242 }
243
244 if state.approval_gate.requires_approval(method.as_str()) {
256 match gate_high_risk_method(method.as_str(), &parsed.params, &state).await {
257 Ok(()) => {}
258 Err(reason) => {
259 let resp = JsonRpcResponse::error(parsed.id.clone(), -32003, &reason);
260 send_response(&session.channel, resp).await?;
261 info!(
262 client = %client_id,
263 method = %method,
264 reason = %reason,
265 "approval gate blocked dispatch"
266 );
267 continue;
268 }
269 }
270 }
271
272 let session_task = session.clone();
287 let state_task = state.clone();
288 let method_owned = method.clone();
289 let parsed_task = parsed;
290 tokio::spawn(async move {
291 let session = session_task;
292 let state = state_task;
293 let parsed = parsed_task;
294 let result = match method_owned.as_str() {
295 "session.auth" => handle_session_auth(&parsed, &session, &state).await,
296 "session.init" => handle_session_init(&parsed, &session).await,
297 "host.subscribe" => handle_host_subscribe(&session).await,
298 "host.agents" => handle_host_agents(&session).await,
299 "host.events" => handle_host_events(&parsed, &session).await,
300 "host.approvals" => handle_host_approvals(&session).await,
301 "host.register_agent" => {
302 handle_host_register_agent(&parsed, &session).await
303 }
304 "host.unregister_agent" => {
305 handle_host_unregister_agent(&parsed, &session).await
306 }
307 "host.set_status" => handle_host_set_status(&parsed, &session).await,
308 "host.notify" => handle_host_notify(&parsed, &session).await,
309 "host.request_approval" => {
310 handle_host_request_approval(&parsed, &session).await
311 }
312 "host.resolve_approval" => {
313 handle_host_resolve_approval(&parsed, &session).await
314 }
315 "tools.register" => handle_tools_register(&parsed, &session).await,
316 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
317 "policy.register" => handle_policy_register(&parsed, &session).await,
318 "session.policy.open" => handle_session_policy_open(&session).await,
319 "session.policy.close" => {
320 handle_session_policy_close(&parsed, &session).await
321 }
322 "verify" => handle_verify(&parsed, &session).await,
323 "state.get" => handle_state_get(&parsed, &session).await,
324 "state.set" => handle_state_set(&parsed, &session).await,
325 "state.exists" => handle_state_exists(&parsed, &session).await,
326 "state.keys" => handle_state_keys(&parsed, &session).await,
327 "state.snapshot" => handle_state_snapshot(&parsed, &session).await,
328 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
329 "memory.query" => handle_memory_query(&parsed, &session).await,
330 "memory.build_context" => {
331 handle_memory_build_context(&parsed, &session).await
332 }
333 "memory.build_context_fast" => {
334 handle_memory_build_context_fast(&parsed, &session).await
335 }
336 "memory.consolidate" => handle_memory_consolidate(&session).await,
337 "memory.fact_count" => handle_memory_fact_count(&session).await,
338 "memory.persist" => handle_memory_persist(&parsed, &session).await,
339 "memory.load" => handle_memory_load(&parsed, &session).await,
340 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
341 "skill.find" => handle_skill_find(&parsed, &session).await,
342 "skill.report" => handle_skill_report(&parsed, &session).await,
343 "skill.repair" => handle_skill_repair(&parsed, &session).await,
344 "skills.ingest_distilled" => {
345 handle_skills_ingest_distilled(&parsed, &session).await
346 }
347 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
348 "skills.domains_needing_evolution" => {
349 handle_skills_domains_needing_evolution(&parsed, &session).await
350 }
351 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
352 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
353 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
354 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
355 "multi.vote" => handle_multi_vote(&parsed, &session).await,
356 "scheduler.create" => handle_scheduler_create(&parsed),
357 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
358 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
359 "infer" => handle_infer(&parsed, &state, &session).await,
360 "image.generate" => handle_image_generate(&parsed, &state).await,
361 "video.generate" => handle_video_generate(&parsed, &state).await,
362 "embed" => handle_embed(&parsed, &state).await,
363 "classify" => handle_classify(&parsed, &state).await,
364 "tokenize" => handle_tokenize(&parsed, &state).await,
365 "detokenize" => handle_detokenize(&parsed, &state).await,
366 "rerank" => handle_rerank(&parsed, &state).await,
367 "transcribe" => handle_transcribe(&parsed, &state).await,
368 "synthesize" => handle_synthesize(&parsed, &state).await,
369 "infer_stream" => handle_infer_stream(&parsed, &session, &state).await,
370 "speech.prepare" => handle_speech_prepare(&state).await,
371 "models.route" => handle_models_route(&parsed, &state).await,
372 "models.stats" => handle_models_stats(&state).await,
373 "outcomes.resolve_pending" => {
374 handle_outcomes_resolve_pending(&parsed, &state).await
375 }
376 "events.count" => handle_events_count(&session).await,
377 "events.stats" => handle_events_stats(&session).await,
378 "events.truncate" => handle_events_truncate(&parsed, &session).await,
379 "events.clear" => handle_events_clear(&session).await,
380 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
381 "models.list" => handle_models_list(&state),
382 "models.register" => handle_models_register(&parsed, &state).await,
383 "models.unregister" => handle_models_unregister(&parsed, &state).await,
384 "models.list_unified" => handle_models_list_unified(&state),
385 "models.search" => handle_models_search(&parsed, &state),
386 "models.upgrades" => handle_models_upgrades(&state),
387 "models.pull" => handle_models_pull(&parsed, &state).await,
388 "models.install" => handle_models_pull(&parsed, &state).await,
389 "skills.distill" => handle_skills_distill(&parsed, &state).await,
390 "skills.list" => handle_skills_list(&parsed, &session).await,
391 "browser.run" => handle_browser_run(&parsed, &session).await,
392 "browser.close" => handle_browser_close(&session).await,
393 "secret.put" => handle_secret_put(&parsed),
394 "secret.get" => handle_secret_get(&parsed),
395 "secret.delete" => handle_secret_delete(&parsed),
396 "secret.status" => handle_secret_status(&parsed),
397 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
398 "permissions.status" => handle_perm_status(&parsed),
399 "permissions.request" => handle_perm_request(&parsed),
400 "permissions.explain" => handle_perm_explain(&parsed),
401 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
402 "accounts.list" => car_ffi_common::accounts::list(),
403 "accounts.open" => {
404 #[derive(serde::Deserialize, Default)]
405 struct OpenParams {
406 #[serde(default)]
407 account_id: Option<String>,
408 }
409 let p: OpenParams =
410 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
411 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
412 }
413 "calendar.list" => car_ffi_common::integrations::calendar_list(),
414 "calendar.events" => handle_calendar_events(&parsed),
415 "contacts.containers" => {
416 car_ffi_common::integrations::contacts_containers()
417 }
418 "contacts.find" => handle_contacts_find(&parsed),
419 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
420 "mail.inbox" => handle_mail_inbox(&parsed),
421 "mail.send" => handle_mail_send(&parsed),
422 "messages.services" => car_ffi_common::integrations::messages_services(),
423 "messages.chats" => handle_messages_chats(&parsed),
424 "messages.send" => handle_messages_send(&parsed),
425 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
426 "notes.find" => handle_notes_find(&parsed),
427 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
428 "reminders.items" => handle_reminders_items(&parsed),
429 "photos.albums" => car_ffi_common::integrations::photos_albums(),
430 "bookmarks.list" => handle_bookmarks_list(&parsed),
431 "files.locations" => car_ffi_common::integrations::files_locations(),
432 "keychain.status" => car_ffi_common::integrations::keychain_status(),
433 "health.status" => car_ffi_common::health::status(),
434 "health.sleep" => handle_health_sleep(&parsed),
435 "health.workouts" => handle_health_workouts(&parsed),
436 "health.activity" => handle_health_activity(&parsed),
437 "voice.transcribe_stream.start" => {
438 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
439 }
440 "voice.transcribe_stream.stop" => {
441 handle_voice_transcribe_stream_stop(&parsed, &state).await
442 }
443 "voice.transcribe_stream.push" => {
444 handle_voice_transcribe_stream_push(&parsed, &state).await
445 }
446 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
447 "voice.dispatch_turn" => {
448 handle_voice_dispatch_turn(&parsed, &state, &session).await
449 }
450 "voice.cancel_turn" => handle_voice_cancel_turn().await,
451 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
452 "inference.register_runner" => {
453 handle_inference_register_runner(&session).await
454 }
455 "inference.runner.event" => handle_inference_runner_event(&parsed).await,
456 "inference.runner.complete" => {
457 handle_inference_runner_complete(&parsed).await
458 }
459 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
460 "voice.providers.list" => {
461 serde_json::from_str::<serde_json::Value>(
465 &car_voice::list_voice_providers_json(),
466 )
467 .map_err(|e| e.to_string())
468 }
469 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
470 .await
471 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
472 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
473 .await
474 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
475 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
476 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
477 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
478 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
479 "workflow.run" => handle_workflow_run(&parsed, &session).await,
480 "workflow.verify" => handle_workflow_verify(&parsed),
481 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
482 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
483 "meeting.list" => handle_meeting_list(&parsed),
484 "meeting.get" => handle_meeting_get(&parsed),
485 "registry.register" => handle_registry_register(&parsed),
486 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
487 "registry.unregister" => handle_registry_unregister(&parsed),
488 "registry.list" => handle_registry_list(&parsed),
489 "registry.reap" => handle_registry_reap(&parsed),
490 "admission.status" => handle_admission_status(&state),
491 "a2a.start" => handle_a2a_start(&parsed, &session).await,
492 "a2a.stop" => handle_a2a_stop(),
493 "a2a.status" => handle_a2a_status(),
494 "a2a.send" => handle_a2a_send(&parsed, &state).await,
495 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
496 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
497 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
498 "a2ui.reap" => handle_a2ui_reap(&state).await,
499 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
500 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
501 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
502 "a2ui.render_report" => handle_a2ui_render_report(&parsed, &state).await,
503 "a2ui/subscribe" => handle_a2ui_subscribe(&session, &state).await,
504 "a2ui/unsubscribe" => handle_a2ui_unsubscribe(&session, &state).await,
505 "a2ui/replay" => handle_a2ui_replay(&parsed, &state).await,
506 "automation.run_applescript" => handle_run_applescript(&parsed).await,
507 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
508 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
509 "notifications.local" => handle_local_notification(&parsed).await,
510 "vision.ocr" => handle_vision_ocr(&parsed).await,
511 "agents.list" => handle_agents_list(&state).await,
512 "agents.health" => handle_agents_health(&state).await,
513 "agents.upsert" => handle_agents_upsert(&parsed, &state).await,
514 "agents.install" => handle_agents_install(&parsed, &state).await,
515 "agents.remove" => handle_agents_remove(&parsed, &state).await,
516 "agents.start" => handle_agents_start(&parsed, &state).await,
517 "agents.stop" => handle_agents_stop(&parsed, &state).await,
518 "agents.restart" => handle_agents_restart(&parsed, &state).await,
519 "agents.tail_log" => handle_agents_tail_log(&parsed, &state).await,
520 "agents.list_external" => handle_agents_list_external(&parsed).await,
521 "agents.detect_external" => handle_agents_detect_external(&parsed).await,
522 "agents.health_external" => handle_agents_health_external(&parsed).await,
523 "agents.invoke_external" => {
524 handle_agents_invoke_external(&parsed, &state).await
525 }
526 "message/send"
533 | "SendMessage"
534 | "message/stream"
535 | "SendStreamingMessage"
536 | "tasks/get"
537 | "GetTask"
538 | "tasks/list"
539 | "ListTasks"
540 | "tasks/cancel"
541 | "CancelTask"
542 | "tasks/resubscribe"
543 | "SubscribeToTask"
544 | "tasks/pushNotificationConfig/set"
545 | "CreateTaskPushNotificationConfig"
546 | "tasks/pushNotificationConfig/get"
547 | "GetTaskPushNotificationConfig"
548 | "tasks/pushNotificationConfig/list"
549 | "ListTaskPushNotificationConfigs"
550 | "tasks/pushNotificationConfig/delete"
551 | "DeleteTaskPushNotificationConfig"
552 | "agent/getAuthenticatedExtendedCard"
553 | "GetExtendedAgentCard" => {
554 handle_a2a_dispatch(method_owned.as_str(), &parsed, &state).await
555 }
556 _ => Err(format!("unknown method: {}", method_owned)),
557 };
558
559 let resp = match result {
560 Ok(value) => JsonRpcResponse::success(parsed.id, value),
561 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
562 };
563 let _ = send_response(&session.channel, resp).await;
564 });
565 }
566 } else if msg.is_close() {
567 info!("Client {} disconnected", client_id);
568 break;
569 }
570 }
571
572 session.host.unsubscribe(&client_id).await;
573 state.a2ui_subscribers.lock().await.remove(&client_id);
574
575 let _removed = state.remove_session(&client_id).await;
586 {
587 let mut pending = session.channel.pending.lock().await;
588 pending.clear();
589 }
590
591 Ok(())
592}
593
594async fn send_response(
595 channel: &WsChannel,
596 resp: JsonRpcResponse,
597) -> Result<(), Box<dyn std::error::Error>> {
598 use futures::SinkExt;
599 let json = serde_json::to_string(&resp)?;
600 channel
601 .write
602 .lock()
603 .await
604 .send(Message::Text(json.into()))
605 .await?;
606 Ok(())
607}
608
609async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
612 session
613 .host
614 .subscribe(&session.client_id, session.channel.clone())
615 .await;
616 serde_json::to_value(HostSnapshot {
617 subscribed: true,
618 agents: session.host.agents().await,
619 approvals: session.host.approvals().await,
620 events: session.host.events(50).await,
621 })
622 .map_err(|e| e.to_string())
623}
624
625async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
626 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
627}
628
629async fn handle_host_events(
630 req: &JsonRpcMessage,
631 session: &crate::session::ClientSession,
632) -> Result<Value, String> {
633 let limit = req
634 .params
635 .get("limit")
636 .and_then(|v| v.as_u64())
637 .unwrap_or(100) as usize;
638 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
639}
640
641async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
642 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
643}
644
645async fn handle_a2ui_apply(
646 req: &JsonRpcMessage,
647 state: &Arc<ServerState>,
648) -> Result<Value, String> {
649 #[derive(Deserialize)]
650 struct Params {
651 #[serde(default)]
652 envelope: Option<car_a2ui::A2uiEnvelope>,
653 #[serde(default)]
654 message: Option<car_a2ui::A2uiEnvelope>,
655 }
656
657 let envelope = if req.params.get("createSurface").is_some()
658 || req.params.get("updateComponents").is_some()
659 || req.params.get("updateDataModel").is_some()
660 || req.params.get("deleteSurface").is_some()
661 {
662 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
663 .map_err(|e| e.to_string())?
664 } else {
665 match serde_json::from_value::<Params>(req.params.clone()) {
666 Ok(params) => params
667 .envelope
668 .or(params.message)
669 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
670 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
671 .map_err(|e| e.to_string())?,
672 }
673 };
674
675 apply_a2ui_envelope(state, envelope, None, None).await
676}
677
678async fn handle_a2ui_ingest(
679 req: &JsonRpcMessage,
680 state: &Arc<ServerState>,
681) -> Result<Value, String> {
682 #[derive(Deserialize)]
683 #[serde(rename_all = "camelCase")]
684 struct Params {
685 #[serde(default)]
686 endpoint: Option<String>,
687 #[serde(default)]
688 a2a_endpoint: Option<String>,
689 #[serde(default)]
690 owner: Option<car_a2ui::A2uiSurfaceOwner>,
691 #[serde(default)]
692 route_auth: Option<A2aRouteAuth>,
693 #[serde(default)]
694 allow_untrusted_endpoint: bool,
695 }
696
697 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
698 endpoint: None,
699 a2a_endpoint: None,
700 owner: None,
701 route_auth: None,
702 allow_untrusted_endpoint: false,
703 });
704 let payload = req.params.get("payload").unwrap_or(&req.params);
705 state
706 .a2ui
707 .validate_payload(payload)
708 .map_err(|e| e.to_string())?;
709 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
710 if envelopes.is_empty() {
711 return Err("no A2UI envelopes found in payload".into());
712 }
713 let endpoint = params.endpoint.or(params.a2a_endpoint);
714 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
715 let owner = params
716 .owner
717 .or_else(|| car_a2ui::owner_from_value(payload))
718 .map(|owner| match endpoint.clone() {
719 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
720 None => owner,
721 });
722
723 let mut results = Vec::new();
724 for envelope in envelopes {
725 let value =
726 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
727 results.push(value);
728 }
729 Ok(serde_json::json!({ "applied": results }))
730}
731
732async fn apply_a2ui_envelope(
733 state: &Arc<ServerState>,
734 envelope: car_a2ui::A2uiEnvelope,
735 owner: Option<car_a2ui::A2uiSurfaceOwner>,
736 route_auth: Option<A2aRouteAuth>,
737) -> Result<Value, String> {
738 let result = state
739 .a2ui
740 .apply_with_owner(envelope, owner)
741 .await
742 .map_err(|e| e.to_string())?;
743 update_a2ui_route_auth(state, &result, route_auth).await;
744 let kind = if result.deleted {
745 "a2ui.surface_deleted"
746 } else {
747 "a2ui.surface_updated"
748 };
749 let message = if result.deleted {
750 format!("A2UI surface {} deleted", result.surface_id)
751 } else {
752 format!("A2UI surface {} updated", result.surface_id)
753 };
754 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
755 state
756 .host
757 .record_event(kind, None, message, payload.clone())
758 .await;
759 broadcast_a2ui_event(state, kind, &payload).await;
763 serde_json::to_value(result).map_err(|e| e.to_string())
764}
765
766async fn broadcast_a2ui_event(state: &Arc<ServerState>, kind: &str, result: &Value) {
767 use futures::SinkExt;
768 use tokio_tungstenite::tungstenite::Message;
769 let subscribers: Vec<Arc<crate::session::WsChannel>> = state
770 .a2ui_subscribers
771 .lock()
772 .await
773 .values()
774 .cloned()
775 .collect();
776 if subscribers.is_empty() {
777 return;
778 }
779 let Ok(json) = serde_json::to_string(&serde_json::json!({
780 "jsonrpc": "2.0",
781 "method": "a2ui.event",
782 "params": {
783 "kind": kind,
784 "result": result,
785 },
786 })) else {
787 return;
788 };
789 for channel in subscribers {
790 let _ = channel
791 .write
792 .lock()
793 .await
794 .send(Message::Text(json.clone().into()))
795 .await;
796 }
797}
798
799async fn update_a2ui_route_auth(
800 state: &Arc<ServerState>,
801 result: &car_a2ui::A2uiApplyResult,
802 route_auth: Option<A2aRouteAuth>,
803) {
804 let mut auth = state.a2ui_route_auth.lock().await;
805 if result.deleted {
806 auth.remove(&result.surface_id);
807 return;
808 }
809
810 let has_route_endpoint = result
811 .surface
812 .as_ref()
813 .and_then(|surface| surface.owner.as_ref())
814 .and_then(|owner| owner.endpoint.as_ref())
815 .is_some();
816 match (has_route_endpoint, route_auth) {
817 (true, Some(route_auth)) => {
818 auth.insert(result.surface_id.clone(), route_auth);
819 }
820 _ => {
821 auth.remove(&result.surface_id);
822 }
823 }
824}
825
826fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
827 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
828}
829
830async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
831 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
832 if !removed.is_empty() {
833 let mut auth = state.a2ui_route_auth.lock().await;
834 for surface_id in &removed {
835 auth.remove(surface_id);
836 }
837 }
838 Ok(serde_json::json!({ "removed": removed }))
839}
840
841async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
842 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
843}
844
845async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
846 let surface_id = req
847 .params
848 .get("surface_id")
849 .or_else(|| req.params.get("surfaceId"))
850 .and_then(Value::as_str)
851 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
852 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
853}
854
855async fn handle_a2ui_subscribe(
861 session: &crate::session::ClientSession,
862 state: &Arc<ServerState>,
863) -> Result<Value, String> {
864 state
865 .a2ui_subscribers
866 .lock()
867 .await
868 .insert(session.client_id.clone(), session.channel.clone());
869 Ok(serde_json::json!({ "subscribed": true }))
870}
871
872async fn handle_a2ui_unsubscribe(
876 session: &crate::session::ClientSession,
877 state: &Arc<ServerState>,
878) -> Result<Value, String> {
879 state
880 .a2ui_subscribers
881 .lock()
882 .await
883 .remove(&session.client_id);
884 Ok(serde_json::json!({ "subscribed": false }))
885}
886
887async fn handle_a2ui_replay(
894 req: &JsonRpcMessage,
895 state: &Arc<ServerState>,
896) -> Result<Value, String> {
897 let surface_id = req
898 .params
899 .get("surface_id")
900 .or_else(|| req.params.get("surfaceId"))
901 .and_then(Value::as_str)
902 .ok_or_else(|| "`a2ui/replay` requires surface_id".to_string())?;
903 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
904}
905
906async fn handle_a2ui_action(
907 req: &JsonRpcMessage,
908 state: &Arc<ServerState>,
909) -> Result<Value, String> {
910 let action: car_a2ui::ClientAction =
911 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
912 let owner = state.a2ui.owner(&action.surface_id).await;
913 let route = route_a2ui_action(state, &action, owner.clone()).await;
914 let payload = serde_json::json!({
915 "action": action,
916 "owner": owner,
917 "route": route,
918 });
919 let event = state
920 .host
921 .record_event(
922 "a2ui.action",
923 None,
924 format!(
925 "A2UI action {} from {}",
926 action.name, action.source_component_id
927 ),
928 payload,
929 )
930 .await;
931 Ok(serde_json::json!({
932 "event": event,
933 "route": route,
934 }))
935}
936
937async fn handle_a2ui_render_report(
944 req: &JsonRpcMessage,
945 state: &Arc<ServerState>,
946) -> Result<Value, String> {
947 let report: car_a2ui::RenderReport =
951 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
952 let payload = serde_json::to_value(&report).map_err(|e| e.to_string())?;
953 let kind = "a2ui.render_report";
954 let message = format!("A2UI render report for surface {}", report.surface_id);
955 let event = state
956 .host
957 .record_event(kind, None, message, payload.clone())
958 .await;
959 broadcast_a2ui_event(state, kind, &payload).await;
960
961 if let Some(surface) = state.a2ui.get(&report.surface_id).await {
969 if !state.ui_agent_budget.try_consume(&report.surface_id) {
975 tracing::warn!(
976 surface_id = %report.surface_id,
977 count = state.ui_agent_budget.count(&report.surface_id),
978 max = state.ui_agent_budget.max(),
979 "ui-agent iteration budget exhausted; skipping agent invocation"
980 );
981 return Ok(serde_json::json!({ "event": event }));
982 }
983 match state.ui_agent.on_render_report(&report, &surface) {
987 car_ui_agent::Decision::Patch {
988 envelope,
989 strategy_id,
990 patch_hash,
991 elapsed_ns,
992 } => {
993 if !state
1001 .ui_agent_oscillation
1002 .check_and_record(&report.surface_id, patch_hash)
1003 {
1004 tracing::warn!(
1005 surface_id = %report.surface_id,
1006 strategy = %strategy_id,
1007 patch_hash,
1008 "ui-agent oscillation detected; suppressing patch"
1009 );
1010 state.ui_agent_budget.refund(&report.surface_id);
1013 return Ok(serde_json::json!({ "event": event }));
1014 }
1015 let a2ui_envelope = car_a2ui::A2uiEnvelope {
1016 patch_components: Some(envelope),
1017 ..Default::default()
1018 };
1019 if let Err(e) =
1020 apply_a2ui_envelope(state, a2ui_envelope, None, None).await
1021 {
1022 tracing::warn!(
1023 surface_id = %report.surface_id,
1024 strategy = %strategy_id,
1025 patch_hash,
1026 elapsed_ns,
1027 error = %e,
1028 "ui-agent patch apply failed",
1029 );
1030 state.ui_agent_budget.refund(&report.surface_id);
1032 } else {
1033 tracing::debug!(
1034 surface_id = %report.surface_id,
1035 strategy = %strategy_id,
1036 patch_hash,
1037 elapsed_ns,
1038 iteration = state.ui_agent_budget.count(&report.surface_id),
1039 "ui-agent patch applied",
1040 );
1041 if let Some(memgine) = state.shared_memgine.clone() {
1051 let speaker = format!("ui-agent/{}", report.surface_id);
1052 let text = format!("strategy applied: {}", strategy_id);
1053 tokio::spawn(async move {
1054 let mut guard = memgine.lock().await;
1055 guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
1056 });
1057 }
1058 }
1059 }
1060 car_ui_agent::Decision::StableNoChange => {
1061 state.ui_agent_budget.refund(&report.surface_id);
1063 }
1064 car_ui_agent::Decision::HardStop { reason } => {
1065 state.ui_agent_budget.refund(&report.surface_id);
1066 tracing::error!(
1072 surface_id = %report.surface_id,
1073 reason = %reason,
1074 "ui-agent hard-stopped improvement loop",
1075 );
1076 }
1077 }
1078 } else {
1079 tracing::debug!(
1080 surface_id = %report.surface_id,
1081 "ui-agent skipped — surface not found in store",
1082 );
1083 }
1084
1085 Ok(serde_json::json!({ "event": event }))
1086}
1087
1088async fn route_a2ui_action(
1089 state: &Arc<ServerState>,
1090 action: &car_a2ui::ClientAction,
1091 owner: Option<car_a2ui::A2uiSurfaceOwner>,
1092) -> Value {
1093 let Some(owner) = owner else {
1094 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
1095 };
1096 if owner.kind != "a2a" {
1097 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
1098 }
1099 let Some(endpoint) = owner.endpoint.clone() else {
1100 return serde_json::json!({
1101 "delivered": false,
1102 "reason": "surface owner has no endpoint",
1103 "owner": owner
1104 });
1105 };
1106
1107 let message = car_a2a::Message {
1108 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
1109 role: car_a2a::MessageRole::User,
1110 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
1111 data: serde_json::json!({
1112 "a2uiAction": action,
1113 }),
1114 metadata: Default::default(),
1115 })],
1116 task_id: owner.task_id.clone(),
1117 context_id: owner.context_id.clone(),
1118 metadata: Default::default(),
1119 };
1120
1121 let auth = state
1122 .a2ui_route_auth
1123 .lock()
1124 .await
1125 .get(&action.surface_id)
1126 .cloned()
1127 .map(client_auth_from_route_auth)
1128 .unwrap_or(car_a2a::ClientAuth::None);
1129
1130 match car_a2a::A2aClient::new(endpoint.clone())
1131 .with_auth(auth)
1132 .send_message(message, false)
1133 .await
1134 {
1135 Ok(result) => serde_json::json!({
1136 "delivered": true,
1137 "owner": owner,
1138 "endpoint": endpoint,
1139 "result": result,
1140 }),
1141 Err(error) => serde_json::json!({
1142 "delivered": false,
1143 "owner": owner,
1144 "endpoint": endpoint,
1145 "error": error.to_string(),
1146 }),
1147 }
1148}
1149
1150fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
1151 match auth {
1152 A2aRouteAuth::None => car_a2a::ClientAuth::None,
1153 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
1154 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
1155 }
1156}
1157
1158fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
1159 let endpoint = endpoint?;
1160 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
1161 Some(endpoint)
1162 } else {
1163 None
1164 }
1165}
1166
1167fn is_loopback_http_endpoint(endpoint: &str) -> bool {
1168 endpoint == "http://localhost"
1169 || endpoint.starts_with("http://localhost:")
1170 || endpoint.starts_with("http://localhost/")
1171 || endpoint == "http://127.0.0.1"
1172 || endpoint.starts_with("http://127.0.0.1:")
1173 || endpoint.starts_with("http://127.0.0.1/")
1174 || endpoint == "http://[::1]"
1175 || endpoint.starts_with("http://[::1]:")
1176 || endpoint.starts_with("http://[::1]/")
1177}
1178
1179async fn handle_host_register_agent(
1180 req: &JsonRpcMessage,
1181 session: &crate::session::ClientSession,
1182) -> Result<Value, String> {
1183 let request: RegisterHostAgentRequest =
1184 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1185 serde_json::to_value(
1186 session
1187 .host
1188 .register_agent(&session.client_id, request)
1189 .await?,
1190 )
1191 .map_err(|e| e.to_string())
1192}
1193
1194async fn handle_host_unregister_agent(
1195 req: &JsonRpcMessage,
1196 session: &crate::session::ClientSession,
1197) -> Result<Value, String> {
1198 let agent_id = req
1199 .params
1200 .get("agent_id")
1201 .and_then(|v| v.as_str())
1202 .ok_or("missing agent_id")?;
1203 session
1204 .host
1205 .unregister_agent(&session.client_id, agent_id)
1206 .await?;
1207 Ok(serde_json::json!({"ok": true}))
1208}
1209
1210async fn handle_host_set_status(
1211 req: &JsonRpcMessage,
1212 session: &crate::session::ClientSession,
1213) -> Result<Value, String> {
1214 let request: SetHostAgentStatusRequest =
1215 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1216 serde_json::to_value(session.host.set_status(&session.client_id, request).await?)
1217 .map_err(|e| e.to_string())
1218}
1219
1220async fn handle_host_notify(
1221 req: &JsonRpcMessage,
1222 session: &crate::session::ClientSession,
1223) -> Result<Value, String> {
1224 let kind = req
1225 .params
1226 .get("kind")
1227 .and_then(|v| v.as_str())
1228 .unwrap_or("host.notification");
1229 let agent_id = req
1230 .params
1231 .get("agent_id")
1232 .and_then(|v| v.as_str())
1233 .map(str::to_string);
1234 let message = req
1235 .params
1236 .get("message")
1237 .and_then(|v| v.as_str())
1238 .unwrap_or("");
1239 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
1240 serde_json::to_value(
1241 session
1242 .host
1243 .record_event(kind, agent_id, message, payload)
1244 .await,
1245 )
1246 .map_err(|e| e.to_string())
1247}
1248
1249async fn handle_host_request_approval(
1250 req: &JsonRpcMessage,
1251 session: &crate::session::ClientSession,
1252) -> Result<Value, String> {
1253 let request: CreateHostApprovalRequest =
1254 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1255 if let Some(agent_id) = &request.agent_id {
1256 let _ = session
1261 .host
1262 .set_status(
1263 &session.client_id,
1264 SetHostAgentStatusRequest {
1265 agent_id: agent_id.clone(),
1266 status: HostAgentStatus::WaitingForApproval,
1267 current_task: None,
1268 message: Some("Waiting for approval".to_string()),
1269 payload: Value::Null,
1270 },
1271 )
1272 .await;
1273 }
1274 serde_json::to_value(
1275 session
1276 .host
1277 .create_approval(Some(&session.client_id), request)
1278 .await?,
1279 )
1280 .map_err(|e| e.to_string())
1281}
1282
1283async fn handle_host_resolve_approval(
1284 req: &JsonRpcMessage,
1285 session: &crate::session::ClientSession,
1286) -> Result<Value, String> {
1287 let request: ResolveHostApprovalRequest =
1288 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1289 serde_json::to_value(
1290 session
1291 .host
1292 .resolve_approval(&session.client_id, request)
1293 .await?,
1294 )
1295 .map_err(|e| e.to_string())
1296}
1297
1298async fn handle_session_auth(
1309 req: &JsonRpcMessage,
1310 session: &crate::session::ClientSession,
1311 state: &Arc<ServerState>,
1312) -> Result<Value, String> {
1313 let supplied = req
1314 .params
1315 .get("token")
1316 .and_then(Value::as_str)
1317 .ok_or_else(|| "session.auth requires { token: string }".to_string())?;
1318 let agent_id = req
1325 .params
1326 .get("agent_id")
1327 .and_then(Value::as_str)
1328 .map(str::to_string);
1329
1330 if let Some(id) = agent_id {
1331 let supervisor = state.supervisor()?;
1332 if !supervisor.validate_agent_token(&id, supplied).await {
1333 return Err(format!(
1334 "auth failed: agent_id `{id}` is not supervised, or token mismatch"
1335 ));
1336 }
1337 {
1341 let mut attached = state.attached_agents.lock().await;
1342 if let Some(prior) = attached.get(&id) {
1343 if prior != &session.client_id {
1344 return Err(format!(
1345 "auth failed: agent_id `{id}` is already attached on \
1346 another connection (client_id={prior})"
1347 ));
1348 }
1349 }
1350 attached.insert(id.clone(), session.client_id.clone());
1351 }
1352 let agent_eng = get_or_load_agent_memgine(state, &id).await?;
1357 *session.bound_memgine.lock().await = Some(agent_eng);
1358 *session.agent_id.lock().await = Some(id.clone());
1359 session
1360 .authenticated
1361 .store(true, std::sync::atomic::Ordering::Release);
1362 return Ok(serde_json::json!({
1363 "ok": true,
1364 "auth_enabled": true,
1365 "agent_id": id,
1366 }));
1367 }
1368
1369 let expected = match state.auth_token.get() {
1370 Some(t) => t,
1371 None => {
1372 session
1378 .authenticated
1379 .store(true, std::sync::atomic::Ordering::Release);
1380 return Ok(serde_json::json!({ "ok": true, "auth_enabled": false }));
1381 }
1382 };
1383 if !constant_time_eq(supplied.as_bytes(), expected.as_bytes()) {
1384 return Err("auth failed: token mismatch".to_string());
1385 }
1386 session
1387 .authenticated
1388 .store(true, std::sync::atomic::Ordering::Release);
1389 Ok(serde_json::json!({ "ok": true, "auth_enabled": true }))
1390}
1391
1392fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
1396 if a.len() != b.len() {
1397 return false;
1398 }
1399 let mut diff: u8 = 0;
1400 for (x, y) in a.iter().zip(b.iter()) {
1401 diff |= x ^ y;
1402 }
1403 diff == 0
1404}
1405
1406async fn gate_high_risk_method(
1416 method: &str,
1417 params: &Value,
1418 state: &Arc<ServerState>,
1419) -> Result<(), String> {
1420 let timeout = state.approval_gate.timeout;
1421 let req = CreateHostApprovalRequest {
1422 agent_id: None,
1423 action: format!("ws.method:{method}"),
1424 details: serde_json::json!({
1425 "method": method,
1426 "params_preview": preview_params(params, 2_000),
1430 }),
1431 options: vec!["approve".to_string(), "deny".to_string()],
1432 };
1433 match state
1434 .host
1435 .request_and_wait_approval(req, "approve", timeout)
1436 .await
1437 {
1438 Ok(crate::host::ApprovalOutcome::Approved) => Ok(()),
1439 Ok(crate::host::ApprovalOutcome::Denied) => Err(format!(
1440 "{method} denied by user (approval gate, audit 2026-05). \
1441 To call this method without an interactive prompt, start \
1442 car-server with --no-approvals on a trusted machine."
1443 )),
1444 Ok(crate::host::ApprovalOutcome::TimedOut) => Err(format!(
1445 "{method} approval timed out after {}s with no resolution. \
1446 The approval is still visible in `host.approvals` for \
1447 forensics; resubmit the request to retry.",
1448 timeout.as_secs()
1449 )),
1450 Err(e) => Err(format!("approval gate error: {e}")),
1451 }
1452}
1453
1454fn preview_params(value: &Value, max_chars: usize) -> Value {
1455 let s = value.to_string();
1456 if s.len() <= max_chars {
1457 value.clone()
1458 } else {
1459 Value::String(format!("{}… (truncated)", &s[..max_chars]))
1460 }
1461}
1462
1463async fn handle_session_init(
1464 req: &JsonRpcMessage,
1465 session: &crate::session::ClientSession,
1466) -> Result<Value, String> {
1467 let init: SessionInitRequest =
1468 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1469
1470 for tool in &init.tools {
1471 register_from_definition(&session.runtime, tool).await;
1472 }
1473
1474 let mut policy_count = 0;
1475 {
1476 let mut policies = session.runtime.policies.write().await;
1477 for policy_def in &init.policies {
1478 if let Some(check) = build_policy_check(policy_def) {
1479 policies.register(&policy_def.name, check, "");
1480 policy_count += 1;
1481 }
1482 }
1483 }
1484
1485 serde_json::to_value(SessionInitResponse {
1486 session_id: session.client_id.clone(),
1487 tools_registered: init.tools.len(),
1488 policies_registered: policy_count,
1489 })
1490 .map_err(|e| e.to_string())
1491}
1492
1493fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
1494 match def.rule.as_str() {
1495 "deny_tool" => {
1496 let target = def.target.clone();
1497 Some(Box::new(
1498 move |action: &car_ir::Action, _: &car_state::StateStore| {
1499 if action.tool.as_deref() == Some(&target) {
1500 Some(format!("tool '{}' denied", target))
1501 } else {
1502 None
1503 }
1504 },
1505 ))
1506 }
1507 "require_state" => {
1508 let key = def.key.clone();
1509 let value = def.value.clone();
1510 Some(Box::new(
1511 move |_: &car_ir::Action, state: &car_state::StateStore| {
1512 if state.get(&key).as_ref() != Some(&value) {
1513 Some(format!("state['{}'] must be {:?}", key, value))
1514 } else {
1515 None
1516 }
1517 },
1518 ))
1519 }
1520 "deny_tool_param" => {
1521 let target = def.target.clone();
1522 let param = def.key.clone();
1523 let pattern = def.pattern.clone();
1524 Some(Box::new(
1525 move |action: &car_ir::Action, _: &car_state::StateStore| {
1526 if action.tool.as_deref() != Some(&target) {
1527 return None;
1528 }
1529 if let Some(val) = action.parameters.get(¶m) {
1530 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
1531 if s.contains(&pattern) {
1532 return Some(format!("param '{}' matches '{}'", param, pattern));
1533 }
1534 }
1535 None
1536 },
1537 ))
1538 }
1539 _ => None,
1540 }
1541}
1542
1543async fn handle_tools_register(
1544 req: &JsonRpcMessage,
1545 session: &crate::session::ClientSession,
1546) -> Result<Value, String> {
1547 let tools: Vec<ToolDefinition> =
1548 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1549 for tool in &tools {
1550 register_from_definition(&session.runtime, tool).await;
1551 }
1552 Ok(Value::from(tools.len()))
1553}
1554
1555async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
1562 runtime
1563 .register_tool_schema(car_ir::ToolSchema {
1564 name: def.name.clone(),
1565 description: def.description.clone(),
1566 parameters: def.parameters.clone(),
1567 returns: def.returns.clone(),
1568 idempotent: def.idempotent,
1569 cache_ttl_secs: def.cache_ttl_secs,
1570 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
1571 max_calls: rl.max_calls,
1572 interval_secs: rl.interval_secs,
1573 }),
1574 })
1575 .await;
1576}
1577
1578async fn handle_proposal_submit(
1579 req: &JsonRpcMessage,
1580 session: &crate::session::ClientSession,
1581) -> Result<Value, String> {
1582 let submit: ProposalSubmitRequest =
1583 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1584 let session_id = req
1590 .params
1591 .get("session_id")
1592 .and_then(|v| v.as_str())
1593 .map(str::to_string);
1594
1595 let scope: Option<car_engine::RuntimeScope> = match req.params.get("scope") {
1604 Some(v) if !v.is_null() => Some(
1605 serde_json::from_value(v.clone())
1606 .map_err(|e| format!("invalid scope: {e}"))?,
1607 ),
1608 _ => None,
1609 };
1610
1611 let result = match (session_id, scope) {
1612 (Some(_sid), Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1617 (Some(sid), None) => {
1618 session
1619 .runtime
1620 .execute_with_session(&submit.proposal, &sid)
1621 .await
1622 }
1623 (None, Some(s)) => session.runtime.execute_scoped(&submit.proposal, &s).await,
1624 (None, None) => session.runtime.execute(&submit.proposal).await,
1625 };
1626 serde_json::to_value(result).map_err(|e| e.to_string())
1627}
1628
1629async fn handle_session_policy_open(
1630 session: &crate::session::ClientSession,
1631) -> Result<Value, String> {
1632 let id = session.runtime.open_session().await;
1633 Ok(serde_json::json!({ "session_id": id }))
1634}
1635
1636async fn handle_session_policy_close(
1637 req: &JsonRpcMessage,
1638 session: &crate::session::ClientSession,
1639) -> Result<Value, String> {
1640 let sid = req
1641 .params
1642 .get("session_id")
1643 .and_then(|v| v.as_str())
1644 .ok_or("missing 'session_id'")?;
1645 let closed = session.runtime.close_session(sid).await;
1646 Ok(serde_json::json!({ "closed": closed }))
1647}
1648
1649async fn handle_policy_register(
1655 req: &JsonRpcMessage,
1656 session: &crate::session::ClientSession,
1657) -> Result<Value, String> {
1658 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1659 .map_err(|e| format!("invalid policy params: {e}"))?;
1660 let session_id = req
1661 .params
1662 .get("session_id")
1663 .and_then(|v| v.as_str())
1664 .map(str::to_string);
1665 let check = build_policy_check(&def)
1666 .ok_or_else(|| format!("unsupported policy rule '{}'", def.rule))?;
1667 match session_id {
1668 Some(sid) => session
1669 .runtime
1670 .register_policy_in_session(&sid, &def.name, check, "")
1671 .await
1672 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1673 None => {
1674 let mut policies = session.runtime.policies.write().await;
1675 policies.register(&def.name, check, "");
1676 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1677 }
1678 }
1679}
1680
1681async fn handle_verify(
1682 req: &JsonRpcMessage,
1683 session: &crate::session::ClientSession,
1684) -> Result<Value, String> {
1685 let vr: VerifyRequest =
1686 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1687 let tools: std::collections::HashSet<String> =
1688 session.runtime.tools.read().await.keys().cloned().collect();
1689 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1690 serde_json::to_value(VerifyResponse {
1691 valid: result.valid,
1692 issues: result
1693 .issues
1694 .iter()
1695 .map(|i| VerifyIssueProto {
1696 action_id: i.action_id.clone(),
1697 severity: i.severity.clone(),
1698 message: i.message.clone(),
1699 })
1700 .collect(),
1701 simulated_state: result.simulated_state,
1702 })
1703 .map_err(|e| e.to_string())
1704}
1705
1706fn tenant_from_params(req: &JsonRpcMessage) -> Option<String> {
1713 req.params
1714 .get("tenant_id")
1715 .and_then(|v| v.as_str())
1716 .filter(|s| !s.is_empty())
1717 .map(str::to_string)
1718}
1719
1720async fn handle_state_get(
1721 req: &JsonRpcMessage,
1722 session: &crate::session::ClientSession,
1723) -> Result<Value, String> {
1724 let key = req
1725 .params
1726 .get("key")
1727 .and_then(|v| v.as_str())
1728 .ok_or("missing 'key'")?;
1729 let tenant = tenant_from_params(req);
1730 Ok(session
1731 .runtime
1732 .state
1733 .scoped(tenant.as_deref())
1734 .get(key)
1735 .unwrap_or(Value::Null))
1736}
1737
1738async fn handle_state_set(
1739 req: &JsonRpcMessage,
1740 session: &crate::session::ClientSession,
1741) -> Result<Value, String> {
1742 let key = req
1743 .params
1744 .get("key")
1745 .and_then(|v| v.as_str())
1746 .ok_or("missing 'key'")?;
1747 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1748 let tenant = tenant_from_params(req);
1749 session
1750 .runtime
1751 .state
1752 .scoped(tenant.as_deref())
1753 .set(key, value, "client");
1754 Ok(Value::from("ok"))
1755}
1756
1757async fn handle_state_exists(
1761 req: &JsonRpcMessage,
1762 session: &crate::session::ClientSession,
1763) -> Result<Value, String> {
1764 let key = req
1765 .params
1766 .get("key")
1767 .and_then(|v| v.as_str())
1768 .ok_or("missing 'key'")?;
1769 let tenant = tenant_from_params(req);
1770 Ok(Value::Bool(
1771 session
1772 .runtime
1773 .state
1774 .scoped(tenant.as_deref())
1775 .exists(key),
1776 ))
1777}
1778
1779async fn handle_state_keys(
1782 req: &JsonRpcMessage,
1783 session: &crate::session::ClientSession,
1784) -> Result<Value, String> {
1785 let tenant = tenant_from_params(req);
1786 Ok(Value::Array(
1787 session
1788 .runtime
1789 .state
1790 .scoped(tenant.as_deref())
1791 .keys()
1792 .into_iter()
1793 .map(Value::String)
1794 .collect(),
1795 ))
1796}
1797
1798async fn handle_state_snapshot(
1809 req: &JsonRpcMessage,
1810 session: &crate::session::ClientSession,
1811) -> Result<Value, String> {
1812 let tenant = tenant_from_params(req);
1813 let view = session.runtime.state.scoped(tenant.as_deref());
1814 let mut map = serde_json::Map::new();
1815 for key in view.keys() {
1816 if let Some(value) = view.get(&key) {
1817 map.insert(key, value);
1818 }
1819 }
1820 Ok(Value::Object(map))
1821}
1822
1823fn agent_memgine_snapshot_path(agent_id: &str) -> Result<std::path::PathBuf, String> {
1829 let base = car_ffi_common::memory_path::ensure_base()
1830 .map_err(|e| format!("memory base unavailable: {e}"))?;
1831 let dir = base.join("agents");
1832 std::fs::create_dir_all(&dir).map_err(|e| format!("create agents dir: {e}"))?;
1833 Ok(dir.join(format!("{agent_id}.json")))
1834}
1835
1836async fn get_or_load_agent_memgine(
1843 state: &Arc<ServerState>,
1844 agent_id: &str,
1845) -> Result<Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>, String> {
1846 {
1847 let map = state.agent_memgines.lock().await;
1848 if let Some(eng) = map.get(agent_id) {
1849 return Ok(eng.clone());
1850 }
1851 }
1852 let engine = Arc::new(tokio::sync::Mutex::new(car_memgine::MemgineEngine::new(
1854 None,
1855 )));
1856 let path = agent_memgine_snapshot_path(agent_id)?;
1857 if path.exists() {
1858 let content = std::fs::read_to_string(&path)
1859 .map_err(|e| format!("read {}: {}", path.display(), e))?;
1860 let facts: Vec<Value> = serde_json::from_str(&content).unwrap_or_default();
1861 let mut g = engine.lock().await;
1862 let mut loaded: u32 = 0;
1863 for fact in &facts {
1864 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
1865 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
1866 let kind = fact
1867 .get("kind")
1868 .and_then(|v| v.as_str())
1869 .unwrap_or("pattern");
1870 let fid = format!("loaded-{loaded}");
1871 g.ingest_fact(
1872 &fid,
1873 subject,
1874 body,
1875 "user",
1876 "peer",
1877 chrono::Utc::now(),
1878 "global",
1879 None,
1880 vec![],
1881 kind == "constraint",
1882 );
1883 loaded += 1;
1884 }
1885 }
1886 let mut map = state.agent_memgines.lock().await;
1887 let stored = map.entry(agent_id.to_string()).or_insert(engine).clone();
1888 Ok(stored)
1889}
1890
1891async fn persist_agent_memgine(
1895 agent_id: &str,
1896 engine: &Arc<tokio::sync::Mutex<car_memgine::MemgineEngine>>,
1897) -> Result<(), String> {
1898 let path = agent_memgine_snapshot_path(agent_id)?;
1899 let g = engine.lock().await;
1900 let facts: Vec<Value> = g
1901 .graph
1902 .inner
1903 .node_indices()
1904 .filter_map(|nix| {
1905 let node = g.graph.inner.node_weight(nix)?;
1906 if !node.is_valid() {
1907 return None;
1908 }
1909 if node.kind == car_memgine::MemKind::Identity
1910 || node.kind == car_memgine::MemKind::Environment
1911 {
1912 return None;
1913 }
1914 Some(serde_json::json!({
1915 "subject": node.key,
1916 "body": node.value,
1917 "kind": match node.kind {
1918 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
1919 car_memgine::MemKind::Conversation => "outcome",
1920 _ => "pattern",
1921 },
1922 "confidence": 0.5,
1923 "content_type": node.content_type.as_label(),
1924 }))
1925 })
1926 .collect();
1927 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
1928 std::fs::write(&path, json).map_err(|e| format!("write {}: {}", path.display(), e))?;
1929 Ok(())
1930}
1931
1932async fn handle_memory_fact_count(
1939 session: &crate::session::ClientSession,
1940) -> Result<Value, String> {
1941 let engine_arc = session.effective_memgine().await;
1942 let engine = engine_arc.lock().await;
1943 Ok(Value::from(engine.valid_fact_count()))
1944}
1945
1946async fn handle_memory_add_fact(
1947 req: &JsonRpcMessage,
1948 session: &crate::session::ClientSession,
1949) -> Result<Value, String> {
1950 let subject = req
1951 .params
1952 .get("subject")
1953 .and_then(|v| v.as_str())
1954 .ok_or("missing subject")?;
1955 let body = req
1956 .params
1957 .get("body")
1958 .and_then(|v| v.as_str())
1959 .ok_or("missing body")?;
1960 let kind = req
1961 .params
1962 .get("kind")
1963 .and_then(|v| v.as_str())
1964 .unwrap_or("pattern");
1965 let engine_arc = session.effective_memgine().await;
1969 let count = {
1970 let mut engine = engine_arc.lock().await;
1971 let fid = format!("ws-{}", engine.valid_fact_count());
1972 engine.ingest_fact(
1973 &fid,
1974 subject,
1975 body,
1976 "user",
1977 "peer",
1978 chrono::Utc::now(),
1979 "global",
1980 None,
1981 vec![],
1982 kind == "constraint",
1983 );
1984 engine.valid_fact_count()
1985 };
1986 if let Some(id) = session.agent_id.lock().await.clone() {
1989 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
1990 tracing::warn!(agent_id = %id, error = %e,
1991 "agent memgine persist failed; in-memory state is canonical");
1992 }
1993 }
1994 Ok(Value::from(count))
1995}
1996
1997async fn handle_memory_query(
1998 req: &JsonRpcMessage,
1999 session: &crate::session::ClientSession,
2000) -> Result<Value, String> {
2001 let query = req
2002 .params
2003 .get("query")
2004 .and_then(|v| v.as_str())
2005 .ok_or("missing query")?;
2006 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
2007 let engine_arc = session.effective_memgine().await;
2008 let engine = engine_arc.lock().await;
2009 let seeds = engine.graph.find_seeds(query, 5);
2010 let hits = if !seeds.is_empty() {
2015 engine.graph.retrieve_ppr(&seeds, None, 0.5, k)
2016 } else {
2017 vec![]
2018 };
2019 let results: Vec<Value> = hits
2020 .iter()
2021 .filter_map(|hit| {
2022 let node = engine.graph.inner.node_weight(hit.node_ix)?;
2023 Some(serde_json::json!({
2024 "subject": node.key,
2025 "body": node.value,
2026 "kind": format!("{:?}", node.kind).to_lowercase(),
2027 "confidence": hit.activation,
2028 }))
2029 })
2030 .collect();
2031 serde_json::to_value(results).map_err(|e| e.to_string())
2032}
2033
2034async fn handle_memory_build_context(
2035 req: &JsonRpcMessage,
2036 session: &crate::session::ClientSession,
2037) -> Result<Value, String> {
2038 let query = req
2039 .params
2040 .get("query")
2041 .and_then(|v| v.as_str())
2042 .unwrap_or("");
2043 let model_context_window = req
2047 .params
2048 .get("model_context_window")
2049 .and_then(|v| v.as_u64())
2050 .map(|w| w as usize);
2051 let mut engine = session.memgine.lock().await;
2052 Ok(Value::from(
2053 engine.build_context_for_model(query, model_context_window),
2054 ))
2055}
2056
2057async fn handle_memory_build_context_fast(
2063 req: &JsonRpcMessage,
2064 session: &crate::session::ClientSession,
2065) -> Result<Value, String> {
2066 let query = req
2067 .params
2068 .get("query")
2069 .and_then(|v| v.as_str())
2070 .unwrap_or("");
2071 let model_context_window = req
2072 .params
2073 .get("model_context_window")
2074 .and_then(|v| v.as_u64())
2075 .map(|w| w as usize);
2076 let mut engine = session.memgine.lock().await;
2077 Ok(Value::from(engine.build_context_with_options(
2078 query,
2079 model_context_window,
2080 car_memgine::ContextMode::Fast,
2081 None,
2082 )))
2083}
2084
2085async fn handle_memory_persist(
2101 req: &JsonRpcMessage,
2102 session: &crate::session::ClientSession,
2103) -> Result<Value, String> {
2104 let path = req
2105 .params
2106 .get("path")
2107 .and_then(|v| v.as_str())
2108 .ok_or("missing path")?;
2109 let resolved = car_ffi_common::memory_path::resolve(path)
2110 .map_err(|e| format!("memory.persist rejected path {path:?}: {e}"))?;
2111 let engine = session.memgine.lock().await;
2112 let facts: Vec<Value> = engine
2113 .graph
2114 .inner
2115 .node_indices()
2116 .filter_map(|nix| {
2117 let node = engine.graph.inner.node_weight(nix)?;
2118 if !node.is_valid() {
2119 return None;
2120 }
2121 if node.kind == car_memgine::MemKind::Identity
2122 || node.kind == car_memgine::MemKind::Environment
2123 {
2124 return None;
2125 }
2126 Some(serde_json::json!({
2127 "subject": node.key,
2128 "body": node.value,
2129 "kind": match node.kind {
2130 car_memgine::MemKind::Fact => if node.is_constraint { "constraint" } else { "pattern" },
2131 car_memgine::MemKind::Conversation => "outcome",
2132 _ => "pattern",
2133 },
2134 "confidence": 0.5,
2135 "content_type": node.content_type.as_label(),
2136 }))
2137 })
2138 .collect();
2139 let count = facts.len();
2140 let json = serde_json::to_string(&facts).map_err(|e| e.to_string())?;
2141 std::fs::write(&resolved, json)
2142 .map_err(|e| format!("failed to write {}: {}", resolved.display(), e))?;
2143 Ok(Value::from(count as u64))
2144}
2145
2146async fn handle_memory_load(
2152 req: &JsonRpcMessage,
2153 session: &crate::session::ClientSession,
2154) -> Result<Value, String> {
2155 let path = req
2156 .params
2157 .get("path")
2158 .and_then(|v| v.as_str())
2159 .ok_or("missing path")?;
2160 let resolved = car_ffi_common::memory_path::resolve(path)
2161 .map_err(|e| format!("memory.load rejected path {path:?}: {e}"))?;
2162 let content = std::fs::read_to_string(&resolved)
2163 .map_err(|e| format!("failed to read {}: {}", resolved.display(), e))?;
2164 let facts: Vec<Value> =
2165 serde_json::from_str(&content).map_err(|e| format!("invalid JSON: {}", e))?;
2166 let mut engine = session.memgine.lock().await;
2167 engine.reset();
2168 let mut count: u32 = 0;
2169 for fact in &facts {
2170 let subject = fact.get("subject").and_then(|v| v.as_str()).unwrap_or("");
2171 let body = fact.get("body").and_then(|v| v.as_str()).unwrap_or("");
2172 let kind = fact
2173 .get("kind")
2174 .and_then(|v| v.as_str())
2175 .unwrap_or("pattern");
2176 let fid = format!("loaded-{}", count);
2177 engine.ingest_fact(
2178 &fid,
2179 subject,
2180 body,
2181 "user",
2182 "peer",
2183 chrono::Utc::now(),
2184 "global",
2185 None,
2186 vec![],
2187 kind == "constraint",
2188 );
2189 count += 1;
2190 }
2191 Ok(Value::from(count))
2192}
2193
2194async fn handle_skill_ingest(
2197 req: &JsonRpcMessage,
2198 session: &crate::session::ClientSession,
2199) -> Result<Value, String> {
2200 let name = req
2201 .params
2202 .get("name")
2203 .and_then(|v| v.as_str())
2204 .ok_or("missing name")?;
2205 let code = req
2206 .params
2207 .get("code")
2208 .and_then(|v| v.as_str())
2209 .ok_or("missing code")?;
2210 let platform = req
2211 .params
2212 .get("platform")
2213 .and_then(|v| v.as_str())
2214 .unwrap_or("unknown");
2215 let persona = req
2216 .params
2217 .get("persona")
2218 .and_then(|v| v.as_str())
2219 .unwrap_or("");
2220 let url_pattern = req
2221 .params
2222 .get("url_pattern")
2223 .and_then(|v| v.as_str())
2224 .unwrap_or("");
2225 let description = req
2226 .params
2227 .get("description")
2228 .and_then(|v| v.as_str())
2229 .unwrap_or("");
2230 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
2231 let keywords: Vec<String> = req
2232 .params
2233 .get("task_keywords")
2234 .and_then(|v| v.as_array())
2235 .map(|arr| {
2236 arr.iter()
2237 .filter_map(|v| v.as_str().map(String::from))
2238 .collect()
2239 })
2240 .unwrap_or_default();
2241
2242 let trigger = car_memgine::SkillTrigger {
2243 persona: persona.into(),
2244 url_pattern: url_pattern.into(),
2245 task_keywords: keywords,
2246 structured: None,
2247 };
2248 let mut engine = session.memgine.lock().await;
2249 let node = engine.ingest_skill(
2250 name,
2251 code,
2252 platform,
2253 trigger,
2254 description,
2255 supersedes,
2256 vec![],
2257 vec![],
2258 );
2259 Ok(Value::from(node.index() as u64))
2260}
2261
2262async fn handle_skill_find(
2263 req: &JsonRpcMessage,
2264 session: &crate::session::ClientSession,
2265) -> Result<Value, String> {
2266 let persona = req
2267 .params
2268 .get("persona")
2269 .and_then(|v| v.as_str())
2270 .unwrap_or("");
2271 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
2272 let task = req
2273 .params
2274 .get("task")
2275 .and_then(|v| v.as_str())
2276 .unwrap_or("");
2277 let max = req
2278 .params
2279 .get("max_results")
2280 .and_then(|v| v.as_u64())
2281 .unwrap_or(1) as usize;
2282 let engine = session.memgine.lock().await;
2283 let results = engine.find_skill(persona, url, task, max);
2284 let json: Vec<Value> = results
2285 .iter()
2286 .map(|(m, s)| {
2287 serde_json::json!({
2288 "name": m.name, "code": m.code, "platform": m.platform,
2289 "description": m.description, "stats": m.stats, "match_score": s,
2290 })
2291 })
2292 .collect();
2293 serde_json::to_value(json).map_err(|e| e.to_string())
2294}
2295
2296async fn handle_skill_report(
2297 req: &JsonRpcMessage,
2298 session: &crate::session::ClientSession,
2299) -> Result<Value, String> {
2300 let name = req
2301 .params
2302 .get("skill_name")
2303 .and_then(|v| v.as_str())
2304 .ok_or("missing skill_name")?;
2305 let outcome_str = req
2306 .params
2307 .get("outcome")
2308 .and_then(|v| v.as_str())
2309 .ok_or("missing outcome")?;
2310 let outcome = match outcome_str {
2311 "success" => car_memgine::SkillOutcome::Success,
2312 _ => car_memgine::SkillOutcome::Fail,
2313 };
2314 let mut engine = session.memgine.lock().await;
2315 let stats = engine
2316 .report_outcome(name, outcome)
2317 .ok_or(format!("skill '{}' not found", name))?;
2318 serde_json::to_value(stats).map_err(|e| e.to_string())
2319}
2320
2321struct WsAgentRunner {
2330 channel: Arc<WsChannel>,
2331 host: Arc<crate::host::HostState>,
2332 client_id: String,
2333}
2334
2335#[async_trait::async_trait]
2336impl car_multi::AgentRunner for WsAgentRunner {
2337 async fn run(
2338 &self,
2339 spec: &car_multi::AgentSpec,
2340 task: &str,
2341 _runtime: &car_engine::Runtime,
2342 _mailbox: &car_multi::Mailbox,
2343 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
2344 use futures::SinkExt;
2345
2346 let request_id = self.channel.next_request_id();
2347 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
2348 let agent = self
2349 .host
2350 .register_agent(
2351 &self.client_id,
2352 RegisterHostAgentRequest {
2353 id: Some(agent_id.clone()),
2354 name: spec.name.clone(),
2355 kind: "callback".to_string(),
2356 capabilities: spec.tools.clone(),
2357 project: spec
2358 .metadata
2359 .get("project")
2360 .and_then(|v| v.as_str())
2361 .map(str::to_string),
2362 pid: None,
2363 display: serde_json::from_value(
2364 spec.metadata
2365 .get("display")
2366 .cloned()
2367 .unwrap_or(serde_json::Value::Null),
2368 )
2369 .unwrap_or_default(),
2370 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
2371 },
2372 )
2373 .await
2374 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
2375 let _ = self
2376 .host
2377 .set_status(
2378 &self.client_id,
2379 SetHostAgentStatusRequest {
2380 agent_id: agent.id.clone(),
2381 status: HostAgentStatus::Running,
2382 current_task: Some(task.to_string()),
2383 message: Some(format!("{} started", spec.name)),
2384 payload: serde_json::json!({ "task": task }),
2385 },
2386 )
2387 .await;
2388
2389 let rpc_request = serde_json::json!({
2390 "jsonrpc": "2.0",
2391 "method": "multi.run_agent",
2392 "params": {
2393 "spec": spec,
2394 "task": task,
2395 },
2396 "id": request_id,
2397 });
2398
2399 let (tx, rx) = tokio::sync::oneshot::channel();
2401 self.channel
2402 .pending
2403 .lock()
2404 .await
2405 .insert(request_id.clone(), tx);
2406
2407 let msg = Message::Text(
2408 serde_json::to_string(&rpc_request)
2409 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
2410 .into(),
2411 );
2412 if let Err(e) = self.channel.write.lock().await.send(msg).await {
2413 let _ = self
2414 .host
2415 .set_status(
2416 &self.client_id,
2417 SetHostAgentStatusRequest {
2418 agent_id: agent_id.clone(),
2419 status: HostAgentStatus::Errored,
2420 current_task: None,
2421 message: Some(format!("{} failed to start", spec.name)),
2422 payload: serde_json::json!({ "error": e.to_string() }),
2423 },
2424 )
2425 .await;
2426 return Err(car_multi::MultiError::AgentFailed(
2427 spec.name.clone(),
2428 format!("ws send error: {}", e),
2429 ));
2430 }
2431
2432 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
2434 Ok(Ok(response)) => response,
2435 Ok(Err(_)) => {
2436 let _ = self
2437 .host
2438 .set_status(
2439 &self.client_id,
2440 SetHostAgentStatusRequest {
2441 agent_id: agent_id.clone(),
2442 status: HostAgentStatus::Errored,
2443 current_task: None,
2444 message: Some(format!("{} callback channel closed", spec.name)),
2445 payload: Value::Null,
2446 },
2447 )
2448 .await;
2449 return Err(car_multi::MultiError::AgentFailed(
2450 spec.name.clone(),
2451 "agent callback channel closed".into(),
2452 ));
2453 }
2454 Err(_) => {
2455 let _ = self
2456 .host
2457 .set_status(
2458 &self.client_id,
2459 SetHostAgentStatusRequest {
2460 agent_id: agent_id.clone(),
2461 status: HostAgentStatus::Errored,
2462 current_task: None,
2463 message: Some(format!("{} timed out", spec.name)),
2464 payload: Value::Null,
2465 },
2466 )
2467 .await;
2468 return Err(car_multi::MultiError::AgentFailed(
2469 spec.name.clone(),
2470 "agent callback timed out (300s)".into(),
2471 ));
2472 }
2473 };
2474
2475 if let Some(err) = response.error {
2476 let _ = self
2477 .host
2478 .set_status(
2479 &self.client_id,
2480 SetHostAgentStatusRequest {
2481 agent_id: agent_id.clone(),
2482 status: HostAgentStatus::Errored,
2483 current_task: None,
2484 message: Some(format!("{} errored", spec.name)),
2485 payload: serde_json::json!({ "error": err }),
2486 },
2487 )
2488 .await;
2489 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
2490 }
2491
2492 let output_value = response.output.unwrap_or(Value::Null);
2493 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
2494 car_multi::MultiError::AgentFailed(
2495 spec.name.clone(),
2496 format!("invalid AgentOutput: {}", e),
2497 )
2498 })?;
2499 let status = if output.error.is_some() {
2500 HostAgentStatus::Errored
2501 } else {
2502 HostAgentStatus::Completed
2503 };
2504 let message = if output.error.is_some() {
2505 format!("{} errored", spec.name)
2506 } else {
2507 format!("{} completed", spec.name)
2508 };
2509 let _ = self
2510 .host
2511 .set_status(
2512 &self.client_id,
2513 SetHostAgentStatusRequest {
2514 agent_id,
2515 status,
2516 current_task: None,
2517 message: Some(message),
2518 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
2519 },
2520 )
2521 .await;
2522
2523 Ok(output)
2524 }
2525}
2526
2527fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
2528 let safe_name: String = name
2529 .chars()
2530 .map(|c| {
2531 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
2532 c
2533 } else {
2534 '-'
2535 }
2536 })
2537 .collect();
2538 format!("{}:{}:{}", client_id, safe_name, request_id)
2539}
2540
2541async fn handle_multi_swarm(
2542 req: &JsonRpcMessage,
2543 session: &crate::session::ClientSession,
2544) -> Result<Value, String> {
2545 let mode_str = req
2546 .params
2547 .get("mode")
2548 .and_then(|v| v.as_str())
2549 .ok_or("missing 'mode'")?;
2550 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2551 let task = req
2552 .params
2553 .get("task")
2554 .and_then(|v| v.as_str())
2555 .ok_or("missing 'task'")?;
2556
2557 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
2558 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
2559 let agent_specs: Vec<car_multi::AgentSpec> =
2560 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2561 let synth: Option<car_multi::AgentSpec> = req
2562 .params
2563 .get("synthesizer")
2564 .map(|v| serde_json::from_value(v.clone()))
2565 .transpose()
2566 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2567
2568 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2569 channel: session.channel.clone(),
2570 host: session.host.clone(),
2571 client_id: session.client_id.clone(),
2572 });
2573 let infra = car_multi::SharedInfra::new();
2574
2575 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
2576 if let Some(s) = synth {
2577 swarm = swarm.with_synthesizer(s);
2578 }
2579
2580 let result = swarm
2581 .run(task, &runner, &infra)
2582 .await
2583 .map_err(|e| format!("swarm error: {}", e))?;
2584 serde_json::to_value(result).map_err(|e| e.to_string())
2585}
2586
2587async fn handle_multi_pipeline(
2588 req: &JsonRpcMessage,
2589 session: &crate::session::ClientSession,
2590) -> Result<Value, String> {
2591 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
2592 let task = req
2593 .params
2594 .get("task")
2595 .and_then(|v| v.as_str())
2596 .ok_or("missing 'task'")?;
2597
2598 let stage_specs: Vec<car_multi::AgentSpec> =
2599 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
2600
2601 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2602 channel: session.channel.clone(),
2603 host: session.host.clone(),
2604 client_id: session.client_id.clone(),
2605 });
2606 let infra = car_multi::SharedInfra::new();
2607
2608 let result = car_multi::Pipeline::new(stage_specs)
2609 .run(task, &runner, &infra)
2610 .await
2611 .map_err(|e| format!("pipeline error: {}", e))?;
2612 serde_json::to_value(result).map_err(|e| e.to_string())
2613}
2614
2615async fn handle_multi_supervisor(
2616 req: &JsonRpcMessage,
2617 session: &crate::session::ClientSession,
2618) -> Result<Value, String> {
2619 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
2620 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
2621 let task = req
2622 .params
2623 .get("task")
2624 .and_then(|v| v.as_str())
2625 .ok_or("missing 'task'")?;
2626 let max_rounds = req
2627 .params
2628 .get("max_rounds")
2629 .and_then(|v| v.as_u64())
2630 .unwrap_or(3) as u32;
2631
2632 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
2633 .map_err(|e| format!("invalid workers: {}", e))?;
2634 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
2635 .map_err(|e| format!("invalid supervisor: {}", e))?;
2636
2637 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2638 channel: session.channel.clone(),
2639 host: session.host.clone(),
2640 client_id: session.client_id.clone(),
2641 });
2642 let infra = car_multi::SharedInfra::new();
2643
2644 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
2645 .with_max_rounds(max_rounds)
2646 .run(task, &runner, &infra)
2647 .await
2648 .map_err(|e| format!("supervisor error: {}", e))?;
2649 serde_json::to_value(result).map_err(|e| e.to_string())
2650}
2651
2652async fn handle_multi_map_reduce(
2653 req: &JsonRpcMessage,
2654 session: &crate::session::ClientSession,
2655) -> Result<Value, String> {
2656 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
2657 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
2658 let task = req
2659 .params
2660 .get("task")
2661 .and_then(|v| v.as_str())
2662 .ok_or("missing 'task'")?;
2663 let items_val = req.params.get("items").ok_or("missing 'items'")?;
2664
2665 let mapper_spec: car_multi::AgentSpec =
2666 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
2667 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
2668 .map_err(|e| format!("invalid reducer: {}", e))?;
2669 let items: Vec<String> =
2670 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
2671
2672 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2673 channel: session.channel.clone(),
2674 host: session.host.clone(),
2675 client_id: session.client_id.clone(),
2676 });
2677 let infra = car_multi::SharedInfra::new();
2678
2679 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
2680 .run(task, &items, &runner, &infra)
2681 .await
2682 .map_err(|e| format!("map_reduce error: {}", e))?;
2683 serde_json::to_value(result).map_err(|e| e.to_string())
2684}
2685
2686async fn handle_multi_vote(
2687 req: &JsonRpcMessage,
2688 session: &crate::session::ClientSession,
2689) -> Result<Value, String> {
2690 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
2691 let task = req
2692 .params
2693 .get("task")
2694 .and_then(|v| v.as_str())
2695 .ok_or("missing 'task'")?;
2696
2697 let agent_specs: Vec<car_multi::AgentSpec> =
2698 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
2699 let synth: Option<car_multi::AgentSpec> = req
2700 .params
2701 .get("synthesizer")
2702 .map(|v| serde_json::from_value(v.clone()))
2703 .transpose()
2704 .map_err(|e| format!("invalid synthesizer: {}", e))?;
2705
2706 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2707 channel: session.channel.clone(),
2708 host: session.host.clone(),
2709 client_id: session.client_id.clone(),
2710 });
2711 let infra = car_multi::SharedInfra::new();
2712
2713 let mut vote = car_multi::Vote::new(agent_specs);
2714 if let Some(s) = synth {
2715 vote = vote.with_synthesizer(s);
2716 }
2717
2718 let result = vote
2719 .run(task, &runner, &infra)
2720 .await
2721 .map_err(|e| format!("vote error: {}", e))?;
2722 serde_json::to_value(result).map_err(|e| e.to_string())
2723}
2724
2725fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
2730 let name = req
2731 .params
2732 .get("name")
2733 .and_then(|v| v.as_str())
2734 .ok_or("scheduler.create requires 'name'")?;
2735 let prompt = req
2736 .params
2737 .get("prompt")
2738 .and_then(|v| v.as_str())
2739 .ok_or("scheduler.create requires 'prompt'")?;
2740
2741 let mut task = car_scheduler::Task::new(name, prompt);
2742
2743 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
2744 let trigger = match t {
2745 "once" => car_scheduler::TaskTrigger::Once,
2746 "cron" => car_scheduler::TaskTrigger::Cron,
2747 "interval" => car_scheduler::TaskTrigger::Interval,
2748 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
2749 _ => car_scheduler::TaskTrigger::Manual,
2750 };
2751 let schedule = req
2752 .params
2753 .get("schedule")
2754 .and_then(|v| v.as_str())
2755 .unwrap_or("");
2756 task = task.with_trigger(trigger, schedule);
2757 }
2758
2759 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
2760 task = task.with_system_prompt(sp);
2761 }
2762
2763 serde_json::to_value(&task).map_err(|e| e.to_string())
2764}
2765
2766async fn handle_scheduler_run(
2767 req: &JsonRpcMessage,
2768 session: &crate::session::ClientSession,
2769) -> Result<Value, String> {
2770 let task_val = req
2771 .params
2772 .get("task")
2773 .ok_or("scheduler.run requires 'task'")?;
2774 let mut task: car_scheduler::Task =
2775 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2776
2777 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2778 channel: session.channel.clone(),
2779 host: session.host.clone(),
2780 client_id: session.client_id.clone(),
2781 });
2782 let executor = car_scheduler::Executor::new(runner);
2783 let execution = executor.run_once(&mut task).await;
2784
2785 serde_json::to_value(&execution).map_err(|e| e.to_string())
2786}
2787
2788async fn handle_scheduler_run_loop(
2789 req: &JsonRpcMessage,
2790 session: &crate::session::ClientSession,
2791) -> Result<Value, String> {
2792 let task_val = req
2793 .params
2794 .get("task")
2795 .ok_or("scheduler.run_loop requires 'task'")?;
2796 let mut task: car_scheduler::Task =
2797 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
2798 let max_iterations = req
2799 .params
2800 .get("max_iterations")
2801 .and_then(|v| v.as_u64())
2802 .map(|v| v as u32);
2803
2804 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2805 channel: session.channel.clone(),
2806 host: session.host.clone(),
2807 client_id: session.client_id.clone(),
2808 });
2809 let executor = car_scheduler::Executor::new(runner);
2810 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
2811 let executions = executor
2812 .run_loop(&mut task, max_iterations, cancel_rx)
2813 .await;
2814
2815 serde_json::to_value(&executions).map_err(|e| e.to_string())
2816}
2817
2818fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
2823 state.inference.get_or_init(|| {
2824 Arc::new(car_inference::InferenceEngine::new(
2825 car_inference::InferenceConfig::default(),
2826 ))
2827 })
2828}
2829
2830async fn handle_infer(
2831 msg: &JsonRpcMessage,
2832 state: &ServerState,
2833 session: &crate::session::ClientSession,
2834) -> Result<Value, String> {
2835 let engine = get_inference_engine(state);
2836 let mut req: car_inference::GenerateRequest =
2837 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2838
2839 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2841 let mut memgine = session.memgine.lock().await;
2842 let ctx = memgine.build_context(cq);
2843 if !ctx.is_empty() {
2844 req.context = Some(ctx);
2845 }
2846 }
2847
2848 let _permit = state.admission.acquire().await;
2854
2855 let result = engine
2866 .generate_tracked(req)
2867 .await
2868 .map_err(|e| e.to_string())?;
2869 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2870}
2871
2872async fn handle_image_generate(
2902 msg: &JsonRpcMessage,
2903 state: &ServerState,
2904) -> Result<Value, String> {
2905 let engine = get_inference_engine(state);
2906 let req: car_inference::GenerateImageRequest = serde_json::from_value(msg.params.clone())
2907 .map_err(|e| format!("invalid params: {}", e))?;
2908 let _permit = state.admission.acquire().await;
2911 let result = engine
2912 .generate_image(req)
2913 .await
2914 .map_err(|e| e.to_string())?;
2915 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2916}
2917
2918async fn handle_video_generate(
2919 msg: &JsonRpcMessage,
2920 state: &ServerState,
2921) -> Result<Value, String> {
2922 let engine = get_inference_engine(state);
2923 let req: car_inference::GenerateVideoRequest = serde_json::from_value(msg.params.clone())
2924 .map_err(|e| format!("invalid params: {}", e))?;
2925 let _permit = state.admission.acquire().await;
2926 let result = engine
2927 .generate_video(req)
2928 .await
2929 .map_err(|e| e.to_string())?;
2930 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
2931}
2932
2933async fn handle_infer_stream(
2934 msg: &JsonRpcMessage,
2935 session: &crate::session::ClientSession,
2936 state: &ServerState,
2937) -> Result<Value, String> {
2938 use futures::SinkExt;
2939 use tokio_tungstenite::tungstenite::Message;
2940
2941 let engine = get_inference_engine(state);
2942 let mut req: car_inference::GenerateRequest =
2943 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2944
2945 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
2948 let mut memgine = session.memgine.lock().await;
2949 let ctx = memgine.build_context(cq);
2950 if !ctx.is_empty() {
2951 req.context = Some(ctx);
2952 }
2953 }
2954
2955 let _permit = state.admission.acquire().await;
2956 let mut rx = engine
2957 .generate_tracked_stream(req)
2958 .await
2959 .map_err(|e| e.to_string())?;
2960
2961 let mut accumulator = car_inference::StreamAccumulator::default();
2962 let request_id = msg.id.clone();
2963
2964 while let Some(event) = rx.recv().await {
2965 let event_payload = match &event {
2966 car_inference::StreamEvent::TextDelta(text) => {
2967 serde_json::json!({"type": "text", "data": text})
2968 }
2969 car_inference::StreamEvent::ToolCallStart { name, index, .. } => {
2970 serde_json::json!({"type": "tool_start", "name": name, "index": index})
2971 }
2972 car_inference::StreamEvent::ToolCallDelta {
2973 index,
2974 arguments_delta,
2975 } => serde_json::json!({
2976 "type": "tool_delta",
2977 "index": index,
2978 "data": arguments_delta,
2979 }),
2980 car_inference::StreamEvent::Usage {
2981 input_tokens,
2982 output_tokens,
2983 } => serde_json::json!({
2984 "type": "usage",
2985 "input_tokens": input_tokens,
2986 "output_tokens": output_tokens,
2987 }),
2988 car_inference::StreamEvent::Done { .. } => {
2993 accumulator.push(&event);
2994 continue;
2995 }
2996 };
2997
2998 let notif = serde_json::json!({
2999 "jsonrpc": "2.0",
3000 "method": "inference.stream.event",
3001 "params": {
3002 "request_id": request_id,
3003 "event": event_payload,
3004 },
3005 });
3006 if let Ok(text) = serde_json::to_string(¬if) {
3007 let _ = session
3008 .channel
3009 .write
3010 .lock()
3011 .await
3012 .send(Message::Text(text.into()))
3013 .await;
3014 }
3015 accumulator.push(&event);
3016 }
3017
3018 let (text, tool_calls, usage) = accumulator.finish_with_usage();
3019 Ok(serde_json::json!({
3020 "text": text,
3021 "tool_calls": tool_calls,
3022 "usage": usage,
3023 }))
3024}
3025
3026async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3027 let engine = get_inference_engine(state);
3028 let req: car_inference::EmbedRequest =
3029 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3030 let _permit = state.admission.acquire().await;
3034 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
3035 Ok(serde_json::json!({"embeddings": result}))
3036}
3037
3038async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3039 let engine = get_inference_engine(state);
3040 let req: car_inference::ClassifyRequest =
3041 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3042 let _permit = state.admission.acquire().await;
3043 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
3044 Ok(serde_json::json!({"classifications": result}))
3045}
3046
3047fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
3051 let total = state.admission.permits();
3052 let available = state.admission.permits_available();
3053 let in_use = total.saturating_sub(available);
3054 Ok(serde_json::json!({
3055 "permits_total": total,
3056 "permits_available": available,
3057 "permits_in_use": in_use,
3058 "env_override": crate::admission::ENV_MAX_CONCURRENT,
3059 }))
3060}
3061
3062async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3063 let model = msg
3064 .params
3065 .get("model")
3066 .and_then(|v| v.as_str())
3067 .ok_or("missing 'model' parameter")?;
3068 let text = msg
3069 .params
3070 .get("text")
3071 .and_then(|v| v.as_str())
3072 .ok_or("missing 'text' parameter")?;
3073 let engine = get_inference_engine(state);
3074 let ids = engine
3075 .tokenize(model, text)
3076 .await
3077 .map_err(|e| e.to_string())?;
3078 Ok(serde_json::json!({"tokens": ids}))
3079}
3080
3081async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3082 let model = msg
3083 .params
3084 .get("model")
3085 .and_then(|v| v.as_str())
3086 .ok_or("missing 'model' parameter")?;
3087 let tokens: Vec<u32> = msg
3088 .params
3089 .get("tokens")
3090 .and_then(|v| v.as_array())
3091 .ok_or("missing 'tokens' parameter")?
3092 .iter()
3093 .map(|t| {
3094 t.as_u64()
3095 .and_then(|n| u32::try_from(n).ok())
3096 .ok_or_else(|| "tokens[] must be u32 values".to_string())
3097 })
3098 .collect::<Result<Vec<_>, _>>()?;
3099 let engine = get_inference_engine(state);
3100 let text = engine
3101 .detokenize(model, &tokens)
3102 .await
3103 .map_err(|e| e.to_string())?;
3104 Ok(serde_json::json!({"text": text}))
3105}
3106
3107async fn handle_models_register(
3126 req: &JsonRpcMessage,
3127 _state: &Arc<ServerState>,
3128) -> Result<Value, String> {
3129 let schema_value = match req.params.get("schema") {
3133 Some(v) => v.clone(),
3134 None => req.params.clone(),
3135 };
3136 let schema: car_inference::ModelSchema = serde_json::from_value(schema_value)
3137 .map_err(|e| format!("invalid ModelSchema: {e}"))?;
3138 let id = schema.id.clone();
3139
3140 let home = std::env::var_os("HOME")
3145 .or_else(|| std::env::var_os("USERPROFILE"))
3146 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3147 let car_dir = std::path::PathBuf::from(home).join(".car");
3148 std::fs::create_dir_all(&car_dir).map_err(|e| format!("create {}: {e}", car_dir.display()))?;
3149 let path = car_dir.join("models.json");
3150
3151 let mut models: Vec<car_inference::ModelSchema> = if path.exists() {
3152 let text = std::fs::read_to_string(&path)
3153 .map_err(|e| format!("read {}: {e}", path.display()))?;
3154 if text.trim().is_empty() {
3155 Vec::new()
3156 } else {
3157 serde_json::from_str(&text)
3158 .map_err(|e| format!("parse {}: {e}", path.display()))?
3159 }
3160 } else {
3161 Vec::new()
3162 };
3163 if let Some(slot) = models.iter_mut().find(|m| m.id == id) {
3165 *slot = schema;
3166 } else {
3167 models.push(schema);
3168 }
3169 let json = serde_json::to_string_pretty(&models)
3170 .map_err(|e| format!("serialize models.json: {e}"))?;
3171 let tmp = path.with_extension("json.tmp");
3172 std::fs::write(&tmp, json)
3173 .map_err(|e| format!("write {}: {e}", tmp.display()))?;
3174 std::fs::rename(&tmp, &path)
3175 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3176 Ok(serde_json::json!({
3177 "id": id,
3178 "registered": true,
3179 "path": path.to_string_lossy(),
3180 "note": "Daemon restart required for live UnifiedRegistry visibility \
3181 (Parslee-ai/car-releases#39 phase 1). The model is persisted; \
3182 next car-server boot loads it via UnifiedRegistry::load_user_config.",
3183 }))
3184}
3185
3186async fn handle_models_unregister(
3197 req: &JsonRpcMessage,
3198 _state: &Arc<ServerState>,
3199) -> Result<Value, String> {
3200 let id = match req.params.get("id") {
3204 Some(v) => v
3205 .as_str()
3206 .ok_or_else(|| "`id` must be a string".to_string())?
3207 .to_string(),
3208 None => match req.params.as_str() {
3209 Some(s) => s.to_string(),
3210 None => return Err("missing `id` parameter".to_string()),
3211 },
3212 };
3213
3214 let home = std::env::var_os("HOME")
3215 .or_else(|| std::env::var_os("USERPROFILE"))
3216 .ok_or_else(|| "no HOME / USERPROFILE in env".to_string())?;
3217 let car_dir = std::path::PathBuf::from(home).join(".car");
3218 let path = car_dir.join("models.json");
3219
3220 if !path.exists() {
3221 return Err(format!("no models.json at {} — nothing to unregister", path.display()));
3222 }
3223 let text = std::fs::read_to_string(&path)
3224 .map_err(|e| format!("read {}: {e}", path.display()))?;
3225 let mut models: Vec<car_inference::ModelSchema> = if text.trim().is_empty() {
3226 Vec::new()
3227 } else {
3228 serde_json::from_str(&text)
3229 .map_err(|e| format!("parse {}: {e}", path.display()))?
3230 };
3231 let before = models.len();
3232 models.retain(|m| m.id != id);
3233 if models.len() == before {
3234 return Err(format!("model {} not found in {}", id, path.display()));
3235 }
3236 let json = serde_json::to_string_pretty(&models)
3237 .map_err(|e| format!("serialize models.json: {e}"))?;
3238 let tmp = path.with_extension("json.tmp");
3239 std::fs::write(&tmp, json)
3240 .map_err(|e| format!("write {}: {e}", tmp.display()))?;
3241 std::fs::rename(&tmp, &path)
3242 .map_err(|e| format!("rename {} -> {}: {e}", tmp.display(), path.display()))?;
3243 Ok(serde_json::json!({
3244 "id": id,
3245 "unregistered": true,
3246 "path": path.to_string_lossy(),
3247 "note": "Daemon restart required for live UnifiedRegistry visibility \
3248 (phase 1, matching models.register).",
3249 }))
3250}
3251
3252fn handle_models_list(state: &ServerState) -> Result<Value, String> {
3253 let engine = get_inference_engine(state);
3254 let models = engine.list_models();
3255 serde_json::to_value(&models).map_err(|e| e.to_string())
3256}
3257
3258fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
3259 let engine = get_inference_engine(state);
3260 let models = engine.list_models_unified();
3261 serde_json::to_value(&models).map_err(|e| e.to_string())
3262}
3263
3264#[derive(Debug, Deserialize)]
3265#[serde(rename_all = "camelCase")]
3266struct ModelSearchParams {
3267 #[serde(default)]
3268 query: Option<String>,
3269 #[serde(default)]
3270 capability: Option<car_inference::ModelCapability>,
3271 #[serde(default)]
3272 provider: Option<String>,
3273 #[serde(default)]
3274 local_only: bool,
3275 #[serde(default)]
3276 available_only: bool,
3277 #[serde(default)]
3278 limit: Option<usize>,
3279}
3280
3281#[derive(Debug, Serialize)]
3282#[serde(rename_all = "camelCase")]
3283struct ModelSearchEntry {
3284 #[serde(flatten)]
3285 info: car_inference::ModelInfo,
3286 family: String,
3287 version: String,
3288 tags: Vec<String>,
3289 pullable: bool,
3290 upgrade: Option<car_inference::ModelUpgrade>,
3291}
3292
3293#[derive(Debug, Serialize)]
3294#[serde(rename_all = "camelCase")]
3295struct ModelSearchResponse {
3296 models: Vec<ModelSearchEntry>,
3297 upgrades: Vec<car_inference::ModelUpgrade>,
3298 total: usize,
3299 available: usize,
3300 local: usize,
3301 remote: usize,
3302}
3303
3304fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3305 let params: ModelSearchParams =
3306 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
3307 query: None,
3308 capability: None,
3309 provider: None,
3310 local_only: false,
3311 available_only: false,
3312 limit: None,
3313 });
3314 let engine = get_inference_engine(state);
3315 let upgrades = engine.available_model_upgrades();
3316 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
3317 .iter()
3318 .cloned()
3319 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
3320 .collect();
3321 let query = params
3322 .query
3323 .as_deref()
3324 .map(str::trim)
3325 .filter(|q| !q.is_empty())
3326 .map(|q| q.to_ascii_lowercase());
3327 let provider = params
3328 .provider
3329 .as_deref()
3330 .map(str::trim)
3331 .filter(|p| !p.is_empty())
3332 .map(|p| p.to_ascii_lowercase());
3333
3334 let mut entries: Vec<ModelSearchEntry> = engine
3335 .list_schemas()
3336 .into_iter()
3337 .filter(|schema| {
3338 if let Some(capability) = params.capability {
3339 if !schema.has_capability(capability) {
3340 return false;
3341 }
3342 }
3343 if let Some(provider) = provider.as_deref() {
3344 if schema.provider.to_ascii_lowercase() != provider {
3345 return false;
3346 }
3347 }
3348 if params.local_only && !schema.is_local() {
3349 return false;
3350 }
3351 if params.available_only && !schema.available {
3352 return false;
3353 }
3354 if let Some(query) = query.as_deref() {
3355 let capability_text = schema
3356 .capabilities
3357 .iter()
3358 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
3359 .collect::<Vec<_>>()
3360 .join(" ");
3361 let haystack = format!(
3362 "{} {} {} {} {} {}",
3363 schema.id,
3364 schema.name,
3365 schema.provider,
3366 schema.family,
3367 schema.tags.join(" "),
3368 capability_text
3369 )
3370 .to_ascii_lowercase();
3371 if !haystack.contains(query) {
3372 return false;
3373 }
3374 }
3375 true
3376 })
3377 .map(|schema| {
3378 let pullable = !schema.available
3379 && matches!(
3380 schema.source,
3381 car_inference::ModelSource::Local { .. }
3382 | car_inference::ModelSource::Mlx { .. }
3383 );
3384 let info = car_inference::ModelInfo::from(&schema);
3385 let upgrade = upgrades_by_from.get(&schema.id).cloned();
3386 ModelSearchEntry {
3387 info,
3388 family: schema.family,
3389 version: schema.version,
3390 tags: schema.tags,
3391 pullable,
3392 upgrade,
3393 }
3394 })
3395 .collect();
3396 entries.sort_by(|a, b| {
3397 b.info
3398 .available
3399 .cmp(&a.info.available)
3400 .then(b.info.is_local.cmp(&a.info.is_local))
3401 .then(a.info.name.cmp(&b.info.name))
3402 });
3403 if let Some(limit) = params.limit {
3404 entries.truncate(limit);
3405 }
3406
3407 let total = entries.len();
3408 let available = entries.iter().filter(|entry| entry.info.available).count();
3409 let local = entries.iter().filter(|entry| entry.info.is_local).count();
3410 let response = ModelSearchResponse {
3411 models: entries,
3412 upgrades,
3413 total,
3414 available,
3415 local,
3416 remote: total.saturating_sub(local),
3417 };
3418 serde_json::to_value(response).map_err(|e| e.to_string())
3419}
3420
3421fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
3422 let engine = get_inference_engine(state);
3423 serde_json::to_value(serde_json::json!({
3424 "upgrades": engine.available_model_upgrades()
3425 }))
3426 .map_err(|e| e.to_string())
3427}
3428
3429async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3430 let name = msg
3431 .params
3432 .get("name")
3433 .or_else(|| msg.params.get("id"))
3434 .or_else(|| msg.params.get("model"))
3435 .and_then(|v| v.as_str())
3436 .ok_or("missing 'name' parameter")?;
3437 let engine = get_inference_engine(state);
3438 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
3439 Ok(serde_json::json!({"path": path.display().to_string()}))
3440}
3441
3442async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3443 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3444 msg.params
3445 .get("events")
3446 .cloned()
3447 .unwrap_or(msg.params.clone()),
3448 )
3449 .map_err(|e| format!("invalid events: {}", e))?;
3450
3451 let inference = get_inference_engine(state).clone();
3452 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
3453
3454 let skills = engine.distill_skills(&events).await;
3455 serde_json::to_value(&skills).map_err(|e| e.to_string())
3456}
3457
3458async fn handle_memory_consolidate(
3462 session: &crate::session::ClientSession,
3463) -> Result<Value, String> {
3464 let engine_arc = session.effective_memgine().await;
3465 let report = {
3466 let mut engine = engine_arc.lock().await;
3467 engine.consolidate().await
3468 };
3469 if let Some(id) = session.agent_id.lock().await.clone() {
3470 if let Err(e) = persist_agent_memgine(&id, &engine_arc).await {
3471 tracing::warn!(agent_id = %id, error = %e,
3472 "agent memgine persist after consolidate failed");
3473 }
3474 }
3475 serde_json::to_value(&report).map_err(|e| e.to_string())
3476}
3477
3478async fn handle_skill_repair(
3482 msg: &JsonRpcMessage,
3483 session: &crate::session::ClientSession,
3484) -> Result<Value, String> {
3485 let name = msg
3486 .params
3487 .get("skill_name")
3488 .and_then(|v| v.as_str())
3489 .ok_or("missing 'skill_name' parameter")?;
3490 let mut engine = session.memgine.lock().await;
3491 let code = engine.repair_skill(name).await;
3492 Ok(match code {
3493 Some(c) => serde_json::json!({ "code": c }),
3494 None => Value::Null,
3495 })
3496}
3497
3498async fn handle_skills_ingest_distilled(
3501 msg: &JsonRpcMessage,
3502 session: &crate::session::ClientSession,
3503) -> Result<Value, String> {
3504 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
3505 msg.params
3506 .get("skills")
3507 .cloned()
3508 .unwrap_or(msg.params.clone()),
3509 )
3510 .map_err(|e| format!("invalid skills: {}", e))?;
3511 let mut engine = session.memgine.lock().await;
3512 let nodes = engine.ingest_distilled_skills(&skills);
3513 Ok(serde_json::json!({ "ingested": nodes.len() }))
3514}
3515
3516async fn handle_skills_evolve(
3519 msg: &JsonRpcMessage,
3520 session: &crate::session::ClientSession,
3521) -> Result<Value, String> {
3522 let domain = msg
3523 .params
3524 .get("domain")
3525 .and_then(|v| v.as_str())
3526 .ok_or("missing 'domain' parameter")?
3527 .to_string();
3528 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
3529 msg.params
3530 .get("events")
3531 .cloned()
3532 .unwrap_or(Value::Array(vec![])),
3533 )
3534 .map_err(|e| format!("invalid events: {}", e))?;
3535 let mut engine = session.memgine.lock().await;
3536 let skills = engine.evolve_skills(&events, &domain).await;
3537 serde_json::to_value(&skills).map_err(|e| e.to_string())
3538}
3539
3540async fn handle_skills_domains_needing_evolution(
3542 msg: &JsonRpcMessage,
3543 session: &crate::session::ClientSession,
3544) -> Result<Value, String> {
3545 let threshold = msg
3546 .params
3547 .get("threshold")
3548 .and_then(|v| v.as_f64())
3549 .unwrap_or(0.6);
3550 let engine = session.memgine.lock().await;
3551 let domains = engine.domains_needing_evolution(threshold);
3552 serde_json::to_value(&domains).map_err(|e| e.to_string())
3553}
3554
3555async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3557 let engine = get_inference_engine(state);
3558 let req: car_inference::RerankRequest =
3559 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
3560 let _permit = state.admission.acquire().await;
3561 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
3562 serde_json::to_value(&result).map_err(|e| e.to_string())
3563}
3564
3565async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3571 use base64::Engine as _;
3572 let engine = get_inference_engine(state);
3573
3574 let mut params = msg.params.clone();
3581 let audio_b64 = params
3582 .as_object_mut()
3583 .and_then(|m| m.remove("audio_b64"))
3584 .and_then(|v| v.as_str().map(str::to_string));
3585 let _tmp_audio = if let Some(b64) = audio_b64 {
3586 let bytes = base64::engine::general_purpose::STANDARD
3587 .decode(b64.as_bytes())
3588 .map_err(|e| format!("audio_b64 decode failed: {e}"))?;
3589 let tmp = tempfile::NamedTempFile::new().map_err(|e| e.to_string())?;
3590 std::fs::write(tmp.path(), &bytes).map_err(|e| e.to_string())?;
3591 let path = tmp.path().to_string_lossy().into_owned();
3592 if let Some(obj) = params.as_object_mut() {
3593 obj.insert("audio_path".to_string(), Value::String(path));
3594 }
3595 Some(tmp)
3596 } else {
3597 None
3598 };
3599
3600 let req: car_inference::TranscribeRequest =
3601 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3602 let _permit = state.admission.acquire().await;
3603 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
3604 serde_json::to_value(&result).map_err(|e| e.to_string())
3605}
3606
3607async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3613 use base64::Engine as _;
3614 let engine = get_inference_engine(state);
3615
3616 let mut params = msg.params.clone();
3617 let return_b64 = params
3618 .as_object_mut()
3619 .and_then(|m| m.remove("return_b64"))
3620 .and_then(|v| v.as_bool())
3621 .unwrap_or(false);
3622 let no_output_path = params
3623 .as_object()
3624 .map(|m| !m.contains_key("output_path"))
3625 .unwrap_or(true);
3626
3627 let req: car_inference::SynthesizeRequest =
3628 serde_json::from_value(params).map_err(|e| format!("invalid params: {}", e))?;
3629 let _permit = state.admission.acquire().await;
3630 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
3631 let mut value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3632
3633 if return_b64 || no_output_path {
3637 let bytes = std::fs::read(&result.audio_path).map_err(|e| {
3638 format!(
3639 "synthesize: failed to read rendered audio at {}: {e}",
3640 result.audio_path
3641 )
3642 })?;
3643 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
3644 if let Some(obj) = value.as_object_mut() {
3645 obj.insert("audio_b64".to_string(), Value::String(encoded));
3646 }
3647 }
3648 Ok(value)
3649}
3650
3651async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
3655 let engine = get_inference_engine(state);
3656 let status = engine
3657 .prepare_speech_runtime()
3658 .await
3659 .map_err(|e| e.to_string())?;
3660 serde_json::to_value(&status).map_err(|e| e.to_string())
3661}
3662
3663async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
3666 let prompt = msg
3667 .params
3668 .get("prompt")
3669 .and_then(|v| v.as_str())
3670 .ok_or("missing 'prompt' parameter")?;
3671 let engine = get_inference_engine(state);
3672 let decision = engine.route_adaptive(prompt).await;
3673 serde_json::to_value(&decision).map_err(|e| e.to_string())
3674}
3675
3676async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
3678 let engine = get_inference_engine(state);
3679 let profiles = engine.export_profiles().await;
3680 serde_json::to_value(&profiles).map_err(|e| e.to_string())
3681}
3682
3683#[derive(Deserialize)]
3684#[serde(rename_all = "camelCase")]
3685struct OutcomesResolvePendingParams {
3686 action_results: Vec<(String, bool, f64, String)>,
3691}
3692
3693async fn handle_outcomes_resolve_pending(
3713 req: &JsonRpcMessage,
3714 state: &ServerState,
3715) -> Result<Value, String> {
3716 let params: OutcomesResolvePendingParams = serde_json::from_value(req.params.clone())
3717 .map_err(|e| format!("invalid params: {e}"))?;
3718 let engine = get_inference_engine(state);
3719 let mut tracker = engine.outcome_tracker.write().await;
3720 let inferred = tracker.infer_outcomes_from_action_sequence(¶ms.action_results);
3721 tracker.resolve_pending_from_signals(inferred);
3722 Ok(serde_json::json!({ "recorded": params.action_results.len() }))
3723}
3724
3725async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
3727 let n = session.runtime.log.lock().await.len();
3728 Ok(Value::from(n as u64))
3729}
3730
3731async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
3732 let stats = session.runtime.log.lock().await.stats();
3733 serde_json::to_value(stats).map_err(|e| e.to_string())
3734}
3735
3736#[derive(Deserialize)]
3737#[serde(rename_all = "camelCase")]
3738struct EventsTruncateParams {
3739 #[serde(default)]
3740 max_events: Option<usize>,
3741 #[serde(default)]
3742 max_spans: Option<usize>,
3743}
3744
3745async fn handle_events_truncate(
3746 msg: &JsonRpcMessage,
3747 session: &crate::session::ClientSession,
3748) -> Result<Value, String> {
3749 let params: EventsTruncateParams =
3750 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
3751 max_events: None,
3752 max_spans: None,
3753 });
3754 let mut log = session.runtime.log.lock().await;
3755 let removed_events = params
3756 .max_events
3757 .map(|max| log.truncate_events_keep_last(max))
3758 .unwrap_or(0);
3759 let removed_spans = params
3760 .max_spans
3761 .map(|max| log.truncate_spans_keep_last(max))
3762 .unwrap_or(0);
3763 let stats = log.stats();
3764 Ok(serde_json::json!({
3765 "removedEvents": removed_events,
3766 "removedSpans": removed_spans,
3767 "stats": stats,
3768 }))
3769}
3770
3771async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
3772 let mut log = session.runtime.log.lock().await;
3773 let removed = log.clear();
3774 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
3775}
3776
3777async fn handle_replan_set_config(
3782 msg: &JsonRpcMessage,
3783 session: &crate::session::ClientSession,
3784) -> Result<Value, String> {
3785 let max_replans = msg
3786 .params
3787 .get("max_replans")
3788 .and_then(|v| v.as_u64())
3789 .unwrap_or(0) as u32;
3790 let delay_ms = msg
3791 .params
3792 .get("delay_ms")
3793 .and_then(|v| v.as_u64())
3794 .unwrap_or(0);
3795 let verify_before_execute = msg
3796 .params
3797 .get("verify_before_execute")
3798 .and_then(|v| v.as_bool())
3799 .unwrap_or(true);
3800 let cfg = car_engine::ReplanConfig {
3801 max_replans,
3802 delay_ms,
3803 verify_before_execute,
3804 };
3805 session.runtime.set_replan_config(cfg).await;
3806 Ok(Value::Null)
3807}
3808
3809async fn handle_skills_list(
3810 msg: &JsonRpcMessage,
3811 session: &crate::session::ClientSession,
3812) -> Result<Value, String> {
3813 let domain = msg.params.get("domain").and_then(|v| v.as_str());
3814 let engine = session.memgine.lock().await;
3815 let skills: Vec<serde_json::Value> = engine
3816 .graph
3817 .inner
3818 .node_indices()
3819 .filter_map(|nix| {
3820 let node = engine.graph.inner.node_weight(nix)?;
3821 if node.kind != car_memgine::MemKind::Skill {
3822 return None;
3823 }
3824 let meta = car_memgine::SkillMeta::from_node(node)?;
3825 if let Some(d) = domain {
3826 match &meta.scope {
3827 car_memgine::SkillScope::Global => {}
3828 car_memgine::SkillScope::Domain(sd) if sd == d => {}
3829 _ => return None,
3830 }
3831 }
3832 Some(serde_json::to_value(&meta).unwrap_or_default())
3833 })
3834 .collect();
3835 serde_json::to_value(&skills).map_err(|e| e.to_string())
3836}
3837
3838#[derive(serde::Deserialize)]
3839struct SecretParams {
3840 #[serde(default)]
3841 service: Option<String>,
3842 key: String,
3843 #[serde(default)]
3844 value: Option<String>,
3845}
3846
3847fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
3848 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3849 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
3850 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
3851}
3852
3853fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
3854 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3855 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
3856}
3857
3858fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
3859 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3860 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
3861}
3862
3863fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
3864 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3865 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
3866}
3867
3868#[derive(serde::Deserialize)]
3869struct PermParams {
3870 domain: String,
3871 #[serde(default)]
3872 target_bundle_id: Option<String>,
3873}
3874
3875fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
3876 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3877 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
3878}
3879
3880fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
3881 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3882 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
3883}
3884
3885fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
3886 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3887 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
3888}
3889
3890fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
3891 #[derive(serde::Deserialize)]
3892 struct P {
3893 start: String,
3894 end: String,
3895 #[serde(default)]
3896 calendar_ids: Vec<String>,
3897 }
3898 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3899 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
3900 .map_err(|e| format!("parse start: {}", e))?
3901 .with_timezone(&chrono::Utc);
3902 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
3903 .map_err(|e| format!("parse end: {}", e))?
3904 .with_timezone(&chrono::Utc);
3905 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
3906}
3907
3908fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
3909 #[derive(serde::Deserialize)]
3910 struct P {
3911 query: String,
3912 #[serde(default = "default_limit")]
3913 limit: usize,
3914 #[serde(default)]
3915 container_ids: Vec<String>,
3916 }
3917 fn default_limit() -> usize {
3918 50
3919 }
3920 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3921 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
3922}
3923
3924fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
3925 #[derive(serde::Deserialize, Default)]
3926 struct P {
3927 #[serde(default)]
3928 account_ids: Vec<String>,
3929 }
3930 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
3931 car_ffi_common::integrations::mail_inbox(&p.account_ids)
3932}
3933
3934fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
3935 let raw = req.params.to_string();
3936 car_ffi_common::integrations::mail_send(&raw)
3937}
3938
3939fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
3940 #[derive(serde::Deserialize)]
3941 struct P {
3942 #[serde(default = "default_limit")]
3943 limit: usize,
3944 }
3945 fn default_limit() -> usize {
3946 50
3947 }
3948 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3949 car_ffi_common::integrations::messages_chats(p.limit)
3950}
3951
3952fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
3953 let raw = req.params.to_string();
3954 car_ffi_common::integrations::messages_send(&raw)
3955}
3956
3957fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
3958 #[derive(serde::Deserialize)]
3959 struct P {
3960 query: String,
3961 #[serde(default = "default_limit")]
3962 limit: usize,
3963 }
3964 fn default_limit() -> usize {
3965 50
3966 }
3967 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3968 car_ffi_common::integrations::notes_find(&p.query, p.limit)
3969}
3970
3971fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
3972 #[derive(serde::Deserialize)]
3973 struct P {
3974 #[serde(default = "default_limit")]
3975 limit: usize,
3976 }
3977 fn default_limit() -> usize {
3978 50
3979 }
3980 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
3981 car_ffi_common::integrations::reminders_items(p.limit)
3982}
3983
3984fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
3985 #[derive(serde::Deserialize)]
3986 struct P {
3987 #[serde(default = "default_limit")]
3988 limit: usize,
3989 }
3990 fn default_limit() -> usize {
3991 100
3992 }
3993 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
3994 car_ffi_common::integrations::bookmarks_list(p.limit)
3995}
3996
3997fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
3998 #[derive(serde::Deserialize)]
3999 struct P {
4000 start: String,
4001 end: String,
4002 }
4003 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4004 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4005 .map_err(|e| format!("parse start: {}", e))?
4006 .with_timezone(&chrono::Utc);
4007 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4008 .map_err(|e| format!("parse end: {}", e))?
4009 .with_timezone(&chrono::Utc);
4010 car_ffi_common::health::sleep_windows(s, e)
4011}
4012
4013fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
4014 #[derive(serde::Deserialize)]
4015 struct P {
4016 start: String,
4017 end: String,
4018 }
4019 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4020 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
4021 .map_err(|e| format!("parse start: {}", e))?
4022 .with_timezone(&chrono::Utc);
4023 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
4024 .map_err(|e| format!("parse end: {}", e))?
4025 .with_timezone(&chrono::Utc);
4026 car_ffi_common::health::workouts(s, e)
4027}
4028
4029fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
4030 #[derive(serde::Deserialize)]
4031 struct P {
4032 start: String,
4033 end: String,
4034 }
4035 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4036 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
4037 .map_err(|e| format!("parse start: {}", e))?;
4038 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
4039 .map_err(|e| format!("parse end: {}", e))?;
4040 car_ffi_common::health::activity(s, e)
4041}
4042
4043async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
4044 let closed = session.browser.close().await?;
4045 Ok(serde_json::json!({"closed": closed}))
4046}
4047
4048async fn handle_browser_run(
4049 req: &JsonRpcMessage,
4050 session: &crate::session::ClientSession,
4051) -> Result<Value, String> {
4052 #[derive(serde::Deserialize)]
4053 struct BrowserRunParams {
4054 script: Value,
4056 #[serde(default)]
4057 width: Option<u32>,
4058 #[serde(default)]
4059 height: Option<u32>,
4060 #[serde(default)]
4065 headed: Option<bool>,
4066 #[serde(default)]
4069 extra_args: Option<Vec<String>>,
4070 }
4071 let params: BrowserRunParams =
4072 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4073
4074 let script_json = match params.script {
4076 Value::String(s) => s,
4077 other => other.to_string(),
4078 };
4079
4080 let browser_session = session
4081 .browser
4082 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
4083 width: params.width.unwrap_or(1280),
4084 height: params.height.unwrap_or(720),
4085 headless: !params.headed.unwrap_or(false),
4086 extra_args: params.extra_args.unwrap_or_default(),
4087 })
4088 .await?;
4089
4090 let trace_json = browser_session.run(&script_json).await?;
4091 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
4092}
4093
4094#[derive(Deserialize)]
4107struct VoiceStartParams {
4108 session_id: String,
4109 audio_source: Value,
4110 #[serde(default)]
4111 options: Option<Value>,
4112}
4113
4114async fn handle_voice_transcribe_stream_start(
4115 req: &JsonRpcMessage,
4116 state: &Arc<ServerState>,
4117 session: &Arc<crate::session::ClientSession>,
4118) -> Result<Value, String> {
4119 let params: VoiceStartParams =
4120 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4121 let audio_source_json =
4122 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
4123 let options_json = params
4124 .options
4125 .as_ref()
4126 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
4127 .transpose()?;
4128 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4129 channel: session.channel.clone(),
4130 });
4131 let json = car_ffi_common::voice::transcribe_stream_start(
4132 ¶ms.session_id,
4133 &audio_source_json,
4134 options_json.as_deref(),
4135 state.voice_sessions.clone(),
4136 sink,
4137 )
4138 .await?;
4139 serde_json::from_str(&json).map_err(|e| e.to_string())
4140}
4141
4142#[derive(Deserialize)]
4143struct VoiceStopParams {
4144 session_id: String,
4145}
4146
4147async fn handle_voice_transcribe_stream_stop(
4148 req: &JsonRpcMessage,
4149 state: &Arc<ServerState>,
4150) -> Result<Value, String> {
4151 let params: VoiceStopParams =
4152 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4153 let json = car_ffi_common::voice::transcribe_stream_stop(
4154 ¶ms.session_id,
4155 state.voice_sessions.clone(),
4156 )
4157 .await?;
4158 serde_json::from_str(&json).map_err(|e| e.to_string())
4159}
4160
4161#[derive(Deserialize)]
4162struct VoicePushParams {
4163 session_id: String,
4164 pcm_b64: String,
4168}
4169
4170async fn handle_voice_transcribe_stream_push(
4171 req: &JsonRpcMessage,
4172 state: &Arc<ServerState>,
4173) -> Result<Value, String> {
4174 use base64::Engine;
4175 let params: VoicePushParams =
4176 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4177 let pcm = base64::engine::general_purpose::STANDARD
4178 .decode(¶ms.pcm_b64)
4179 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
4180 let json = car_ffi_common::voice::transcribe_stream_push(
4181 ¶ms.session_id,
4182 &pcm,
4183 state.voice_sessions.clone(),
4184 )
4185 .await?;
4186 serde_json::from_str(&json).map_err(|e| e.to_string())
4187}
4188
4189fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
4190 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
4191 serde_json::from_str(&json).unwrap_or(Value::Null)
4192}
4193
4194async fn handle_voice_dispatch_turn(
4195 req: &JsonRpcMessage,
4196 state: &Arc<ServerState>,
4197 session: &Arc<crate::session::ClientSession>,
4198) -> Result<Value, String> {
4199 let req_value = req.params.clone();
4200 let request: crate::voice_turn::DispatchVoiceTurnRequest =
4201 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
4202 let engine = get_inference_engine(state).clone();
4203 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
4204 channel: session.channel.clone(),
4205 });
4206 let resp = crate::voice_turn::dispatch(engine, request, sink).await?;
4207 serde_json::to_value(resp).map_err(|e| e.to_string())
4208}
4209
4210async fn handle_voice_cancel_turn() -> Result<Value, String> {
4211 crate::voice_turn::cancel().await;
4212 Ok(serde_json::json!({"cancelled": true}))
4213}
4214
4215async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
4216 let engine = get_inference_engine(state).clone();
4217 crate::voice_turn::prewarm(engine).await;
4218 Ok(serde_json::json!({"prewarmed": true}))
4219}
4220
4221fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
4240 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
4241 std::sync::OnceLock::new();
4242 SLOT.get_or_init(|| std::sync::RwLock::new(None))
4243}
4244
4245fn ws_runner_calls() -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
4246 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
4247 std::sync::OnceLock::new();
4248 MAP.get_or_init(dashmap::DashMap::new)
4249}
4250
4251fn ws_runner_completions() -> &'static dashmap::DashMap<
4252 String,
4253 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4254> {
4255 static MAP: std::sync::OnceLock<
4256 dashmap::DashMap<
4257 String,
4258 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
4259 >,
4260 > = std::sync::OnceLock::new();
4261 MAP.get_or_init(dashmap::DashMap::new)
4262}
4263
4264struct WsInferenceRunner;
4265
4266#[async_trait::async_trait]
4267impl car_inference::InferenceRunner for WsInferenceRunner {
4268 async fn run(
4269 &self,
4270 request: car_inference::tasks::generate::GenerateRequest,
4271 emitter: car_inference::EventEmitter,
4272 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
4273 let channel = ws_runner_session()
4274 .read()
4275 .map_err(|e| {
4276 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
4277 })?
4278 .clone()
4279 .ok_or_else(|| {
4280 car_inference::RunnerError::Declined(
4281 "no WebSocket inference runner registered — call inference.register_runner first"
4282 .into(),
4283 )
4284 })?;
4285
4286 let call_id = uuid::Uuid::new_v4().to_string();
4287 let request_json = serde_json::to_value(&request)
4288 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4289 let (tx, rx) = tokio::sync::oneshot::channel();
4290 ws_runner_calls().insert(call_id.clone(), emitter);
4291 ws_runner_completions().insert(call_id.clone(), tx);
4292
4293 use futures::SinkExt;
4295 let notification = serde_json::json!({
4296 "jsonrpc": "2.0",
4297 "method": "inference.runner.invoke",
4298 "params": {
4299 "call_id": call_id,
4300 "request": request_json,
4301 },
4302 });
4303 let text = serde_json::to_string(¬ification)
4304 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
4305 let _ = channel
4306 .write
4307 .lock()
4308 .await
4309 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
4310 .await;
4311
4312 let result = rx.await.map_err(|_| {
4313 car_inference::RunnerError::Failed("runner completion channel dropped".into())
4314 })?;
4315 ws_runner_calls().remove(&call_id);
4316 result.map_err(car_inference::RunnerError::Failed)
4317 }
4318}
4319
4320async fn handle_inference_register_runner(
4321 session: &Arc<crate::session::ClientSession>,
4322) -> Result<Value, String> {
4323 let mut guard = ws_runner_session()
4324 .write()
4325 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
4326 *guard = Some(session.channel.clone());
4327 drop(guard);
4328 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
4329 Ok(serde_json::json!({"registered": true}))
4330}
4331
4332#[derive(serde::Deserialize)]
4333struct InferenceRunnerEventParams {
4334 call_id: String,
4335 event: Value,
4336}
4337
4338async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
4339 let params: InferenceRunnerEventParams =
4340 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4341 let stream_event = match parse_runner_event_value(¶ms.event) {
4342 Some(e) => e,
4343 None => return Err("unrecognised runner event shape".into()),
4344 };
4345 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
4346 let emitter = entry.value().clone();
4347 tokio::spawn(async move { emitter.emit(stream_event).await });
4348 }
4349 Ok(serde_json::json!({"emitted": true}))
4350}
4351
4352#[derive(serde::Deserialize)]
4353struct InferenceRunnerCompleteParams {
4354 call_id: String,
4355 result: Value,
4356}
4357
4358async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
4359 let params: InferenceRunnerCompleteParams =
4360 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4361 let result: std::result::Result<car_inference::RunnerResult, String> =
4362 serde_json::from_value(params.result)
4363 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
4364 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4365 let _ = tx.send(result);
4366 }
4367 Ok(serde_json::json!({"completed": true}))
4368}
4369
4370#[derive(serde::Deserialize)]
4371struct InferenceRunnerFailParams {
4372 call_id: String,
4373 error: String,
4374}
4375
4376async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
4377 let params: InferenceRunnerFailParams =
4378 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4379 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
4380 let _ = tx.send(Err(params.error));
4381 }
4382 Ok(serde_json::json!({"failed": true}))
4383}
4384
4385fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
4386 let ty = v.get("type").and_then(|t| t.as_str())?;
4387 match ty {
4388 "text" => Some(car_inference::StreamEvent::TextDelta(
4389 v.get("data")?.as_str()?.to_string(),
4390 )),
4391 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
4392 name: v.get("name")?.as_str()?.to_string(),
4393 index: v.get("index")?.as_u64()? as usize,
4394 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
4395 }),
4396 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
4397 index: v.get("index")?.as_u64()? as usize,
4398 arguments_delta: v.get("data")?.as_str()?.to_string(),
4399 }),
4400 "usage" => Some(car_inference::StreamEvent::Usage {
4401 input_tokens: v.get("input_tokens")?.as_u64()?,
4402 output_tokens: v.get("output_tokens")?.as_u64()?,
4403 }),
4404 "done" => Some(car_inference::StreamEvent::Done {
4405 text: v.get("text")?.as_str()?.to_string(),
4406 tool_calls: v
4407 .get("tool_calls")
4408 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
4409 .unwrap_or_default(),
4410 }),
4411 _ => None,
4412 }
4413}
4414
4415#[derive(Deserialize)]
4416struct EnrollSpeakerParams {
4417 label: String,
4418 audio: Value,
4419}
4420
4421async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
4422 let params: EnrollSpeakerParams =
4423 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4424 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
4425 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
4426 serde_json::from_str(&json).map_err(|e| e.to_string())
4427}
4428
4429#[derive(Deserialize)]
4430struct RemoveEnrollmentParams {
4431 label: String,
4432}
4433
4434fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
4435 let params: RemoveEnrollmentParams =
4436 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4437 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
4438 serde_json::from_str(&json).map_err(|e| e.to_string())
4439}
4440
4441#[derive(Deserialize)]
4442struct WorkflowRunParams {
4443 workflow: Value,
4444}
4445
4446async fn handle_workflow_run(
4447 req: &JsonRpcMessage,
4448 session: &Arc<crate::session::ClientSession>,
4449) -> Result<Value, String> {
4450 let params: WorkflowRunParams =
4451 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4452 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4453 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
4454 channel: session.channel.clone(),
4455 host: session.host.clone(),
4456 client_id: session.client_id.clone(),
4457 });
4458 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
4459 serde_json::from_str(&json).map_err(|e| e.to_string())
4460}
4461
4462#[derive(Deserialize)]
4463struct WorkflowVerifyParams {
4464 workflow: Value,
4465}
4466
4467fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
4468 let params: WorkflowVerifyParams =
4469 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4470 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
4471 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
4472 serde_json::from_str(&json).map_err(|e| e.to_string())
4473}
4474
4475async fn handle_meeting_start(
4480 req: &JsonRpcMessage,
4481 state: &Arc<ServerState>,
4482 session: &Arc<crate::session::ClientSession>,
4483) -> Result<Value, String> {
4484 let mut req_value = req.params.clone();
4490 let meeting_id = req_value
4491 .get("id")
4492 .and_then(|v| v.as_str())
4493 .map(str::to_string)
4494 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
4495 if let Some(map) = req_value.as_object_mut() {
4496 map.insert("id".into(), Value::String(meeting_id.clone()));
4497 }
4498 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
4499
4500 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
4501 Arc::new(crate::session::WsVoiceEventSink {
4502 channel: session.channel.clone(),
4503 });
4504
4505 let upstream: Arc<dyn car_voice::VoiceEventSink> =
4510 Arc::new(crate::session::WsMemgineIngestSink {
4511 meeting_id,
4512 engine: session.memgine.clone(),
4513 upstream: ws_upstream,
4514 });
4515
4516 let cwd = std::env::current_dir().ok();
4517 let json = crate::meeting::start_meeting(
4518 &request_json,
4519 state.meetings.clone(),
4520 state.voice_sessions.clone(),
4521 upstream,
4522 None,
4523 cwd,
4524 )
4525 .await?;
4526 serde_json::from_str(&json).map_err(|e| e.to_string())
4527}
4528
4529#[derive(Deserialize)]
4530struct MeetingStopParams {
4531 meeting_id: String,
4532 #[serde(default = "default_summarize")]
4533 summarize: bool,
4534}
4535
4536fn default_summarize() -> bool {
4537 true
4538}
4539
4540async fn handle_meeting_stop(
4541 req: &JsonRpcMessage,
4542 state: &Arc<ServerState>,
4543 _session: &Arc<crate::session::ClientSession>,
4544) -> Result<Value, String> {
4545 let params: MeetingStopParams =
4546 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4547 let inference = if params.summarize {
4548 Some(state.inference.get().cloned()).flatten()
4549 } else {
4550 None
4551 };
4552 let json = crate::meeting::stop_meeting(
4553 ¶ms.meeting_id,
4554 params.summarize,
4555 state.meetings.clone(),
4556 state.voice_sessions.clone(),
4557 inference,
4558 )
4559 .await?;
4560 serde_json::from_str(&json).map_err(|e| e.to_string())
4561}
4562
4563#[derive(Deserialize, Default)]
4564struct MeetingListParams {
4565 #[serde(default)]
4566 root: Option<std::path::PathBuf>,
4567}
4568
4569fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
4570 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4571 let cwd = std::env::current_dir().ok();
4572 let json = crate::meeting::list_meetings(params.root, cwd)?;
4573 serde_json::from_str(&json).map_err(|e| e.to_string())
4574}
4575
4576#[derive(Deserialize)]
4577struct MeetingGetParams {
4578 meeting_id: String,
4579 #[serde(default)]
4580 root: Option<std::path::PathBuf>,
4581}
4582
4583fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
4584 let params: MeetingGetParams =
4585 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4586 let cwd = std::env::current_dir().ok();
4587 let json = crate::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
4588 serde_json::from_str(&json).map_err(|e| e.to_string())
4589}
4590
4591#[derive(Deserialize, Default)]
4596struct RegistryRegisterParams {
4597 entry: Value,
4601 #[serde(default)]
4602 registry_path: Option<std::path::PathBuf>,
4603}
4604
4605fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
4606 let params: RegistryRegisterParams =
4607 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4608 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
4609 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
4610 Ok(Value::Null)
4611}
4612
4613#[derive(Deserialize, Default)]
4614struct RegistryNameParams {
4615 name: String,
4616 #[serde(default)]
4617 registry_path: Option<std::path::PathBuf>,
4618}
4619
4620fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
4621 let params: RegistryNameParams =
4622 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4623 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
4624 serde_json::from_str(&json).map_err(|e| e.to_string())
4625}
4626
4627fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
4628 let params: RegistryNameParams =
4629 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4630 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
4631 Ok(Value::Null)
4632}
4633
4634#[derive(Deserialize, Default)]
4635struct RegistryListParams {
4636 #[serde(default)]
4637 registry_path: Option<std::path::PathBuf>,
4638}
4639
4640fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
4641 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4642 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
4643 serde_json::from_str(&json).map_err(|e| e.to_string())
4644}
4645
4646#[derive(Deserialize, Default)]
4647struct RegistryReapParams {
4648 #[serde(default = "default_reap_age")]
4651 max_age_secs: u64,
4652 #[serde(default)]
4653 registry_path: Option<std::path::PathBuf>,
4654}
4655
4656fn default_reap_age() -> u64 {
4657 60
4658}
4659
4660fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
4661 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
4662 let json =
4663 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
4664 serde_json::from_str(&json).map_err(|e| e.to_string())
4665}
4666
4667async fn handle_a2a_start(
4674 req: &JsonRpcMessage,
4675 session: &crate::session::ClientSession,
4676) -> Result<Value, String> {
4677 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4678 let json =
4684 crate::a2a::start_a2a(¶ms_json, Some(session.runtime.clone())).await?;
4685 serde_json::from_str(&json).map_err(|e| e.to_string())
4686}
4687
4688fn handle_a2a_stop() -> Result<Value, String> {
4689 let json = crate::a2a::stop_a2a()?;
4690 serde_json::from_str(&json).map_err(|e| e.to_string())
4691}
4692
4693fn handle_a2a_status() -> Result<Value, String> {
4694 let json = crate::a2a::a2a_status()?;
4695 serde_json::from_str(&json).map_err(|e| e.to_string())
4696}
4697
4698#[derive(Deserialize)]
4699#[serde(rename_all = "camelCase")]
4700struct A2aSendParams {
4701 endpoint: String,
4702 message: car_a2a::Message,
4703 #[serde(default)]
4704 blocking: bool,
4705 #[serde(default = "default_true")]
4706 ingest_a2ui: bool,
4707 #[serde(default)]
4708 route_auth: Option<A2aRouteAuth>,
4709 #[serde(default)]
4710 allow_untrusted_endpoint: bool,
4711}
4712
4713fn default_true() -> bool {
4714 true
4715}
4716
4717async fn handle_a2a_dispatch(
4727 method: &str,
4728 req: &JsonRpcMessage,
4729 state: &Arc<ServerState>,
4730) -> Result<Value, String> {
4731 let dispatcher = state.a2a_dispatcher().await;
4732 dispatcher
4733 .dispatch(method, req.params.clone())
4734 .await
4735 .map_err(|e| e.to_string())
4736}
4737
4738async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
4739 let params: A2aSendParams =
4740 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4741 let endpoint = trusted_route_endpoint(
4742 Some(params.endpoint.clone()),
4743 params.allow_untrusted_endpoint,
4744 )
4745 .ok_or_else(|| {
4746 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
4747 })?;
4748 let client = match params.route_auth.clone() {
4749 Some(auth) => {
4750 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
4751 }
4752 None => car_a2a::A2aClient::new(endpoint.clone()),
4753 };
4754 let result = client
4755 .send_message(params.message, params.blocking)
4756 .await
4757 .map_err(|e| e.to_string())?;
4758 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
4759 let mut applied = Vec::new();
4760 if params.ingest_a2ui {
4761 state
4762 .a2ui
4763 .validate_payload(&result_value)
4764 .map_err(|e| e.to_string())?;
4765 let routed_endpoint = Some(endpoint.clone());
4766 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
4767 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
4768 if owner.endpoint.is_none() {
4769 owner.with_endpoint(routed_endpoint.clone())
4770 } else {
4771 owner
4772 }
4773 });
4774 applied.push(
4775 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
4776 );
4777 }
4778 }
4779 Ok(serde_json::json!({
4780 "result": result,
4781 "a2ui": {
4782 "applied": applied,
4783 }
4784 }))
4785}
4786
4787async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
4795 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4796 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
4797 serde_json::from_str(&json).map_err(|e| e.to_string())
4798}
4799
4800async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
4801 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4802 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
4803 serde_json::from_str(&json).map_err(|e| e.to_string())
4804}
4805
4806async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
4807 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4808 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
4809 serde_json::from_str(&json).map_err(|e| e.to_string())
4810}
4811
4812async fn handle_local_notification(req: &JsonRpcMessage) -> Result<Value, String> {
4813 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4814 let json = car_ffi_common::notifications::local(&args_json).await?;
4815 serde_json::from_str(&json).map_err(|e| e.to_string())
4816}
4817
4818async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
4819 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
4820 let json = car_ffi_common::vision::ocr(&args_json).await?;
4821 serde_json::from_str(&json).map_err(|e| e.to_string())
4822}
4823
4824async fn handle_agents_list(state: &Arc<ServerState>) -> Result<Value, String> {
4829 let supervisor = state.supervisor()?;
4830 let agents = supervisor.list().await;
4831 let attached = state.attached_agents.lock().await.clone();
4838 let mut decorated: Vec<Value> = Vec::with_capacity(agents.len());
4839 for a in agents {
4840 let mut v = serde_json::to_value(&a).map_err(|e| e.to_string())?;
4841 let session_id = attached.get(&a.spec.id).cloned();
4842 if let Some(map) = v.as_object_mut() {
4843 map.insert("attached".to_string(), Value::Bool(session_id.is_some()));
4844 if let Some(sid) = session_id {
4845 map.insert("session_id".to_string(), Value::String(sid));
4846 }
4847 }
4848 decorated.push(v);
4849 }
4850 Ok(Value::Array(decorated))
4851}
4852
4853async fn handle_agents_upsert(
4854 req: &JsonRpcMessage,
4855 state: &Arc<ServerState>,
4856) -> Result<Value, String> {
4857 let mut params = req.params.clone();
4858 if let Some(name) = params
4867 .get("interpreter")
4868 .and_then(|v| v.as_str())
4869 .map(str::to_string)
4870 {
4871 let resolved =
4872 car_registry::supervisor::resolve_interpreter(&name).map_err(|e| e.to_string())?;
4873 params["command"] = Value::String(resolved.to_string_lossy().into_owned());
4874 }
4875 let spec: car_registry::supervisor::AgentSpec =
4876 serde_json::from_value(params).map_err(|e| e.to_string())?;
4877 let supervisor = state.supervisor()?;
4878 let agent = supervisor.upsert(spec).await.map_err(|e| e.to_string())?;
4879 serde_json::to_value(agent).map_err(|e| e.to_string())
4880}
4881
4882async fn handle_agents_install(
4896 req: &JsonRpcMessage,
4897 state: &Arc<ServerState>,
4898) -> Result<Value, String> {
4899 let manifest: car_registry::manifest::AgentManifest =
4900 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
4901 let host = car_registry::install::HostCapabilities::daemon_default(env!("CARGO_PKG_VERSION"));
4902 let supervisor = state.supervisor()?;
4903 let (report, managed) = supervisor
4904 .install_manifest(manifest, &host)
4905 .await
4906 .map_err(|e| e.to_string())?;
4907 Ok(serde_json::json!({
4908 "report": {
4909 "missingOptional": report
4910 .missing_optional
4911 .iter()
4912 .map(|(ns, feat)| serde_json::json!({ "namespace": ns, "feature": feat }))
4913 .collect::<Vec<_>>(),
4914 },
4915 "agent": managed,
4916 }))
4917}
4918
4919async fn handle_agents_health(state: &Arc<ServerState>) -> Result<Value, String> {
4920 let supervisor = state.supervisor()?;
4921 let entries = supervisor.health().await;
4922 serde_json::to_value(entries).map_err(|e| e.to_string())
4923}
4924
4925fn extract_agent_id(req: &JsonRpcMessage) -> Result<String, String> {
4926 req.params
4927 .get("id")
4928 .and_then(Value::as_str)
4929 .map(str::to_string)
4930 .ok_or_else(|| "missing required `id` parameter".to_string())
4931}
4932
4933async fn handle_agents_remove(
4934 req: &JsonRpcMessage,
4935 state: &Arc<ServerState>,
4936) -> Result<Value, String> {
4937 let id = extract_agent_id(req)?;
4938 let supervisor = state.supervisor()?;
4939 let removed = supervisor.remove(&id).await.map_err(|e| e.to_string())?;
4940 Ok(serde_json::json!({ "removed": removed }))
4941}
4942
4943async fn handle_agents_start(
4944 req: &JsonRpcMessage,
4945 state: &Arc<ServerState>,
4946) -> Result<Value, String> {
4947 let id = extract_agent_id(req)?;
4948 let supervisor = state.supervisor()?;
4949 let agent = supervisor.start(&id).await.map_err(|e| e.to_string())?;
4950 serde_json::to_value(agent).map_err(|e| e.to_string())
4951}
4952
4953async fn handle_agents_stop(
4954 req: &JsonRpcMessage,
4955 state: &Arc<ServerState>,
4956) -> Result<Value, String> {
4957 let id = extract_agent_id(req)?;
4958 let signal: car_registry::supervisor::StopSignal = req
4959 .params
4960 .get("signal")
4961 .map(|v| serde_json::from_value(v.clone()))
4962 .transpose()
4963 .map_err(|e| e.to_string())?
4964 .unwrap_or_default();
4965 let supervisor = state.supervisor()?;
4966 let agent = supervisor
4967 .stop(&id, signal)
4968 .await
4969 .map_err(|e| e.to_string())?;
4970 serde_json::to_value(agent).map_err(|e| e.to_string())
4971}
4972
4973async fn handle_agents_restart(
4974 req: &JsonRpcMessage,
4975 state: &Arc<ServerState>,
4976) -> Result<Value, String> {
4977 let id = extract_agent_id(req)?;
4978 let supervisor = state.supervisor()?;
4979 let agent = supervisor.restart(&id).await.map_err(|e| e.to_string())?;
4980 serde_json::to_value(agent).map_err(|e| e.to_string())
4981}
4982
4983async fn handle_agents_tail_log(
4984 req: &JsonRpcMessage,
4985 state: &Arc<ServerState>,
4986) -> Result<Value, String> {
4987 let id = extract_agent_id(req)?;
4988 let n = req.params.get("n").and_then(Value::as_u64).unwrap_or(100) as usize;
4989 let supervisor = state.supervisor()?;
4990 let lines = supervisor
4991 .tail_log(&id, n)
4992 .await
4993 .map_err(|e| e.to_string())?;
4994 Ok(serde_json::json!({ "lines": lines }))
4995}
4996
4997async fn handle_agents_list_external(req: &JsonRpcMessage) -> Result<Value, String> {
5008 let include_health = req
5009 .params
5010 .get("include_health")
5011 .and_then(Value::as_bool)
5012 .unwrap_or(false);
5013 let json = car_ffi_common::external_agents::list(include_health).await?;
5014 serde_json::from_str(&json).map_err(|e| e.to_string())
5015}
5016
5017async fn handle_agents_detect_external(req: &JsonRpcMessage) -> Result<Value, String> {
5018 let include_health = req
5019 .params
5020 .get("include_health")
5021 .and_then(Value::as_bool)
5022 .unwrap_or(false);
5023 let json = car_ffi_common::external_agents::detect(include_health).await?;
5024 serde_json::from_str(&json).map_err(|e| e.to_string())
5025}
5026
5027async fn handle_agents_invoke_external(
5045 req: &JsonRpcMessage,
5046 state: &Arc<ServerState>,
5047) -> Result<Value, String> {
5048 let id = req
5049 .params
5050 .get("id")
5051 .and_then(Value::as_str)
5052 .ok_or_else(|| "missing required `id` parameter".to_string())?;
5053 let task = req
5054 .params
5055 .get("task")
5056 .and_then(Value::as_str)
5057 .ok_or_else(|| "missing required `task` parameter".to_string())?;
5058 let mut options_value = req.params.clone();
5063 if let Some(obj) = options_value.as_object_mut() {
5064 obj.remove("id");
5065 obj.remove("task");
5066 let has_explicit_mcp = obj.contains_key("mcp_endpoint");
5075 if !has_explicit_mcp {
5076 if let Some(url) = state.mcp_url.get() {
5077 obj.insert("mcp_endpoint".to_string(), Value::String(url.clone()));
5078 }
5079 }
5080 }
5081 let options_json = options_value.to_string();
5082 let json = car_ffi_common::external_agents::invoke(id, task, &options_json).await?;
5083 let result: Value = serde_json::from_str(&json).map_err(|e| e.to_string())?;
5084 append_external_agent_audit(id, task, &options_value, &result);
5085 Ok(result)
5086}
5087
5088fn append_external_agent_audit(id: &str, task: &str, options: &Value, result: &Value) {
5094 use std::io::Write;
5095 let car_dir = match std::env::var_os("HOME").map(std::path::PathBuf::from) {
5096 Some(home) => home.join(".car"),
5097 None => return,
5098 };
5099 if std::fs::create_dir_all(&car_dir).is_err() {
5100 return;
5101 }
5102 let path = car_dir.join("external-agents.jsonl");
5103 let record = serde_json::json!({
5104 "ts": chrono::Utc::now().to_rfc3339(),
5105 "adapter_id": id,
5106 "task": task,
5107 "options": options,
5108 "result": result,
5109 });
5110 let line = match serde_json::to_string(&record) {
5111 Ok(s) => s,
5112 Err(_) => return,
5113 };
5114 if let Ok(mut f) = std::fs::OpenOptions::new()
5115 .create(true)
5116 .append(true)
5117 .open(&path)
5118 {
5119 let _ = writeln!(f, "{}", line);
5120 } else {
5121 tracing::warn!(
5122 path = %path.display(),
5123 "failed to append external-agent audit record"
5124 );
5125 }
5126}
5127
5128async fn handle_agents_health_external(req: &JsonRpcMessage) -> Result<Value, String> {
5134 let force = req
5135 .params
5136 .get("force")
5137 .and_then(Value::as_bool)
5138 .unwrap_or(false);
5139 if let Some(id) = req.params.get("id").and_then(Value::as_str) {
5140 let json = car_ffi_common::external_agents::health_one(id, force).await?;
5141 serde_json::from_str(&json).map_err(|e| e.to_string())
5142 } else {
5143 let json = car_ffi_common::external_agents::health(force).await?;
5144 serde_json::from_str(&json).map_err(|e| e.to_string())
5145 }
5146}