use dashmap::DashMap;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use arcp_core::ids::{JobId, MessageId, SessionId};
pub use arcp_core::messages::JobState;
use arcp_core::messages::{CredentialId, LeaseRequest};
#[derive(Debug)]
pub struct JobEntry {
pub job_id: JobId,
pub session_id: SessionId,
pub correlation_id: MessageId,
pub cancel: CancellationToken,
pub state: JobState,
pub agent: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub last_event_seq: u64,
pub parent_job_id: Option<JobId>,
pub credential_ids: Vec<CredentialId>,
pub lease: Option<LeaseRequest>,
}
#[derive(Clone, Default)]
pub struct JobRegistry {
inner: Arc<DashMap<JobId, JobRecord>>,
}
impl std::fmt::Debug for JobRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobRegistry")
.field("len", &self.inner.len())
.finish()
}
}
struct JobRecord {
entry: JobEntry,
join: Option<JoinHandle<()>>,
}
impl std::fmt::Debug for JobRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobRecord")
.field("entry", &self.entry)
.field(
"join_finished",
&self.join.as_ref().is_some_and(JoinHandle::is_finished),
)
.finish()
}
}
impl JobRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn insert(&self, entry: JobEntry, join: JoinHandle<()>) {
let id = entry.job_id.clone();
self.inner.insert(
id,
JobRecord {
entry,
join: Some(join),
},
);
}
pub fn set_state(&self, job_id: &JobId, state: JobState) {
if let Some(mut r) = self.inner.get_mut(job_id) {
r.entry.state = state;
}
}
#[must_use]
pub fn cancel(&self, job_id: &JobId) -> bool {
self.inner.get(job_id).is_some_and(|r| {
r.entry.cancel.cancel();
true
})
}
#[must_use]
pub fn len(&self) -> usize {
self.inner.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn sweep_terminals(&self) {
self.inner.retain(|_, r| !r.entry.state.is_terminal());
}
#[must_use]
pub fn list_for_session(
&self,
session_id: &SessionId,
filter: Option<&arcp_core::messages::SessionListJobsFilter>,
) -> Vec<arcp_core::messages::JobListEntry> {
let mut out: Vec<arcp_core::messages::JobListEntry> = self
.inner
.iter()
.filter_map(|r| {
let e = &r.entry;
if e.session_id != *session_id {
return None;
}
if let Some(f) = filter {
let status = e.state.wire_str();
if !f.status.is_empty() && !f.status.iter().any(|s| s == status) {
return None;
}
if let Some(agent) = f.agent.as_deref() {
if e.agent != agent {
return None;
}
}
if let Some(after) = f.created_after {
if e.created_at <= after {
return None;
}
}
if let Some(before) = f.created_before {
if e.created_at >= before {
return None;
}
}
}
Some(arcp_core::messages::JobListEntry {
job_id: e.job_id.clone(),
agent: e.agent.clone(),
status: e.state.wire_str().to_owned(),
parent_job_id: e.parent_job_id.clone(),
created_at: e.created_at,
trace_id: None,
last_event_seq: e.last_event_seq,
})
})
.collect();
out.sort_by_key(|e| e.created_at);
out
}
#[must_use]
pub fn bump_event_seq(&self, job_id: &JobId) -> Option<u64> {
self.inner.get_mut(job_id).map(|mut r| {
r.entry.last_event_seq += 1;
r.entry.last_event_seq
})
}
pub fn record_event_seq(&self, job_id: &JobId, seq: u64) {
if let Some(mut r) = self.inner.get_mut(job_id) {
if seq > r.entry.last_event_seq {
r.entry.last_event_seq = seq;
}
}
}
#[must_use]
pub fn snapshot(&self, job_id: &JobId) -> Option<JobSnapshot> {
self.inner.get(job_id).map(|r| {
let e = &r.entry;
JobSnapshot {
job_id: e.job_id.clone(),
session_id: e.session_id.clone(),
state: e.state,
agent: e.agent.clone(),
parent_job_id: e.parent_job_id.clone(),
last_event_seq: e.last_event_seq,
}
})
}
}
#[derive(Debug, Clone)]
pub struct JobSnapshot {
pub job_id: JobId,
pub session_id: SessionId,
pub state: JobState,
pub agent: String,
pub parent_job_id: Option<JobId>,
pub last_event_seq: u64,
}
#[cfg(test)]
#[allow(
clippy::expect_used,
clippy::unwrap_used,
clippy::panic,
clippy::missing_panics_doc
)]
mod tests {
use super::*;
use arcp_core::ids::{JobId, MessageId, SessionId};
fn make_entry(state: JobState) -> (JobEntry, tokio::task::JoinHandle<()>) {
let cancel = CancellationToken::new();
let entry = JobEntry {
job_id: JobId::new(),
session_id: SessionId::new(),
correlation_id: MessageId::new(),
cancel,
state,
agent: "test-tool".to_owned(),
created_at: chrono::Utc::now(),
last_event_seq: 0,
parent_job_id: None,
credential_ids: vec![],
lease: None,
};
let join = tokio::spawn(async {});
(entry, join)
}
#[test]
fn job_state_terminals_are_classified_correctly() {
for s in [JobState::Completed, JobState::Failed, JobState::Cancelled] {
assert!(s.is_terminal(), "{s:?} should be terminal");
}
for s in [
JobState::Accepted,
JobState::Queued,
JobState::Running,
JobState::Blocked,
JobState::Paused,
] {
assert!(!s.is_terminal(), "{s:?} should NOT be terminal");
}
}
#[tokio::test]
async fn registry_insert_and_set_state_round_trip() {
let reg = JobRegistry::new();
assert!(reg.is_empty());
let (entry, join) = make_entry(JobState::Accepted);
let id = entry.job_id.clone();
reg.insert(entry, join);
assert_eq!(reg.len(), 1);
reg.set_state(&id, JobState::Running);
}
#[tokio::test]
async fn cancel_returns_false_for_unknown_job() {
let reg = JobRegistry::new();
let id = JobId::new();
assert!(!reg.cancel(&id));
}
#[tokio::test]
async fn cancel_triggers_token_for_known_job() {
let reg = JobRegistry::new();
let (entry, join) = make_entry(JobState::Running);
let token = entry.cancel.clone();
let id = entry.job_id.clone();
reg.insert(entry, join);
assert!(reg.cancel(&id));
assert!(token.is_cancelled());
}
#[tokio::test]
async fn sweep_terminals_drops_only_terminal_jobs() {
let reg = JobRegistry::new();
let (running, jh1) = make_entry(JobState::Running);
let (done, jh2) = make_entry(JobState::Completed);
let running_id = running.job_id.clone();
let done_id = done.job_id.clone();
reg.insert(running, jh1);
reg.insert(done, jh2);
assert_eq!(reg.len(), 2);
reg.sweep_terminals();
assert_eq!(reg.len(), 1);
reg.sweep_terminals();
assert_eq!(reg.len(), 1);
assert!(reg.cancel(&running_id));
assert!(!reg.cancel(&done_id));
}
}