use car_engine::{Runtime, ToolExecutor};
use car_eventlog::EventLog;
use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
use futures::Sink;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio_tungstenite::tungstenite::{Error as WsError, Message};
pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum A2aRouteAuth {
None,
Bearer { token: String },
Header { name: String, value: String },
}
pub struct WsChannel {
pub write: Mutex<WsSink>,
pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
pub next_id: AtomicU64,
}
impl WsChannel {
pub fn next_request_id(&self) -> String {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
format!("cb-{}", id)
}
}
pub struct WsToolExecutor {
pub channel: Arc<WsChannel>,
}
#[async_trait::async_trait]
impl ToolExecutor for WsToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
use futures::SinkExt;
let request_id = self.channel.next_request_id();
let callback = ToolExecuteRequest {
action_id: request_id.clone(),
tool: tool.to_string(),
parameters: params.clone(),
timeout_ms: None,
attempt: 1,
};
let (tx, rx) = oneshot::channel();
self.channel
.pending
.lock()
.await
.insert(request_id.clone(), tx);
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"method": "tools.execute",
"params": callback,
"id": request_id,
});
let msg = Message::Text(
serde_json::to_string(&rpc_request)
.map_err(|e| e.to_string())?
.into(),
);
self.channel
.write
.lock()
.await
.send(msg)
.await
.map_err(|e| format!("failed to send tool callback: {}", e))?;
let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
.await
.map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
.map_err(|_| format!("tool '{}' callback channel closed", tool))?;
if let Some(err) = response.error {
Err(err)
} else {
Ok(response.output.unwrap_or(Value::Null))
}
}
}
pub struct WsVoiceEventSink {
pub channel: Arc<WsChannel>,
}
impl car_voice::VoiceEventSink for WsVoiceEventSink {
fn send(&self, session_id: &str, event_json: String) {
use futures::SinkExt;
let channel = self.channel.clone();
let session_id = session_id.to_string();
tokio::spawn(async move {
let payload: Value = serde_json::from_str(&event_json)
.unwrap_or_else(|_| Value::String(event_json.clone()));
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "voice.event",
"params": {
"session_id": session_id,
"event": payload,
},
});
let Ok(text) = serde_json::to_string(¬ification) else {
return;
};
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
});
}
}
pub struct WsMemgineIngestSink {
pub meeting_id: String,
pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub upstream: Arc<dyn car_voice::VoiceEventSink>,
}
impl car_voice::VoiceEventSink for WsMemgineIngestSink {
fn send(&self, voice_session_id: &str, event_json: String) {
if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
&value,
&self.meeting_id,
voice_session_id,
) {
let engine = self.engine.clone();
tokio::spawn(async move {
let mut guard = engine.lock().await;
guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
});
}
}
self.upstream.send(voice_session_id, event_json);
}
}
pub struct ClientSession {
pub client_id: String,
pub runtime: Arc<Runtime>,
pub channel: Arc<WsChannel>,
pub host: Arc<crate::host::HostState>,
pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub browser: car_ffi_common::browser::BrowserSessionSlot,
pub authenticated: std::sync::atomic::AtomicBool,
pub agent_id: tokio::sync::Mutex<Option<String>>,
pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
}
impl ClientSession {
pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
return eng.clone();
}
self.memgine.clone()
}
}
#[derive(Debug, Clone)]
pub struct ApprovalGate {
pub enabled: bool,
pub methods: std::collections::HashSet<String>,
pub timeout: std::time::Duration,
}
impl Default for ApprovalGate {
fn default() -> Self {
let methods = [
"automation.run_applescript",
"automation.shortcuts.run",
"messages.send",
"mail.send",
"vision.ocr",
]
.iter()
.map(|s| s.to_string())
.collect();
Self {
enabled: true,
methods,
timeout: std::time::Duration::from_secs(60),
}
}
}
impl ApprovalGate {
pub fn disabled() -> Self {
Self {
enabled: false,
methods: std::collections::HashSet::new(),
timeout: std::time::Duration::from_secs(60),
}
}
pub fn requires_approval(&self, method: &str) -> bool {
self.enabled && self.methods.contains(method)
}
}
pub struct ServerStateConfig {
pub journal_dir: PathBuf,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub inference: Option<Arc<car_inference::InferenceEngine>>,
pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
pub approval_gate: Option<ApprovalGate>,
}
impl ServerStateConfig {
pub fn new(journal_dir: PathBuf) -> Self {
Self {
journal_dir,
shared_memgine: None,
inference: None,
a2a_runtime: None,
a2a_store: None,
a2a_card_source: None,
approval_gate: None,
}
}
pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
self.shared_memgine = Some(engine);
self
}
pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
self.inference = Some(engine);
self
}
pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
self.a2a_runtime = Some(runtime);
self
}
pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
self.a2a_store = Some(store);
self
}
pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
self.a2a_card_source = Some(source);
self
}
pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
self.approval_gate = Some(gate);
self
}
}
pub struct ServerState {
pub journal_dir: PathBuf,
pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
pub host: Arc<crate::host::HostState>,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
pub meetings: Arc<car_meeting::MeetingRegistry>,
pub a2ui: car_a2ui::A2uiSurfaceStore,
pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
pub admission: Arc<crate::admission::InferenceAdmission>,
pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
pub auth_token: std::sync::OnceLock<String>,
pub attached_agents: Mutex<HashMap<String, String>>,
pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
pub mcp_url: std::sync::OnceLock<String>,
pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
pub approval_gate: ApprovalGate,
pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
}
impl ServerState {
pub fn standalone(journal_dir: PathBuf) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir))
}
pub fn embedded(
journal_dir: PathBuf,
shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
}
pub fn with_config(cfg: ServerStateConfig) -> Self {
let inference = std::sync::OnceLock::new();
if let Some(eng) = cfg.inference {
let _ = inference.set(eng);
}
let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
voice_sessions.start_sweeper();
let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
let ui_agent_oscillation =
Arc::new(crate::ui_agent_loop::OscillationDetector::new());
let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
Self {
journal_dir: cfg.journal_dir,
sessions: Mutex::new(HashMap::new()),
inference,
host: Arc::new(crate::host::HostState::new()),
shared_memgine: cfg.shared_memgine,
voice_sessions,
meetings: Arc::new(car_meeting::MeetingRegistry::new()),
a2ui: car_a2ui::A2uiSurfaceStore::new(),
ui_agent,
ui_agent_oscillation,
ui_agent_budget,
admission: Arc::new(crate::admission::InferenceAdmission::new()),
a2ui_route_auth: Mutex::new(HashMap::new()),
supervisor: std::sync::OnceLock::new(),
a2a_dispatcher: std::sync::OnceLock::new(),
a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
a2a_store: std::sync::Mutex::new(cfg.a2a_store),
a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
a2ui_subscribers: Mutex::new(HashMap::new()),
auth_token: std::sync::OnceLock::new(),
attached_agents: Mutex::new(HashMap::new()),
agent_memgines: Mutex::new(HashMap::new()),
mcp_url: std::sync::OnceLock::new(),
mcp_sessions: std::sync::OnceLock::new(),
approval_gate: cfg.approval_gate.unwrap_or_default(),
}
}
pub fn install_auth_token(&self, token: String) -> Result<(), String> {
self.auth_token.set(token)
}
pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
self.mcp_url.set(url)
}
pub fn install_mcp_sessions(
&self,
sessions: Arc<crate::mcp::SessionMap>,
) -> Result<(), Arc<crate::mcp::SessionMap>> {
self.mcp_sessions.set(sessions)
}
pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
if let Some(s) = self.supervisor.get() {
return Ok(s.clone());
}
let s = car_registry::supervisor::Supervisor::user_default()
.map(Arc::new)
.map_err(|e| e.to_string())?;
let _ = self.supervisor.set(s);
Ok(self.supervisor.get().expect("set or pre-existing").clone())
}
pub fn install_supervisor(
&self,
supervisor: Arc<car_registry::supervisor::Supervisor>,
) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
self.supervisor.set(supervisor)
}
pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
if let Some(d) = self.a2a_dispatcher.get() {
return d.clone();
}
let runtime = self
.a2a_runtime
.lock()
.expect("a2a_runtime mutex poisoned")
.take();
let runtime = match runtime {
Some(r) => r,
None => {
let r = Arc::new(car_engine::Runtime::new());
r.register_agent_basics().await;
r
}
};
let store = self
.a2a_store
.lock()
.expect("a2a_store mutex poisoned")
.take()
.unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
let card_source = self
.a2a_card_source
.lock()
.expect("a2a_card_source mutex poisoned")
.take();
let card_source = match card_source {
Some(c) => c,
None => {
let card = car_a2a::build_default_agent_card(
&runtime,
car_a2a::AgentCardConfig::minimal(
"Common Agent Runtime",
"Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
"ws://127.0.0.1:9100/",
car_a2a::AgentProvider {
organization: "Parslee".into(),
url: Some("https://github.com/Parslee-ai/car".into()),
},
),
)
.await;
Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
}
};
let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
let _ = self.a2a_dispatcher.set(dispatcher);
self.a2a_dispatcher
.get()
.expect("a2a_dispatcher set or pre-existing")
.clone()
}
pub async fn create_session(
&self,
client_id: &str,
channel: Arc<WsChannel>,
) -> Arc<ClientSession> {
let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
let event_log = EventLog::with_journal(journal_path);
let executor = Arc::new(WsToolExecutor {
channel: channel.clone(),
});
let runtime = Runtime::new()
.with_event_log(event_log)
.with_executor(executor);
let memgine = match &self.shared_memgine {
Some(eng) => eng.clone(),
None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
};
let session = Arc::new(ClientSession {
client_id: client_id.to_string(),
runtime: Arc::new(runtime),
channel,
host: self.host.clone(),
memgine,
browser: car_ffi_common::browser::BrowserSessionSlot::new(),
authenticated: std::sync::atomic::AtomicBool::new(false),
agent_id: tokio::sync::Mutex::new(None),
bound_memgine: tokio::sync::Mutex::new(None),
});
self.sessions
.lock()
.await
.insert(client_id.to_string(), session.clone());
session
}
pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
let removed = self.sessions.lock().await.remove(client_id);
if let Some(session) = &removed {
let bound = session.agent_id.lock().await.clone();
if let Some(id) = bound {
let mut attached = self.attached_agents.lock().await;
if attached.get(&id).map(String::as_str) == Some(client_id) {
attached.remove(&id);
}
}
}
removed
}
}