1use crate::session::{ServerState, WsChannel};
13use car_proto::*;
14use car_verify;
15use futures::StreamExt;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::collections::HashMap;
19use std::net::SocketAddr;
20use std::sync::atomic::AtomicU64;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::{accept_async, tungstenite::Message};
25use tracing::{info, instrument};
26
27#[derive(Debug, Deserialize)]
28#[allow(dead_code)]
29pub struct JsonRpcMessage {
30 #[serde(default)]
31 pub jsonrpc: String,
32 #[serde(default)]
33 pub method: Option<String>,
34 #[serde(default)]
35 pub params: Value,
36 #[serde(default)]
37 pub id: Value,
38 #[serde(default)]
40 pub result: Option<Value>,
41 #[serde(default)]
42 pub error: Option<Value>,
43}
44
45#[derive(Debug, Serialize)]
46pub struct JsonRpcResponse {
47 pub jsonrpc: &'static str,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub result: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<JsonRpcError>,
52 pub id: Value,
53}
54
55#[derive(Debug, Serialize)]
56pub struct JsonRpcError {
57 pub code: i32,
58 pub message: String,
59}
60
61impl JsonRpcResponse {
62 pub fn success(id: Value, result: Value) -> Self {
63 Self {
64 jsonrpc: "2.0",
65 result: Some(result),
66 error: None,
67 id,
68 }
69 }
70 pub fn error(id: Value, code: i32, message: &str) -> Self {
71 Self {
72 jsonrpc: "2.0",
73 result: None,
74 error: Some(JsonRpcError {
75 code,
76 message: message.to_string(),
77 }),
78 id,
79 }
80 }
81}
82
83#[instrument(
89 name = "ws.connection",
90 skip_all,
91 fields(peer = %peer),
92)]
93pub async fn handle_connection(
94 stream: TcpStream,
95 peer: SocketAddr,
96 state: Arc<ServerState>,
97) -> Result<(), Box<dyn std::error::Error>> {
98 let ws_stream = accept_async(stream).await?;
99 let (write, read) = ws_stream.split();
100 run_dispatch(read, write, peer, state).await
101}
102
103#[instrument(
114 name = "ws.dispatch",
115 skip_all,
116 fields(client_id = tracing::field::Empty, peer = %peer),
117)]
118pub async fn run_dispatch(
119 mut read: futures::stream::SplitStream<
120 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
121 >,
122 write: futures::stream::SplitSink<
123 tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
124 Message,
125 >,
126 peer: SocketAddr,
127 state: Arc<ServerState>,
128) -> Result<(), Box<dyn std::error::Error>> {
129 let client_id = uuid::Uuid::new_v4().simple().to_string()[..12].to_string();
130 tracing::Span::current().record("client_id", &client_id.as_str());
131
132 info!("New connection from {}", peer);
133
134 let channel = Arc::new(WsChannel {
135 write: Mutex::new(write),
136 pending: Mutex::new(HashMap::new()),
137 next_id: AtomicU64::new(1),
138 });
139
140 let session = state.create_session(&client_id, channel.clone()).await;
141
142 while let Some(msg) = read.next().await {
143 let msg = msg?;
144 if msg.is_text() {
145 let text = msg.to_text()?;
146 let parsed: JsonRpcMessage = match serde_json::from_str(text) {
147 Ok(m) => m,
148 Err(e) => {
149 send_response(
150 &session.channel,
151 JsonRpcResponse::error(Value::Null, -32700, &format!("Parse error: {}", e)),
152 )
153 .await?;
154 continue;
155 }
156 };
157
158 if parsed.method.is_none() && (parsed.result.is_some() || parsed.error.is_some()) {
160 if let Some(id_str) = parsed.id.as_str() {
161 let mut pending = session.channel.pending.lock().await;
162 if let Some(tx) = pending.remove(id_str) {
163 let tool_resp = if let Some(result) = parsed.result {
164 ToolExecuteResponse {
165 action_id: id_str.to_string(),
166 output: Some(result),
167 error: None,
168 }
169 } else {
170 let err_msg = parsed
171 .error
172 .as_ref()
173 .and_then(|e| e.get("message"))
174 .and_then(|m| m.as_str())
175 .unwrap_or("unknown error")
176 .to_string();
177 ToolExecuteResponse {
178 action_id: id_str.to_string(),
179 output: None,
180 error: Some(err_msg),
181 }
182 };
183 let _ = tx.send(tool_resp);
184 continue;
185 }
186 }
187 }
188
189 if let Some(method) = &parsed.method {
191 info!(method = %method, "dispatching JSON-RPC method");
192 let result = match method.as_str() {
193 "session.init" => handle_session_init(&parsed, &session).await,
194 "host.subscribe" => handle_host_subscribe(&session).await,
195 "host.agents" => handle_host_agents(&session).await,
196 "host.events" => handle_host_events(&parsed, &session).await,
197 "host.approvals" => handle_host_approvals(&session).await,
198 "host.register_agent" => handle_host_register_agent(&parsed, &session).await,
199 "host.unregister_agent" => {
200 handle_host_unregister_agent(&parsed, &session).await
201 }
202 "host.set_status" => handle_host_set_status(&parsed, &session).await,
203 "host.notify" => handle_host_notify(&parsed, &session).await,
204 "host.request_approval" => {
205 handle_host_request_approval(&parsed, &session).await
206 }
207 "host.resolve_approval" => {
208 handle_host_resolve_approval(&parsed, &session).await
209 }
210 "tools.register" => handle_tools_register(&parsed, &session).await,
211 "proposal.submit" => handle_proposal_submit(&parsed, &session).await,
212 "verify" => handle_verify(&parsed, &session).await,
213 "state.get" => handle_state_get(&parsed, &session).await,
214 "state.set" => handle_state_set(&parsed, &session).await,
215 "memory.add_fact" => handle_memory_add_fact(&parsed, &session).await,
216 "memory.query" => handle_memory_query(&parsed, &session).await,
217 "memory.build_context" => handle_memory_build_context(&parsed, &session).await,
218 "memory.consolidate" => handle_memory_consolidate(&session).await,
219 "memory.fact_count" => handle_memory_fact_count(&session).await,
220 "skill.ingest" => handle_skill_ingest(&parsed, &session).await,
221 "skill.find" => handle_skill_find(&parsed, &session).await,
222 "skill.report" => handle_skill_report(&parsed, &session).await,
223 "skill.repair" => handle_skill_repair(&parsed, &session).await,
224 "skills.ingest_distilled" => {
225 handle_skills_ingest_distilled(&parsed, &session).await
226 }
227 "skills.evolve" => handle_skills_evolve(&parsed, &session).await,
228 "skills.domains_needing_evolution" => {
229 handle_skills_domains_needing_evolution(&parsed, &session).await
230 }
231 "multi.swarm" => handle_multi_swarm(&parsed, &session).await,
232 "multi.pipeline" => handle_multi_pipeline(&parsed, &session).await,
233 "multi.supervisor" => handle_multi_supervisor(&parsed, &session).await,
234 "multi.map_reduce" => handle_multi_map_reduce(&parsed, &session).await,
235 "multi.vote" => handle_multi_vote(&parsed, &session).await,
236 "scheduler.create" => handle_scheduler_create(&parsed),
237 "scheduler.run" => handle_scheduler_run(&parsed, &session).await,
238 "scheduler.run_loop" => handle_scheduler_run_loop(&parsed, &session).await,
239 "infer" => handle_infer(&parsed, &state, &session).await,
240 "embed" => handle_embed(&parsed, &state).await,
241 "classify" => handle_classify(&parsed, &state).await,
242 "tokenize" => handle_tokenize(&parsed, &state).await,
243 "detokenize" => handle_detokenize(&parsed, &state).await,
244 "rerank" => handle_rerank(&parsed, &state).await,
245 "transcribe" => handle_transcribe(&parsed, &state).await,
246 "synthesize" => handle_synthesize(&parsed, &state).await,
247 "speech.prepare" => handle_speech_prepare(&state).await,
248 "models.route" => handle_models_route(&parsed, &state).await,
249 "models.stats" => handle_models_stats(&state).await,
250 "events.count" => handle_events_count(&session).await,
251 "replan.set_config" => handle_replan_set_config(&parsed, &session).await,
252 "models.list" => handle_models_list(&state),
253 "models.list_unified" => handle_models_list_unified(&state),
254 "models.pull" => handle_models_pull(&parsed, &state).await,
255 "skills.distill" => handle_skills_distill(&parsed, &state).await,
256 "skills.list" => handle_skills_list(&parsed, &session).await,
257 "browser.run" => handle_browser_run(&parsed, &session).await,
258 "browser.close" => handle_browser_close(&session).await,
259 "secret.put" => handle_secret_put(&parsed),
260 "secret.get" => handle_secret_get(&parsed),
261 "secret.delete" => handle_secret_delete(&parsed),
262 "secret.status" => handle_secret_status(&parsed),
263 "secret.available" => Ok(car_ffi_common::secrets::is_available()),
264 "permissions.status" => handle_perm_status(&parsed),
265 "permissions.request" => handle_perm_request(&parsed),
266 "permissions.explain" => handle_perm_explain(&parsed),
267 "permissions.domains" => Ok(car_ffi_common::permissions::domains()),
268 "accounts.list" => car_ffi_common::accounts::list(),
269 "accounts.open" => {
270 #[derive(serde::Deserialize, Default)]
271 struct OpenParams {
272 #[serde(default)]
273 account_id: Option<String>,
274 }
275 let p: OpenParams =
276 serde_json::from_value(parsed.params.clone()).unwrap_or_default();
277 car_ffi_common::accounts::open_settings(p.account_id.as_deref())
278 }
279 "calendar.list" => car_ffi_common::integrations::calendar_list(),
280 "calendar.events" => handle_calendar_events(&parsed),
281 "contacts.containers" => car_ffi_common::integrations::contacts_containers(),
282 "contacts.find" => handle_contacts_find(&parsed),
283 "mail.accounts" => car_ffi_common::integrations::mail_accounts(),
284 "mail.inbox" => handle_mail_inbox(&parsed),
285 "mail.send" => handle_mail_send(&parsed),
286 "health.status" => car_ffi_common::health::status(),
287 "health.sleep" => handle_health_sleep(&parsed),
288 "health.workouts" => handle_health_workouts(&parsed),
289 "health.activity" => handle_health_activity(&parsed),
290 "voice.transcribe_stream.start" => {
291 handle_voice_transcribe_stream_start(&parsed, &state, &session).await
292 }
293 "voice.transcribe_stream.stop" => {
294 handle_voice_transcribe_stream_stop(&parsed, &state).await
295 }
296 "voice.transcribe_stream.push" => {
297 handle_voice_transcribe_stream_push(&parsed, &state).await
298 }
299 "voice.sessions.list" => Ok(handle_voice_sessions_list(&state)),
300 "voice.providers.list" => {
301 serde_json::from_str::<serde_json::Value>(
305 &car_voice::list_voice_providers_json(),
306 )
307 .map_err(|e| e.to_string())
308 }
309 "voice.prepare_parakeet" => car_ffi_common::voice::prepare_parakeet()
310 .await
311 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
312 "voice.prepare_diarizer" => car_ffi_common::voice::prepare_diarizer()
313 .await
314 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
315 "voice.enroll_speaker" => handle_enroll_speaker(&parsed).await,
316 "voice.list_enrollments" => car_ffi_common::voice::list_enrollments()
317 .and_then(|j| serde_json::from_str(&j).map_err(|e| e.to_string())),
318 "voice.remove_enrollment" => handle_remove_enrollment(&parsed),
319 "workflow.run" => handle_workflow_run(&parsed, &session).await,
320 "workflow.verify" => handle_workflow_verify(&parsed),
321 "meeting.start" => handle_meeting_start(&parsed, &state, &session).await,
322 "meeting.stop" => handle_meeting_stop(&parsed, &state, &session).await,
323 "meeting.list" => handle_meeting_list(&parsed),
324 "meeting.get" => handle_meeting_get(&parsed),
325 "registry.register" => handle_registry_register(&parsed),
326 "registry.heartbeat" => handle_registry_heartbeat(&parsed),
327 "registry.unregister" => handle_registry_unregister(&parsed),
328 "registry.list" => handle_registry_list(&parsed),
329 "registry.reap" => handle_registry_reap(&parsed),
330 "admission.status" => handle_admission_status(&state),
331 "a2a.start" => handle_a2a_start(&parsed).await,
332 "a2a.stop" => handle_a2a_stop(),
333 "a2a.status" => handle_a2a_status(),
334 "automation.run_applescript" => handle_run_applescript(&parsed).await,
335 "automation.shortcuts.list" => handle_list_shortcuts(&parsed).await,
336 "automation.shortcuts.run" => handle_run_shortcut(&parsed).await,
337 "vision.ocr" => handle_vision_ocr(&parsed).await,
338 _ => Err(format!("unknown method: {}", method)),
339 };
340
341 let resp = match result {
342 Ok(value) => JsonRpcResponse::success(parsed.id, value),
343 Err(e) => JsonRpcResponse::error(parsed.id, -32603, &e),
344 };
345 send_response(&session.channel, resp).await?;
346 }
347 } else if msg.is_close() {
348 info!("Client {} disconnected", client_id);
349 break;
350 }
351 }
352
353 session.host.unsubscribe(&client_id).await;
354
355 let _removed = state.remove_session(&client_id).await;
366 {
367 let mut pending = session.channel.pending.lock().await;
368 pending.clear();
369 }
370
371 Ok(())
372}
373
374async fn send_response(
375 channel: &WsChannel,
376 resp: JsonRpcResponse,
377) -> Result<(), Box<dyn std::error::Error>> {
378 use futures::SinkExt;
379 let json = serde_json::to_string(&resp)?;
380 channel
381 .write
382 .lock()
383 .await
384 .send(Message::Text(json.into()))
385 .await?;
386 Ok(())
387}
388
389async fn handle_host_subscribe(session: &crate::session::ClientSession) -> Result<Value, String> {
392 session
393 .host
394 .subscribe(&session.client_id, session.channel.clone())
395 .await;
396 serde_json::to_value(HostSnapshot {
397 subscribed: true,
398 agents: session.host.agents().await,
399 approvals: session.host.approvals().await,
400 events: session.host.events(50).await,
401 })
402 .map_err(|e| e.to_string())
403}
404
405async fn handle_host_agents(session: &crate::session::ClientSession) -> Result<Value, String> {
406 serde_json::to_value(session.host.agents().await).map_err(|e| e.to_string())
407}
408
409async fn handle_host_events(
410 req: &JsonRpcMessage,
411 session: &crate::session::ClientSession,
412) -> Result<Value, String> {
413 let limit = req
414 .params
415 .get("limit")
416 .and_then(|v| v.as_u64())
417 .unwrap_or(100) as usize;
418 serde_json::to_value(session.host.events(limit).await).map_err(|e| e.to_string())
419}
420
421async fn handle_host_approvals(session: &crate::session::ClientSession) -> Result<Value, String> {
422 serde_json::to_value(session.host.approvals().await).map_err(|e| e.to_string())
423}
424
425async fn handle_host_register_agent(
426 req: &JsonRpcMessage,
427 session: &crate::session::ClientSession,
428) -> Result<Value, String> {
429 let request: RegisterHostAgentRequest =
430 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
431 serde_json::to_value(
432 session
433 .host
434 .register_agent(&session.client_id, request)
435 .await?,
436 )
437 .map_err(|e| e.to_string())
438}
439
440async fn handle_host_unregister_agent(
441 req: &JsonRpcMessage,
442 session: &crate::session::ClientSession,
443) -> Result<Value, String> {
444 let agent_id = req
445 .params
446 .get("agent_id")
447 .and_then(|v| v.as_str())
448 .ok_or("missing agent_id")?;
449 session.host.unregister_agent(agent_id).await?;
450 Ok(serde_json::json!({"ok": true}))
451}
452
453async fn handle_host_set_status(
454 req: &JsonRpcMessage,
455 session: &crate::session::ClientSession,
456) -> Result<Value, String> {
457 let request: SetHostAgentStatusRequest =
458 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
459 serde_json::to_value(session.host.set_status(request).await?).map_err(|e| e.to_string())
460}
461
462async fn handle_host_notify(
463 req: &JsonRpcMessage,
464 session: &crate::session::ClientSession,
465) -> Result<Value, String> {
466 let kind = req
467 .params
468 .get("kind")
469 .and_then(|v| v.as_str())
470 .unwrap_or("host.notification");
471 let agent_id = req
472 .params
473 .get("agent_id")
474 .and_then(|v| v.as_str())
475 .map(str::to_string);
476 let message = req
477 .params
478 .get("message")
479 .and_then(|v| v.as_str())
480 .unwrap_or("");
481 let payload = req.params.get("payload").cloned().unwrap_or(Value::Null);
482 serde_json::to_value(
483 session
484 .host
485 .record_event(kind, agent_id, message, payload)
486 .await,
487 )
488 .map_err(|e| e.to_string())
489}
490
491async fn handle_host_request_approval(
492 req: &JsonRpcMessage,
493 session: &crate::session::ClientSession,
494) -> Result<Value, String> {
495 let request: CreateHostApprovalRequest =
496 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
497 if let Some(agent_id) = &request.agent_id {
498 let _ = session
499 .host
500 .set_status(SetHostAgentStatusRequest {
501 agent_id: agent_id.clone(),
502 status: HostAgentStatus::WaitingForApproval,
503 current_task: None,
504 message: Some("Waiting for approval".to_string()),
505 payload: Value::Null,
506 })
507 .await;
508 }
509 serde_json::to_value(session.host.create_approval(request).await?).map_err(|e| e.to_string())
510}
511
512async fn handle_host_resolve_approval(
513 req: &JsonRpcMessage,
514 session: &crate::session::ClientSession,
515) -> Result<Value, String> {
516 let request: ResolveHostApprovalRequest =
517 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
518 serde_json::to_value(session.host.resolve_approval(request).await?).map_err(|e| e.to_string())
519}
520
521async fn handle_session_init(
522 req: &JsonRpcMessage,
523 session: &crate::session::ClientSession,
524) -> Result<Value, String> {
525 let init: SessionInitRequest =
526 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
527
528 for tool in &init.tools {
529 register_from_definition(&session.runtime, tool).await;
530 }
531
532 let mut policy_count = 0;
533 {
534 let mut policies = session.runtime.policies.write().await;
535 for policy_def in &init.policies {
536 if let Some(check) = build_policy_check(policy_def) {
537 policies.register(&policy_def.name, check, "");
538 policy_count += 1;
539 }
540 }
541 }
542
543 serde_json::to_value(SessionInitResponse {
544 session_id: session.client_id.clone(),
545 tools_registered: init.tools.len(),
546 policies_registered: policy_count,
547 })
548 .map_err(|e| e.to_string())
549}
550
551fn build_policy_check(def: &PolicyDefinition) -> Option<car_policy::PolicyCheck> {
552 match def.rule.as_str() {
553 "deny_tool" => {
554 let target = def.target.clone();
555 Some(Box::new(
556 move |action: &car_ir::Action, _: &car_state::StateStore| {
557 if action.tool.as_deref() == Some(&target) {
558 Some(format!("tool '{}' denied", target))
559 } else {
560 None
561 }
562 },
563 ))
564 }
565 "require_state" => {
566 let key = def.key.clone();
567 let value = def.value.clone();
568 Some(Box::new(
569 move |_: &car_ir::Action, state: &car_state::StateStore| {
570 if state.get(&key).as_ref() != Some(&value) {
571 Some(format!("state['{}'] must be {:?}", key, value))
572 } else {
573 None
574 }
575 },
576 ))
577 }
578 "deny_tool_param" => {
579 let target = def.target.clone();
580 let param = def.key.clone();
581 let pattern = def.pattern.clone();
582 Some(Box::new(
583 move |action: &car_ir::Action, _: &car_state::StateStore| {
584 if action.tool.as_deref() != Some(&target) {
585 return None;
586 }
587 if let Some(val) = action.parameters.get(¶m) {
588 let s = val.as_str().unwrap_or(&val.to_string()).to_string();
589 if s.contains(&pattern) {
590 return Some(format!("param '{}' matches '{}'", param, pattern));
591 }
592 }
593 None
594 },
595 ))
596 }
597 _ => None,
598 }
599}
600
601async fn handle_tools_register(
602 req: &JsonRpcMessage,
603 session: &crate::session::ClientSession,
604) -> Result<Value, String> {
605 let tools: Vec<ToolDefinition> =
606 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
607 for tool in &tools {
608 register_from_definition(&session.runtime, tool).await;
609 }
610 Ok(Value::from(tools.len()))
611}
612
613async fn register_from_definition(
620 runtime: &car_engine::Runtime,
621 def: &ToolDefinition,
622) {
623 runtime
624 .register_tool_schema(car_ir::ToolSchema {
625 name: def.name.clone(),
626 description: def.description.clone(),
627 parameters: def.parameters.clone(),
628 returns: def.returns.clone(),
629 idempotent: def.idempotent,
630 cache_ttl_secs: def.cache_ttl_secs,
631 rate_limit: def.rate_limit.as_ref().map(|rl| car_ir::ToolRateLimit {
632 max_calls: rl.max_calls,
633 interval_secs: rl.interval_secs,
634 }),
635 })
636 .await;
637}
638
639async fn handle_proposal_submit(
640 req: &JsonRpcMessage,
641 session: &crate::session::ClientSession,
642) -> Result<Value, String> {
643 let submit: ProposalSubmitRequest =
644 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
645 let result = session.runtime.execute(&submit.proposal).await;
646 serde_json::to_value(result).map_err(|e| e.to_string())
647}
648
649async fn handle_verify(
650 req: &JsonRpcMessage,
651 session: &crate::session::ClientSession,
652) -> Result<Value, String> {
653 let vr: VerifyRequest =
654 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
655 let tools: std::collections::HashSet<String> =
656 session.runtime.tools.read().await.keys().cloned().collect();
657 let result = car_verify::verify(&vr.proposal, Some(&vr.initial_state), Some(&tools), 30);
658 serde_json::to_value(VerifyResponse {
659 valid: result.valid,
660 issues: result
661 .issues
662 .iter()
663 .map(|i| VerifyIssueProto {
664 action_id: i.action_id.clone(),
665 severity: i.severity.clone(),
666 message: i.message.clone(),
667 })
668 .collect(),
669 simulated_state: result.simulated_state,
670 })
671 .map_err(|e| e.to_string())
672}
673
674async fn handle_state_get(
675 req: &JsonRpcMessage,
676 session: &crate::session::ClientSession,
677) -> Result<Value, String> {
678 let key = req
679 .params
680 .get("key")
681 .and_then(|v| v.as_str())
682 .ok_or("missing 'key'")?;
683 Ok(session.runtime.state.get(key).unwrap_or(Value::Null))
684}
685
686async fn handle_state_set(
687 req: &JsonRpcMessage,
688 session: &crate::session::ClientSession,
689) -> Result<Value, String> {
690 let key = req
691 .params
692 .get("key")
693 .and_then(|v| v.as_str())
694 .ok_or("missing 'key'")?;
695 let value = req.params.get("value").cloned().unwrap_or(Value::Null);
696 session.runtime.state.set(key, value, "client");
697 Ok(Value::from("ok"))
698}
699
700async fn handle_memory_fact_count(
707 session: &crate::session::ClientSession,
708) -> Result<Value, String> {
709 let engine = session.memgine.lock().await;
710 Ok(Value::from(engine.valid_fact_count()))
711}
712
713async fn handle_memory_add_fact(
714 req: &JsonRpcMessage,
715 session: &crate::session::ClientSession,
716) -> Result<Value, String> {
717 let subject = req
718 .params
719 .get("subject")
720 .and_then(|v| v.as_str())
721 .ok_or("missing subject")?;
722 let body = req
723 .params
724 .get("body")
725 .and_then(|v| v.as_str())
726 .ok_or("missing body")?;
727 let kind = req
728 .params
729 .get("kind")
730 .and_then(|v| v.as_str())
731 .unwrap_or("pattern");
732 let mut engine = session.memgine.lock().await;
733 let fid = format!("ws-{}", engine.valid_fact_count());
734 engine.ingest_fact(
735 &fid,
736 subject,
737 body,
738 "user",
739 "peer",
740 chrono::Utc::now(),
741 "global",
742 None,
743 vec![],
744 kind == "constraint",
745 );
746 Ok(Value::from(engine.valid_fact_count()))
747}
748
749async fn handle_memory_query(
750 req: &JsonRpcMessage,
751 session: &crate::session::ClientSession,
752) -> Result<Value, String> {
753 let query = req
754 .params
755 .get("query")
756 .and_then(|v| v.as_str())
757 .ok_or("missing query")?;
758 let k = req.params.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
759 let engine = session.memgine.lock().await;
760 let seeds = engine.graph.find_seeds(query, 5);
761 let hits = if !seeds.is_empty() {
762 engine.graph.retrieve(&seeds, 3, k, 0.6, 0.05)
763 } else {
764 vec![]
765 };
766 let results: Vec<Value> = hits.iter().filter_map(|hit| {
767 let node = engine.graph.inner.node_weight(hit.node_ix)?;
768 Some(serde_json::json!({"subject": node.key, "body": node.value, "activation": hit.activation}))
769 }).collect();
770 serde_json::to_value(results).map_err(|e| e.to_string())
771}
772
773async fn handle_memory_build_context(
774 req: &JsonRpcMessage,
775 session: &crate::session::ClientSession,
776) -> Result<Value, String> {
777 let query = req
778 .params
779 .get("query")
780 .and_then(|v| v.as_str())
781 .unwrap_or("");
782 let mut engine = session.memgine.lock().await;
783 Ok(Value::from(engine.build_context(query)))
784}
785
786async fn handle_skill_ingest(
789 req: &JsonRpcMessage,
790 session: &crate::session::ClientSession,
791) -> Result<Value, String> {
792 let name = req
793 .params
794 .get("name")
795 .and_then(|v| v.as_str())
796 .ok_or("missing name")?;
797 let code = req
798 .params
799 .get("code")
800 .and_then(|v| v.as_str())
801 .ok_or("missing code")?;
802 let platform = req
803 .params
804 .get("platform")
805 .and_then(|v| v.as_str())
806 .unwrap_or("unknown");
807 let persona = req
808 .params
809 .get("persona")
810 .and_then(|v| v.as_str())
811 .unwrap_or("");
812 let url_pattern = req
813 .params
814 .get("url_pattern")
815 .and_then(|v| v.as_str())
816 .unwrap_or("");
817 let description = req
818 .params
819 .get("description")
820 .and_then(|v| v.as_str())
821 .unwrap_or("");
822 let supersedes = req.params.get("supersedes").and_then(|v| v.as_str());
823 let keywords: Vec<String> = req
824 .params
825 .get("task_keywords")
826 .and_then(|v| v.as_array())
827 .map(|arr| {
828 arr.iter()
829 .filter_map(|v| v.as_str().map(String::from))
830 .collect()
831 })
832 .unwrap_or_default();
833
834 let trigger = car_memgine::SkillTrigger {
835 persona: persona.into(),
836 url_pattern: url_pattern.into(),
837 task_keywords: keywords,
838 };
839 let mut engine = session.memgine.lock().await;
840 engine.ingest_skill(
841 name,
842 code,
843 platform,
844 trigger,
845 description,
846 supersedes,
847 vec![],
848 vec![],
849 );
850 Ok(Value::from("ok"))
851}
852
853async fn handle_skill_find(
854 req: &JsonRpcMessage,
855 session: &crate::session::ClientSession,
856) -> Result<Value, String> {
857 let persona = req
858 .params
859 .get("persona")
860 .and_then(|v| v.as_str())
861 .unwrap_or("");
862 let url = req.params.get("url").and_then(|v| v.as_str()).unwrap_or("");
863 let task = req
864 .params
865 .get("task")
866 .and_then(|v| v.as_str())
867 .unwrap_or("");
868 let max = req
869 .params
870 .get("max_results")
871 .and_then(|v| v.as_u64())
872 .unwrap_or(1) as usize;
873 let engine = session.memgine.lock().await;
874 let results = engine.find_skill(persona, url, task, max);
875 let json: Vec<Value> = results
876 .iter()
877 .map(|(m, s)| {
878 serde_json::json!({
879 "name": m.name, "code": m.code, "platform": m.platform,
880 "description": m.description, "stats": m.stats, "match_score": s,
881 })
882 })
883 .collect();
884 serde_json::to_value(json).map_err(|e| e.to_string())
885}
886
887async fn handle_skill_report(
888 req: &JsonRpcMessage,
889 session: &crate::session::ClientSession,
890) -> Result<Value, String> {
891 let name = req
892 .params
893 .get("skill_name")
894 .and_then(|v| v.as_str())
895 .ok_or("missing skill_name")?;
896 let outcome_str = req
897 .params
898 .get("outcome")
899 .and_then(|v| v.as_str())
900 .ok_or("missing outcome")?;
901 let outcome = match outcome_str {
902 "success" => car_memgine::SkillOutcome::Success,
903 _ => car_memgine::SkillOutcome::Fail,
904 };
905 let mut engine = session.memgine.lock().await;
906 let stats = engine
907 .report_outcome(name, outcome)
908 .ok_or(format!("skill '{}' not found", name))?;
909 serde_json::to_value(stats).map_err(|e| e.to_string())
910}
911
912struct WsAgentRunner {
921 channel: Arc<WsChannel>,
922 host: Arc<crate::host::HostState>,
923 client_id: String,
924}
925
926#[async_trait::async_trait]
927impl car_multi::AgentRunner for WsAgentRunner {
928 async fn run(
929 &self,
930 spec: &car_multi::AgentSpec,
931 task: &str,
932 _runtime: &car_engine::Runtime,
933 _mailbox: &car_multi::Mailbox,
934 ) -> std::result::Result<car_multi::AgentOutput, car_multi::MultiError> {
935 use futures::SinkExt;
936
937 let request_id = self.channel.next_request_id();
938 let agent_id = agent_id_for_run(&self.client_id, &spec.name, &request_id);
939 let agent = self
940 .host
941 .register_agent(
942 &self.client_id,
943 RegisterHostAgentRequest {
944 id: Some(agent_id.clone()),
945 name: spec.name.clone(),
946 kind: "callback".to_string(),
947 capabilities: spec.tools.clone(),
948 project: spec
949 .metadata
950 .get("project")
951 .and_then(|v| v.as_str())
952 .map(str::to_string),
953 pid: None,
954 display: serde_json::from_value(
955 spec.metadata
956 .get("display")
957 .cloned()
958 .unwrap_or(serde_json::Value::Null),
959 )
960 .unwrap_or_default(),
961 metadata: serde_json::to_value(&spec.metadata).unwrap_or(Value::Null),
962 },
963 )
964 .await
965 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e))?;
966 let _ = self
967 .host
968 .set_status(SetHostAgentStatusRequest {
969 agent_id: agent.id.clone(),
970 status: HostAgentStatus::Running,
971 current_task: Some(task.to_string()),
972 message: Some(format!("{} started", spec.name)),
973 payload: serde_json::json!({ "task": task }),
974 })
975 .await;
976
977 let rpc_request = serde_json::json!({
978 "jsonrpc": "2.0",
979 "method": "multi.run_agent",
980 "params": {
981 "spec": spec,
982 "task": task,
983 },
984 "id": request_id,
985 });
986
987 let (tx, rx) = tokio::sync::oneshot::channel();
989 self.channel
990 .pending
991 .lock()
992 .await
993 .insert(request_id.clone(), tx);
994
995 let msg = Message::Text(
996 serde_json::to_string(&rpc_request)
997 .map_err(|e| car_multi::MultiError::AgentFailed(spec.name.clone(), e.to_string()))?
998 .into(),
999 );
1000 if let Err(e) = self.channel.write.lock().await.send(msg).await {
1001 let _ = self
1002 .host
1003 .set_status(SetHostAgentStatusRequest {
1004 agent_id: agent_id.clone(),
1005 status: HostAgentStatus::Errored,
1006 current_task: None,
1007 message: Some(format!("{} failed to start", spec.name)),
1008 payload: serde_json::json!({ "error": e.to_string() }),
1009 })
1010 .await;
1011 return Err(car_multi::MultiError::AgentFailed(
1012 spec.name.clone(),
1013 format!("ws send error: {}", e),
1014 ));
1015 }
1016
1017 let response = match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
1019 Ok(Ok(response)) => response,
1020 Ok(Err(_)) => {
1021 let _ = self
1022 .host
1023 .set_status(SetHostAgentStatusRequest {
1024 agent_id: agent_id.clone(),
1025 status: HostAgentStatus::Errored,
1026 current_task: None,
1027 message: Some(format!("{} callback channel closed", spec.name)),
1028 payload: Value::Null,
1029 })
1030 .await;
1031 return Err(car_multi::MultiError::AgentFailed(
1032 spec.name.clone(),
1033 "agent callback channel closed".into(),
1034 ));
1035 }
1036 Err(_) => {
1037 let _ = self
1038 .host
1039 .set_status(SetHostAgentStatusRequest {
1040 agent_id: agent_id.clone(),
1041 status: HostAgentStatus::Errored,
1042 current_task: None,
1043 message: Some(format!("{} timed out", spec.name)),
1044 payload: Value::Null,
1045 })
1046 .await;
1047 return Err(car_multi::MultiError::AgentFailed(
1048 spec.name.clone(),
1049 "agent callback timed out (300s)".into(),
1050 ));
1051 }
1052 };
1053
1054 if let Some(err) = response.error {
1055 let _ = self
1056 .host
1057 .set_status(SetHostAgentStatusRequest {
1058 agent_id: agent_id.clone(),
1059 status: HostAgentStatus::Errored,
1060 current_task: None,
1061 message: Some(format!("{} errored", spec.name)),
1062 payload: serde_json::json!({ "error": err }),
1063 })
1064 .await;
1065 return Err(car_multi::MultiError::AgentFailed(spec.name.clone(), err));
1066 }
1067
1068 let output_value = response.output.unwrap_or(Value::Null);
1069 let output: car_multi::AgentOutput = serde_json::from_value(output_value).map_err(|e| {
1070 car_multi::MultiError::AgentFailed(
1071 spec.name.clone(),
1072 format!("invalid AgentOutput: {}", e),
1073 )
1074 })?;
1075 let status = if output.error.is_some() {
1076 HostAgentStatus::Errored
1077 } else {
1078 HostAgentStatus::Completed
1079 };
1080 let message = if output.error.is_some() {
1081 format!("{} errored", spec.name)
1082 } else {
1083 format!("{} completed", spec.name)
1084 };
1085 let _ = self
1086 .host
1087 .set_status(SetHostAgentStatusRequest {
1088 agent_id,
1089 status,
1090 current_task: None,
1091 message: Some(message),
1092 payload: serde_json::to_value(&output).unwrap_or(Value::Null),
1093 })
1094 .await;
1095
1096 Ok(output)
1097 }
1098}
1099
1100fn agent_id_for_run(client_id: &str, name: &str, request_id: &str) -> String {
1101 let safe_name: String = name
1102 .chars()
1103 .map(|c| {
1104 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
1105 c
1106 } else {
1107 '-'
1108 }
1109 })
1110 .collect();
1111 format!("{}:{}:{}", client_id, safe_name, request_id)
1112}
1113
1114async fn handle_multi_swarm(
1115 req: &JsonRpcMessage,
1116 session: &crate::session::ClientSession,
1117) -> Result<Value, String> {
1118 let mode_str = req
1119 .params
1120 .get("mode")
1121 .and_then(|v| v.as_str())
1122 .ok_or("missing 'mode'")?;
1123 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1124 let task = req
1125 .params
1126 .get("task")
1127 .and_then(|v| v.as_str())
1128 .ok_or("missing 'task'")?;
1129
1130 let swarm_mode: car_multi::SwarmMode = serde_json::from_str(&format!("\"{}\"", mode_str))
1131 .map_err(|e| format!("invalid mode '{}': {}", mode_str, e))?;
1132 let agent_specs: Vec<car_multi::AgentSpec> =
1133 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1134 let synth: Option<car_multi::AgentSpec> = req
1135 .params
1136 .get("synthesizer")
1137 .map(|v| serde_json::from_value(v.clone()))
1138 .transpose()
1139 .map_err(|e| format!("invalid synthesizer: {}", e))?;
1140
1141 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1142 channel: session.channel.clone(),
1143 host: session.host.clone(),
1144 client_id: session.client_id.clone(),
1145 });
1146 let infra = car_multi::SharedInfra::new();
1147
1148 let mut swarm = car_multi::Swarm::new(agent_specs, swarm_mode);
1149 if let Some(s) = synth {
1150 swarm = swarm.with_synthesizer(s);
1151 }
1152
1153 let result = swarm
1154 .run(task, &runner, &infra)
1155 .await
1156 .map_err(|e| format!("swarm error: {}", e))?;
1157 serde_json::to_value(result).map_err(|e| e.to_string())
1158}
1159
1160async fn handle_multi_pipeline(
1161 req: &JsonRpcMessage,
1162 session: &crate::session::ClientSession,
1163) -> Result<Value, String> {
1164 let stages_val = req.params.get("stages").ok_or("missing 'stages'")?;
1165 let task = req
1166 .params
1167 .get("task")
1168 .and_then(|v| v.as_str())
1169 .ok_or("missing 'task'")?;
1170
1171 let stage_specs: Vec<car_multi::AgentSpec> =
1172 serde_json::from_value(stages_val.clone()).map_err(|e| format!("invalid stages: {}", e))?;
1173
1174 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1175 channel: session.channel.clone(),
1176 host: session.host.clone(),
1177 client_id: session.client_id.clone(),
1178 });
1179 let infra = car_multi::SharedInfra::new();
1180
1181 let result = car_multi::Pipeline::new(stage_specs)
1182 .run(task, &runner, &infra)
1183 .await
1184 .map_err(|e| format!("pipeline error: {}", e))?;
1185 serde_json::to_value(result).map_err(|e| e.to_string())
1186}
1187
1188async fn handle_multi_supervisor(
1189 req: &JsonRpcMessage,
1190 session: &crate::session::ClientSession,
1191) -> Result<Value, String> {
1192 let workers_val = req.params.get("workers").ok_or("missing 'workers'")?;
1193 let supervisor_val = req.params.get("supervisor").ok_or("missing 'supervisor'")?;
1194 let task = req
1195 .params
1196 .get("task")
1197 .and_then(|v| v.as_str())
1198 .ok_or("missing 'task'")?;
1199 let max_rounds = req
1200 .params
1201 .get("max_rounds")
1202 .and_then(|v| v.as_u64())
1203 .unwrap_or(3) as u32;
1204
1205 let worker_specs: Vec<car_multi::AgentSpec> = serde_json::from_value(workers_val.clone())
1206 .map_err(|e| format!("invalid workers: {}", e))?;
1207 let supervisor_spec: car_multi::AgentSpec = serde_json::from_value(supervisor_val.clone())
1208 .map_err(|e| format!("invalid supervisor: {}", e))?;
1209
1210 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1211 channel: session.channel.clone(),
1212 host: session.host.clone(),
1213 client_id: session.client_id.clone(),
1214 });
1215 let infra = car_multi::SharedInfra::new();
1216
1217 let result = car_multi::Supervisor::new(worker_specs, supervisor_spec)
1218 .with_max_rounds(max_rounds)
1219 .run(task, &runner, &infra)
1220 .await
1221 .map_err(|e| format!("supervisor error: {}", e))?;
1222 serde_json::to_value(result).map_err(|e| e.to_string())
1223}
1224
1225async fn handle_multi_map_reduce(
1226 req: &JsonRpcMessage,
1227 session: &crate::session::ClientSession,
1228) -> Result<Value, String> {
1229 let mapper_val = req.params.get("mapper").ok_or("missing 'mapper'")?;
1230 let reducer_val = req.params.get("reducer").ok_or("missing 'reducer'")?;
1231 let task = req
1232 .params
1233 .get("task")
1234 .and_then(|v| v.as_str())
1235 .ok_or("missing 'task'")?;
1236 let items_val = req.params.get("items").ok_or("missing 'items'")?;
1237
1238 let mapper_spec: car_multi::AgentSpec =
1239 serde_json::from_value(mapper_val.clone()).map_err(|e| format!("invalid mapper: {}", e))?;
1240 let reducer_spec: car_multi::AgentSpec = serde_json::from_value(reducer_val.clone())
1241 .map_err(|e| format!("invalid reducer: {}", e))?;
1242 let items: Vec<String> =
1243 serde_json::from_value(items_val.clone()).map_err(|e| format!("invalid items: {}", e))?;
1244
1245 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1246 channel: session.channel.clone(),
1247 host: session.host.clone(),
1248 client_id: session.client_id.clone(),
1249 });
1250 let infra = car_multi::SharedInfra::new();
1251
1252 let result = car_multi::MapReduce::new(mapper_spec, reducer_spec)
1253 .run(task, &items, &runner, &infra)
1254 .await
1255 .map_err(|e| format!("map_reduce error: {}", e))?;
1256 serde_json::to_value(result).map_err(|e| e.to_string())
1257}
1258
1259async fn handle_multi_vote(
1260 req: &JsonRpcMessage,
1261 session: &crate::session::ClientSession,
1262) -> Result<Value, String> {
1263 let agents_val = req.params.get("agents").ok_or("missing 'agents'")?;
1264 let task = req
1265 .params
1266 .get("task")
1267 .and_then(|v| v.as_str())
1268 .ok_or("missing 'task'")?;
1269
1270 let agent_specs: Vec<car_multi::AgentSpec> =
1271 serde_json::from_value(agents_val.clone()).map_err(|e| format!("invalid agents: {}", e))?;
1272 let synth: Option<car_multi::AgentSpec> = req
1273 .params
1274 .get("synthesizer")
1275 .map(|v| serde_json::from_value(v.clone()))
1276 .transpose()
1277 .map_err(|e| format!("invalid synthesizer: {}", e))?;
1278
1279 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1280 channel: session.channel.clone(),
1281 host: session.host.clone(),
1282 client_id: session.client_id.clone(),
1283 });
1284 let infra = car_multi::SharedInfra::new();
1285
1286 let mut vote = car_multi::Vote::new(agent_specs);
1287 if let Some(s) = synth {
1288 vote = vote.with_synthesizer(s);
1289 }
1290
1291 let result = vote
1292 .run(task, &runner, &infra)
1293 .await
1294 .map_err(|e| format!("vote error: {}", e))?;
1295 serde_json::to_value(result).map_err(|e| e.to_string())
1296}
1297
1298fn handle_scheduler_create(req: &JsonRpcMessage) -> Result<Value, String> {
1303 let name = req
1304 .params
1305 .get("name")
1306 .and_then(|v| v.as_str())
1307 .ok_or("scheduler.create requires 'name'")?;
1308 let prompt = req
1309 .params
1310 .get("prompt")
1311 .and_then(|v| v.as_str())
1312 .ok_or("scheduler.create requires 'prompt'")?;
1313
1314 let mut task = car_scheduler::Task::new(name, prompt);
1315
1316 if let Some(t) = req.params.get("trigger").and_then(|v| v.as_str()) {
1317 let trigger = match t {
1318 "once" => car_scheduler::TaskTrigger::Once,
1319 "cron" => car_scheduler::TaskTrigger::Cron,
1320 "interval" => car_scheduler::TaskTrigger::Interval,
1321 "file_watch" => car_scheduler::TaskTrigger::FileWatch,
1322 _ => car_scheduler::TaskTrigger::Manual,
1323 };
1324 let schedule = req
1325 .params
1326 .get("schedule")
1327 .and_then(|v| v.as_str())
1328 .unwrap_or("");
1329 task = task.with_trigger(trigger, schedule);
1330 }
1331
1332 if let Some(sp) = req.params.get("system_prompt").and_then(|v| v.as_str()) {
1333 task = task.with_system_prompt(sp);
1334 }
1335
1336 serde_json::to_value(&task).map_err(|e| e.to_string())
1337}
1338
1339async fn handle_scheduler_run(
1340 req: &JsonRpcMessage,
1341 session: &crate::session::ClientSession,
1342) -> Result<Value, String> {
1343 let task_val = req
1344 .params
1345 .get("task")
1346 .ok_or("scheduler.run requires 'task'")?;
1347 let mut task: car_scheduler::Task =
1348 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1349
1350 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1351 channel: session.channel.clone(),
1352 host: session.host.clone(),
1353 client_id: session.client_id.clone(),
1354 });
1355 let executor = car_scheduler::Executor::new(runner);
1356 let execution = executor.run_once(&mut task).await;
1357
1358 serde_json::to_value(&execution).map_err(|e| e.to_string())
1359}
1360
1361async fn handle_scheduler_run_loop(
1362 req: &JsonRpcMessage,
1363 session: &crate::session::ClientSession,
1364) -> Result<Value, String> {
1365 let task_val = req
1366 .params
1367 .get("task")
1368 .ok_or("scheduler.run_loop requires 'task'")?;
1369 let mut task: car_scheduler::Task =
1370 serde_json::from_value(task_val.clone()).map_err(|e| format!("invalid task: {}", e))?;
1371 let max_iterations = req
1372 .params
1373 .get("max_iterations")
1374 .and_then(|v| v.as_u64())
1375 .map(|v| v as u32);
1376
1377 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
1378 channel: session.channel.clone(),
1379 host: session.host.clone(),
1380 client_id: session.client_id.clone(),
1381 });
1382 let executor = car_scheduler::Executor::new(runner);
1383 let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1384 let executions = executor
1385 .run_loop(&mut task, max_iterations, cancel_rx)
1386 .await;
1387
1388 serde_json::to_value(&executions).map_err(|e| e.to_string())
1389}
1390
1391fn get_inference_engine(state: &ServerState) -> &Arc<car_inference::InferenceEngine> {
1396 state.inference.get_or_init(|| {
1397 Arc::new(car_inference::InferenceEngine::new(
1398 car_inference::InferenceConfig::default(),
1399 ))
1400 })
1401}
1402
1403async fn handle_infer(
1404 msg: &JsonRpcMessage,
1405 state: &ServerState,
1406 session: &crate::session::ClientSession,
1407) -> Result<Value, String> {
1408 let engine = get_inference_engine(state);
1409 let mut req: car_inference::GenerateRequest =
1410 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1411
1412 if let Some(cq) = msg.params.get("context_query").and_then(|v| v.as_str()) {
1414 let mut memgine = session.memgine.lock().await;
1415 let ctx = memgine.build_context(cq);
1416 if !ctx.is_empty() {
1417 req.context = Some(ctx);
1418 }
1419 }
1420
1421 let _permit = state.admission.acquire().await;
1427
1428 let result = engine
1439 .generate_tracked(req)
1440 .await
1441 .map_err(|e| e.to_string())?;
1442 serde_json::to_value(&result).map_err(|e| format!("serialize result: {}", e))
1443}
1444
1445async fn handle_embed(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1446 let engine = get_inference_engine(state);
1447 let req: car_inference::EmbedRequest =
1448 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1449 let _permit = state.admission.acquire().await;
1453 let result = engine.embed(req).await.map_err(|e| e.to_string())?;
1454 Ok(serde_json::json!({"embeddings": result}))
1455}
1456
1457async fn handle_classify(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1458 let engine = get_inference_engine(state);
1459 let req: car_inference::ClassifyRequest =
1460 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1461 let _permit = state.admission.acquire().await;
1462 let result = engine.classify(req).await.map_err(|e| e.to_string())?;
1463 Ok(serde_json::json!({"classifications": result}))
1464}
1465
1466fn handle_admission_status(state: &ServerState) -> Result<Value, String> {
1470 let total = state.admission.permits();
1471 let available = state.admission.permits_available();
1472 let in_use = total.saturating_sub(available);
1473 Ok(serde_json::json!({
1474 "permits_total": total,
1475 "permits_available": available,
1476 "permits_in_use": in_use,
1477 "env_override": crate::admission::ENV_MAX_CONCURRENT,
1478 }))
1479}
1480
1481async fn handle_tokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1482 let model = msg
1483 .params
1484 .get("model")
1485 .and_then(|v| v.as_str())
1486 .ok_or("missing 'model' parameter")?;
1487 let text = msg
1488 .params
1489 .get("text")
1490 .and_then(|v| v.as_str())
1491 .ok_or("missing 'text' parameter")?;
1492 let engine = get_inference_engine(state);
1493 let ids = engine
1494 .tokenize(model, text)
1495 .await
1496 .map_err(|e| e.to_string())?;
1497 Ok(serde_json::json!({"tokens": ids}))
1498}
1499
1500async fn handle_detokenize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1501 let model = msg
1502 .params
1503 .get("model")
1504 .and_then(|v| v.as_str())
1505 .ok_or("missing 'model' parameter")?;
1506 let tokens: Vec<u32> = msg
1507 .params
1508 .get("tokens")
1509 .and_then(|v| v.as_array())
1510 .ok_or("missing 'tokens' parameter")?
1511 .iter()
1512 .map(|t| {
1513 t.as_u64()
1514 .and_then(|n| u32::try_from(n).ok())
1515 .ok_or_else(|| "tokens[] must be u32 values".to_string())
1516 })
1517 .collect::<Result<Vec<_>, _>>()?;
1518 let engine = get_inference_engine(state);
1519 let text = engine
1520 .detokenize(model, &tokens)
1521 .await
1522 .map_err(|e| e.to_string())?;
1523 Ok(serde_json::json!({"text": text}))
1524}
1525
1526fn handle_models_list(state: &ServerState) -> Result<Value, String> {
1527 let engine = get_inference_engine(state);
1528 let models = engine.list_models();
1529 serde_json::to_value(&models).map_err(|e| e.to_string())
1530}
1531
1532fn handle_models_list_unified(state: &ServerState) -> Result<Value, String> {
1533 let engine = get_inference_engine(state);
1534 let models = engine.list_models_unified();
1535 serde_json::to_value(&models).map_err(|e| e.to_string())
1536}
1537
1538async fn handle_models_pull(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1539 let name = msg
1540 .params
1541 .get("name")
1542 .and_then(|v| v.as_str())
1543 .ok_or("missing 'name' parameter")?;
1544 let engine = get_inference_engine(state);
1545 let path = engine.pull_model(name).await.map_err(|e| e.to_string())?;
1546 Ok(serde_json::json!({"path": path.display().to_string()}))
1547}
1548
1549async fn handle_skills_distill(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1550 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
1551 msg.params
1552 .get("events")
1553 .cloned()
1554 .unwrap_or(msg.params.clone()),
1555 )
1556 .map_err(|e| format!("invalid events: {}", e))?;
1557
1558 let inference = get_inference_engine(state).clone();
1559 let engine = car_memgine::MemgineEngine::new(None).with_inference(inference);
1560
1561 let skills = engine.distill_skills(&events).await;
1562 serde_json::to_value(&skills).map_err(|e| e.to_string())
1563}
1564
1565async fn handle_memory_consolidate(
1568 session: &crate::session::ClientSession,
1569) -> Result<Value, String> {
1570 let mut engine = session.memgine.lock().await;
1571 let report = engine.consolidate().await;
1572 serde_json::to_value(&report).map_err(|e| e.to_string())
1573}
1574
1575async fn handle_skill_repair(
1579 msg: &JsonRpcMessage,
1580 session: &crate::session::ClientSession,
1581) -> Result<Value, String> {
1582 let name = msg
1583 .params
1584 .get("skill_name")
1585 .and_then(|v| v.as_str())
1586 .ok_or("missing 'skill_name' parameter")?;
1587 let mut engine = session.memgine.lock().await;
1588 let code = engine.repair_skill(name).await;
1589 Ok(match code {
1590 Some(c) => serde_json::json!({ "code": c }),
1591 None => Value::Null,
1592 })
1593}
1594
1595async fn handle_skills_ingest_distilled(
1598 msg: &JsonRpcMessage,
1599 session: &crate::session::ClientSession,
1600) -> Result<Value, String> {
1601 let skills: Vec<car_memgine::DistilledSkill> = serde_json::from_value(
1602 msg.params
1603 .get("skills")
1604 .cloned()
1605 .unwrap_or(msg.params.clone()),
1606 )
1607 .map_err(|e| format!("invalid skills: {}", e))?;
1608 let mut engine = session.memgine.lock().await;
1609 let nodes = engine.ingest_distilled_skills(&skills);
1610 Ok(serde_json::json!({ "ingested": nodes.len() }))
1611}
1612
1613async fn handle_skills_evolve(
1616 msg: &JsonRpcMessage,
1617 session: &crate::session::ClientSession,
1618) -> Result<Value, String> {
1619 let domain = msg
1620 .params
1621 .get("domain")
1622 .and_then(|v| v.as_str())
1623 .ok_or("missing 'domain' parameter")?
1624 .to_string();
1625 let events: Vec<car_memgine::TraceEvent> = serde_json::from_value(
1626 msg.params
1627 .get("events")
1628 .cloned()
1629 .unwrap_or(Value::Array(vec![])),
1630 )
1631 .map_err(|e| format!("invalid events: {}", e))?;
1632 let mut engine = session.memgine.lock().await;
1633 let skills = engine.evolve_skills(&events, &domain).await;
1634 serde_json::to_value(&skills).map_err(|e| e.to_string())
1635}
1636
1637async fn handle_skills_domains_needing_evolution(
1639 msg: &JsonRpcMessage,
1640 session: &crate::session::ClientSession,
1641) -> Result<Value, String> {
1642 let threshold = msg
1643 .params
1644 .get("threshold")
1645 .and_then(|v| v.as_f64())
1646 .unwrap_or(0.6);
1647 let engine = session.memgine.lock().await;
1648 let domains = engine.domains_needing_evolution(threshold);
1649 serde_json::to_value(&domains).map_err(|e| e.to_string())
1650}
1651
1652async fn handle_rerank(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1654 let engine = get_inference_engine(state);
1655 let req: car_inference::RerankRequest =
1656 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1657 let _permit = state.admission.acquire().await;
1658 let result = engine.rerank(req).await.map_err(|e| e.to_string())?;
1659 serde_json::to_value(&result).map_err(|e| e.to_string())
1660}
1661
1662async fn handle_transcribe(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1668 let engine = get_inference_engine(state);
1669 let req: car_inference::TranscribeRequest =
1670 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1671 let _permit = state.admission.acquire().await;
1672 let result = engine.transcribe(req).await.map_err(|e| e.to_string())?;
1673 serde_json::to_value(&result).map_err(|e| e.to_string())
1674}
1675
1676async fn handle_synthesize(msg: &JsonRpcMessage, state: &ServerState) -> Result<Value, String> {
1679 let engine = get_inference_engine(state);
1680 let req: car_inference::SynthesizeRequest =
1681 serde_json::from_value(msg.params.clone()).map_err(|e| format!("invalid params: {}", e))?;
1682 let _permit = state.admission.acquire().await;
1683 let result = engine.synthesize(req).await.map_err(|e| e.to_string())?;
1684 serde_json::to_value(&result).map_err(|e| e.to_string())
1685}
1686
1687async fn handle_speech_prepare(state: &ServerState) -> Result<Value, String> {
1691 let engine = get_inference_engine(state);
1692 let status = engine
1693 .prepare_speech_runtime()
1694 .await
1695 .map_err(|e| e.to_string())?;
1696 serde_json::to_value(&status).map_err(|e| e.to_string())
1697}
1698
1699async fn handle_models_route(
1702 msg: &JsonRpcMessage,
1703 state: &ServerState,
1704) -> Result<Value, String> {
1705 let prompt = msg
1706 .params
1707 .get("prompt")
1708 .and_then(|v| v.as_str())
1709 .ok_or("missing 'prompt' parameter")?;
1710 let engine = get_inference_engine(state);
1711 let decision = engine.route_adaptive(prompt).await;
1712 serde_json::to_value(&decision).map_err(|e| e.to_string())
1713}
1714
1715async fn handle_models_stats(state: &ServerState) -> Result<Value, String> {
1717 let engine = get_inference_engine(state);
1718 let profiles = engine.export_profiles().await;
1719 serde_json::to_value(&profiles).map_err(|e| e.to_string())
1720}
1721
1722async fn handle_events_count(
1724 session: &crate::session::ClientSession,
1725) -> Result<Value, String> {
1726 let n = session.runtime.log.lock().await.len();
1727 Ok(Value::from(n as u64))
1728}
1729
1730async fn handle_replan_set_config(
1735 msg: &JsonRpcMessage,
1736 session: &crate::session::ClientSession,
1737) -> Result<Value, String> {
1738 let max_replans = msg
1739 .params
1740 .get("max_replans")
1741 .and_then(|v| v.as_u64())
1742 .unwrap_or(0) as u32;
1743 let delay_ms = msg
1744 .params
1745 .get("delay_ms")
1746 .and_then(|v| v.as_u64())
1747 .unwrap_or(0);
1748 let verify_before_execute = msg
1749 .params
1750 .get("verify_before_execute")
1751 .and_then(|v| v.as_bool())
1752 .unwrap_or(true);
1753 let cfg = car_engine::ReplanConfig {
1754 max_replans,
1755 delay_ms,
1756 verify_before_execute,
1757 };
1758 session.runtime.set_replan_config(cfg).await;
1759 Ok(Value::Null)
1760}
1761
1762async fn handle_skills_list(
1763 msg: &JsonRpcMessage,
1764 session: &crate::session::ClientSession,
1765) -> Result<Value, String> {
1766 let domain = msg.params.get("domain").and_then(|v| v.as_str());
1767 let engine = session.memgine.lock().await;
1768 let skills: Vec<serde_json::Value> = engine
1769 .graph
1770 .inner
1771 .node_indices()
1772 .filter_map(|nix| {
1773 let node = engine.graph.inner.node_weight(nix)?;
1774 if node.kind != car_memgine::MemKind::Skill {
1775 return None;
1776 }
1777 let meta = car_memgine::SkillMeta::from_node(node)?;
1778 if let Some(d) = domain {
1779 match &meta.scope {
1780 car_memgine::SkillScope::Global => {}
1781 car_memgine::SkillScope::Domain(sd) if sd == d => {}
1782 _ => return None,
1783 }
1784 }
1785 Some(serde_json::to_value(&meta).unwrap_or_default())
1786 })
1787 .collect();
1788 serde_json::to_value(&skills).map_err(|e| e.to_string())
1789}
1790
1791#[derive(serde::Deserialize)]
1792struct SecretParams {
1793 #[serde(default)]
1794 service: Option<String>,
1795 key: String,
1796 #[serde(default)]
1797 value: Option<String>,
1798}
1799
1800fn handle_secret_put(req: &JsonRpcMessage) -> Result<Value, String> {
1801 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1802 let value = p.value.ok_or_else(|| "missing 'value'".to_string())?;
1803 car_ffi_common::secrets::put(p.service.as_deref(), &p.key, &value)
1804}
1805
1806fn handle_secret_get(req: &JsonRpcMessage) -> Result<Value, String> {
1807 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1808 car_ffi_common::secrets::get(p.service.as_deref(), &p.key)
1809}
1810
1811fn handle_secret_delete(req: &JsonRpcMessage) -> Result<Value, String> {
1812 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1813 car_ffi_common::secrets::delete(p.service.as_deref(), &p.key)
1814}
1815
1816fn handle_secret_status(req: &JsonRpcMessage) -> Result<Value, String> {
1817 let p: SecretParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1818 car_ffi_common::secrets::status(p.service.as_deref(), &p.key)
1819}
1820
1821#[derive(serde::Deserialize)]
1822struct PermParams {
1823 domain: String,
1824 #[serde(default)]
1825 target_bundle_id: Option<String>,
1826}
1827
1828fn handle_perm_status(req: &JsonRpcMessage) -> Result<Value, String> {
1829 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1830 car_ffi_common::permissions::status(&p.domain, p.target_bundle_id.as_deref())
1831}
1832
1833fn handle_perm_request(req: &JsonRpcMessage) -> Result<Value, String> {
1834 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1835 car_ffi_common::permissions::request(&p.domain, p.target_bundle_id.as_deref())
1836}
1837
1838fn handle_perm_explain(req: &JsonRpcMessage) -> Result<Value, String> {
1839 let p: PermParams = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1840 car_ffi_common::permissions::explain(&p.domain, p.target_bundle_id.as_deref())
1841}
1842
1843fn handle_calendar_events(req: &JsonRpcMessage) -> Result<Value, String> {
1844 #[derive(serde::Deserialize)]
1845 struct P {
1846 start: String,
1847 end: String,
1848 #[serde(default)]
1849 calendar_ids: Vec<String>,
1850 }
1851 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1852 let start = chrono::DateTime::parse_from_rfc3339(&p.start)
1853 .map_err(|e| format!("parse start: {}", e))?
1854 .with_timezone(&chrono::Utc);
1855 let end = chrono::DateTime::parse_from_rfc3339(&p.end)
1856 .map_err(|e| format!("parse end: {}", e))?
1857 .with_timezone(&chrono::Utc);
1858 car_ffi_common::integrations::calendar_events(start, end, &p.calendar_ids)
1859}
1860
1861fn handle_contacts_find(req: &JsonRpcMessage) -> Result<Value, String> {
1862 #[derive(serde::Deserialize)]
1863 struct P {
1864 query: String,
1865 #[serde(default = "default_limit")]
1866 limit: usize,
1867 #[serde(default)]
1868 container_ids: Vec<String>,
1869 }
1870 fn default_limit() -> usize {
1871 50
1872 }
1873 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1874 car_ffi_common::integrations::contacts_list(&p.query, &p.container_ids, p.limit)
1875}
1876
1877fn handle_mail_inbox(req: &JsonRpcMessage) -> Result<Value, String> {
1878 #[derive(serde::Deserialize, Default)]
1879 struct P {
1880 #[serde(default)]
1881 account_ids: Vec<String>,
1882 }
1883 let p: P = serde_json::from_value(req.params.clone()).unwrap_or_default();
1884 car_ffi_common::integrations::mail_inbox(&p.account_ids)
1885}
1886
1887fn handle_mail_send(req: &JsonRpcMessage) -> Result<Value, String> {
1888 let raw = req.params.to_string();
1889 car_ffi_common::integrations::mail_send(&raw)
1890}
1891
1892fn handle_health_sleep(req: &JsonRpcMessage) -> Result<Value, String> {
1893 #[derive(serde::Deserialize)]
1894 struct P {
1895 start: String,
1896 end: String,
1897 }
1898 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1899 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
1900 .map_err(|e| format!("parse start: {}", e))?
1901 .with_timezone(&chrono::Utc);
1902 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
1903 .map_err(|e| format!("parse end: {}", e))?
1904 .with_timezone(&chrono::Utc);
1905 car_ffi_common::health::sleep_windows(s, e)
1906}
1907
1908fn handle_health_workouts(req: &JsonRpcMessage) -> Result<Value, String> {
1909 #[derive(serde::Deserialize)]
1910 struct P {
1911 start: String,
1912 end: String,
1913 }
1914 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1915 let s = chrono::DateTime::parse_from_rfc3339(&p.start)
1916 .map_err(|e| format!("parse start: {}", e))?
1917 .with_timezone(&chrono::Utc);
1918 let e = chrono::DateTime::parse_from_rfc3339(&p.end)
1919 .map_err(|e| format!("parse end: {}", e))?
1920 .with_timezone(&chrono::Utc);
1921 car_ffi_common::health::workouts(s, e)
1922}
1923
1924fn handle_health_activity(req: &JsonRpcMessage) -> Result<Value, String> {
1925 #[derive(serde::Deserialize)]
1926 struct P {
1927 start: String,
1928 end: String,
1929 }
1930 let p: P = serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1931 let s = chrono::NaiveDate::parse_from_str(&p.start, "%Y-%m-%d")
1932 .map_err(|e| format!("parse start: {}", e))?;
1933 let e = chrono::NaiveDate::parse_from_str(&p.end, "%Y-%m-%d")
1934 .map_err(|e| format!("parse end: {}", e))?;
1935 car_ffi_common::health::activity(s, e)
1936}
1937
1938async fn handle_browser_close(session: &crate::session::ClientSession) -> Result<Value, String> {
1939 let closed = session.browser.close().await?;
1940 Ok(serde_json::json!({"closed": closed}))
1941}
1942
1943async fn handle_browser_run(
1944 req: &JsonRpcMessage,
1945 session: &crate::session::ClientSession,
1946) -> Result<Value, String> {
1947 #[derive(serde::Deserialize)]
1948 struct BrowserRunParams {
1949 script: Value,
1951 #[serde(default)]
1952 width: Option<u32>,
1953 #[serde(default)]
1954 height: Option<u32>,
1955 #[serde(default)]
1960 headed: Option<bool>,
1961 #[serde(default)]
1964 extra_args: Option<Vec<String>>,
1965 }
1966 let params: BrowserRunParams =
1967 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
1968
1969 let script_json = match params.script {
1971 Value::String(s) => s,
1972 other => other.to_string(),
1973 };
1974
1975 let browser_session = session
1976 .browser
1977 .get_or_launch(car_ffi_common::browser::BrowserLaunchOptions {
1978 width: params.width.unwrap_or(1280),
1979 height: params.height.unwrap_or(720),
1980 headless: !params.headed.unwrap_or(false),
1981 extra_args: params.extra_args.unwrap_or_default(),
1982 })
1983 .await?;
1984
1985 let trace_json = browser_session.run(&script_json).await?;
1986 serde_json::from_str(&trace_json).map_err(|e| e.to_string())
1987}
1988
1989#[derive(Deserialize)]
2002struct VoiceStartParams {
2003 session_id: String,
2004 audio_source: Value,
2005 #[serde(default)]
2006 options: Option<Value>,
2007}
2008
2009async fn handle_voice_transcribe_stream_start(
2010 req: &JsonRpcMessage,
2011 state: &Arc<ServerState>,
2012 session: &Arc<crate::session::ClientSession>,
2013) -> Result<Value, String> {
2014 let params: VoiceStartParams =
2015 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2016 let audio_source_json = serde_json::to_string(¶ms.audio_source).map_err(|e| e.to_string())?;
2017 let options_json = params
2018 .options
2019 .as_ref()
2020 .map(|v| serde_json::to_string(v).map_err(|e| e.to_string()))
2021 .transpose()?;
2022 let sink: Arc<dyn car_voice::VoiceEventSink> = Arc::new(crate::session::WsVoiceEventSink {
2023 channel: session.channel.clone(),
2024 });
2025 let json = car_ffi_common::voice::transcribe_stream_start(
2026 ¶ms.session_id,
2027 &audio_source_json,
2028 options_json.as_deref(),
2029 state.voice_sessions.clone(),
2030 sink,
2031 )
2032 .await?;
2033 serde_json::from_str(&json).map_err(|e| e.to_string())
2034}
2035
2036#[derive(Deserialize)]
2037struct VoiceStopParams {
2038 session_id: String,
2039}
2040
2041async fn handle_voice_transcribe_stream_stop(
2042 req: &JsonRpcMessage,
2043 state: &Arc<ServerState>,
2044) -> Result<Value, String> {
2045 let params: VoiceStopParams =
2046 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2047 let json =
2048 car_ffi_common::voice::transcribe_stream_stop(¶ms.session_id, state.voice_sessions.clone())
2049 .await?;
2050 serde_json::from_str(&json).map_err(|e| e.to_string())
2051}
2052
2053#[derive(Deserialize)]
2054struct VoicePushParams {
2055 session_id: String,
2056 pcm_b64: String,
2060}
2061
2062async fn handle_voice_transcribe_stream_push(
2063 req: &JsonRpcMessage,
2064 state: &Arc<ServerState>,
2065) -> Result<Value, String> {
2066 use base64::Engine;
2067 let params: VoicePushParams =
2068 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2069 let pcm = base64::engine::general_purpose::STANDARD
2070 .decode(¶ms.pcm_b64)
2071 .map_err(|e| format!("invalid pcm_b64: {}", e))?;
2072 let json = car_ffi_common::voice::transcribe_stream_push(
2073 ¶ms.session_id,
2074 &pcm,
2075 state.voice_sessions.clone(),
2076 )
2077 .await?;
2078 serde_json::from_str(&json).map_err(|e| e.to_string())
2079}
2080
2081fn handle_voice_sessions_list(state: &Arc<ServerState>) -> Value {
2082 let json = car_ffi_common::voice::list_voice_sessions(state.voice_sessions.clone());
2083 serde_json::from_str(&json).unwrap_or(Value::Null)
2084}
2085
2086#[derive(Deserialize)]
2087struct EnrollSpeakerParams {
2088 label: String,
2089 audio: Value,
2090}
2091
2092async fn handle_enroll_speaker(req: &JsonRpcMessage) -> Result<Value, String> {
2093 let params: EnrollSpeakerParams =
2094 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2095 let audio_json = serde_json::to_string(¶ms.audio).map_err(|e| e.to_string())?;
2096 let json = car_ffi_common::voice::enroll_speaker(¶ms.label, &audio_json).await?;
2097 serde_json::from_str(&json).map_err(|e| e.to_string())
2098}
2099
2100#[derive(Deserialize)]
2101struct RemoveEnrollmentParams {
2102 label: String,
2103}
2104
2105fn handle_remove_enrollment(req: &JsonRpcMessage) -> Result<Value, String> {
2106 let params: RemoveEnrollmentParams =
2107 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2108 let json = car_ffi_common::voice::remove_enrollment(¶ms.label)?;
2109 serde_json::from_str(&json).map_err(|e| e.to_string())
2110}
2111
2112#[derive(Deserialize)]
2113struct WorkflowRunParams {
2114 workflow: Value,
2115}
2116
2117async fn handle_workflow_run(
2118 req: &JsonRpcMessage,
2119 session: &Arc<crate::session::ClientSession>,
2120) -> Result<Value, String> {
2121 let params: WorkflowRunParams =
2122 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2123 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
2124 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(WsAgentRunner {
2125 channel: session.channel.clone(),
2126 host: session.host.clone(),
2127 client_id: session.client_id.clone(),
2128 });
2129 let json = car_ffi_common::workflow::run_workflow(&workflow_json, runner).await?;
2130 serde_json::from_str(&json).map_err(|e| e.to_string())
2131}
2132
2133#[derive(Deserialize)]
2134struct WorkflowVerifyParams {
2135 workflow: Value,
2136}
2137
2138fn handle_workflow_verify(req: &JsonRpcMessage) -> Result<Value, String> {
2139 let params: WorkflowVerifyParams =
2140 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2141 let workflow_json = serde_json::to_string(¶ms.workflow).map_err(|e| e.to_string())?;
2142 let json = car_ffi_common::workflow::verify_workflow(&workflow_json)?;
2143 serde_json::from_str(&json).map_err(|e| e.to_string())
2144}
2145
2146async fn handle_meeting_start(
2151 req: &JsonRpcMessage,
2152 state: &Arc<ServerState>,
2153 session: &Arc<crate::session::ClientSession>,
2154) -> Result<Value, String> {
2155 let mut req_value = req.params.clone();
2161 let meeting_id = req_value
2162 .get("id")
2163 .and_then(|v| v.as_str())
2164 .map(str::to_string)
2165 .unwrap_or_else(|| uuid::Uuid::new_v4().simple().to_string());
2166 if let Some(map) = req_value.as_object_mut() {
2167 map.insert("id".into(), Value::String(meeting_id.clone()));
2168 }
2169 let request_json = serde_json::to_string(&req_value).map_err(|e| e.to_string())?;
2170
2171 let ws_upstream: Arc<dyn car_voice::VoiceEventSink> =
2172 Arc::new(crate::session::WsVoiceEventSink {
2173 channel: session.channel.clone(),
2174 });
2175
2176 let upstream: Arc<dyn car_voice::VoiceEventSink> =
2181 Arc::new(crate::session::WsMemgineIngestSink {
2182 meeting_id,
2183 engine: session.memgine.clone(),
2184 upstream: ws_upstream,
2185 });
2186
2187 let cwd = std::env::current_dir().ok();
2188 let json = car_ffi_common::meeting::start_meeting(
2189 &request_json,
2190 state.meetings.clone(),
2191 state.voice_sessions.clone(),
2192 upstream,
2193 None,
2194 cwd,
2195 )
2196 .await?;
2197 serde_json::from_str(&json).map_err(|e| e.to_string())
2198}
2199
2200#[derive(Deserialize)]
2201struct MeetingStopParams {
2202 meeting_id: String,
2203 #[serde(default = "default_summarize")]
2204 summarize: bool,
2205}
2206
2207fn default_summarize() -> bool {
2208 true
2209}
2210
2211async fn handle_meeting_stop(
2212 req: &JsonRpcMessage,
2213 state: &Arc<ServerState>,
2214 _session: &Arc<crate::session::ClientSession>,
2215) -> Result<Value, String> {
2216 let params: MeetingStopParams =
2217 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2218 let inference = if params.summarize {
2219 Some(state.inference.get().cloned()).flatten()
2220 } else {
2221 None
2222 };
2223 let json = car_ffi_common::meeting::stop_meeting(
2224 ¶ms.meeting_id,
2225 params.summarize,
2226 state.meetings.clone(),
2227 state.voice_sessions.clone(),
2228 inference,
2229 )
2230 .await?;
2231 serde_json::from_str(&json).map_err(|e| e.to_string())
2232}
2233
2234#[derive(Deserialize, Default)]
2235struct MeetingListParams {
2236 #[serde(default)]
2237 root: Option<std::path::PathBuf>,
2238}
2239
2240fn handle_meeting_list(req: &JsonRpcMessage) -> Result<Value, String> {
2241 let params: MeetingListParams =
2242 serde_json::from_value(req.params.clone()).unwrap_or_default();
2243 let cwd = std::env::current_dir().ok();
2244 let json = car_ffi_common::meeting::list_meetings(params.root, cwd)?;
2245 serde_json::from_str(&json).map_err(|e| e.to_string())
2246}
2247
2248#[derive(Deserialize)]
2249struct MeetingGetParams {
2250 meeting_id: String,
2251 #[serde(default)]
2252 root: Option<std::path::PathBuf>,
2253}
2254
2255fn handle_meeting_get(req: &JsonRpcMessage) -> Result<Value, String> {
2256 let params: MeetingGetParams =
2257 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2258 let cwd = std::env::current_dir().ok();
2259 let json = car_ffi_common::meeting::get_meeting(¶ms.meeting_id, params.root, cwd)?;
2260 serde_json::from_str(&json).map_err(|e| e.to_string())
2261}
2262
2263#[derive(Deserialize, Default)]
2268struct RegistryRegisterParams {
2269 entry: Value,
2273 #[serde(default)]
2274 registry_path: Option<std::path::PathBuf>,
2275}
2276
2277fn handle_registry_register(req: &JsonRpcMessage) -> Result<Value, String> {
2278 let params: RegistryRegisterParams =
2279 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2280 let entry_json = serde_json::to_string(¶ms.entry).map_err(|e| e.to_string())?;
2281 car_ffi_common::registry::register_agent(&entry_json, params.registry_path)?;
2282 Ok(Value::Null)
2283}
2284
2285#[derive(Deserialize, Default)]
2286struct RegistryNameParams {
2287 name: String,
2288 #[serde(default)]
2289 registry_path: Option<std::path::PathBuf>,
2290}
2291
2292fn handle_registry_heartbeat(req: &JsonRpcMessage) -> Result<Value, String> {
2293 let params: RegistryNameParams =
2294 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2295 let json = car_ffi_common::registry::agent_heartbeat(¶ms.name, params.registry_path)?;
2296 serde_json::from_str(&json).map_err(|e| e.to_string())
2297}
2298
2299fn handle_registry_unregister(req: &JsonRpcMessage) -> Result<Value, String> {
2300 let params: RegistryNameParams =
2301 serde_json::from_value(req.params.clone()).map_err(|e| e.to_string())?;
2302 car_ffi_common::registry::unregister_agent(¶ms.name, params.registry_path)?;
2303 Ok(Value::Null)
2304}
2305
2306#[derive(Deserialize, Default)]
2307struct RegistryListParams {
2308 #[serde(default)]
2309 registry_path: Option<std::path::PathBuf>,
2310}
2311
2312fn handle_registry_list(req: &JsonRpcMessage) -> Result<Value, String> {
2313 let params: RegistryListParams =
2314 serde_json::from_value(req.params.clone()).unwrap_or_default();
2315 let json = car_ffi_common::registry::list_agents(params.registry_path)?;
2316 serde_json::from_str(&json).map_err(|e| e.to_string())
2317}
2318
2319#[derive(Deserialize, Default)]
2320struct RegistryReapParams {
2321 #[serde(default = "default_reap_age")]
2324 max_age_secs: u64,
2325 #[serde(default)]
2326 registry_path: Option<std::path::PathBuf>,
2327}
2328
2329fn default_reap_age() -> u64 {
2330 60
2331}
2332
2333fn handle_registry_reap(req: &JsonRpcMessage) -> Result<Value, String> {
2334 let params: RegistryReapParams =
2335 serde_json::from_value(req.params.clone()).unwrap_or_default();
2336 let json = car_ffi_common::registry::reap_stale_agents(params.max_age_secs, params.registry_path)?;
2337 serde_json::from_str(&json).map_err(|e| e.to_string())
2338}
2339
2340async fn handle_a2a_start(req: &JsonRpcMessage) -> Result<Value, String> {
2347 let params_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2348 let json = car_ffi_common::a2a::start_a2a(¶ms_json).await?;
2349 serde_json::from_str(&json).map_err(|e| e.to_string())
2350}
2351
2352fn handle_a2a_stop() -> Result<Value, String> {
2353 let json = car_ffi_common::a2a::stop_a2a()?;
2354 serde_json::from_str(&json).map_err(|e| e.to_string())
2355}
2356
2357fn handle_a2a_status() -> Result<Value, String> {
2358 let json = car_ffi_common::a2a::a2a_status()?;
2359 serde_json::from_str(&json).map_err(|e| e.to_string())
2360}
2361
2362async fn handle_run_applescript(req: &JsonRpcMessage) -> Result<Value, String> {
2370 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2371 let json = car_ffi_common::automation::run_applescript(&args_json).await?;
2372 serde_json::from_str(&json).map_err(|e| e.to_string())
2373}
2374
2375async fn handle_list_shortcuts(req: &JsonRpcMessage) -> Result<Value, String> {
2376 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2377 let json = car_ffi_common::automation::list_shortcuts(&args_json).await?;
2378 serde_json::from_str(&json).map_err(|e| e.to_string())
2379}
2380
2381async fn handle_run_shortcut(req: &JsonRpcMessage) -> Result<Value, String> {
2382 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2383 let json = car_ffi_common::automation::run_shortcut(&args_json).await?;
2384 serde_json::from_str(&json).map_err(|e| e.to_string())
2385}
2386
2387async fn handle_vision_ocr(req: &JsonRpcMessage) -> Result<Value, String> {
2388 let args_json = serde_json::to_string(&req.params).map_err(|e| e.to_string())?;
2389 let json = car_ffi_common::vision::ocr(&args_json).await?;
2390 serde_json::from_str(&json).map_err(|e| e.to_string())
2391}