tandem_server/runtime/
runs.rs1use serde::Serialize;
2use tokio::sync::RwLock;
3
4use std::sync::Arc;
5
6use crate::now_ms;
7
8#[derive(Debug, Clone, Serialize)]
9pub struct ActiveRun {
10 #[serde(rename = "runID")]
11 pub run_id: String,
12 #[serde(rename = "startedAtMs")]
13 pub started_at_ms: u64,
14 #[serde(rename = "lastActivityAtMs")]
15 pub last_activity_at_ms: u64,
16 #[serde(rename = "clientID", skip_serializing_if = "Option::is_none")]
17 pub client_id: Option<String>,
18 #[serde(rename = "agentID", skip_serializing_if = "Option::is_none")]
19 pub agent_id: Option<String>,
20 #[serde(rename = "agentProfile", skip_serializing_if = "Option::is_none")]
21 pub agent_profile: Option<String>,
22}
23
24#[derive(Clone, Default)]
25pub struct RunRegistry {
26 active: Arc<RwLock<std::collections::HashMap<String, ActiveRun>>>,
27}
28
29impl RunRegistry {
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub async fn get(&self, session_id: &str) -> Option<ActiveRun> {
35 self.active.read().await.get(session_id).cloned()
36 }
37
38 pub async fn acquire(
39 &self,
40 session_id: &str,
41 run_id: String,
42 client_id: Option<String>,
43 agent_id: Option<String>,
44 agent_profile: Option<String>,
45 ) -> std::result::Result<ActiveRun, ActiveRun> {
46 let mut guard = self.active.write().await;
47 if let Some(existing) = guard.get(session_id).cloned() {
48 return Err(existing);
49 }
50 let now = now_ms();
51 let run = ActiveRun {
52 run_id,
53 started_at_ms: now,
54 last_activity_at_ms: now,
55 client_id,
56 agent_id,
57 agent_profile,
58 };
59 guard.insert(session_id.to_string(), run.clone());
60 Ok(run)
61 }
62
63 pub async fn touch(&self, session_id: &str, run_id: &str) {
64 let mut guard = self.active.write().await;
65 if let Some(run) = guard.get_mut(session_id) {
66 if run.run_id == run_id {
67 run.last_activity_at_ms = now_ms();
68 }
69 }
70 }
71
72 pub async fn finish_if_match(&self, session_id: &str, run_id: &str) -> Option<ActiveRun> {
73 let mut guard = self.active.write().await;
74 if let Some(run) = guard.get(session_id) {
75 if run.run_id == run_id {
76 return guard.remove(session_id);
77 }
78 }
79 None
80 }
81
82 pub async fn finish_active(&self, session_id: &str) -> Option<ActiveRun> {
83 self.active.write().await.remove(session_id)
84 }
85
86 pub async fn reap_stale(&self, stale_ms: u64) -> Vec<(String, ActiveRun)> {
87 let now = now_ms();
88 let mut guard = self.active.write().await;
89 let stale_ids = guard
90 .iter()
91 .filter_map(|(session_id, run)| {
92 if now.saturating_sub(run.last_activity_at_ms) > stale_ms {
93 Some(session_id.clone())
94 } else {
95 None
96 }
97 })
98 .collect::<Vec<_>>();
99 let mut out = Vec::with_capacity(stale_ids.len());
100 for session_id in stale_ids {
101 if let Some(run) = guard.remove(&session_id) {
102 out.push((session_id, run));
103 }
104 }
105 out
106 }
107}