1use dashmap::DashMap;
14use std::sync::Arc;
15
16use tokio::task::JoinHandle;
17use tokio_util::sync::CancellationToken;
18
19use arcp_core::ids::{JobId, MessageId, SessionId};
20pub use arcp_core::messages::JobState;
21use arcp_core::messages::{CredentialId, LeaseRequest};
22
23#[derive(Debug)]
25pub struct JobEntry {
26 pub job_id: JobId,
28 pub session_id: SessionId,
30 pub correlation_id: MessageId,
32 pub cancel: CancellationToken,
34 pub state: JobState,
36 pub agent: String,
39 pub created_at: chrono::DateTime<chrono::Utc>,
41 pub last_event_seq: u64,
43 pub parent_job_id: Option<JobId>,
45 pub credential_ids: Vec<CredentialId>,
47 pub lease: Option<LeaseRequest>,
49}
50
51#[derive(Clone, Default)]
58pub struct JobRegistry {
59 inner: Arc<DashMap<JobId, JobRecord>>,
60}
61
62impl std::fmt::Debug for JobRegistry {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("JobRegistry")
65 .field("len", &self.inner.len())
66 .finish()
67 }
68}
69
70struct JobRecord {
71 entry: JobEntry,
72 join: Option<JoinHandle<()>>,
73}
74
75impl std::fmt::Debug for JobRecord {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("JobRecord")
78 .field("entry", &self.entry)
79 .field(
80 "join_finished",
81 &self.join.as_ref().is_some_and(JoinHandle::is_finished),
82 )
83 .finish()
84 }
85}
86
87impl JobRegistry {
88 #[must_use]
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn insert(&self, entry: JobEntry, join: JoinHandle<()>) {
97 let id = entry.job_id.clone();
98 self.inner.insert(
99 id,
100 JobRecord {
101 entry,
102 join: Some(join),
103 },
104 );
105 }
106
107 pub fn set_state(&self, job_id: &JobId, state: JobState) {
109 if let Some(mut r) = self.inner.get_mut(job_id) {
110 r.entry.state = state;
111 }
112 }
113
114 #[must_use]
117 pub fn cancel(&self, job_id: &JobId) -> bool {
118 self.inner.get(job_id).is_some_and(|r| {
119 r.entry.cancel.cancel();
120 true
121 })
122 }
123
124 #[must_use]
126 pub fn len(&self) -> usize {
127 self.inner.len()
128 }
129
130 #[must_use]
132 pub fn is_empty(&self) -> bool {
133 self.inner.is_empty()
134 }
135
136 pub fn sweep_terminals(&self) {
139 self.inner.retain(|_, r| !r.entry.state.is_terminal());
140 }
141
142 #[must_use]
146 pub fn list_for_session(
147 &self,
148 session_id: &SessionId,
149 filter: Option<&arcp_core::messages::SessionListJobsFilter>,
150 ) -> Vec<arcp_core::messages::JobListEntry> {
151 let mut out: Vec<arcp_core::messages::JobListEntry> = self
152 .inner
153 .iter()
154 .filter_map(|r| {
155 let e = &r.entry;
156 if e.session_id != *session_id {
157 return None;
158 }
159 if let Some(f) = filter {
160 let status = e.state.wire_str();
161 if !f.status.is_empty() && !f.status.iter().any(|s| s == status) {
162 return None;
163 }
164 if let Some(agent) = f.agent.as_deref() {
165 if e.agent != agent {
166 return None;
167 }
168 }
169 if let Some(after) = f.created_after {
170 if e.created_at <= after {
171 return None;
172 }
173 }
174 if let Some(before) = f.created_before {
175 if e.created_at >= before {
176 return None;
177 }
178 }
179 }
180 Some(arcp_core::messages::JobListEntry {
181 job_id: e.job_id.clone(),
182 agent: e.agent.clone(),
183 status: e.state.wire_str().to_owned(),
184 parent_job_id: e.parent_job_id.clone(),
185 created_at: e.created_at,
186 trace_id: None,
187 last_event_seq: e.last_event_seq,
188 })
189 })
190 .collect();
191 out.sort_by_key(|e| e.created_at);
192 out
193 }
194
195 #[must_use]
199 pub fn bump_event_seq(&self, job_id: &JobId) -> Option<u64> {
200 self.inner.get_mut(job_id).map(|mut r| {
201 r.entry.last_event_seq += 1;
202 r.entry.last_event_seq
203 })
204 }
205
206 pub fn record_event_seq(&self, job_id: &JobId, seq: u64) {
212 if let Some(mut r) = self.inner.get_mut(job_id) {
213 if seq > r.entry.last_event_seq {
214 r.entry.last_event_seq = seq;
215 }
216 }
217 }
218
219 #[must_use]
224 pub fn snapshot(&self, job_id: &JobId) -> Option<JobSnapshot> {
225 self.inner.get(job_id).map(|r| {
226 let e = &r.entry;
227 JobSnapshot {
228 job_id: e.job_id.clone(),
229 session_id: e.session_id.clone(),
230 state: e.state,
231 agent: e.agent.clone(),
232 parent_job_id: e.parent_job_id.clone(),
233 last_event_seq: e.last_event_seq,
234 }
235 })
236 }
237}
238
239#[derive(Debug, Clone)]
241pub struct JobSnapshot {
242 pub job_id: JobId,
244 pub session_id: SessionId,
246 pub state: JobState,
248 pub agent: String,
250 pub parent_job_id: Option<JobId>,
252 pub last_event_seq: u64,
254}
255
256#[cfg(test)]
257#[allow(
258 clippy::expect_used,
259 clippy::unwrap_used,
260 clippy::panic,
261 clippy::missing_panics_doc
262)]
263mod tests {
264 use super::*;
265 use arcp_core::ids::{JobId, MessageId, SessionId};
266
267 fn make_entry(state: JobState) -> (JobEntry, tokio::task::JoinHandle<()>) {
268 let cancel = CancellationToken::new();
269 let entry = JobEntry {
270 job_id: JobId::new(),
271 session_id: SessionId::new(),
272 correlation_id: MessageId::new(),
273 cancel,
274 state,
275 agent: "test-tool".to_owned(),
276 created_at: chrono::Utc::now(),
277 last_event_seq: 0,
278 parent_job_id: None,
279 credential_ids: vec![],
280 lease: None,
281 };
282 let join = tokio::spawn(async {});
284 (entry, join)
285 }
286
287 #[test]
288 fn job_state_terminals_are_classified_correctly() {
289 for s in [JobState::Completed, JobState::Failed, JobState::Cancelled] {
290 assert!(s.is_terminal(), "{s:?} should be terminal");
291 }
292 for s in [
293 JobState::Accepted,
294 JobState::Queued,
295 JobState::Running,
296 JobState::Blocked,
297 JobState::Paused,
298 ] {
299 assert!(!s.is_terminal(), "{s:?} should NOT be terminal");
300 }
301 }
302
303 #[tokio::test]
304 async fn registry_insert_and_set_state_round_trip() {
305 let reg = JobRegistry::new();
306 assert!(reg.is_empty());
307 let (entry, join) = make_entry(JobState::Accepted);
308 let id = entry.job_id.clone();
309 reg.insert(entry, join);
310 assert_eq!(reg.len(), 1);
311 reg.set_state(&id, JobState::Running);
312 }
313
314 #[tokio::test]
315 async fn cancel_returns_false_for_unknown_job() {
316 let reg = JobRegistry::new();
317 let id = JobId::new();
318 assert!(!reg.cancel(&id));
319 }
320
321 #[tokio::test]
322 async fn cancel_triggers_token_for_known_job() {
323 let reg = JobRegistry::new();
324 let (entry, join) = make_entry(JobState::Running);
325 let token = entry.cancel.clone();
326 let id = entry.job_id.clone();
327 reg.insert(entry, join);
328 assert!(reg.cancel(&id));
329 assert!(token.is_cancelled());
330 }
331
332 #[tokio::test]
333 async fn sweep_terminals_drops_only_terminal_jobs() {
334 let reg = JobRegistry::new();
335 let (running, jh1) = make_entry(JobState::Running);
336 let (done, jh2) = make_entry(JobState::Completed);
337 let running_id = running.job_id.clone();
338 let done_id = done.job_id.clone();
339 reg.insert(running, jh1);
340 reg.insert(done, jh2);
341 assert_eq!(reg.len(), 2);
342 reg.sweep_terminals();
343 assert_eq!(reg.len(), 1);
344 reg.sweep_terminals();
346 assert_eq!(reg.len(), 1);
347 assert!(reg.cancel(&running_id));
349 assert!(!reg.cancel(&done_id));
351 }
352}