Skip to main content

tandem_server/runtime/
runs.rs

1use serde::Serialize;
2use tokio::sync::RwLock;
3
4use std::sync::Arc;
5
6use crate::now_ms;
7
8#[derive(Debug, Clone, Serialize)]
9pub struct ActiveRun {
10    #[serde(rename = "runID")]
11    pub run_id: String,
12    #[serde(rename = "startedAtMs")]
13    pub started_at_ms: u64,
14    #[serde(rename = "lastActivityAtMs")]
15    pub last_activity_at_ms: u64,
16    #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
17    pub client_id: Option<String>,
18    #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
19    pub agent_id: Option<String>,
20    #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
21    pub agent_profile: Option<String>,
22}
23
24#[derive(Clone, Default)]
25pub struct RunRegistry {
26    active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
27}
28
29impl RunRegistry {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
35        self.active.read().await.get(session_id).cloned()
36    }
37
38    pub async fn acquire(
39        &self,
40        session_id: &str,
41        run_id: String,
42        client_id: Option<String>,
43        agent_id: Option<String>,
44        agent_profile: Option<String>,
45    ) -> std::result::Result<ActiveRun, ActiveRun> {
46        let mut guard = self.active.write().await;
47        if let Some(existing) = guard.get(session_id).cloned() {
48            return Err(existing);
49        }
50        let now = now_ms();
51        let run = ActiveRun {
52            run_id,
53            started_at_ms: now,
54            last_activity_at_ms: now,
55            client_id,
56            agent_id,
57            agent_profile,
58        };
59        guard.insert(session_id.to_string(), run.clone());
60        Ok(run)
61    }
62
63    pub async fn touch(&self, session_id: &str, run_id: &str) {
64        let mut guard = self.active.write().await;
65        if let Some(run) = guard.get_mut(session_id) {
66            if run.run_id == run_id {
67                run.last_activity_at_ms = now_ms();
68            }
69        }
70    }
71
72    pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
73        let mut guard = self.active.write().await;
74        if let Some(run) = guard.get(session_id) {
75            if run.run_id == run_id {
76                return guard.remove(session_id);
77            }
78        }
79        None
80    }
81
82    pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
83        self.active.write().await.remove(session_id)
84    }
85
86    pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
87        let now = now_ms();
88        let mut guard = self.active.write().await;
89        let stale_ids = guard
90            .iter()
91            .filter_map(|(session_id, run)| {
92                if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
93                    Some(session_id.clone())
94                } else {
95                    None
96                }
97            })
98            .collect::<Vec<_>>();
99        let mut out = Vec::with_capacity(stale_ids.len());
100        for session_id in stale_ids {
101            if let Some(run) = guard.remove(&session_id) {
102                out.push((session_id, run));
103            }
104        }
105        out
106    }
107}