use serde::Serialize;
use tokio::sync::RwLock;
use std::sync::Arc;
use crate::now_ms;
#[derive(Debug, Clone, Serialize)]
pub struct ActiveRun {
#[serde(rename = "runID")]
pub run_id: String,
#[serde(rename = "startedAtMs")]
pub started_at_ms: u64,
#[serde(rename = "lastActivityAtMs")]
pub last_activity_at_ms: u64,
#[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
pub client_id: Option<String>,
#[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
#[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
pub agent_profile: Option<String>,
}
#[derive(Clone, Default)]
pub struct RunRegistry {
active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
}
impl RunRegistry {
pub fn new() -> Self {
Self::default()
}
pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
self.active.read().await.get(session_id).cloned()
}
pub async fn acquire(
&self,
session_id: &str,
run_id: String,
client_id: Option<String>,
agent_id: Option<String>,
agent_profile: Option<String>,
) -> std::result::Result<ActiveRun, ActiveRun> {
let mut guard = self.active.write().await;
if let Some(existing) = guard.get(session_id).cloned() {
return Err(existing);
}
let now = now_ms();
let run = ActiveRun {
run_id,
started_at_ms: now,
last_activity_at_ms: now,
client_id,
agent_id,
agent_profile,
};
guard.insert(session_id.to_string(), run.clone());
Ok(run)
}
pub async fn touch(&self, session_id: &str, run_id: &str) {
let mut guard = self.active.write().await;
if let Some(run) = guard.get_mut(session_id) {
if run.run_id == run_id {
run.last_activity_at_ms = now_ms();
}
}
}
pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
let mut guard = self.active.write().await;
if let Some(run) = guard.get(session_id) {
if run.run_id == run_id {
return guard.remove(session_id);
}
}
None
}
pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
self.active.write().await.remove(session_id)
}
pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
let now = now_ms();
let mut guard = self.active.write().await;
let stale_ids = guard
.iter()
.filter_map(|(session_id, run)| {
if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
Some(session_id.clone())
} else {
None
}
})
.collect::<Vec<_>>();
let mut out = Vec::with_capacity(stale_ids.len());
for session_id in stale_ids {
if let Some(run) = guard.remove(&session_id) {
out.push((session_id, run));
}
}
out
}
}