1use crate::session::{A2aRouteAuth, ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
89 name = "ws.connection",
90 skip_all,
91 fields(peer = %peer),
92)]
93pub async fn handle_connection(
94 stream: TcpStream,
95 peer: SocketAddr,
96 state: Arc<ServerState>,
97) -> Result<(), Box<dyn std::error::Error>> {
98 let ws_stream = accept_async(stream).await?;
99 let (write, read) = ws_stream.split();
100 run_dispatch(read, write, peer, state).await
101}
102
103#[instrument(
114 name = "ws.dispatch",
115 skip_all,
116 fields(client_id = tracing::field::Empty, peer = %peer),
117)]
118pub async fn run_dispatch(
119 mut read: futures::stream::SplitStream<
120 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
121 >,
122 write: futures::stream::SplitSink<
123 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
124 Message,
125 >,
126 peer: SocketAddr,
127 state: Arc<ServerState>,
128) -> Result<(), Box<dyn std::error::Error>> {
129 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
130 tracing::Span::current().record("client_id", &client_id.as_str());
131
132 info!("New connection from {}", peer);
133
134 let channel = Arc::new(WsChannel {
135 write: Mutex::new(write),
136 pending: Mutex::new(HashMap::new()),
137 next_id: AtomicU64::new(1),
138 });
139
140 let session = state.create_session(&client_id, channel.clone()).await;
141
142 while let Some(msg) = read.next().await {
143 let msg = msg?;
144 if msg.is_text() {
145 let text = msg.to_text()?;
146 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
147 Ok(m) => m,
148 Err(e) => {
149 send_response(
150 &session.channel,
151 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
152 )
153 .await?;
154 continue;
155 }
156 };
157
158 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
160 if let Some(id_str) = parsed.id.as_str() {
161 let mut pending = session.channel.pending.lock().await;
162 if let Some(tx) = pending.remove(id_str) {
163 let tool_resp = if let Some(result) = parsed.result {
164 ToolExecuteResponse {
165 action_id: id_str.to_string(),
166 output: Some(result),
167 error: None,
168 }
169 } else {
170 let err_msg = parsed
171 .error
172 .as_ref()
173 .and_then(|e| e.get("message"))
174 .and_then(|m| m.as_str())
175 .unwrap_or("unknown error")
176 .to_string();
177 ToolExecuteResponse {
178 action_id: id_str.to_string(),
179 output: None,
180 error: Some(err_msg),
181 }
182 };
183 let _ = tx.send(tool_resp);
184 continue;
185 }
186 }
187 }
188
189 if let Some(method) = &parsed.method {
191 info!(method = %method, "dispatching JSON-RPC method");
192 let result = match method.as_str() {
193 "session.init" => handle_session_init(&parsed, &session).await,
194 "host.subscribe" => handle_host_subscribe(&session).await,
195 "host.agents" => handle_host_agents(&session).await,
196 "host.events" => handle_host_events(&parsed, &session).await,
197 "host.approvals" => handle_host_approvals(&session).await,
198 "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
199 "host.unregister_agent" => {
200 handle_host_unregister_agent(&parsed, &session).await
201 }
202 "host.set_status" => handle_host_set_status(&parsed, &session).await,
203 "host.notify" => handle_host_notify(&parsed, &session).await,
204 "host.request_approval" => {
205 handle_host_request_approval(&parsed, &session).await
206 }
207 "host.resolve_approval" => {
208 handle_host_resolve_approval(&parsed, &session).await
209 }
210 "tools.register" => handle_tools_register(&parsed, &session).await,
211 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
212 "policy.register" => handle_policy_register(&parsed, &session).await,
213 "session.policy.open" => handle_session_policy_open(&session).await,
214 "session.policy.close" => {
215 handle_session_policy_close(&parsed, &session).await
216 }
217 "verify" => handle_verify(&parsed, &session).await,
218 "state.get" => handle_state_get(&parsed, &session).await,
219 "state.set" => handle_state_set(&parsed, &session).await,
220 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
221 "memory.query" => handle_memory_query(&parsed, &session).await,
222 "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
223 "memory.consolidate" => handle_memory_consolidate(&session).await,
224 "memory.fact_count" => handle_memory_fact_count(&session).await,
225 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
226 "skill.find" => handle_skill_find(&parsed, &session).await,
227 "skill.report" => handle_skill_report(&parsed, &session).await,
228 "skill.repair" => handle_skill_repair(&parsed, &session).await,
229 "skills.ingest_distilled" => {
230 handle_skills_ingest_distilled(&parsed, &session).await
231 }
232 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
233 "skills.domains_needing_evolution" => {
234 handle_skills_domains_needing_evolution(&parsed, &session).await
235 }
236 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
237 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
238 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
239 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
240 "multi.vote" => handle_multi_vote(&parsed, &session).await,
241 "scheduler.create" => handle_scheduler_create(&parsed),
242 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
243 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
244 "infer" => handle_infer(&parsed, &state, &session).await,
245 "embed" => handle_embed(&parsed, &state).await,
246 "classify" => handle_classify(&parsed, &state).await,
247 "tokenize" => handle_tokenize(&parsed, &state).await,
248 "detokenize" => handle_detokenize(&parsed, &state).await,
249 "rerank" => handle_rerank(&parsed, &state).await,
250 "transcribe" => handle_transcribe(&parsed, &state).await,
251 "synthesize" => handle_synthesize(&parsed, &state).await,
252 "speech.prepare" => handle_speech_prepare(&state).await,
253 "models.route" => handle_models_route(&parsed, &state).await,
254 "models.stats" => handle_models_stats(&state).await,
255 "events.count" => handle_events_count(&session).await,
256 "events.stats" => handle_events_stats(&session).await,
257 "events.truncate" => handle_events_truncate(&parsed, &session).await,
258 "events.clear" => handle_events_clear(&session).await,
259 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
260 "models.list" => handle_models_list(&state),
261 "models.list_unified" => handle_models_list_unified(&state),
262 "models.search" => handle_models_search(&parsed, &state),
263 "models.upgrades" => handle_models_upgrades(&state),
264 "models.pull" => handle_models_pull(&parsed, &state).await,
265 "models.install" => handle_models_pull(&parsed, &state).await,
266 "skills.distill" => handle_skills_distill(&parsed, &state).await,
267 "skills.list" => handle_skills_list(&parsed, &session).await,
268 "browser.run" => handle_browser_run(&parsed, &session).await,
269 "browser.close" => handle_browser_close(&session).await,
270 "secret.put" => handle_secret_put(&parsed),
271 "secret.get" => handle_secret_get(&parsed),
272 "secret.delete" => handle_secret_delete(&parsed),
273 "secret.status" => handle_secret_status(&parsed),
274 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
275 "permissions.status" => handle_perm_status(&parsed),
276 "permissions.request" => handle_perm_request(&parsed),
277 "permissions.explain" => handle_perm_explain(&parsed),
278 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
279 "accounts.list" => car_ffi_common::accounts::list(),
280 "accounts.open" => {
281 #[derive(serde::Deserialize, Default)]
282 struct OpenParams {
283 #[serde(default)]
284 account_id: Option<String>,
285 }
286 let p: OpenParams =
287 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
288 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
289 }
290 "calendar.list" => car_ffi_common::integrations::calendar_list(),
291 "calendar.events" => handle_calendar_events(&parsed),
292 "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
293 "contacts.find" => handle_contacts_find(&parsed),
294 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
295 "mail.inbox" => handle_mail_inbox(&parsed),
296 "mail.send" => handle_mail_send(&parsed),
297 "messages.services" => car_ffi_common::integrations::messages_services(),
298 "messages.chats" => handle_messages_chats(&parsed),
299 "messages.send" => handle_messages_send(&parsed),
300 "notes.accounts" => car_ffi_common::integrations::notes_accounts(),
301 "notes.find" => handle_notes_find(&parsed),
302 "reminders.lists" => car_ffi_common::integrations::reminders_lists(),
303 "reminders.items" => handle_reminders_items(&parsed),
304 "photos.albums" => car_ffi_common::integrations::photos_albums(),
305 "bookmarks.list" => handle_bookmarks_list(&parsed),
306 "files.locations" => car_ffi_common::integrations::files_locations(),
307 "keychain.status" => car_ffi_common::integrations::keychain_status(),
308 "health.status" => car_ffi_common::health::status(),
309 "health.sleep" => handle_health_sleep(&parsed),
310 "health.workouts" => handle_health_workouts(&parsed),
311 "health.activity" => handle_health_activity(&parsed),
312 "voice.transcribe_stream.start" => {
313 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
314 }
315 "voice.transcribe_stream.stop" => {
316 handle_voice_transcribe_stream_stop(&parsed, &state).await
317 }
318 "voice.transcribe_stream.push" => {
319 handle_voice_transcribe_stream_push(&parsed, &state).await
320 }
321 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
322 "voice.dispatch_turn" => {
323 handle_voice_dispatch_turn(&parsed, &state, &session).await
324 }
325 "voice.cancel_turn" => handle_voice_cancel_turn().await,
326 "voice.prewarm_turn" => handle_voice_prewarm_turn(&state).await,
327 "inference.register_runner" => {
328 handle_inference_register_runner(&session).await
329 }
330 "inference.runner.event" => {
331 handle_inference_runner_event(&parsed).await
332 }
333 "inference.runner.complete" => {
334 handle_inference_runner_complete(&parsed).await
335 }
336 "inference.runner.fail" => handle_inference_runner_fail(&parsed).await,
337 "voice.providers.list" => {
338 serde_json::from_str::<serde_json::Value>(
342 &car_voice::list_voice_providers_json(),
343 )
344 .map_err(|e| e.to_string())
345 }
346 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
347 .await
348 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
349 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
350 .await
351 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
352 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
353 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
354 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
355 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
356 "workflow.run" => handle_workflow_run(&parsed, &session).await,
357 "workflow.verify" => handle_workflow_verify(&parsed),
358 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
359 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
360 "meeting.list" => handle_meeting_list(&parsed),
361 "meeting.get" => handle_meeting_get(&parsed),
362 "registry.register" => handle_registry_register(&parsed),
363 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
364 "registry.unregister" => handle_registry_unregister(&parsed),
365 "registry.list" => handle_registry_list(&parsed),
366 "registry.reap" => handle_registry_reap(&parsed),
367 "admission.status" => handle_admission_status(&state),
368 "a2a.start" => handle_a2a_start(&parsed).await,
369 "a2a.stop" => handle_a2a_stop(),
370 "a2a.status" => handle_a2a_status(),
371 "a2a.send" => handle_a2a_send(&parsed, &state).await,
372 "a2ui.apply" => handle_a2ui_apply(&parsed, &state).await,
373 "a2ui.ingest" => handle_a2ui_ingest(&parsed, &state).await,
374 "a2ui.capabilities" => handle_a2ui_capabilities(&state),
375 "a2ui.reap" => handle_a2ui_reap(&state).await,
376 "a2ui.surfaces" => handle_a2ui_surfaces(&state).await,
377 "a2ui.get" => handle_a2ui_get(&parsed, &state).await,
378 "a2ui.action" => handle_a2ui_action(&parsed, &state).await,
379 "automation.run_applescript" => handle_run_applescript(&parsed).await,
380 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
381 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
382 "vision.ocr" => handle_vision_ocr(&parsed).await,
383 _ => Err(format!("unknown method: {}", method)),
384 };
385
386 let resp = match result {
387 Ok(value) => JsonRpcResponse::success(parsed.id, value),
388 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
389 };
390 send_response(&session.channel, resp).await?;
391 }
392 } else if msg.is_close() {
393 info!("Client {} disconnected", client_id);
394 break;
395 }
396 }
397
398 session.host.unsubscribe(&client_id).await;
399
400 let _removed = state.remove_session(&client_id).await;
411 {
412 let mut pending = session.channel.pending.lock().await;
413 pending.clear();
414 }
415
416 Ok(())
417}
418
419async fn send_response(
420 channel: &WsChannel,
421 resp: JsonRpcResponse,
422) -> Result<(), Box<dyn std::error::Error>> {
423 use futures::SinkExt;
424 let json = serde_json::to_string(&resp)?;
425 channel
426 .write
427 .lock()
428 .await
429 .send(Message::Text(json.into()))
430 .await?;
431 Ok(())
432}
433
434async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
437 session
438 .host
439 .subscribe(&session.client_id, session.channel.clone())
440 .await;
441 serde_json::to_value(HostSnapshot {
442 subscribed: true,
443 agents: session.host.agents().await,
444 approvals: session.host.approvals().await,
445 events: session.host.events(50).await,
446 })
447 .map_err(|e| e.to_string())
448}
449
450async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
451 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
452}
453
454async fn handle_host_events(
455 req: &JsonRpcMessage,
456 session: &crate::session::ClientSession,
457) -> Result<Value, String> {
458 let limit = req
459 .params
460 .get("limit")
461 .and_then(|v| v.as_u64())
462 .unwrap_or(100) as usize;
463 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
464}
465
466async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
467 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
468}
469
470async fn handle_a2ui_apply(
471 req: &JsonRpcMessage,
472 state: &Arc<ServerState>,
473) -> Result<Value, String> {
474 #[derive(Deserialize)]
475 struct Params {
476 #[serde(default)]
477 envelope: Option<car_a2ui::A2uiEnvelope>,
478 #[serde(default)]
479 message: Option<car_a2ui::A2uiEnvelope>,
480 }
481
482 let envelope = if req.params.get("createSurface").is_some()
483 || req.params.get("updateComponents").is_some()
484 || req.params.get("updateDataModel").is_some()
485 || req.params.get("deleteSurface").is_some()
486 {
487 serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
488 .map_err(|e| e.to_string())?
489 } else {
490 match serde_json::from_value::<Params>(req.params.clone()) {
491 Ok(params) => params
492 .envelope
493 .or(params.message)
494 .ok_or_else(|| "`a2ui.apply` requires an A2UI envelope".to_string())?,
495 Err(_) => serde_json::from_value::<car_a2ui::A2uiEnvelope>(req.params.clone())
496 .map_err(|e| e.to_string())?,
497 }
498 };
499
500 apply_a2ui_envelope(state, envelope, None, None).await
501}
502
503async fn handle_a2ui_ingest(
504 req: &JsonRpcMessage,
505 state: &Arc<ServerState>,
506) -> Result<Value, String> {
507 #[derive(Deserialize)]
508 #[serde(rename_all = "camelCase")]
509 struct Params {
510 #[serde(default)]
511 endpoint: Option<String>,
512 #[serde(default)]
513 a2a_endpoint: Option<String>,
514 #[serde(default)]
515 owner: Option<car_a2ui::A2uiSurfaceOwner>,
516 #[serde(default)]
517 route_auth: Option<A2aRouteAuth>,
518 #[serde(default)]
519 allow_untrusted_endpoint: bool,
520 }
521
522 let params = serde_json::from_value::<Params>(req.params.clone()).unwrap_or(Params {
523 endpoint: None,
524 a2a_endpoint: None,
525 owner: None,
526 route_auth: None,
527 allow_untrusted_endpoint: false,
528 });
529 let payload = req.params.get("payload").unwrap_or(&req.params);
530 state
531 .a2ui
532 .validate_payload(payload)
533 .map_err(|e| e.to_string())?;
534 let envelopes = car_a2ui::envelopes_from_value(payload).map_err(|e| e.to_string())?;
535 if envelopes.is_empty() {
536 return Err("no A2UI envelopes found in payload".into());
537 }
538 let endpoint = params.endpoint.or(params.a2a_endpoint);
539 let endpoint = trusted_route_endpoint(endpoint, params.allow_untrusted_endpoint);
540 let owner = params
541 .owner
542 .or_else(|| car_a2ui::owner_from_value(payload))
543 .map(|owner| match endpoint.clone() {
544 Some(endpoint) => owner.with_endpoint(Some(endpoint)),
545 None => owner,
546 });
547
548 let mut results = Vec::new();
549 for envelope in envelopes {
550 let value =
551 apply_a2ui_envelope(state, envelope, owner.clone(), params.route_auth.clone()).await?;
552 results.push(value);
553 }
554 Ok(serde_json::json!({ "applied": results }))
555}
556
557async fn apply_a2ui_envelope(
558 state: &Arc<ServerState>,
559 envelope: car_a2ui::A2uiEnvelope,
560 owner: Option<car_a2ui::A2uiSurfaceOwner>,
561 route_auth: Option<A2aRouteAuth>,
562) -> Result<Value, String> {
563 let result = state
564 .a2ui
565 .apply_with_owner(envelope, owner)
566 .await
567 .map_err(|e| e.to_string())?;
568 update_a2ui_route_auth(state, &result, route_auth).await;
569 let kind = if result.deleted {
570 "a2ui.surface_deleted"
571 } else {
572 "a2ui.surface_updated"
573 };
574 let message = if result.deleted {
575 format!("A2UI surface {} deleted", result.surface_id)
576 } else {
577 format!("A2UI surface {} updated", result.surface_id)
578 };
579 let payload = serde_json::to_value(&result).map_err(|e| e.to_string())?;
580 state.host.record_event(kind, None, message, payload).await;
581 serde_json::to_value(result).map_err(|e| e.to_string())
582}
583
584async fn update_a2ui_route_auth(
585 state: &Arc<ServerState>,
586 result: &car_a2ui::A2uiApplyResult,
587 route_auth: Option<A2aRouteAuth>,
588) {
589 let mut auth = state.a2ui_route_auth.lock().await;
590 if result.deleted {
591 auth.remove(&result.surface_id);
592 return;
593 }
594
595 let has_route_endpoint = result
596 .surface
597 .as_ref()
598 .and_then(|surface| surface.owner.as_ref())
599 .and_then(|owner| owner.endpoint.as_ref())
600 .is_some();
601 match (has_route_endpoint, route_auth) {
602 (true, Some(route_auth)) => {
603 auth.insert(result.surface_id.clone(), route_auth);
604 }
605 _ => {
606 auth.remove(&result.surface_id);
607 }
608 }
609}
610
611fn handle_a2ui_capabilities(state: &Arc<ServerState>) -> Result<Value, String> {
612 serde_json::to_value(state.a2ui.capabilities()).map_err(|e| e.to_string())
613}
614
615async fn handle_a2ui_reap(state: &Arc<ServerState>) -> Result<Value, String> {
616 let removed = state.a2ui.reap_expired(chrono::Utc::now()).await;
617 if !removed.is_empty() {
618 let mut auth = state.a2ui_route_auth.lock().await;
619 for surface_id in &removed {
620 auth.remove(surface_id);
621 }
622 }
623 Ok(serde_json::json!({ "removed": removed }))
624}
625
626async fn handle_a2ui_surfaces(state: &Arc<ServerState>) -> Result<Value, String> {
627 serde_json::to_value(state.a2ui.list().await).map_err(|e| e.to_string())
628}
629
630async fn handle_a2ui_get(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
631 let surface_id = req
632 .params
633 .get("surface_id")
634 .or_else(|| req.params.get("surfaceId"))
635 .and_then(Value::as_str)
636 .ok_or_else(|| "`a2ui.get` requires surface_id".to_string())?;
637 serde_json::to_value(state.a2ui.get(surface_id).await).map_err(|e| e.to_string())
638}
639
640async fn handle_a2ui_action(
641 req: &JsonRpcMessage,
642 state: &Arc<ServerState>,
643) -> Result<Value, String> {
644 let action: car_a2ui::ClientAction =
645 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
646 let owner = state.a2ui.owner(&action.surface_id).await;
647 let route = route_a2ui_action(state, &action, owner.clone()).await;
648 let payload = serde_json::json!({
649 "action": action,
650 "owner": owner,
651 "route": route,
652 });
653 let event = state
654 .host
655 .record_event(
656 "a2ui.action",
657 None,
658 format!(
659 "A2UI action {} from {}",
660 action.name, action.source_component_id
661 ),
662 payload,
663 )
664 .await;
665 Ok(serde_json::json!({
666 "event": event,
667 "route": route,
668 }))
669}
670
671async fn route_a2ui_action(
672 state: &Arc<ServerState>,
673 action: &car_a2ui::ClientAction,
674 owner: Option<car_a2ui::A2uiSurfaceOwner>,
675) -> Value {
676 let Some(owner) = owner else {
677 return serde_json::json!({ "delivered": false, "reason": "surface has no owner" });
678 };
679 if owner.kind != "a2a" {
680 return serde_json::json!({ "delivered": false, "reason": "unsupported owner kind", "owner": owner });
681 }
682 let Some(endpoint) = owner.endpoint.clone() else {
683 return serde_json::json!({
684 "delivered": false,
685 "reason": "surface owner has no endpoint",
686 "owner": owner
687 });
688 };
689
690 let message = car_a2a::Message {
691 message_id: format!("a2ui-action-{}", uuid::Uuid::new_v4().simple()),
692 role: car_a2a::MessageRole::User,
693 parts: vec![car_a2a::Part::Data(car_a2a::types::DataPart {
694 data: serde_json::json!({
695 "a2uiAction": action,
696 }),
697 metadata: Default::default(),
698 })],
699 task_id: owner.task_id.clone(),
700 context_id: owner.context_id.clone(),
701 metadata: Default::default(),
702 };
703
704 let auth = state
705 .a2ui_route_auth
706 .lock()
707 .await
708 .get(&action.surface_id)
709 .cloned()
710 .map(client_auth_from_route_auth)
711 .unwrap_or(car_a2a::ClientAuth::None);
712
713 match car_a2a::A2aClient::new(endpoint.clone())
714 .with_auth(auth)
715 .send_message(message, false)
716 .await
717 {
718 Ok(result) => serde_json::json!({
719 "delivered": true,
720 "owner": owner,
721 "endpoint": endpoint,
722 "result": result,
723 }),
724 Err(error) => serde_json::json!({
725 "delivered": false,
726 "owner": owner,
727 "endpoint": endpoint,
728 "error": error.to_string(),
729 }),
730 }
731}
732
733fn client_auth_from_route_auth(auth: A2aRouteAuth) -> car_a2a::ClientAuth {
734 match auth {
735 A2aRouteAuth::None => car_a2a::ClientAuth::None,
736 A2aRouteAuth::Bearer { token } => car_a2a::ClientAuth::Bearer(token),
737 A2aRouteAuth::Header { name, value } => car_a2a::ClientAuth::Header { name, value },
738 }
739}
740
741fn trusted_route_endpoint(endpoint: Option<String>, allow_untrusted: bool) -> Option<String> {
742 let endpoint = endpoint?;
743 if allow_untrusted || is_loopback_http_endpoint(&endpoint) {
744 Some(endpoint)
745 } else {
746 None
747 }
748}
749
750fn is_loopback_http_endpoint(endpoint: &str) -> bool {
751 endpoint == "http://localhost"
752 || endpoint.starts_with("http://localhost:")
753 || endpoint.starts_with("http://localhost/")
754 || endpoint == "http://127.0.0.1"
755 || endpoint.starts_with("http://127.0.0.1:")
756 || endpoint.starts_with("http://127.0.0.1/")
757 || endpoint == "http://[::1]"
758 || endpoint.starts_with("http://[::1]:")
759 || endpoint.starts_with("http://[::1]/")
760}
761
762async fn handle_host_register_agent(
763 req: &JsonRpcMessage,
764 session: &crate::session::ClientSession,
765) -> Result<Value, String> {
766 let request: RegisterHostAgentRequest =
767 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
768 serde_json::to_value(
769 session
770 .host
771 .register_agent(&session.client_id, request)
772 .await?,
773 )
774 .map_err(|e| e.to_string())
775}
776
777async fn handle_host_unregister_agent(
778 req: &JsonRpcMessage,
779 session: &crate::session::ClientSession,
780) -> Result<Value, String> {
781 let agent_id = req
782 .params
783 .get("agent_id")
784 .and_then(|v| v.as_str())
785 .ok_or("missing agent_id")?;
786 session.host.unregister_agent(agent_id).await?;
787 Ok(serde_json::json!({"ok": true}))
788}
789
790async fn handle_host_set_status(
791 req: &JsonRpcMessage,
792 session: &crate::session::ClientSession,
793) -> Result<Value, String> {
794 let request: SetHostAgentStatusRequest =
795 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
796 serde_json::to_value(session.host.set_status(request).await?).map_err(|e| e.to_string())
797}
798
799async fn handle_host_notify(
800 req: &JsonRpcMessage,
801 session: &crate::session::ClientSession,
802) -> Result<Value, String> {
803 let kind = req
804 .params
805 .get("kind")
806 .and_then(|v| v.as_str())
807 .unwrap_or("host.notification");
808 let agent_id = req
809 .params
810 .get("agent_id")
811 .and_then(|v| v.as_str())
812 .map(str::to_string);
813 let message = req
814 .params
815 .get("message")
816 .and_then(|v| v.as_str())
817 .unwrap_or("");
818 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
819 serde_json::to_value(
820 session
821 .host
822 .record_event(kind, agent_id, message, payload)
823 .await,
824 )
825 .map_err(|e| e.to_string())
826}
827
828async fn handle_host_request_approval(
829 req: &JsonRpcMessage,
830 session: &crate::session::ClientSession,
831) -> Result<Value, String> {
832 let request: CreateHostApprovalRequest =
833 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
834 if let Some(agent_id) = &request.agent_id {
835 let _ = session
836 .host
837 .set_status(SetHostAgentStatusRequest {
838 agent_id: agent_id.clone(),
839 status: HostAgentStatus::WaitingForApproval,
840 current_task: None,
841 message: Some("Waiting for approval".to_string()),
842 payload: Value::Null,
843 })
844 .await;
845 }
846 serde_json::to_value(session.host.create_approval(request).await?).map_err(|e| e.to_string())
847}
848
849async fn handle_host_resolve_approval(
850 req: &JsonRpcMessage,
851 session: &crate::session::ClientSession,
852) -> Result<Value, String> {
853 let request: ResolveHostApprovalRequest =
854 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
855 serde_json::to_value(session.host.resolve_approval(request).await?).map_err(|e| e.to_string())
856}
857
858async fn handle_session_init(
859 req: &JsonRpcMessage,
860 session: &crate::session::ClientSession,
861) -> Result<Value, String> {
862 let init: SessionInitRequest =
863 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
864
865 for tool in &init.tools {
866 register_from_definition(&session.runtime, tool).await;
867 }
868
869 let mut policy_count = 0;
870 {
871 let mut policies = session.runtime.policies.write().await;
872 for policy_def in &init.policies {
873 if let Some(check) = build_policy_check(policy_def) {
874 policies.register(&policy_def.name, check, "");
875 policy_count += 1;
876 }
877 }
878 }
879
880 serde_json::to_value(SessionInitResponse {
881 session_id: session.client_id.clone(),
882 tools_registered: init.tools.len(),
883 policies_registered: policy_count,
884 })
885 .map_err(|e| e.to_string())
886}
887
888fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
889 match def.rule.as_str() {
890 "deny_tool" => {
891 let target = def.target.clone();
892 Some(Box::new(
893 move |action: &car_ir::Action, _: &car_state::StateStore| {
894 if action.tool.as_deref() == Some(&target) {
895 Some(format!("tool '{}' denied", target))
896 } else {
897 None
898 }
899 },
900 ))
901 }
902 "require_state" => {
903 let key = def.key.clone();
904 let value = def.value.clone();
905 Some(Box::new(
906 move |_: &car_ir::Action, state: &car_state::StateStore| {
907 if state.get(&key).as_ref() != Some(&value) {
908 Some(format!("state['{}'] must be {:?}", key, value))
909 } else {
910 None
911 }
912 },
913 ))
914 }
915 "deny_tool_param" => {
916 let target = def.target.clone();
917 let param = def.key.clone();
918 let pattern = def.pattern.clone();
919 Some(Box::new(
920 move |action: &car_ir::Action, _: &car_state::StateStore| {
921 if action.tool.as_deref() != Some(&target) {
922 return None;
923 }
924 if let Some(val) = action.parameters.get(¶m) {
925 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
926 if s.contains(&pattern) {
927 return Some(format!("param '{}' matches '{}'", param, pattern));
928 }
929 }
930 None
931 },
932 ))
933 }
934 _ => None,
935 }
936}
937
938async fn handle_tools_register(
939 req: &JsonRpcMessage,
940 session: &crate::session::ClientSession,
941) -> Result<Value, String> {
942 let tools: Vec<ToolDefinition> =
943 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
944 for tool in &tools {
945 register_from_definition(&session.runtime, tool).await;
946 }
947 Ok(Value::from(tools.len()))
948}
949
950async fn register_from_definition(runtime: &car_engine::Runtime, def: &ToolDefinition) {
957 runtime
958 .register_tool_schema(car_ir::ToolSchema {
959 name: def.name.clone(),
960 description: def.description.clone(),
961 parameters: def.parameters.clone(),
962 returns: def.returns.clone(),
963 idempotent: def.idempotent,
964 cache_ttl_secs: def.cache_ttl_secs,
965 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
966 max_calls: rl.max_calls,
967 interval_secs: rl.interval_secs,
968 }),
969 })
970 .await;
971}
972
973async fn handle_proposal_submit(
974 req: &JsonRpcMessage,
975 session: &crate::session::ClientSession,
976) -> Result<Value, String> {
977 let submit: ProposalSubmitRequest =
978 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
979 let session_id = req
985 .params
986 .get("session_id")
987 .and_then(|v| v.as_str())
988 .map(str::to_string);
989 let result = match session_id {
990 Some(sid) => session.runtime.execute_with_session(&submit.proposal, &sid).await,
991 None => session.runtime.execute(&submit.proposal).await,
992 };
993 serde_json::to_value(result).map_err(|e| e.to_string())
994}
995
996async fn handle_session_policy_open(
997 session: &crate::session::ClientSession,
998) -> Result<Value, String> {
999 let id = session.runtime.open_session().await;
1000 Ok(serde_json::json!({ "session_id": id }))
1001}
1002
1003async fn handle_session_policy_close(
1004 req: &JsonRpcMessage,
1005 session: &crate::session::ClientSession,
1006) -> Result<Value, String> {
1007 let sid = req
1008 .params
1009 .get("session_id")
1010 .and_then(|v| v.as_str())
1011 .ok_or("missing 'session_id'")?;
1012 let closed = session.runtime.close_session(sid).await;
1013 Ok(serde_json::json!({ "closed": closed }))
1014}
1015
1016async fn handle_policy_register(
1022 req: &JsonRpcMessage,
1023 session: &crate::session::ClientSession,
1024) -> Result<Value, String> {
1025 let def: PolicyDefinition = serde_json::from_value(req.params.clone())
1026 .map_err(|e| format!("invalid policy params: {e}"))?;
1027 let session_id = req
1028 .params
1029 .get("session_id")
1030 .and_then(|v| v.as_str())
1031 .map(str::to_string);
1032 let check = build_policy_check(&def).ok_or_else(|| {
1033 format!("unsupported policy rule '{}'", def.rule)
1034 })?;
1035 match session_id {
1036 Some(sid) => session
1037 .runtime
1038 .register_policy_in_session(&sid, &def.name, check, "")
1039 .await
1040 .map(|_| serde_json::json!({ "registered": def.name, "scope": { "session_id": sid } })),
1041 None => {
1042 let mut policies = session.runtime.policies.write().await;
1043 policies.register(&def.name, check, "");
1044 Ok(serde_json::json!({ "registered": def.name, "scope": "global" }))
1045 }
1046 }
1047}
1048
1049async fn handle_verify(
1050 req: &JsonRpcMessage,
1051 session: &crate::session::ClientSession,
1052) -> Result<Value, String> {
1053 let vr: VerifyRequest =
1054 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1055 let tools: std::collections::HashSet<String> =
1056 session.runtime.tools.read().await.keys().cloned().collect();
1057 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
1058 serde_json::to_value(VerifyResponse {
1059 valid: result.valid,
1060 issues: result
1061 .issues
1062 .iter()
1063 .map(|i| VerifyIssueProto {
1064 action_id: i.action_id.clone(),
1065 severity: i.severity.clone(),
1066 message: i.message.clone(),
1067 })
1068 .collect(),
1069 simulated_state: result.simulated_state,
1070 })
1071 .map_err(|e| e.to_string())
1072}
1073
1074async fn handle_state_get(
1075 req: &JsonRpcMessage,
1076 session: &crate::session::ClientSession,
1077) -> Result<Value, String> {
1078 let key = req
1079 .params
1080 .get("key")
1081 .and_then(|v| v.as_str())
1082 .ok_or("missing 'key'")?;
1083 Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
1084}
1085
1086async fn handle_state_set(
1087 req: &JsonRpcMessage,
1088 session: &crate::session::ClientSession,
1089) -> Result<Value, String> {
1090 let key = req
1091 .params
1092 .get("key")
1093 .and_then(|v| v.as_str())
1094 .ok_or("missing 'key'")?;
1095 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
1096 session.runtime.state.set(key, value, "client");
1097 Ok(Value::from("ok"))
1098}
1099
1100async fn handle_memory_fact_count(
1107 session: &crate::session::ClientSession,
1108) -> Result<Value, String> {
1109 let engine = session.memgine.lock().await;
1110 Ok(Value::from(engine.valid_fact_count()))
1111}
1112
1113async fn handle_memory_add_fact(
1114 req: &JsonRpcMessage,
1115 session: &crate::session::ClientSession,
1116) -> Result<Value, String> {
1117 let subject = req
1118 .params
1119 .get("subject")
1120 .and_then(|v| v.as_str())
1121 .ok_or("missing subject")?;
1122 let body = req
1123 .params
1124 .get("body")
1125 .and_then(|v| v.as_str())
1126 .ok_or("missing body")?;
1127 let kind = req
1128 .params
1129 .get("kind")
1130 .and_then(|v| v.as_str())
1131 .unwrap_or("pattern");
1132 let mut engine = session.memgine.lock().await;
1133 let fid = format!("ws-{}", engine.valid_fact_count());
1134 engine.ingest_fact(
1135 &fid,
1136 subject,
1137 body,
1138 "user",
1139 "peer",
1140 chrono::Utc::now(),
1141 "global",
1142 None,
1143 vec![],
1144 kind == "constraint",
1145 );
1146 Ok(Value::from(engine.valid_fact_count()))
1147}
1148
1149async fn handle_memory_query(
1150 req: &JsonRpcMessage,
1151 session: &crate::session::ClientSession,
1152) -> Result<Value, String> {
1153 let query = req
1154 .params
1155 .get("query")
1156 .and_then(|v| v.as_str())
1157 .ok_or("missing query")?;
1158 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
1159 let engine = session.memgine.lock().await;
1160 let seeds = engine.graph.find_seeds(query, 5);
1161 let hits = if !seeds.is_empty() {
1162 engine.graph.retrieve(&seeds, 3, k, 0.6, 0.05)
1163 } else {
1164 vec![]
1165 };
1166 let results: Vec<Value> = hits.iter().filter_map(|hit| {
1167 let node = engine.graph.inner.node_weight(hit.node_ix)?;
1168 Some(serde_json::json!({"subject": node.key, "body": node.value, "activation": hit.activation}))
1169 }).collect();
1170 serde_json::to_value(results).map_err(|e| e.to_string())
1171}
1172
1173async fn handle_memory_build_context(
1174 req: &JsonRpcMessage,
1175 session: &crate::session::ClientSession,
1176) -> Result<Value, String> {
1177 let query = req
1178 .params
1179 .get("query")
1180 .and_then(|v| v.as_str())
1181 .unwrap_or("");
1182 let mut engine = session.memgine.lock().await;
1183 Ok(Value::from(engine.build_context(query)))
1184}
1185
1186async fn handle_skill_ingest(
1189 req: &JsonRpcMessage,
1190 session: &crate::session::ClientSession,
1191) -> Result<Value, String> {
1192 let name = req
1193 .params
1194 .get("name")
1195 .and_then(|v| v.as_str())
1196 .ok_or("missing name")?;
1197 let code = req
1198 .params
1199 .get("code")
1200 .and_then(|v| v.as_str())
1201 .ok_or("missing code")?;
1202 let platform = req
1203 .params
1204 .get("platform")
1205 .and_then(|v| v.as_str())
1206 .unwrap_or("unknown");
1207 let persona = req
1208 .params
1209 .get("persona")
1210 .and_then(|v| v.as_str())
1211 .unwrap_or("");
1212 let url_pattern = req
1213 .params
1214 .get("url_pattern")
1215 .and_then(|v| v.as_str())
1216 .unwrap_or("");
1217 let description = req
1218 .params
1219 .get("description")
1220 .and_then(|v| v.as_str())
1221 .unwrap_or("");
1222 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
1223 let keywords: Vec<String> = req
1224 .params
1225 .get("task_keywords")
1226 .and_then(|v| v.as_array())
1227 .map(|arr| {
1228 arr.iter()
1229 .filter_map(|v| v.as_str().map(String::from))
1230 .collect()
1231 })
1232 .unwrap_or_default();
1233
1234 let trigger = car_memgine::SkillTrigger {
1235 persona: persona.into(),
1236 url_pattern: url_pattern.into(),
1237 task_keywords: keywords,
1238 };
1239 let mut engine = session.memgine.lock().await;
1240 engine.ingest_skill(
1241 name,
1242 code,
1243 platform,
1244 trigger,
1245 description,
1246 supersedes,
1247 vec![],
1248 vec![],
1249 );
1250 Ok(Value::from("ok"))
1251}
1252
1253async fn handle_skill_find(
1254 req: &JsonRpcMessage,
1255 session: &crate::session::ClientSession,
1256) -> Result<Value, String> {
1257 let persona = req
1258 .params
1259 .get("persona")
1260 .and_then(|v| v.as_str())
1261 .unwrap_or("");
1262 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
1263 let task = req
1264 .params
1265 .get("task")
1266 .and_then(|v| v.as_str())
1267 .unwrap_or("");
1268 let max = req
1269 .params
1270 .get("max_results")
1271 .and_then(|v| v.as_u64())
1272 .unwrap_or(1) as usize;
1273 let engine = session.memgine.lock().await;
1274 let results = engine.find_skill(persona, url, task, max);
1275 let json: Vec<Value> = results
1276 .iter()
1277 .map(|(m, s)| {
1278 serde_json::json!({
1279 "name": m.name, "code": m.code, "platform": m.platform,
1280 "description": m.description, "stats": m.stats, "match_score": s,
1281 })
1282 })
1283 .collect();
1284 serde_json::to_value(json).map_err(|e| e.to_string())
1285}
1286
1287async fn handle_skill_report(
1288 req: &JsonRpcMessage,
1289 session: &crate::session::ClientSession,
1290) -> Result<Value, String> {
1291 let name = req
1292 .params
1293 .get("skill_name")
1294 .and_then(|v| v.as_str())
1295 .ok_or("missing skill_name")?;
1296 let outcome_str = req
1297 .params
1298 .get("outcome")
1299 .and_then(|v| v.as_str())
1300 .ok_or("missing outcome")?;
1301 let outcome = match outcome_str {
1302 "success" => car_memgine::SkillOutcome::Success,
1303 _ => car_memgine::SkillOutcome::Fail,
1304 };
1305 let mut engine = session.memgine.lock().await;
1306 let stats = engine
1307 .report_outcome(name, outcome)
1308 .ok_or(format!("skill '{}' not found", name))?;
1309 serde_json::to_value(stats).map_err(|e| e.to_string())
1310}
1311
1312struct WsAgentRunner {
1321 channel: Arc<WsChannel>,
1322 host: Arc<crate::host::HostState>,
1323 client_id: String,
1324}
1325
1326#[async_trait::async_trait]
1327impl car_multi::AgentRunner for WsAgentRunner {
1328 async fn run(
1329 &self,
1330 spec: &car_multi::AgentSpec,
1331 task: &str,
1332 _runtime: &car_engine::Runtime,
1333 _mailbox: &car_multi::Mailbox,
1334 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
1335 use futures::SinkExt;
1336
1337 let request_id = self.channel.next_request_id();
1338 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
1339 let agent = self
1340 .host
1341 .register_agent(
1342 &self.client_id,
1343 RegisterHostAgentRequest {
1344 id: Some(agent_id.clone()),
1345 name: spec.name.clone(),
1346 kind: "callback".to_string(),
1347 capabilities: spec.tools.clone(),
1348 project: spec
1349 .metadata
1350 .get("project")
1351 .and_then(|v| v.as_str())
1352 .map(str::to_string),
1353 pid: None,
1354 display: serde_json::from_value(
1355 spec.metadata
1356 .get("display")
1357 .cloned()
1358 .unwrap_or(serde_json::Value::Null),
1359 )
1360 .unwrap_or_default(),
1361 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
1362 },
1363 )
1364 .await
1365 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
1366 let _ = self
1367 .host
1368 .set_status(SetHostAgentStatusRequest {
1369 agent_id: agent.id.clone(),
1370 status: HostAgentStatus::Running,
1371 current_task: Some(task.to_string()),
1372 message: Some(format!("{} started", spec.name)),
1373 payload: serde_json::json!({ "task": task }),
1374 })
1375 .await;
1376
1377 let rpc_request = serde_json::json!({
1378 "jsonrpc": "2.0",
1379 "method": "multi.run_agent",
1380 "params": {
1381 "spec": spec,
1382 "task": task,
1383 },
1384 "id": request_id,
1385 });
1386
1387 let (tx, rx) = tokio::sync::oneshot::channel();
1389 self.channel
1390 .pending
1391 .lock()
1392 .await
1393 .insert(request_id.clone(), tx);
1394
1395 let msg = Message::Text(
1396 serde_json::to_string(&rpc_request)
1397 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
1398 .into(),
1399 );
1400 if let Err(e) = self.channel.write.lock().await.send(msg).await {
1401 let _ = self
1402 .host
1403 .set_status(SetHostAgentStatusRequest {
1404 agent_id: agent_id.clone(),
1405 status: HostAgentStatus::Errored,
1406 current_task: None,
1407 message: Some(format!("{} failed to start", spec.name)),
1408 payload: serde_json::json!({ "error": e.to_string() }),
1409 })
1410 .await;
1411 return Err(car_multi::MultiError::AgentFailed(
1412 spec.name.clone(),
1413 format!("ws send error: {}", e),
1414 ));
1415 }
1416
1417 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
1419 Ok(Ok(response)) => response,
1420 Ok(Err(_)) => {
1421 let _ = self
1422 .host
1423 .set_status(SetHostAgentStatusRequest {
1424 agent_id: agent_id.clone(),
1425 status: HostAgentStatus::Errored,
1426 current_task: None,
1427 message: Some(format!("{} callback channel closed", spec.name)),
1428 payload: Value::Null,
1429 })
1430 .await;
1431 return Err(car_multi::MultiError::AgentFailed(
1432 spec.name.clone(),
1433 "agent callback channel closed".into(),
1434 ));
1435 }
1436 Err(_) => {
1437 let _ = self
1438 .host
1439 .set_status(SetHostAgentStatusRequest {
1440 agent_id: agent_id.clone(),
1441 status: HostAgentStatus::Errored,
1442 current_task: None,
1443 message: Some(format!("{} timed out", spec.name)),
1444 payload: Value::Null,
1445 })
1446 .await;
1447 return Err(car_multi::MultiError::AgentFailed(
1448 spec.name.clone(),
1449 "agent callback timed out (300s)".into(),
1450 ));
1451 }
1452 };
1453
1454 if let Some(err) = response.error {
1455 let _ = self
1456 .host
1457 .set_status(SetHostAgentStatusRequest {
1458 agent_id: agent_id.clone(),
1459 status: HostAgentStatus::Errored,
1460 current_task: None,
1461 message: Some(format!("{} errored", spec.name)),
1462 payload: serde_json::json!({ "error": err }),
1463 })
1464 .await;
1465 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
1466 }
1467
1468 let output_value = response.output.unwrap_or(Value::Null);
1469 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
1470 car_multi::MultiError::AgentFailed(
1471 spec.name.clone(),
1472 format!("invalid AgentOutput: {}", e),
1473 )
1474 })?;
1475 let status = if output.error.is_some() {
1476 HostAgentStatus::Errored
1477 } else {
1478 HostAgentStatus::Completed
1479 };
1480 let message = if output.error.is_some() {
1481 format!("{} errored", spec.name)
1482 } else {
1483 format!("{} completed", spec.name)
1484 };
1485 let _ = self
1486 .host
1487 .set_status(SetHostAgentStatusRequest {
1488 agent_id,
1489 status,
1490 current_task: None,
1491 message: Some(message),
1492 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
1493 })
1494 .await;
1495
1496 Ok(output)
1497 }
1498}
1499
1500fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
1501 let safe_name: String = name
1502 .chars()
1503 .map(|c| {
1504 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
1505 c
1506 } else {
1507 '-'
1508 }
1509 })
1510 .collect();
1511 format!("{}:{}:{}", client_id, safe_name, request_id)
1512}
1513
1514async fn handle_multi_swarm(
1515 req: &JsonRpcMessage,
1516 session: &crate::session::ClientSession,
1517) -> Result<Value, String> {
1518 let mode_str = req
1519 .params
1520 .get("mode")
1521 .and_then(|v| v.as_str())
1522 .ok_or("missing 'mode'")?;
1523 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1524 let task = req
1525 .params
1526 .get("task")
1527 .and_then(|v| v.as_str())
1528 .ok_or("missing 'task'")?;
1529
1530 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
1531 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
1532 let agent_specs: Vec<car_multi::AgentSpec> =
1533 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1534 let synth: Option<car_multi::AgentSpec> = req
1535 .params
1536 .get("synthesizer")
1537 .map(|v| serde_json::from_value(v.clone()))
1538 .transpose()
1539 .map_err(|e| format!("invalid synthesizer: {}", e))?;
1540
1541 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1542 channel: session.channel.clone(),
1543 host: session.host.clone(),
1544 client_id: session.client_id.clone(),
1545 });
1546 let infra = car_multi::SharedInfra::new();
1547
1548 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
1549 if let Some(s) = synth {
1550 swarm = swarm.with_synthesizer(s);
1551 }
1552
1553 let result = swarm
1554 .run(task, &runner, &infra)
1555 .await
1556 .map_err(|e| format!("swarm error: {}", e))?;
1557 serde_json::to_value(result).map_err(|e| e.to_string())
1558}
1559
1560async fn handle_multi_pipeline(
1561 req: &JsonRpcMessage,
1562 session: &crate::session::ClientSession,
1563) -> Result<Value, String> {
1564 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
1565 let task = req
1566 .params
1567 .get("task")
1568 .and_then(|v| v.as_str())
1569 .ok_or("missing 'task'")?;
1570
1571 let stage_specs: Vec<car_multi::AgentSpec> =
1572 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
1573
1574 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1575 channel: session.channel.clone(),
1576 host: session.host.clone(),
1577 client_id: session.client_id.clone(),
1578 });
1579 let infra = car_multi::SharedInfra::new();
1580
1581 let result = car_multi::Pipeline::new(stage_specs)
1582 .run(task, &runner, &infra)
1583 .await
1584 .map_err(|e| format!("pipeline error: {}", e))?;
1585 serde_json::to_value(result).map_err(|e| e.to_string())
1586}
1587
1588async fn handle_multi_supervisor(
1589 req: &JsonRpcMessage,
1590 session: &crate::session::ClientSession,
1591) -> Result<Value, String> {
1592 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
1593 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
1594 let task = req
1595 .params
1596 .get("task")
1597 .and_then(|v| v.as_str())
1598 .ok_or("missing 'task'")?;
1599 let max_rounds = req
1600 .params
1601 .get("max_rounds")
1602 .and_then(|v| v.as_u64())
1603 .unwrap_or(3) as u32;
1604
1605 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
1606 .map_err(|e| format!("invalid workers: {}", e))?;
1607 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
1608 .map_err(|e| format!("invalid supervisor: {}", e))?;
1609
1610 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1611 channel: session.channel.clone(),
1612 host: session.host.clone(),
1613 client_id: session.client_id.clone(),
1614 });
1615 let infra = car_multi::SharedInfra::new();
1616
1617 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
1618 .with_max_rounds(max_rounds)
1619 .run(task, &runner, &infra)
1620 .await
1621 .map_err(|e| format!("supervisor error: {}", e))?;
1622 serde_json::to_value(result).map_err(|e| e.to_string())
1623}
1624
1625async fn handle_multi_map_reduce(
1626 req: &JsonRpcMessage,
1627 session: &crate::session::ClientSession,
1628) -> Result<Value, String> {
1629 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
1630 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
1631 let task = req
1632 .params
1633 .get("task")
1634 .and_then(|v| v.as_str())
1635 .ok_or("missing 'task'")?;
1636 let items_val = req.params.get("items").ok_or("missing 'items'")?;
1637
1638 let mapper_spec: car_multi::AgentSpec =
1639 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
1640 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
1641 .map_err(|e| format!("invalid reducer: {}", e))?;
1642 let items: Vec<String> =
1643 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
1644
1645 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1646 channel: session.channel.clone(),
1647 host: session.host.clone(),
1648 client_id: session.client_id.clone(),
1649 });
1650 let infra = car_multi::SharedInfra::new();
1651
1652 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
1653 .run(task, &items, &runner, &infra)
1654 .await
1655 .map_err(|e| format!("map_reduce error: {}", e))?;
1656 serde_json::to_value(result).map_err(|e| e.to_string())
1657}
1658
1659async fn handle_multi_vote(
1660 req: &JsonRpcMessage,
1661 session: &crate::session::ClientSession,
1662) -> Result<Value, String> {
1663 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1664 let task = req
1665 .params
1666 .get("task")
1667 .and_then(|v| v.as_str())
1668 .ok_or("missing 'task'")?;
1669
1670 let agent_specs: Vec<car_multi::AgentSpec> =
1671 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1672 let synth: Option<car_multi::AgentSpec> = req
1673 .params
1674 .get("synthesizer")
1675 .map(|v| serde_json::from_value(v.clone()))
1676 .transpose()
1677 .map_err(|e| format!("invalid synthesizer: {}", e))?;
1678
1679 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1680 channel: session.channel.clone(),
1681 host: session.host.clone(),
1682 client_id: session.client_id.clone(),
1683 });
1684 let infra = car_multi::SharedInfra::new();
1685
1686 let mut vote = car_multi::Vote::new(agent_specs);
1687 if let Some(s) = synth {
1688 vote = vote.with_synthesizer(s);
1689 }
1690
1691 let result = vote
1692 .run(task, &runner, &infra)
1693 .await
1694 .map_err(|e| format!("vote error: {}", e))?;
1695 serde_json::to_value(result).map_err(|e| e.to_string())
1696}
1697
1698fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
1703 let name = req
1704 .params
1705 .get("name")
1706 .and_then(|v| v.as_str())
1707 .ok_or("scheduler.create requires 'name'")?;
1708 let prompt = req
1709 .params
1710 .get("prompt")
1711 .and_then(|v| v.as_str())
1712 .ok_or("scheduler.create requires 'prompt'")?;
1713
1714 let mut task = car_scheduler::Task::new(name, prompt);
1715
1716 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
1717 let trigger = match t {
1718 "once" => car_scheduler::TaskTrigger::Once,
1719 "cron" => car_scheduler::TaskTrigger::Cron,
1720 "interval" => car_scheduler::TaskTrigger::Interval,
1721 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
1722 _ => car_scheduler::TaskTrigger::Manual,
1723 };
1724 let schedule = req
1725 .params
1726 .get("schedule")
1727 .and_then(|v| v.as_str())
1728 .unwrap_or("");
1729 task = task.with_trigger(trigger, schedule);
1730 }
1731
1732 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
1733 task = task.with_system_prompt(sp);
1734 }
1735
1736 serde_json::to_value(&task).map_err(|e| e.to_string())
1737}
1738
1739async fn handle_scheduler_run(
1740 req: &JsonRpcMessage,
1741 session: &crate::session::ClientSession,
1742) -> Result<Value, String> {
1743 let task_val = req
1744 .params
1745 .get("task")
1746 .ok_or("scheduler.run requires 'task'")?;
1747 let mut task: car_scheduler::Task =
1748 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1749
1750 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1751 channel: session.channel.clone(),
1752 host: session.host.clone(),
1753 client_id: session.client_id.clone(),
1754 });
1755 let executor = car_scheduler::Executor::new(runner);
1756 let execution = executor.run_once(&mut task).await;
1757
1758 serde_json::to_value(&execution).map_err(|e| e.to_string())
1759}
1760
1761async fn handle_scheduler_run_loop(
1762 req: &JsonRpcMessage,
1763 session: &crate::session::ClientSession,
1764) -> Result<Value, String> {
1765 let task_val = req
1766 .params
1767 .get("task")
1768 .ok_or("scheduler.run_loop requires 'task'")?;
1769 let mut task: car_scheduler::Task =
1770 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1771 let max_iterations = req
1772 .params
1773 .get("max_iterations")
1774 .and_then(|v| v.as_u64())
1775 .map(|v| v as u32);
1776
1777 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1778 channel: session.channel.clone(),
1779 host: session.host.clone(),
1780 client_id: session.client_id.clone(),
1781 });
1782 let executor = car_scheduler::Executor::new(runner);
1783 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1784 let executions = executor
1785 .run_loop(&mut task, max_iterations, cancel_rx)
1786 .await;
1787
1788 serde_json::to_value(&executions).map_err(|e| e.to_string())
1789}
1790
1791fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
1796 state.inference.get_or_init(|| {
1797 Arc::new(car_inference::InferenceEngine::new(
1798 car_inference::InferenceConfig::default(),
1799 ))
1800 })
1801}
1802
1803async fn handle_infer(
1804 msg: &JsonRpcMessage,
1805 state: &ServerState,
1806 session: &crate::session::ClientSession,
1807) -> Result<Value, String> {
1808 let engine = get_inference_engine(state);
1809 let mut req: car_inference::GenerateRequest =
1810 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1811
1812 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
1814 let mut memgine = session.memgine.lock().await;
1815 let ctx = memgine.build_context(cq);
1816 if !ctx.is_empty() {
1817 req.context = Some(ctx);
1818 }
1819 }
1820
1821 let _permit = state.admission.acquire().await;
1827
1828 let result = engine
1839 .generate_tracked(req)
1840 .await
1841 .map_err(|e| e.to_string())?;
1842 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
1843}
1844
1845async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1846 let engine = get_inference_engine(state);
1847 let req: car_inference::EmbedRequest =
1848 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1849 let _permit = state.admission.acquire().await;
1853 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
1854 Ok(serde_json::json!({"embeddings": result}))
1855}
1856
1857async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1858 let engine = get_inference_engine(state);
1859 let req: car_inference::ClassifyRequest =
1860 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1861 let _permit = state.admission.acquire().await;
1862 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
1863 Ok(serde_json::json!({"classifications": result}))
1864}
1865
1866fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
1870 let total = state.admission.permits();
1871 let available = state.admission.permits_available();
1872 let in_use = total.saturating_sub(available);
1873 Ok(serde_json::json!({
1874 "permits_total": total,
1875 "permits_available": available,
1876 "permits_in_use": in_use,
1877 "env_override": crate::admission::ENV_MAX_CONCURRENT,
1878 }))
1879}
1880
1881async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1882 let model = msg
1883 .params
1884 .get("model")
1885 .and_then(|v| v.as_str())
1886 .ok_or("missing 'model' parameter")?;
1887 let text = msg
1888 .params
1889 .get("text")
1890 .and_then(|v| v.as_str())
1891 .ok_or("missing 'text' parameter")?;
1892 let engine = get_inference_engine(state);
1893 let ids = engine
1894 .tokenize(model, text)
1895 .await
1896 .map_err(|e| e.to_string())?;
1897 Ok(serde_json::json!({"tokens": ids}))
1898}
1899
1900async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1901 let model = msg
1902 .params
1903 .get("model")
1904 .and_then(|v| v.as_str())
1905 .ok_or("missing 'model' parameter")?;
1906 let tokens: Vec<u32> = msg
1907 .params
1908 .get("tokens")
1909 .and_then(|v| v.as_array())
1910 .ok_or("missing 'tokens' parameter")?
1911 .iter()
1912 .map(|t| {
1913 t.as_u64()
1914 .and_then(|n| u32::try_from(n).ok())
1915 .ok_or_else(|| "tokens[] must be u32 values".to_string())
1916 })
1917 .collect::<Result<Vec<_>, _>>()?;
1918 let engine = get_inference_engine(state);
1919 let text = engine
1920 .detokenize(model, &tokens)
1921 .await
1922 .map_err(|e| e.to_string())?;
1923 Ok(serde_json::json!({"text": text}))
1924}
1925
1926fn handle_models_list(state: &ServerState) -> Result<Value, String> {
1927 let engine = get_inference_engine(state);
1928 let models = engine.list_models();
1929 serde_json::to_value(&models).map_err(|e| e.to_string())
1930}
1931
1932fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
1933 let engine = get_inference_engine(state);
1934 let models = engine.list_models_unified();
1935 serde_json::to_value(&models).map_err(|e| e.to_string())
1936}
1937
1938#[derive(Debug, Deserialize)]
1939#[serde(rename_all = "camelCase")]
1940struct ModelSearchParams {
1941 #[serde(default)]
1942 query: Option<String>,
1943 #[serde(default)]
1944 capability: Option<car_inference::ModelCapability>,
1945 #[serde(default)]
1946 provider: Option<String>,
1947 #[serde(default)]
1948 local_only: bool,
1949 #[serde(default)]
1950 available_only: bool,
1951 #[serde(default)]
1952 limit: Option<usize>,
1953}
1954
1955#[derive(Debug, Serialize)]
1956#[serde(rename_all = "camelCase")]
1957struct ModelSearchEntry {
1958 #[serde(flatten)]
1959 info: car_inference::ModelInfo,
1960 family: String,
1961 version: String,
1962 tags: Vec<String>,
1963 pullable: bool,
1964 upgrade: Option<car_inference::ModelUpgrade>,
1965}
1966
1967#[derive(Debug, Serialize)]
1968#[serde(rename_all = "camelCase")]
1969struct ModelSearchResponse {
1970 models: Vec<ModelSearchEntry>,
1971 upgrades: Vec<car_inference::ModelUpgrade>,
1972 total: usize,
1973 available: usize,
1974 local: usize,
1975 remote: usize,
1976}
1977
1978fn handle_models_search(req: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1979 let params: ModelSearchParams =
1980 serde_json::from_value(req.params.clone()).unwrap_or(ModelSearchParams {
1981 query: None,
1982 capability: None,
1983 provider: None,
1984 local_only: false,
1985 available_only: false,
1986 limit: None,
1987 });
1988 let engine = get_inference_engine(state);
1989 let upgrades = engine.available_model_upgrades();
1990 let upgrades_by_from: HashMap<String, car_inference::ModelUpgrade> = upgrades
1991 .iter()
1992 .cloned()
1993 .map(|upgrade| (upgrade.from_id.clone(), upgrade))
1994 .collect();
1995 let query = params
1996 .query
1997 .as_deref()
1998 .map(str::trim)
1999 .filter(|q| !q.is_empty())
2000 .map(|q| q.to_ascii_lowercase());
2001 let provider = params
2002 .provider
2003 .as_deref()
2004 .map(str::trim)
2005 .filter(|p| !p.is_empty())
2006 .map(|p| p.to_ascii_lowercase());
2007
2008 let mut entries: Vec<ModelSearchEntry> = engine
2009 .list_schemas()
2010 .into_iter()
2011 .filter(|schema| {
2012 if let Some(capability) = params.capability {
2013 if !schema.has_capability(capability) {
2014 return false;
2015 }
2016 }
2017 if let Some(provider) = provider.as_deref() {
2018 if schema.provider.to_ascii_lowercase() != provider {
2019 return false;
2020 }
2021 }
2022 if params.local_only && !schema.is_local() {
2023 return false;
2024 }
2025 if params.available_only && !schema.available {
2026 return false;
2027 }
2028 if let Some(query) = query.as_deref() {
2029 let capability_text = schema
2030 .capabilities
2031 .iter()
2032 .map(|cap| format!("{cap:?}").to_ascii_lowercase())
2033 .collect::<Vec<_>>()
2034 .join(" ");
2035 let haystack = format!(
2036 "{} {} {} {} {} {}",
2037 schema.id,
2038 schema.name,
2039 schema.provider,
2040 schema.family,
2041 schema.tags.join(" "),
2042 capability_text
2043 )
2044 .to_ascii_lowercase();
2045 if !haystack.contains(query) {
2046 return false;
2047 }
2048 }
2049 true
2050 })
2051 .map(|schema| {
2052 let pullable = !schema.available
2053 && matches!(
2054 schema.source,
2055 car_inference::ModelSource::Local { .. } | car_inference::ModelSource::Mlx { .. }
2056 );
2057 let info = car_inference::ModelInfo::from(&schema);
2058 let upgrade = upgrades_by_from.get(&schema.id).cloned();
2059 ModelSearchEntry {
2060 info,
2061 family: schema.family,
2062 version: schema.version,
2063 tags: schema.tags,
2064 pullable,
2065 upgrade,
2066 }
2067 })
2068 .collect();
2069 entries.sort_by(|a, b| {
2070 b.info
2071 .available
2072 .cmp(&a.info.available)
2073 .then(b.info.is_local.cmp(&a.info.is_local))
2074 .then(a.info.name.cmp(&b.info.name))
2075 });
2076 if let Some(limit) = params.limit {
2077 entries.truncate(limit);
2078 }
2079
2080 let total = entries.len();
2081 let available = entries.iter().filter(|entry| entry.info.available).count();
2082 let local = entries.iter().filter(|entry| entry.info.is_local).count();
2083 let response = ModelSearchResponse {
2084 models: entries,
2085 upgrades,
2086 total,
2087 available,
2088 local,
2089 remote: total.saturating_sub(local),
2090 };
2091 serde_json::to_value(response).map_err(|e| e.to_string())
2092}
2093
2094fn handle_models_upgrades(state: &ServerState) -> Result<Value, String> {
2095 let engine = get_inference_engine(state);
2096 serde_json::to_value(serde_json::json!({
2097 "upgrades": engine.available_model_upgrades()
2098 }))
2099 .map_err(|e| e.to_string())
2100}
2101
2102async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2103 let name = msg
2104 .params
2105 .get("name")
2106 .or_else(|| msg.params.get("id"))
2107 .or_else(|| msg.params.get("model"))
2108 .and_then(|v| v.as_str())
2109 .ok_or("missing 'name' parameter")?;
2110 let engine = get_inference_engine(state);
2111 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
2112 Ok(serde_json::json!({"path": path.display().to_string()}))
2113}
2114
2115async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2116 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2117 msg.params
2118 .get("events")
2119 .cloned()
2120 .unwrap_or(msg.params.clone()),
2121 )
2122 .map_err(|e| format!("invalid events: {}", e))?;
2123
2124 let inference = get_inference_engine(state).clone();
2125 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
2126
2127 let skills = engine.distill_skills(&events).await;
2128 serde_json::to_value(&skills).map_err(|e| e.to_string())
2129}
2130
2131async fn handle_memory_consolidate(
2134 session: &crate::session::ClientSession,
2135) -> Result<Value, String> {
2136 let mut engine = session.memgine.lock().await;
2137 let report = engine.consolidate().await;
2138 serde_json::to_value(&report).map_err(|e| e.to_string())
2139}
2140
2141async fn handle_skill_repair(
2145 msg: &JsonRpcMessage,
2146 session: &crate::session::ClientSession,
2147) -> Result<Value, String> {
2148 let name = msg
2149 .params
2150 .get("skill_name")
2151 .and_then(|v| v.as_str())
2152 .ok_or("missing 'skill_name' parameter")?;
2153 let mut engine = session.memgine.lock().await;
2154 let code = engine.repair_skill(name).await;
2155 Ok(match code {
2156 Some(c) => serde_json::json!({ "code": c }),
2157 None => Value::Null,
2158 })
2159}
2160
2161async fn handle_skills_ingest_distilled(
2164 msg: &JsonRpcMessage,
2165 session: &crate::session::ClientSession,
2166) -> Result<Value, String> {
2167 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
2168 msg.params
2169 .get("skills")
2170 .cloned()
2171 .unwrap_or(msg.params.clone()),
2172 )
2173 .map_err(|e| format!("invalid skills: {}", e))?;
2174 let mut engine = session.memgine.lock().await;
2175 let nodes = engine.ingest_distilled_skills(&skills);
2176 Ok(serde_json::json!({ "ingested": nodes.len() }))
2177}
2178
2179async fn handle_skills_evolve(
2182 msg: &JsonRpcMessage,
2183 session: &crate::session::ClientSession,
2184) -> Result<Value, String> {
2185 let domain = msg
2186 .params
2187 .get("domain")
2188 .and_then(|v| v.as_str())
2189 .ok_or("missing 'domain' parameter")?
2190 .to_string();
2191 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
2192 msg.params
2193 .get("events")
2194 .cloned()
2195 .unwrap_or(Value::Array(vec![])),
2196 )
2197 .map_err(|e| format!("invalid events: {}", e))?;
2198 let mut engine = session.memgine.lock().await;
2199 let skills = engine.evolve_skills(&events, &domain).await;
2200 serde_json::to_value(&skills).map_err(|e| e.to_string())
2201}
2202
2203async fn handle_skills_domains_needing_evolution(
2205 msg: &JsonRpcMessage,
2206 session: &crate::session::ClientSession,
2207) -> Result<Value, String> {
2208 let threshold = msg
2209 .params
2210 .get("threshold")
2211 .and_then(|v| v.as_f64())
2212 .unwrap_or(0.6);
2213 let engine = session.memgine.lock().await;
2214 let domains = engine.domains_needing_evolution(threshold);
2215 serde_json::to_value(&domains).map_err(|e| e.to_string())
2216}
2217
2218async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2220 let engine = get_inference_engine(state);
2221 let req: car_inference::RerankRequest =
2222 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2223 let _permit = state.admission.acquire().await;
2224 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
2225 serde_json::to_value(&result).map_err(|e| e.to_string())
2226}
2227
2228async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2234 let engine = get_inference_engine(state);
2235 let req: car_inference::TranscribeRequest =
2236 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2237 let _permit = state.admission.acquire().await;
2238 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
2239 serde_json::to_value(&result).map_err(|e| e.to_string())
2240}
2241
2242async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2245 let engine = get_inference_engine(state);
2246 let req: car_inference::SynthesizeRequest =
2247 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
2248 let _permit = state.admission.acquire().await;
2249 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
2250 serde_json::to_value(&result).map_err(|e| e.to_string())
2251}
2252
2253async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
2257 let engine = get_inference_engine(state);
2258 let status = engine
2259 .prepare_speech_runtime()
2260 .await
2261 .map_err(|e| e.to_string())?;
2262 serde_json::to_value(&status).map_err(|e| e.to_string())
2263}
2264
2265async fn handle_models_route(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
2268 let prompt = msg
2269 .params
2270 .get("prompt")
2271 .and_then(|v| v.as_str())
2272 .ok_or("missing 'prompt' parameter")?;
2273 let engine = get_inference_engine(state);
2274 let decision = engine.route_adaptive(prompt).await;
2275 serde_json::to_value(&decision).map_err(|e| e.to_string())
2276}
2277
2278async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
2280 let engine = get_inference_engine(state);
2281 let profiles = engine.export_profiles().await;
2282 serde_json::to_value(&profiles).map_err(|e| e.to_string())
2283}
2284
2285async fn handle_events_count(session: &crate::session::ClientSession) -> Result<Value, String> {
2287 let n = session.runtime.log.lock().await.len();
2288 Ok(Value::from(n as u64))
2289}
2290
2291async fn handle_events_stats(session: &crate::session::ClientSession) -> Result<Value, String> {
2292 let stats = session.runtime.log.lock().await.stats();
2293 serde_json::to_value(stats).map_err(|e| e.to_string())
2294}
2295
2296#[derive(Deserialize)]
2297#[serde(rename_all = "camelCase")]
2298struct EventsTruncateParams {
2299 #[serde(default)]
2300 max_events: Option<usize>,
2301 #[serde(default)]
2302 max_spans: Option<usize>,
2303}
2304
2305async fn handle_events_truncate(
2306 msg: &JsonRpcMessage,
2307 session: &crate::session::ClientSession,
2308) -> Result<Value, String> {
2309 let params: EventsTruncateParams =
2310 serde_json::from_value(msg.params.clone()).unwrap_or(EventsTruncateParams {
2311 max_events: None,
2312 max_spans: None,
2313 });
2314 let mut log = session.runtime.log.lock().await;
2315 let removed_events = params
2316 .max_events
2317 .map(|max| log.truncate_events_keep_last(max))
2318 .unwrap_or(0);
2319 let removed_spans = params
2320 .max_spans
2321 .map(|max| log.truncate_spans_keep_last(max))
2322 .unwrap_or(0);
2323 let stats = log.stats();
2324 Ok(serde_json::json!({
2325 "removedEvents": removed_events,
2326 "removedSpans": removed_spans,
2327 "stats": stats,
2328 }))
2329}
2330
2331async fn handle_events_clear(session: &crate::session::ClientSession) -> Result<Value, String> {
2332 let mut log = session.runtime.log.lock().await;
2333 let removed = log.clear();
2334 Ok(serde_json::json!({ "removed": removed, "stats": log.stats() }))
2335}
2336
2337async fn handle_replan_set_config(
2342 msg: &JsonRpcMessage,
2343 session: &crate::session::ClientSession,
2344) -> Result<Value, String> {
2345 let max_replans = msg
2346 .params
2347 .get("max_replans")
2348 .and_then(|v| v.as_u64())
2349 .unwrap_or(0) as u32;
2350 let delay_ms = msg
2351 .params
2352 .get("delay_ms")
2353 .and_then(|v| v.as_u64())
2354 .unwrap_or(0);
2355 let verify_before_execute = msg
2356 .params
2357 .get("verify_before_execute")
2358 .and_then(|v| v.as_bool())
2359 .unwrap_or(true);
2360 let cfg = car_engine::ReplanConfig {
2361 max_replans,
2362 delay_ms,
2363 verify_before_execute,
2364 };
2365 session.runtime.set_replan_config(cfg).await;
2366 Ok(Value::Null)
2367}
2368
2369async fn handle_skills_list(
2370 msg: &JsonRpcMessage,
2371 session: &crate::session::ClientSession,
2372) -> Result<Value, String> {
2373 let domain = msg.params.get("domain").and_then(|v| v.as_str());
2374 let engine = session.memgine.lock().await;
2375 let skills: Vec<serde_json::Value> = engine
2376 .graph
2377 .inner
2378 .node_indices()
2379 .filter_map(|nix| {
2380 let node = engine.graph.inner.node_weight(nix)?;
2381 if node.kind != car_memgine::MemKind::Skill {
2382 return None;
2383 }
2384 let meta = car_memgine::SkillMeta::from_node(node)?;
2385 if let Some(d) = domain {
2386 match &meta.scope {
2387 car_memgine::SkillScope::Global => {}
2388 car_memgine::SkillScope::Domain(sd) if sd == d => {}
2389 _ => return None,
2390 }
2391 }
2392 Some(serde_json::to_value(&meta).unwrap_or_default())
2393 })
2394 .collect();
2395 serde_json::to_value(&skills).map_err(|e| e.to_string())
2396}
2397
2398#[derive(serde::Deserialize)]
2399struct SecretParams {
2400 #[serde(default)]
2401 service: Option<String>,
2402 key: String,
2403 #[serde(default)]
2404 value: Option<String>,
2405}
2406
2407fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
2408 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2409 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
2410 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
2411}
2412
2413fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
2414 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2415 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
2416}
2417
2418fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
2419 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2420 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
2421}
2422
2423fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
2424 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2425 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
2426}
2427
2428#[derive(serde::Deserialize)]
2429struct PermParams {
2430 domain: String,
2431 #[serde(default)]
2432 target_bundle_id: Option<String>,
2433}
2434
2435fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
2436 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2437 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
2438}
2439
2440fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
2441 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2442 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
2443}
2444
2445fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
2446 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2447 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
2448}
2449
2450fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
2451 #[derive(serde::Deserialize)]
2452 struct P {
2453 start: String,
2454 end: String,
2455 #[serde(default)]
2456 calendar_ids: Vec<String>,
2457 }
2458 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2459 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
2460 .map_err(|e| format!("parse start: {}", e))?
2461 .with_timezone(&chrono::Utc);
2462 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
2463 .map_err(|e| format!("parse end: {}", e))?
2464 .with_timezone(&chrono::Utc);
2465 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
2466}
2467
2468fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
2469 #[derive(serde::Deserialize)]
2470 struct P {
2471 query: String,
2472 #[serde(default = "default_limit")]
2473 limit: usize,
2474 #[serde(default)]
2475 container_ids: Vec<String>,
2476 }
2477 fn default_limit() -> usize {
2478 50
2479 }
2480 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2481 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
2482}
2483
2484fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
2485 #[derive(serde::Deserialize, Default)]
2486 struct P {
2487 #[serde(default)]
2488 account_ids: Vec<String>,
2489 }
2490 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
2491 car_ffi_common::integrations::mail_inbox(&p.account_ids)
2492}
2493
2494fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
2495 let raw = req.params.to_string();
2496 car_ffi_common::integrations::mail_send(&raw)
2497}
2498
2499fn handle_messages_chats(req: &JsonRpcMessage) -> Result<Value, String> {
2500 #[derive(serde::Deserialize)]
2501 struct P {
2502 #[serde(default = "default_limit")]
2503 limit: usize,
2504 }
2505 fn default_limit() -> usize {
2506 50
2507 }
2508 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
2509 car_ffi_common::integrations::messages_chats(p.limit)
2510}
2511
2512fn handle_messages_send(req: &JsonRpcMessage) -> Result<Value, String> {
2513 let raw = req.params.to_string();
2514 car_ffi_common::integrations::messages_send(&raw)
2515}
2516
2517fn handle_notes_find(req: &JsonRpcMessage) -> Result<Value, String> {
2518 #[derive(serde::Deserialize)]
2519 struct P {
2520 query: String,
2521 #[serde(default = "default_limit")]
2522 limit: usize,
2523 }
2524 fn default_limit() -> usize {
2525 50
2526 }
2527 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2528 car_ffi_common::integrations::notes_find(&p.query, p.limit)
2529}
2530
2531fn handle_reminders_items(req: &JsonRpcMessage) -> Result<Value, String> {
2532 #[derive(serde::Deserialize)]
2533 struct P {
2534 #[serde(default = "default_limit")]
2535 limit: usize,
2536 }
2537 fn default_limit() -> usize {
2538 50
2539 }
2540 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 50 });
2541 car_ffi_common::integrations::reminders_items(p.limit)
2542}
2543
2544fn handle_bookmarks_list(req: &JsonRpcMessage) -> Result<Value, String> {
2545 #[derive(serde::Deserialize)]
2546 struct P {
2547 #[serde(default = "default_limit")]
2548 limit: usize,
2549 }
2550 fn default_limit() -> usize {
2551 100
2552 }
2553 let p: P = serde_json::from_value(req.params.clone()).unwrap_or(P { limit: 100 });
2554 car_ffi_common::integrations::bookmarks_list(p.limit)
2555}
2556
2557fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
2558 #[derive(serde::Deserialize)]
2559 struct P {
2560 start: String,
2561 end: String,
2562 }
2563 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2564 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
2565 .map_err(|e| format!("parse start: {}", e))?
2566 .with_timezone(&chrono::Utc);
2567 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
2568 .map_err(|e| format!("parse end: {}", e))?
2569 .with_timezone(&chrono::Utc);
2570 car_ffi_common::health::sleep_windows(s, e)
2571}
2572
2573fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
2574 #[derive(serde::Deserialize)]
2575 struct P {
2576 start: String,
2577 end: String,
2578 }
2579 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2580 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
2581 .map_err(|e| format!("parse start: {}", e))?
2582 .with_timezone(&chrono::Utc);
2583 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
2584 .map_err(|e| format!("parse end: {}", e))?
2585 .with_timezone(&chrono::Utc);
2586 car_ffi_common::health::workouts(s, e)
2587}
2588
2589fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
2590 #[derive(serde::Deserialize)]
2591 struct P {
2592 start: String,
2593 end: String,
2594 }
2595 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2596 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
2597 .map_err(|e| format!("parse start: {}", e))?;
2598 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
2599 .map_err(|e| format!("parse end: {}", e))?;
2600 car_ffi_common::health::activity(s, e)
2601}
2602
2603async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
2604 let closed = session.browser.close().await?;
2605 Ok(serde_json::json!({"closed": closed}))
2606}
2607
2608async fn handle_browser_run(
2609 req: &JsonRpcMessage,
2610 session: &crate::session::ClientSession,
2611) -> Result<Value, String> {
2612 #[derive(serde::Deserialize)]
2613 struct BrowserRunParams {
2614 script: Value,
2616 #[serde(default)]
2617 width: Option<u32>,
2618 #[serde(default)]
2619 height: Option<u32>,
2620 #[serde(default)]
2625 headed: Option<bool>,
2626 #[serde(default)]
2629 extra_args: Option<Vec<String>>,
2630 }
2631 let params: BrowserRunParams =
2632 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2633
2634 let script_json = match params.script {
2636 Value::String(s) => s,
2637 other => other.to_string(),
2638 };
2639
2640 let browser_session = session
2641 .browser
2642 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
2643 width: params.width.unwrap_or(1280),
2644 height: params.height.unwrap_or(720),
2645 headless: !params.headed.unwrap_or(false),
2646 extra_args: params.extra_args.unwrap_or_default(),
2647 })
2648 .await?;
2649
2650 let trace_json = browser_session.run(&script_json).await?;
2651 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
2652}
2653
2654#[derive(Deserialize)]
2667struct VoiceStartParams {
2668 session_id: String,
2669 audio_source: Value,
2670 #[serde(default)]
2671 options: Option<Value>,
2672}
2673
2674async fn handle_voice_transcribe_stream_start(
2675 req: &JsonRpcMessage,
2676 state: &Arc<ServerState>,
2677 session: &Arc<crate::session::ClientSession>,
2678) -> Result<Value, String> {
2679 let params: VoiceStartParams =
2680 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2681 let audio_source_json =
2682 serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
2683 let options_json = params
2684 .options
2685 .as_ref()
2686 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
2687 .transpose()?;
2688 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2689 channel: session.channel.clone(),
2690 });
2691 let json = car_ffi_common::voice::transcribe_stream_start(
2692 ¶ms.session_id,
2693 &audio_source_json,
2694 options_json.as_deref(),
2695 state.voice_sessions.clone(),
2696 sink,
2697 )
2698 .await?;
2699 serde_json::from_str(&json).map_err(|e| e.to_string())
2700}
2701
2702#[derive(Deserialize)]
2703struct VoiceStopParams {
2704 session_id: String,
2705}
2706
2707async fn handle_voice_transcribe_stream_stop(
2708 req: &JsonRpcMessage,
2709 state: &Arc<ServerState>,
2710) -> Result<Value, String> {
2711 let params: VoiceStopParams =
2712 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2713 let json = car_ffi_common::voice::transcribe_stream_stop(
2714 ¶ms.session_id,
2715 state.voice_sessions.clone(),
2716 )
2717 .await?;
2718 serde_json::from_str(&json).map_err(|e| e.to_string())
2719}
2720
2721#[derive(Deserialize)]
2722struct VoicePushParams {
2723 session_id: String,
2724 pcm_b64: String,
2728}
2729
2730async fn handle_voice_transcribe_stream_push(
2731 req: &JsonRpcMessage,
2732 state: &Arc<ServerState>,
2733) -> Result<Value, String> {
2734 use base64::Engine;
2735 let params: VoicePushParams =
2736 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2737 let pcm = base64::engine::general_purpose::STANDARD
2738 .decode(¶ms.pcm_b64)
2739 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
2740 let json = car_ffi_common::voice::transcribe_stream_push(
2741 ¶ms.session_id,
2742 &pcm,
2743 state.voice_sessions.clone(),
2744 )
2745 .await?;
2746 serde_json::from_str(&json).map_err(|e| e.to_string())
2747}
2748
2749fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
2750 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
2751 serde_json::from_str(&json).unwrap_or(Value::Null)
2752}
2753
2754async fn handle_voice_dispatch_turn(
2755 req: &JsonRpcMessage,
2756 state: &Arc<ServerState>,
2757 session: &Arc<crate::session::ClientSession>,
2758) -> Result<Value, String> {
2759 let req_value = req.params.clone();
2760 let request: car_ffi_common::voice_turn::DispatchVoiceTurnRequest =
2761 serde_json::from_value(req_value).map_err(|e| e.to_string())?;
2762 let engine = get_inference_engine(state).clone();
2763 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2764 channel: session.channel.clone(),
2765 });
2766 let resp = car_ffi_common::voice_turn::dispatch(engine, request, sink).await?;
2767 serde_json::to_value(resp).map_err(|e| e.to_string())
2768}
2769
2770async fn handle_voice_cancel_turn() -> Result<Value, String> {
2771 car_ffi_common::voice_turn::cancel().await;
2772 Ok(serde_json::json!({"cancelled": true}))
2773}
2774
2775async fn handle_voice_prewarm_turn(state: &Arc<ServerState>) -> Result<Value, String> {
2776 let engine = get_inference_engine(state).clone();
2777 car_ffi_common::voice_turn::prewarm(engine).await;
2778 Ok(serde_json::json!({"prewarmed": true}))
2779}
2780
2781fn ws_runner_session() -> &'static std::sync::RwLock<Option<Arc<crate::session::WsChannel>>> {
2800 static SLOT: std::sync::OnceLock<std::sync::RwLock<Option<Arc<crate::session::WsChannel>>>> =
2801 std::sync::OnceLock::new();
2802 SLOT.get_or_init(|| std::sync::RwLock::new(None))
2803}
2804
2805fn ws_runner_calls(
2806) -> &'static dashmap::DashMap<String, car_inference::EventEmitter> {
2807 static MAP: std::sync::OnceLock<dashmap::DashMap<String, car_inference::EventEmitter>> =
2808 std::sync::OnceLock::new();
2809 MAP.get_or_init(dashmap::DashMap::new)
2810}
2811
2812fn ws_runner_completions() -> &'static dashmap::DashMap<
2813 String,
2814 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
2815> {
2816 static MAP: std::sync::OnceLock<
2817 dashmap::DashMap<
2818 String,
2819 tokio::sync::oneshot::Sender<std::result::Result<car_inference::RunnerResult, String>>,
2820 >,
2821 > = std::sync::OnceLock::new();
2822 MAP.get_or_init(dashmap::DashMap::new)
2823}
2824
2825struct WsInferenceRunner;
2826
2827#[async_trait::async_trait]
2828impl car_inference::InferenceRunner for WsInferenceRunner {
2829 async fn run(
2830 &self,
2831 request: car_inference::tasks::generate::GenerateRequest,
2832 emitter: car_inference::EventEmitter,
2833 ) -> std::result::Result<car_inference::RunnerResult, car_inference::RunnerError> {
2834 let channel = ws_runner_session()
2835 .read()
2836 .map_err(|e| {
2837 car_inference::RunnerError::Failed(format!("ws runner slot poisoned: {e}"))
2838 })?
2839 .clone()
2840 .ok_or_else(|| {
2841 car_inference::RunnerError::Declined(
2842 "no WebSocket inference runner registered — call inference.register_runner first"
2843 .into(),
2844 )
2845 })?;
2846
2847 let call_id = uuid::Uuid::new_v4().to_string();
2848 let request_json = serde_json::to_value(&request)
2849 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
2850 let (tx, rx) = tokio::sync::oneshot::channel();
2851 ws_runner_calls().insert(call_id.clone(), emitter);
2852 ws_runner_completions().insert(call_id.clone(), tx);
2853
2854 use futures::SinkExt;
2856 let notification = serde_json::json!({
2857 "jsonrpc": "2.0",
2858 "method": "inference.runner.invoke",
2859 "params": {
2860 "call_id": call_id,
2861 "request": request_json,
2862 },
2863 });
2864 let text = serde_json::to_string(¬ification)
2865 .map_err(|e| car_inference::RunnerError::Failed(e.to_string()))?;
2866 let _ = channel
2867 .write
2868 .lock()
2869 .await
2870 .send(tokio_tungstenite::tungstenite::Message::Text(text.into()))
2871 .await;
2872
2873 let result = rx.await.map_err(|_| {
2874 car_inference::RunnerError::Failed("runner completion channel dropped".into())
2875 })?;
2876 ws_runner_calls().remove(&call_id);
2877 result.map_err(car_inference::RunnerError::Failed)
2878 }
2879}
2880
2881async fn handle_inference_register_runner(
2882 session: &Arc<crate::session::ClientSession>,
2883) -> Result<Value, String> {
2884 let mut guard = ws_runner_session()
2885 .write()
2886 .map_err(|e| format!("ws runner slot poisoned: {e}"))?;
2887 *guard = Some(session.channel.clone());
2888 drop(guard);
2889 car_inference::set_inference_runner(Some(Arc::new(WsInferenceRunner)));
2890 Ok(serde_json::json!({"registered": true}))
2891}
2892
2893#[derive(serde::Deserialize)]
2894struct InferenceRunnerEventParams {
2895 call_id: String,
2896 event: Value,
2897}
2898
2899async fn handle_inference_runner_event(req: &JsonRpcMessage) -> Result<Value, String> {
2900 let params: InferenceRunnerEventParams =
2901 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2902 let stream_event = match parse_runner_event_value(¶ms.event) {
2903 Some(e) => e,
2904 None => return Err("unrecognised runner event shape".into()),
2905 };
2906 if let Some(entry) = ws_runner_calls().get(¶ms.call_id) {
2907 let emitter = entry.value().clone();
2908 tokio::spawn(async move { emitter.emit(stream_event).await });
2909 }
2910 Ok(serde_json::json!({"emitted": true}))
2911}
2912
2913#[derive(serde::Deserialize)]
2914struct InferenceRunnerCompleteParams {
2915 call_id: String,
2916 result: Value,
2917}
2918
2919async fn handle_inference_runner_complete(req: &JsonRpcMessage) -> Result<Value, String> {
2920 let params: InferenceRunnerCompleteParams =
2921 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2922 let result: std::result::Result<car_inference::RunnerResult, String> =
2923 serde_json::from_value(params.result)
2924 .map_err(|e| format!("invalid RunnerResult JSON: {e}"));
2925 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
2926 let _ = tx.send(result);
2927 }
2928 Ok(serde_json::json!({"completed": true}))
2929}
2930
2931#[derive(serde::Deserialize)]
2932struct InferenceRunnerFailParams {
2933 call_id: String,
2934 error: String,
2935}
2936
2937async fn handle_inference_runner_fail(req: &JsonRpcMessage) -> Result<Value, String> {
2938 let params: InferenceRunnerFailParams =
2939 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2940 if let Some((_, tx)) = ws_runner_completions().remove(¶ms.call_id) {
2941 let _ = tx.send(Err(params.error));
2942 }
2943 Ok(serde_json::json!({"failed": true}))
2944}
2945
2946fn parse_runner_event_value(v: &Value) -> Option<car_inference::StreamEvent> {
2947 let ty = v.get("type").and_then(|t| t.as_str())?;
2948 match ty {
2949 "text" => Some(car_inference::StreamEvent::TextDelta(
2950 v.get("data")?.as_str()?.to_string(),
2951 )),
2952 "tool_start" => Some(car_inference::StreamEvent::ToolCallStart {
2953 name: v.get("name")?.as_str()?.to_string(),
2954 index: v.get("index")?.as_u64()? as usize,
2955 id: v.get("id").and_then(|i| i.as_str()).map(str::to_string),
2956 }),
2957 "tool_delta" => Some(car_inference::StreamEvent::ToolCallDelta {
2958 index: v.get("index")?.as_u64()? as usize,
2959 arguments_delta: v.get("data")?.as_str()?.to_string(),
2960 }),
2961 "usage" => Some(car_inference::StreamEvent::Usage {
2962 input_tokens: v.get("input_tokens")?.as_u64()?,
2963 output_tokens: v.get("output_tokens")?.as_u64()?,
2964 }),
2965 "done" => Some(car_inference::StreamEvent::Done {
2966 text: v.get("text")?.as_str()?.to_string(),
2967 tool_calls: v
2968 .get("tool_calls")
2969 .and_then(|tc| serde_json::from_value(tc.clone()).ok())
2970 .unwrap_or_default(),
2971 }),
2972 _ => None,
2973 }
2974}
2975
2976#[derive(Deserialize)]
2977struct EnrollSpeakerParams {
2978 label: String,
2979 audio: Value,
2980}
2981
2982async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
2983 let params: EnrollSpeakerParams =
2984 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2985 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
2986 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
2987 serde_json::from_str(&json).map_err(|e| e.to_string())
2988}
2989
2990#[derive(Deserialize)]
2991struct RemoveEnrollmentParams {
2992 label: String,
2993}
2994
2995fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
2996 let params: RemoveEnrollmentParams =
2997 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2998 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
2999 serde_json::from_str(&json).map_err(|e| e.to_string())
3000}
3001
3002#[derive(Deserialize)]
3003struct WorkflowRunParams {
3004 workflow: Value,
3005}
3006
3007async fn handle_workflow_run(
3008 req: &JsonRpcMessage,
3009 session: &Arc<crate::session::ClientSession>,
3010) -> Result<Value, String> {
3011 let params: WorkflowRunParams =
3012 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3013 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
3014 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
3015 channel: session.channel.clone(),
3016 host: session.host.clone(),
3017 client_id: session.client_id.clone(),
3018 });
3019 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
3020 serde_json::from_str(&json).map_err(|e| e.to_string())
3021}
3022
3023#[derive(Deserialize)]
3024struct WorkflowVerifyParams {
3025 workflow: Value,
3026}
3027
3028fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
3029 let params: WorkflowVerifyParams =
3030 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3031 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
3032 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
3033 serde_json::from_str(&json).map_err(|e| e.to_string())
3034}
3035
3036async fn handle_meeting_start(
3041 req: &JsonRpcMessage,
3042 state: &Arc<ServerState>,
3043 session: &Arc<crate::session::ClientSession>,
3044) -> Result<Value, String> {
3045 let mut req_value = req.params.clone();
3051 let meeting_id = req_value
3052 .get("id")
3053 .and_then(|v| v.as_str())
3054 .map(str::to_string)
3055 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
3056 if let Some(map) = req_value.as_object_mut() {
3057 map.insert("id".into(), Value::String(meeting_id.clone()));
3058 }
3059 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
3060
3061 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
3062 Arc::new(crate::session::WsVoiceEventSink {
3063 channel: session.channel.clone(),
3064 });
3065
3066 let upstream: Arc<dyn car_voice::VoiceEventSink> =
3071 Arc::new(crate::session::WsMemgineIngestSink {
3072 meeting_id,
3073 engine: session.memgine.clone(),
3074 upstream: ws_upstream,
3075 });
3076
3077 let cwd = std::env::current_dir().ok();
3078 let json = car_ffi_common::meeting::start_meeting(
3079 &request_json,
3080 state.meetings.clone(),
3081 state.voice_sessions.clone(),
3082 upstream,
3083 None,
3084 cwd,
3085 )
3086 .await?;
3087 serde_json::from_str(&json).map_err(|e| e.to_string())
3088}
3089
3090#[derive(Deserialize)]
3091struct MeetingStopParams {
3092 meeting_id: String,
3093 #[serde(default = "default_summarize")]
3094 summarize: bool,
3095}
3096
3097fn default_summarize() -> bool {
3098 true
3099}
3100
3101async fn handle_meeting_stop(
3102 req: &JsonRpcMessage,
3103 state: &Arc<ServerState>,
3104 _session: &Arc<crate::session::ClientSession>,
3105) -> Result<Value, String> {
3106 let params: MeetingStopParams =
3107 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3108 let inference = if params.summarize {
3109 Some(state.inference.get().cloned()).flatten()
3110 } else {
3111 None
3112 };
3113 let json = car_ffi_common::meeting::stop_meeting(
3114 ¶ms.meeting_id,
3115 params.summarize,
3116 state.meetings.clone(),
3117 state.voice_sessions.clone(),
3118 inference,
3119 )
3120 .await?;
3121 serde_json::from_str(&json).map_err(|e| e.to_string())
3122}
3123
3124#[derive(Deserialize, Default)]
3125struct MeetingListParams {
3126 #[serde(default)]
3127 root: Option<std::path::PathBuf>,
3128}
3129
3130fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
3131 let params: MeetingListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3132 let cwd = std::env::current_dir().ok();
3133 let json = car_ffi_common::meeting::list_meetings(params.root, cwd)?;
3134 serde_json::from_str(&json).map_err(|e| e.to_string())
3135}
3136
3137#[derive(Deserialize)]
3138struct MeetingGetParams {
3139 meeting_id: String,
3140 #[serde(default)]
3141 root: Option<std::path::PathBuf>,
3142}
3143
3144fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
3145 let params: MeetingGetParams =
3146 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3147 let cwd = std::env::current_dir().ok();
3148 let json = car_ffi_common::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
3149 serde_json::from_str(&json).map_err(|e| e.to_string())
3150}
3151
3152#[derive(Deserialize, Default)]
3157struct RegistryRegisterParams {
3158 entry: Value,
3162 #[serde(default)]
3163 registry_path: Option<std::path::PathBuf>,
3164}
3165
3166fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
3167 let params: RegistryRegisterParams =
3168 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3169 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
3170 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
3171 Ok(Value::Null)
3172}
3173
3174#[derive(Deserialize, Default)]
3175struct RegistryNameParams {
3176 name: String,
3177 #[serde(default)]
3178 registry_path: Option<std::path::PathBuf>,
3179}
3180
3181fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
3182 let params: RegistryNameParams =
3183 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3184 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
3185 serde_json::from_str(&json).map_err(|e| e.to_string())
3186}
3187
3188fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
3189 let params: RegistryNameParams =
3190 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3191 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
3192 Ok(Value::Null)
3193}
3194
3195#[derive(Deserialize, Default)]
3196struct RegistryListParams {
3197 #[serde(default)]
3198 registry_path: Option<std::path::PathBuf>,
3199}
3200
3201fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
3202 let params: RegistryListParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3203 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
3204 serde_json::from_str(&json).map_err(|e| e.to_string())
3205}
3206
3207#[derive(Deserialize, Default)]
3208struct RegistryReapParams {
3209 #[serde(default = "default_reap_age")]
3212 max_age_secs: u64,
3213 #[serde(default)]
3214 registry_path: Option<std::path::PathBuf>,
3215}
3216
3217fn default_reap_age() -> u64 {
3218 60
3219}
3220
3221fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
3222 let params: RegistryReapParams = serde_json::from_value(req.params.clone()).unwrap_or_default();
3223 let json =
3224 car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
3225 serde_json::from_str(&json).map_err(|e| e.to_string())
3226}
3227
3228async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
3235 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3236 let json = car_ffi_common::a2a::start_a2a(¶ms_json).await?;
3237 serde_json::from_str(&json).map_err(|e| e.to_string())
3238}
3239
3240fn handle_a2a_stop() -> Result<Value, String> {
3241 let json = car_ffi_common::a2a::stop_a2a()?;
3242 serde_json::from_str(&json).map_err(|e| e.to_string())
3243}
3244
3245fn handle_a2a_status() -> Result<Value, String> {
3246 let json = car_ffi_common::a2a::a2a_status()?;
3247 serde_json::from_str(&json).map_err(|e| e.to_string())
3248}
3249
3250#[derive(Deserialize)]
3251#[serde(rename_all = "camelCase")]
3252struct A2aSendParams {
3253 endpoint: String,
3254 message: car_a2a::Message,
3255 #[serde(default)]
3256 blocking: bool,
3257 #[serde(default = "default_true")]
3258 ingest_a2ui: bool,
3259 #[serde(default)]
3260 route_auth: Option<A2aRouteAuth>,
3261 #[serde(default)]
3262 allow_untrusted_endpoint: bool,
3263}
3264
3265fn default_true() -> bool {
3266 true
3267}
3268
3269async fn handle_a2a_send(req: &JsonRpcMessage, state: &Arc<ServerState>) -> Result<Value, String> {
3270 let params: A2aSendParams =
3271 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
3272 let endpoint = trusted_route_endpoint(
3273 Some(params.endpoint.clone()),
3274 params.allow_untrusted_endpoint,
3275 )
3276 .ok_or_else(|| {
3277 "`a2a.send` endpoint must be loopback unless allowUntrustedEndpoint is true".to_string()
3278 })?;
3279 let client = match params.route_auth.clone() {
3280 Some(auth) => {
3281 car_a2a::A2aClient::new(endpoint.clone()).with_auth(client_auth_from_route_auth(auth))
3282 }
3283 None => car_a2a::A2aClient::new(endpoint.clone()),
3284 };
3285 let result = client
3286 .send_message(params.message, params.blocking)
3287 .await
3288 .map_err(|e| e.to_string())?;
3289 let result_value = serde_json::to_value(&result).map_err(|e| e.to_string())?;
3290 let mut applied = Vec::new();
3291 if params.ingest_a2ui {
3292 state
3293 .a2ui
3294 .validate_payload(&result_value)
3295 .map_err(|e| e.to_string())?;
3296 let routed_endpoint = Some(endpoint.clone());
3297 for envelope in car_a2ui::envelopes_from_value(&result_value).map_err(|e| e.to_string())? {
3298 let owner = car_a2ui::owner_from_value(&result_value).map(|owner| {
3299 if owner.endpoint.is_none() {
3300 owner.with_endpoint(routed_endpoint.clone())
3301 } else {
3302 owner
3303 }
3304 });
3305 applied.push(
3306 apply_a2ui_envelope(state, envelope, owner, params.route_auth.clone()).await?,
3307 );
3308 }
3309 }
3310 Ok(serde_json::json!({
3311 "result": result,
3312 "a2ui": {
3313 "applied": applied,
3314 }
3315 }))
3316}
3317
3318async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
3326 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3327 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
3328 serde_json::from_str(&json).map_err(|e| e.to_string())
3329}
3330
3331async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
3332 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3333 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
3334 serde_json::from_str(&json).map_err(|e| e.to_string())
3335}
3336
3337async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
3338 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3339 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
3340 serde_json::from_str(&json).map_err(|e| e.to_string())
3341}
3342
3343async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
3344 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
3345 let json = car_ffi_common::vision::ocr(&args_json).await?;
3346 serde_json::from_str(&json).map_err(|e| e.to_string())
3347}