use car_engine::{Runtime, ToolExecutor};
use car_eventlog::EventLog;
use car_proto::{ToolCancelRequest, ToolExecuteRequest, ToolExecuteResponse};
use futures::Sink;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio_tungstenite::tungstenite::{Error as WsError, Message};
pub type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Unpin + 'static>>;
pub const RUN_COMPLETE_GRACE: std::time::Duration = std::time::Duration::from_millis(250);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum A2aRouteAuth {
None,
Bearer { token: String },
Header { name: String, value: String },
}
pub struct WsChannel {
pub write: Mutex<WsSink>,
pub pending: Mutex<HashMap<String, oneshot::Sender<ToolExecuteResponse>>>,
pub next_id: AtomicU64,
}
impl WsChannel {
pub fn next_request_id(&self) -> String {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
format!("cb-{}", id)
}
#[cfg(test)]
pub fn test_stub() -> Self {
use futures::sink::SinkExt;
let sink: WsSink = Box::pin(futures::sink::drain().sink_map_err(|_| {
tokio_tungstenite::tungstenite::Error::ConnectionClosed
}));
WsChannel {
write: Mutex::new(sink),
pending: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ChatSession {
pub agent_id: String,
pub host_client_id: String,
pub created_at: u64,
}
#[derive(Debug, Clone)]
pub struct RunMeta {
pub run_id: String,
pub agent_id: String,
pub client_id: String,
pub intent: String,
pub outcome_description: Option<String>,
pub started_at: chrono::DateTime<chrono::Utc>,
pub termination: Option<car_proto::RunTermination>,
pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
pub turns: Vec<car_proto::RunRecord>,
}
pub const RECORD_TURNS_RUN_CEILING: usize = 2000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecordRunTurnsOutcome {
Appended { new_total: usize },
RefusedCeiling,
UnknownOrTerminal,
}
impl RunMeta {
pub fn is_terminal(&self) -> bool {
self.termination.is_some()
}
pub fn live_status(&self) -> car_proto::RunLiveStatus {
match &self.termination {
None => car_proto::RunLiveStatus::InProgress,
Some(car_proto::RunTermination::Outcome { .. }) => {
car_proto::RunLiveStatus::Completed
}
Some(car_proto::RunTermination::Incomplete) => car_proto::RunLiveStatus::Incomplete,
}
}
pub fn turn_cursor(&self) -> usize {
self.turns.len()
}
}
pub const DEFAULT_TOOL_TIMEOUT_MS: u64 = 300_000;
const TOOL_TIMEOUT_GRACE_MS: u64 = 5_000;
fn tool_callback_timeout(action_timeout_ms: Option<u64>) -> std::time::Duration {
let ms = match action_timeout_ms {
Some(budget) => budget.saturating_add(TOOL_TIMEOUT_GRACE_MS),
None => std::env::var("CAR_TOOL_TIMEOUT")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.map(|secs| secs.saturating_mul(1000))
.unwrap_or(DEFAULT_TOOL_TIMEOUT_MS),
};
std::time::Duration::from_millis(ms)
}
pub struct WsToolExecutor {
pub channel: Arc<WsChannel>,
}
#[async_trait::async_trait]
impl ToolExecutor for WsToolExecutor {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
self.execute_with_action(tool, params, "", None).await
}
async fn execute_with_action(
&self,
tool: &str,
params: &Value,
action_id: &str,
timeout_ms: Option<u64>,
) -> Result<Value, String> {
use futures::SinkExt;
let request_id = self.channel.next_request_id();
let callback = ToolExecuteRequest {
action_id: action_id.to_string(),
tool: tool.to_string(),
parameters: params.clone(),
timeout_ms,
attempt: 1,
request_id: request_id.clone(),
};
let (tx, rx) = oneshot::channel();
self.channel
.pending
.lock()
.await
.insert(request_id.clone(), tx);
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"method": "tools.execute",
"params": callback,
"id": request_id,
});
let msg = Message::Text(
serde_json::to_string(&rpc_request)
.map_err(|e| e.to_string())?
.into(),
);
self.channel
.write
.lock()
.await
.send(msg)
.await
.map_err(|e| format!("failed to send tool callback: {}", e))?;
let wait = tool_callback_timeout(timeout_ms);
let response = match tokio::time::timeout(wait, rx).await {
Ok(inner) => inner
.map_err(|_| format!("tool '{}' callback channel closed", tool))?,
Err(_) => {
let reason = format!(
"tool '{}' callback timed out ({}s)",
tool,
wait.as_secs()
);
let claimed = self
.channel
.pending
.lock()
.await
.remove(&request_id)
.is_some();
if claimed {
self.spawn_tool_cancel(request_id.clone(), action_id.to_string(), reason.clone());
}
return Err(reason);
}
};
if let Some(err) = response.error {
Err(err)
} else {
Ok(response.output.unwrap_or(Value::Null))
}
}
}
impl WsToolExecutor {
fn spawn_tool_cancel(&self, request_id: String, action_id: String, reason: String) {
use futures::SinkExt;
let channel = self.channel.clone();
tokio::spawn(async move {
let cancel = ToolCancelRequest {
request_id,
action_id,
reason,
};
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "tools.cancel",
"params": cancel,
});
let Ok(text) = serde_json::to_string(¬ification) else {
return;
};
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
});
}
}
pub struct WsVoiceEventSink {
pub channel: Arc<WsChannel>,
}
impl car_voice::VoiceEventSink for WsVoiceEventSink {
fn send(&self, session_id: &str, event_json: String) {
use futures::SinkExt;
let channel = self.channel.clone();
let session_id = session_id.to_string();
tokio::spawn(async move {
let payload: Value = serde_json::from_str(&event_json)
.unwrap_or_else(|_| Value::String(event_json.clone()));
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "voice.event",
"params": {
"session_id": session_id,
"event": payload,
},
});
let Ok(text) = serde_json::to_string(¬ification) else {
return;
};
let _ = channel
.write
.lock()
.await
.send(Message::Text(text.into()))
.await;
});
}
fn send_binary(&self, frame: Vec<u8>) {
use futures::SinkExt;
let channel = self.channel.clone();
tokio::spawn(async move {
let _ = channel
.write
.lock()
.await
.send(Message::Binary(frame.into()))
.await;
});
}
}
pub struct WsMemgineIngestSink {
pub meeting_id: String,
pub engine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub upstream: Arc<dyn car_voice::VoiceEventSink>,
}
impl car_voice::VoiceEventSink for WsMemgineIngestSink {
fn send(&self, voice_session_id: &str, event_json: String) {
if let Ok(value) = serde_json::from_str::<Value>(&event_json) {
if let Some((speaker, text)) = car_meeting::extract_transcript_for_ingest(
&value,
&self.meeting_id,
voice_session_id,
) {
let engine = self.engine.clone();
tokio::spawn(async move {
let mut guard = engine.lock().await;
guard.ingest_conversation(&speaker, &text, chrono::Utc::now());
});
}
}
self.upstream.send(voice_session_id, event_json);
}
}
pub struct ClientSession {
pub client_id: String,
pub runtime: Arc<Runtime>,
pub channel: Arc<WsChannel>,
pub host: Arc<crate::host::HostState>,
pub memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
pub browser: car_ffi_common::browser::BrowserSessionSlot,
pub authenticated: std::sync::atomic::AtomicBool,
pub is_host: std::sync::atomic::AtomicBool,
pub agent_id: tokio::sync::Mutex<Option<String>>,
pub bound_memgine: tokio::sync::Mutex<Option<Arc<Mutex<car_memgine::MemgineEngine>>>>,
pub current_run_id: tokio::sync::Mutex<Option<String>>,
}
impl ClientSession {
pub async fn effective_memgine(&self) -> Arc<Mutex<car_memgine::MemgineEngine>> {
if let Some(eng) = self.bound_memgine.lock().await.as_ref() {
return eng.clone();
}
self.memgine.clone()
}
}
#[derive(Debug, Clone)]
pub struct ApprovalGate {
pub enabled: bool,
pub methods: std::collections::HashSet<String>,
pub timeout: std::time::Duration,
}
impl Default for ApprovalGate {
fn default() -> Self {
let methods = [
"automation.run_applescript",
"automation.shortcuts.run",
"messages.send",
"mail.send",
"vision.ocr",
]
.iter()
.map(|s| s.to_string())
.collect();
Self {
enabled: true,
methods,
timeout: std::time::Duration::from_secs(60),
}
}
}
impl ApprovalGate {
pub fn disabled() -> Self {
Self {
enabled: false,
methods: std::collections::HashSet::new(),
timeout: std::time::Duration::from_secs(60),
}
}
pub fn requires_approval(&self, method: &str) -> bool {
self.enabled && self.methods.contains(method)
}
}
pub struct ServerStateConfig {
pub journal_dir: PathBuf,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub inference: Option<Arc<car_inference::InferenceEngine>>,
pub a2a_runtime: Option<Arc<car_engine::Runtime>>,
pub a2a_store: Option<Arc<dyn car_a2a::TaskStore>>,
pub a2a_card_source: Option<Arc<car_a2a::AgentCardSource>>,
pub approval_gate: Option<ApprovalGate>,
}
impl ServerStateConfig {
pub fn new(journal_dir: PathBuf) -> Self {
Self {
journal_dir,
shared_memgine: None,
inference: None,
a2a_runtime: None,
a2a_store: None,
a2a_card_source: None,
approval_gate: None,
}
}
pub fn with_shared_memgine(mut self, engine: Arc<Mutex<car_memgine::MemgineEngine>>) -> Self {
self.shared_memgine = Some(engine);
self
}
pub fn with_inference(mut self, engine: Arc<car_inference::InferenceEngine>) -> Self {
self.inference = Some(engine);
self
}
pub fn with_a2a_runtime(mut self, runtime: Arc<car_engine::Runtime>) -> Self {
self.a2a_runtime = Some(runtime);
self
}
pub fn with_a2a_store(mut self, store: Arc<dyn car_a2a::TaskStore>) -> Self {
self.a2a_store = Some(store);
self
}
pub fn with_a2a_card_source(mut self, source: Arc<car_a2a::AgentCardSource>) -> Self {
self.a2a_card_source = Some(source);
self
}
pub fn with_approval_gate(mut self, gate: ApprovalGate) -> Self {
self.approval_gate = Some(gate);
self
}
}
pub struct ServerState {
pub journal_dir: PathBuf,
pub sessions: Mutex<HashMap<String, Arc<ClientSession>>>,
pub inference: std::sync::OnceLock<Arc<car_inference::InferenceEngine>>,
pub host: Arc<crate::host::HostState>,
pub shared_memgine: Option<Arc<Mutex<car_memgine::MemgineEngine>>>,
pub voice_sessions: Arc<car_voice::VoiceSessionRegistry>,
pub meetings: Arc<car_meeting::MeetingRegistry>,
pub a2ui: car_a2ui::A2uiSurfaceStore,
pub ui_agent: Arc<car_ui_agent::UIImprovementAgent>,
pub ui_agent_oscillation: Arc<crate::ui_agent_loop::OscillationDetector>,
pub ui_agent_budget: Arc<crate::ui_agent_loop::IterationBudget>,
pub admission: Arc<crate::admission::InferenceAdmission>,
pub a2ui_route_auth: Mutex<HashMap<String, A2aRouteAuth>>,
pub supervisor: std::sync::OnceLock<Arc<car_registry::supervisor::Supervisor>>,
pub declagents: std::sync::OnceLock<Arc<car_registry::declarative::DeclRegistry>>,
pub routing: std::sync::OnceLock<Arc<car_registry::routing::RoutingStore>>,
pub observer_manifest_path: std::sync::OnceLock<PathBuf>,
pub a2a_dispatcher: std::sync::OnceLock<Arc<car_a2a::A2aDispatcher>>,
pub a2ui_subscribers: Mutex<HashMap<String, Arc<WsChannel>>>,
pub mcp_executor: Arc<car_engine::McpToolExecutor>,
pub connectors: std::sync::OnceLock<Arc<car_connectors::ConnectorManager>>,
connectors_loaded: std::sync::atomic::AtomicBool,
pub auth_token: std::sync::OnceLock<String>,
pub host_token: std::sync::OnceLock<String>,
pub parslee_session: std::sync::OnceLock<crate::parslee_auth::ParsleeSession>,
pub attached_agents: Mutex<HashMap<String, String>>,
pub agent_memgines: Mutex<HashMap<String, Arc<Mutex<car_memgine::MemgineEngine>>>>,
pub coder_sessions: Mutex<crate::coder::rpc::CoderSessionMap>,
pub coder_subscribers: Mutex<HashMap<(String, String), Arc<WsChannel>>>,
pub chat_sessions: Mutex<HashMap<String, ChatSession>>,
pub runs: Mutex<HashMap<String, RunMeta>>,
pub run_subscribers: Mutex<HashMap<(String, String), crate::host::RunTraceSubscriber>>,
run_write_locks: Mutex<HashMap<String, Arc<Mutex<()>>>>,
pub run_store: crate::run_store::RunStore,
pub mcp_url: std::sync::OnceLock<String>,
pub mcp_sessions: std::sync::OnceLock<Arc<crate::mcp::SessionMap>>,
pub approval_gate: ApprovalGate,
pub(crate) a2a_runtime: std::sync::Mutex<Option<Arc<car_engine::Runtime>>>,
pub(crate) a2a_store: std::sync::Mutex<Option<Arc<dyn car_a2a::TaskStore>>>,
pub(crate) a2a_card_source: std::sync::Mutex<Option<Arc<car_a2a::AgentCardSource>>>,
}
impl ServerState {
pub fn standalone(journal_dir: PathBuf) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir))
}
pub fn embedded(
journal_dir: PathBuf,
shared_memgine: Arc<Mutex<car_memgine::MemgineEngine>>,
) -> Self {
Self::with_config(ServerStateConfig::new(journal_dir).with_shared_memgine(shared_memgine))
}
pub fn with_config(cfg: ServerStateConfig) -> Self {
let inference = std::sync::OnceLock::new();
if let Some(eng) = cfg.inference {
let _ = inference.set(eng);
}
let voice_sessions = Arc::new(car_voice::VoiceSessionRegistry::new());
voice_sessions.start_sweeper();
let ui_agent = Arc::new(car_ui_agent::UIImprovementAgent::with_default_strategies());
let ui_agent_oscillation = Arc::new(crate::ui_agent_loop::OscillationDetector::new());
let ui_agent_budget = Arc::new(crate::ui_agent_loop::IterationBudget::new());
let run_store = crate::run_store::RunStore::from_journal_dir(&cfg.journal_dir);
let _adopted = run_store.adopt_orphans();
let _evicted = run_store.gc();
if let Ok(coder_dir) = crate::coder::rpc::coder_state_dir() {
let _coder_adopted = crate::coder::session::adopt_orphaned_sessions(&coder_dir);
}
Self {
journal_dir: cfg.journal_dir,
sessions: Mutex::new(HashMap::new()),
inference,
host: Arc::new(crate::host::HostState::new()),
shared_memgine: cfg.shared_memgine,
voice_sessions,
meetings: Arc::new(car_meeting::MeetingRegistry::new()),
a2ui: car_a2ui::A2uiSurfaceStore::new(),
ui_agent,
ui_agent_oscillation,
ui_agent_budget,
admission: Arc::new(crate::admission::InferenceAdmission::new()),
a2ui_route_auth: Mutex::new(HashMap::new()),
supervisor: std::sync::OnceLock::new(),
declagents: std::sync::OnceLock::new(),
routing: std::sync::OnceLock::new(),
observer_manifest_path: std::sync::OnceLock::new(),
a2a_dispatcher: std::sync::OnceLock::new(),
a2a_runtime: std::sync::Mutex::new(cfg.a2a_runtime),
a2a_store: std::sync::Mutex::new(cfg.a2a_store),
a2a_card_source: std::sync::Mutex::new(cfg.a2a_card_source),
a2ui_subscribers: Mutex::new(HashMap::new()),
mcp_executor: Arc::new(car_engine::McpToolExecutor::new()),
connectors: std::sync::OnceLock::new(),
connectors_loaded: std::sync::atomic::AtomicBool::new(false),
auth_token: std::sync::OnceLock::new(),
host_token: std::sync::OnceLock::new(),
parslee_session: std::sync::OnceLock::new(),
attached_agents: Mutex::new(HashMap::new()),
agent_memgines: Mutex::new(HashMap::new()),
coder_sessions: Mutex::new(HashMap::new()),
coder_subscribers: Mutex::new(HashMap::new()),
chat_sessions: Mutex::new(HashMap::new()),
runs: Mutex::new(HashMap::new()),
run_subscribers: Mutex::new(HashMap::new()),
run_write_locks: Mutex::new(HashMap::new()),
run_store,
mcp_url: std::sync::OnceLock::new(),
mcp_sessions: std::sync::OnceLock::new(),
approval_gate: cfg.approval_gate.unwrap_or_default(),
}
}
pub fn install_auth_token(&self, token: String) -> Result<(), String> {
self.auth_token.set(token)
}
pub fn install_host_token(&self, token: String) -> Result<(), String> {
self.host_token.set(token)
}
pub fn install_parslee_session(
&self,
session: crate::parslee_auth::ParsleeSession,
) -> Result<(), crate::parslee_auth::ParsleeSession> {
self.parslee_session.set(session)
}
pub fn install_mcp_url(&self, url: String) -> Result<(), String> {
self.mcp_url.set(url)
}
pub fn install_mcp_sessions(
&self,
sessions: Arc<crate::mcp::SessionMap>,
) -> Result<(), Arc<crate::mcp::SessionMap>> {
self.mcp_sessions.set(sessions)
}
pub fn connectors(&self) -> Arc<car_connectors::ConnectorManager> {
self.connectors
.get_or_init(|| {
let mgr = car_connectors::ConnectorManager::new(self.mcp_executor.clone())
.unwrap_or_else(|_| {
car_connectors::ConnectorManager::with_path(
self.mcp_executor.clone(),
std::path::PathBuf::from("connectors.json"),
)
});
Arc::new(mgr)
})
.clone()
}
pub async fn ensure_connectors_loaded(&self) {
use std::sync::atomic::Ordering;
if self.connectors_loaded.swap(true, Ordering::SeqCst) {
return;
}
let mgr = self.connectors();
match mgr.load_and_connect().await {
Ok(entries) => self.register_connector_entries(&entries).await,
Err(e) => tracing::warn!("connector load failed: {e}"),
}
}
pub async fn register_connector_entries(&self, entries: &[car_engine::ToolEntry]) {
if entries.is_empty() {
return;
}
let sessions: Vec<Arc<ClientSession>> =
self.sessions.lock().await.values().cloned().collect();
for session in sessions {
for entry in entries {
session.runtime.register_tool_entry(entry.clone()).await;
}
}
}
pub async fn unregister_connector_tools(&self, canonical_names: &[String]) {
if canonical_names.is_empty() {
return;
}
let sessions: Vec<Arc<ClientSession>> =
self.sessions.lock().await.values().cloned().collect();
for session in sessions {
for name in canonical_names {
session.runtime.unregister_tool(name).await;
}
}
}
pub fn declagents(&self) -> Result<Arc<car_registry::declarative::DeclRegistry>, String> {
if let Some(r) = self.declagents.get() {
return Ok(r.clone());
}
let r = Arc::new(car_registry::declarative::DeclRegistry::user_default()?);
let _ = self.declagents.set(r);
Ok(self.declagents.get().expect("set or pre-existing").clone())
}
pub fn routing(&self) -> Result<Arc<car_registry::routing::RoutingStore>, String> {
if let Some(r) = self.routing.get() {
return Ok(r.clone());
}
let r = Arc::new(car_registry::routing::RoutingStore::user_default()?);
let _ = self.routing.set(r);
Ok(self.routing.get().expect("set or pre-existing").clone())
}
pub fn supervisor(&self) -> Result<Arc<car_registry::supervisor::Supervisor>, String> {
if let Some(s) = self.supervisor.get() {
return Ok(s.clone());
}
if let Some(p) = self.observer_manifest_path.get() {
return Err(format!(
"this car-server is observe-only — another car-server process \
holds the supervisor lock for {}. Mutations refuse here; route \
them to the primary daemon, or stop the other car-server first.",
p.display()
));
}
let s = car_registry::supervisor::Supervisor::user_default()
.map(Arc::new)
.map_err(|e| e.to_string())?;
let _ = self.supervisor.set(s);
Ok(self.supervisor.get().expect("set or pre-existing").clone())
}
pub fn install_supervisor(
&self,
supervisor: Arc<car_registry::supervisor::Supervisor>,
) -> Result<(), Arc<car_registry::supervisor::Supervisor>> {
self.supervisor.set(supervisor)
}
pub fn supervisor_if_installed(&self) -> Option<Arc<car_registry::supervisor::Supervisor>> {
self.supervisor.get().cloned()
}
pub fn install_observer_manifest(&self, path: PathBuf) -> Result<(), PathBuf> {
self.observer_manifest_path.set(path)
}
pub fn observer_manifest_path(&self) -> Option<&PathBuf> {
self.observer_manifest_path.get()
}
pub async fn a2a_dispatcher(&self) -> Arc<car_a2a::A2aDispatcher> {
if let Some(d) = self.a2a_dispatcher.get() {
return d.clone();
}
let runtime = self
.a2a_runtime
.lock()
.expect("a2a_runtime mutex poisoned")
.take();
let runtime = match runtime {
Some(r) => r,
None => {
let r = Arc::new(car_engine::Runtime::new());
r.register_agent_basics().await;
r
}
};
let store = self
.a2a_store
.lock()
.expect("a2a_store mutex poisoned")
.take()
.unwrap_or_else(|| Arc::new(car_a2a::InMemoryTaskStore::new()));
let card_source = self
.a2a_card_source
.lock()
.expect("a2a_card_source mutex poisoned")
.take();
let card_source = match card_source {
Some(c) => c,
None => {
let card = car_a2a::build_default_agent_card(
&runtime,
car_a2a::AgentCardConfig::minimal(
"Common Agent Runtime",
"Embedded CAR daemon — A2A v1.0 reachable over WebSocket JSON-RPC.",
"ws://127.0.0.1:9100/",
car_a2a::AgentProvider {
organization: "Parslee".into(),
url: Some("https://github.com/Parslee-ai/car".into()),
},
),
)
.await;
Arc::new(move || card.clone()) as Arc<car_a2a::AgentCardSource>
}
};
let dispatcher = Arc::new(car_a2a::A2aDispatcher::new(runtime, store, card_source));
let _ = self.a2a_dispatcher.set(dispatcher);
self.a2a_dispatcher
.get()
.expect("a2a_dispatcher set or pre-existing")
.clone()
}
fn fanout_locked(
subscribers: &HashMap<(String, String), crate::host::RunTraceSubscriber>,
run_id: &str,
agent_id: &str,
record: car_proto::RunRecord,
cursor: usize,
status: car_proto::RunLiveStatus,
) {
for ((sub_run, _client), sub) in subscribers.iter() {
if sub_run != run_id {
continue;
}
let event = car_proto::RunTraceEvent {
run_id: run_id.to_string(),
agent_id: agent_id.to_string(),
record: record.clone(),
cursor,
status,
};
if !sub.push(event) {
tracing::debug!(
run_id,
"run-trace: dropped event for slow subscriber (channel full)"
);
}
}
}
pub async fn start_run(&self, meta: RunMeta) {
let started = car_proto::RunStarted {
run_id: meta.run_id.clone(),
agent_id: meta.agent_id.clone(),
intent: meta.intent.clone(),
outcome_description: meta.outcome_description.clone(),
started_at: meta.started_at,
};
{
let mut runs = self.runs.lock().await;
runs.insert(meta.run_id.clone(), meta);
let subs = self.run_subscribers.lock().await;
Self::fanout_locked(
&subs,
&started.run_id,
&started.agent_id,
car_proto::RunRecord::Started(started.clone()),
0,
car_proto::RunLiveStatus::InProgress,
);
}
if let Err(e) = self.run_store.write_started(&started) {
tracing::warn!(run_id = %started.run_id, error = %e, "run-store: failed to persist RunStarted");
}
}
pub async fn complete_run(
&self,
run_id: &str,
termination: car_proto::RunTermination,
) -> Result<(), String> {
let ended = {
let mut runs = self.runs.lock().await;
let meta = runs
.get_mut(run_id)
.ok_or_else(|| format!("unknown run_id `{run_id}`"))?;
if meta.is_terminal() {
return Err(format!(
"run `{run_id}` is already terminal — cannot record another outcome"
));
}
let ended_at = chrono::Utc::now();
meta.termination = Some(termination.clone());
meta.ended_at = Some(ended_at);
let ended = car_proto::RunEnded {
run_id: run_id.to_string(),
agent_id: meta.agent_id.clone(),
termination,
ended_at,
};
let cursor = meta.turn_cursor();
let status = meta.live_status();
let agent_id = meta.agent_id.clone();
let subs = self.run_subscribers.lock().await;
Self::fanout_locked(
&subs,
run_id,
&agent_id,
car_proto::RunRecord::Ended(ended.clone()),
cursor,
status,
);
ended
};
if let Err(e) = self.run_store.write_ended(&ended) {
tracing::warn!(run_id, error = %e, "run-store: failed to persist RunEnded");
}
self.clear_terminal_run_turns(run_id).await;
Ok(())
}
pub async fn mark_run_incomplete(&self, run_id: &str) -> bool {
let ended = {
let mut runs = self.runs.lock().await;
match runs.get_mut(run_id) {
Some(meta) if !meta.is_terminal() => {
let ended_at = chrono::Utc::now();
meta.termination = Some(car_proto::RunTermination::Incomplete);
meta.ended_at = Some(ended_at);
let ended = car_proto::RunEnded {
run_id: run_id.to_string(),
agent_id: meta.agent_id.clone(),
termination: car_proto::RunTermination::Incomplete,
ended_at,
};
let cursor = meta.turn_cursor();
let agent_id = meta.agent_id.clone();
let subs = self.run_subscribers.lock().await;
Self::fanout_locked(
&subs,
run_id,
&agent_id,
car_proto::RunRecord::Ended(ended.clone()),
cursor,
car_proto::RunLiveStatus::Incomplete,
);
Some(ended)
}
_ => None,
}
};
match ended {
Some(ended) => {
if let Err(e) = self.run_store.write_ended(&ended) {
tracing::warn!(run_id, error = %e, "run-store: failed to persist Incomplete");
}
self.clear_terminal_run_turns(run_id).await;
true
}
None => false,
}
}
async fn clear_terminal_run_turns(&self, run_id: &str) {
let mut runs = self.runs.lock().await;
if let Some(meta) = runs.get_mut(run_id) {
if meta.is_terminal() {
meta.turns = Vec::new();
}
}
drop(runs);
self.run_write_locks.lock().await.remove(run_id);
}
pub async fn run_meta(&self, run_id: &str) -> Option<RunMeta> {
self.runs.lock().await.get(run_id).cloned()
}
pub async fn run_header(&self, run_id: &str) -> Option<(String, bool, usize)> {
self.runs
.lock()
.await
.get(run_id)
.map(|m| (m.agent_id.clone(), m.is_terminal(), m.turns.len()))
}
pub async fn record_run_turns(
&self,
run_id: &str,
mut records: Vec<car_proto::RunRecord>,
) -> RecordRunTurnsOutcome {
enum Locked {
Persist(String, Vec<car_proto::RunRecord>, usize),
RefusedCeiling,
UnknownOrTerminal,
}
let locked = {
let mut runs = self.runs.lock().await;
match runs.get_mut(run_id) {
Some(meta) if !meta.is_terminal() => {
let incoming_turns = records
.iter()
.filter(|r| matches!(r, car_proto::RunRecord::Turn(_)))
.count();
if meta.turns.len() + incoming_turns > RECORD_TURNS_RUN_CEILING {
Locked::RefusedCeiling
} else {
let base = meta.turns.len();
for (offset, record) in records.iter_mut().enumerate() {
if let car_proto::RunRecord::Turn(turn) = record {
turn.index = base + offset;
}
}
meta.turns.extend(records.iter().cloned());
let len = meta.turns.len();
let agent_id = meta.agent_id.clone();
let status = meta.live_status();
let subs = self.run_subscribers.lock().await;
for (offset, record) in records.iter().enumerate() {
Self::fanout_locked(
&subs,
run_id,
&agent_id,
record.clone(),
base + offset + 1,
status,
);
}
drop(subs);
Locked::Persist(agent_id, records, len)
}
}
_ => Locked::UnknownOrTerminal,
}
};
match locked {
Locked::Persist(agent_id, records, len) => {
let write_guard = self.run_write_lock(run_id).await;
let _held = write_guard.lock().await;
if let Err(e) = self.run_store.append_turns(&agent_id, run_id, &records) {
tracing::warn!(run_id, error = %e, "run-store: failed to persist turns");
}
RecordRunTurnsOutcome::Appended { new_total: len }
}
Locked::RefusedCeiling => RecordRunTurnsOutcome::RefusedCeiling,
Locked::UnknownOrTerminal => RecordRunTurnsOutcome::UnknownOrTerminal,
}
}
async fn run_write_lock(&self, run_id: &str) -> Arc<Mutex<()>> {
let mut locks = self.run_write_locks.lock().await;
locks
.entry(run_id.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
pub async fn run_turn_count(&self, run_id: &str) -> usize {
self.runs
.lock()
.await
.get(run_id)
.map(|m| m.turns.len())
.unwrap_or(0)
}
pub async fn run_turns(&self, run_id: &str) -> Vec<car_proto::RunRecord> {
self.runs
.lock()
.await
.get(run_id)
.map(|m| m.turns.clone())
.unwrap_or_default()
}
pub async fn subscribe_run(
&self,
run_id: &str,
host_client_id: &str,
channel: Arc<WsChannel>,
) -> Option<car_proto::RunSubscribeResponse> {
let subscriber = crate::host::RunTraceSubscriber::spawn(
host_client_id.to_string(),
channel,
);
let runs = self.runs.lock().await;
let meta = runs.get(run_id)?;
let mut snapshot = meta.turns.clone();
let agent_id = meta.agent_id.clone();
let status = meta.live_status();
let needs_disk_snapshot = meta.is_terminal() && snapshot.is_empty();
{
let mut subs = self.run_subscribers.lock().await;
subs.insert((run_id.to_string(), host_client_id.to_string()), subscriber);
}
drop(runs);
if needs_disk_snapshot {
if let Some(records) = self.run_store.get_run_trace(run_id) {
snapshot = records
.into_iter()
.filter(|r| matches!(r, car_proto::RunRecord::Turn(_)))
.collect();
}
}
let cursor = snapshot.len();
Some(car_proto::RunSubscribeResponse {
run_id: run_id.to_string(),
agent_id,
turns_so_far: snapshot,
cursor,
status,
})
}
pub async fn unsubscribe_run(&self, run_id: &str, host_client_id: &str) -> bool {
self.run_subscribers
.lock()
.await
.remove(&(run_id.to_string(), host_client_id.to_string()))
.is_some()
}
pub async fn drop_run_subscribers_for_client(&self, host_client_id: &str) {
self.run_subscribers
.lock()
.await
.retain(|(_run, client), _| client != host_client_id);
}
async fn sweep_runs_for_disconnect(&self, client_id: &str) {
let pending: Vec<String> = {
let runs = self.runs.lock().await;
runs.values()
.filter(|m| m.client_id == client_id && !m.is_terminal())
.map(|m| m.run_id.clone())
.collect()
};
if pending.is_empty() {
return;
}
tokio::time::sleep(RUN_COMPLETE_GRACE).await;
for run_id in pending {
self.mark_run_incomplete(&run_id).await;
}
}
pub async fn create_session(
&self,
client_id: &str,
channel: Arc<WsChannel>,
) -> Arc<ClientSession> {
let journal_path = self.journal_dir.join(format!("{}.jsonl", client_id));
let event_log = EventLog::with_journal(journal_path);
let ws_executor = Arc::new(WsToolExecutor {
channel: channel.clone(),
});
let executor: Arc<dyn ToolExecutor> =
Arc::new(self.mcp_executor.share_with_fallback(ws_executor));
let runtime = Runtime::new()
.with_event_log(event_log)
.with_executor(executor);
let memgine = match &self.shared_memgine {
Some(eng) => eng.clone(),
None => Arc::new(Mutex::new(car_memgine::MemgineEngine::new(None))),
};
let session = Arc::new(ClientSession {
client_id: client_id.to_string(),
runtime: Arc::new(runtime),
channel,
host: self.host.clone(),
memgine,
browser: car_ffi_common::browser::BrowserSessionSlot::new(),
authenticated: std::sync::atomic::AtomicBool::new(false),
is_host: std::sync::atomic::AtomicBool::new(false),
agent_id: tokio::sync::Mutex::new(None),
bound_memgine: tokio::sync::Mutex::new(None),
current_run_id: tokio::sync::Mutex::new(None),
});
if self.connectors.get().is_some() {
for entry in self.connectors().enabled_tool_entries().await {
session.runtime.register_tool_entry(entry).await;
}
}
self.sessions
.lock()
.await
.insert(client_id.to_string(), session.clone());
session
}
pub async fn remove_session(&self, client_id: &str) -> Option<Arc<ClientSession>> {
let removed = self.sessions.lock().await.remove(client_id);
if let Some(session) = &removed {
let bound = session.agent_id.lock().await.clone();
if let Some(id) = bound {
let mut attached = self.attached_agents.lock().await;
if attached.get(&id).map(String::as_str) == Some(client_id) {
attached.remove(&id);
}
}
let bound_agent = session.agent_id.lock().await.clone();
let mut chats = self.chat_sessions.lock().await;
chats.retain(|_, s| {
if s.host_client_id == client_id {
return false;
}
if let Some(agent_id) = &bound_agent {
if &s.agent_id == agent_id {
return false;
}
}
true
});
drop(chats);
self.drop_run_subscribers_for_client(client_id).await;
crate::coder::rpc::drop_subscriptions_for_client(self, client_id).await;
self.sweep_runs_for_disconnect(client_id).await;
}
removed
}
}
#[cfg(test)]
mod tool_timeout_tests {
use super::*;
#[test]
fn honors_action_timeout_over_default() {
assert_eq!(
tool_callback_timeout(Some(180_000)),
std::time::Duration::from_millis(180_000 + TOOL_TIMEOUT_GRACE_MS)
);
assert!(tool_callback_timeout(Some(600_000)) > std::time::Duration::from_secs(600));
assert!(tool_callback_timeout(Some(180_000)) >= std::time::Duration::from_millis(180_000));
}
#[test]
fn default_is_not_the_old_60s() {
if std::env::var_os("CAR_TOOL_TIMEOUT").is_none() {
assert_eq!(
tool_callback_timeout(None),
std::time::Duration::from_millis(DEFAULT_TOOL_TIMEOUT_MS)
);
assert!(DEFAULT_TOOL_TIMEOUT_MS > 60_000, "default must exceed the old 60s");
}
}
}
#[cfg(test)]
mod observer_mode_tests {
use super::*;
fn journal_dir() -> PathBuf {
let target = std::env::var_os("CARGO_TARGET_DIR")
.map(std::path::PathBuf::from)
.unwrap_or_else(|| {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..")
.join("target")
});
std::fs::create_dir_all(&target).ok();
let target = std::fs::canonicalize(&target).unwrap_or(target);
let tmp = tempfile::TempDir::new_in(&target).unwrap();
let p = tmp.path().to_path_buf();
std::mem::forget(tmp); p
}
#[test]
fn supervisor_returns_observer_error_when_marker_set() {
let state = ServerState::standalone(journal_dir());
let fake_manifest = PathBuf::from("/tmp/fake-manifest-for-test.json");
state
.install_observer_manifest(fake_manifest.clone())
.expect("install_observer_manifest succeeds on fresh state");
assert_eq!(state.observer_manifest_path(), Some(&fake_manifest));
let err = state.supervisor().map(|_| ()).unwrap_err();
assert!(
err.contains("observe-only"),
"error must mention observe-only mode: {err}"
);
assert!(
err.contains("fake-manifest-for-test.json"),
"error must surface the manifest path so operators know which daemon owns it: {err}"
);
}
#[test]
fn install_observer_manifest_is_idempotent_per_path_collision() {
let state = ServerState::standalone(journal_dir());
let p = PathBuf::from("/tmp/manifest-a.json");
let q = PathBuf::from("/tmp/manifest-b.json");
state.install_observer_manifest(p.clone()).unwrap();
let err = state.install_observer_manifest(q.clone()).unwrap_err();
assert_eq!(err, q);
assert_eq!(state.observer_manifest_path(), Some(&p));
}
#[test]
fn supervisor_if_installed_does_not_lazy_init() {
let state = ServerState::standalone(journal_dir());
assert!(state.supervisor_if_installed().is_none());
assert!(state.observer_manifest_path().is_none());
}
}