use car_engine::{Runtime, ToolExecutor};
use car_eventlog::EventLog;
use car_proto::{ToolExecuteRequest, ToolExecuteResponse};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio_tungstenite::tungstenite::Message;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum A2aRouteAuth {
None,
Bearer { token: String },
Header { name: String, value: String },
}
pub struct WsChannel {
pub write: Mutex<
futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
Message,
>,
>,
pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
pub next_id: AtomicU64,
}
impl WsChannel {
pub fn next_request_id(&self) -> String {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
format!("cb-{}", id)
}
}
pub struct WsToolExecutor {
pub channel: Arc<WsChannel>,
}
#[async_trait::async_trait]
impl ToolExecutor for WsToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
use futures::SinkExt;
let request_id = self.channel.next_request_id();
let callback = ToolExecuteRequest {
action_id: request_id.clone(),
tool: tool.to_string(),
parameters: params.clone(),
timeout_ms: None,
attempt: 1,
};
let (tx, rx) = oneshot::channel();
self.channel
.pending
.lock()
.await
.insert(request_id.clone(), tx);
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"method": "tools.execute",
"params": callback,
"id": request_id,
});
let msg = Message::Text(
serde_json::to_string(&rpc_request)
.map_err(|e| e.to_string())?
.into(),
);
self.channel
.write
.lock()
.await
.send(msg)
.await
.map_err(|e| format!("failed to send tool callback: {}", e))?;
let response = tokio::time::timeout(std::time::Duration::from_secs(60), rx)
.await
.map_err(|_| format!("tool '{}' callback timed out (60s)", tool))?
.map_err(|_| format!("tool '{}' callback channel closed", tool))?;
if let Some(err) = response.error {
Err(err)
} else {
Ok(response.output.unwrap_or(Value::Null))
}
}
}
pub struct WsVoiceEventSink {
pub channel: Arc<WsChannel>,
}
impl car_voice::VoiceEventSink for WsVoiceEventSink {
fn send(&self, session_id: &str, event_json: String) {
use futures::SinkExt;
let channel = self.channel.clone();
let session_id = session_id.to_string();
tokio::spawn(async move {
let payload: Value = serde_json::from_str(&event_json)
.unwrap_or_else(|_| Value::String(event_json.clone()));
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "voice.event",
"params": {
"session_id": session_id,
"event": payload,
},
});
let Ok(text) = serde_json::to_string(¬ification) else {
return;
};
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
});
}
}
pub struct WsMemgineIngestSink {
pub meeting_id: String,
pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub upstream: Arc<dyn car_voice::VoiceEventSink>,
}
impl car_voice::VoiceEventSink for WsMemgineIngestSink {
fn send(&self, voice_session_id: &str, event_json: String) {
if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
&value,
&self.meeting_id,
voice_session_id,
) {
let engine = self.engine.clone();
tokio::spawn(async move {
let mut guard = engine.lock().await;
guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
});
}
}
self.upstream.send(voice_session_id, event_json);
}
}
pub struct ClientSession {
pub client_id: String,
pub runtime: Arc<Runtime>,
pub channel: Arc<WsChannel>,
pub host: Arc<crate::host::HostState>,
pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub browser: car_ffi_common::browser::BrowserSessionSlot,
}
pub struct ServerStateConfig {
pub journal_dir: PathBuf,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub inference: Option<Arc<car_inference::InferenceEngine>>,
}
impl ServerStateConfig {
pub fn new(journal_dir: PathBuf) -> Self {
Self {
journal_dir,
shared_memgine: None,
inference: None,
}
}
pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
self.shared_memgine = Some(engine);
self
}
pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
self.inference = Some(engine);
self
}
}
pub struct ServerState {
pub journal_dir: PathBuf,
pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
pub host: Arc<crate::host::HostState>,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
pub meetings: Arc<car_meeting::MeetingRegistry>,
pub a2ui: car_a2ui::A2uiSurfaceStore,
pub admission: Arc<crate::admission::InferenceAdmission>,
pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
}
impl ServerState {
pub fn standalone(journal_dir: PathBuf) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir))
}
pub fn embedded(
journal_dir: PathBuf,
shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
}
pub fn with_config(cfg: ServerStateConfig) -> Self {
let inference = std::sync::OnceLock::new();
if let Some(eng) = cfg.inference {
let _ = inference.set(eng);
}
let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
voice_sessions.start_sweeper();
Self {
journal_dir: cfg.journal_dir,
sessions: Mutex::new(HashMap::new()),
inference,
host: Arc::new(crate::host::HostState::new()),
shared_memgine: cfg.shared_memgine,
voice_sessions,
meetings: Arc::new(car_meeting::MeetingRegistry::new()),
a2ui: car_a2ui::A2uiSurfaceStore::new(),
admission: Arc::new(crate::admission::InferenceAdmission::new()),
a2ui_route_auth: Mutex::new(HashMap::new()),
}
}
pub async fn create_session(
&self,
client_id: &str,
channel: Arc<WsChannel>,
) -> Arc<ClientSession> {
let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
let event_log = EventLog::with_journal(journal_path);
let executor = Arc::new(WsToolExecutor {
channel: channel.clone(),
});
let runtime = Runtime::new()
.with_event_log(event_log)
.with_executor(executor);
let memgine = match &self.shared_memgine {
Some(eng) => eng.clone(),
None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
};
let session = Arc::new(ClientSession {
client_id: client_id.to_string(),
runtime: Arc::new(runtime),
channel,
host: self.host.clone(),
memgine,
browser: car_ffi_common::browser::BrowserSessionSlot::new(),
});
self.sessions
.lock()
.await
.insert(client_id.to_string(), session.clone());
session
}
pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
self.sessions.lock().await.remove(client_id)
}
}