Skip to main content

arcp_runtime/runtime/
job.rs

1//! Job state machine and dispatch (RFC §10).
2//!
3//! Phase 3 implements the core lifecycle: a `tool.invoke` envelope is
4//! turned into a `Job` that runs in its own tokio task with a
5//! [`CancellationToken`]. The runtime emits `job.accepted`,
6//! `job.started`, then a terminal `job.completed` / `job.failed` /
7//! `job.cancelled`.
8//!
9//! The heartbeat watchdog and hard-kill escalation that the RFC describes
10//! in §10.3 / §10.4 land in a follow-up phase; the cooperative cancel
11//! path is in place via `CancellationToken`.
12
13use dashmap::DashMap;
14use std::sync::Arc;
15
16use tokio::task::JoinHandle;
17use tokio_util::sync::CancellationToken;
18
19use arcp_core::ids::{JobId, MessageId, SessionId};
20pub use arcp_core::messages::JobState;
21use arcp_core::messages::{CredentialId, LeaseRequest};
22
23/// Per-job runtime bookkeeping.
24#[derive(Debug)]
25pub struct JobEntry {
26    /// Job identifier.
27    pub job_id: JobId,
28    /// Owning session.
29    pub session_id: SessionId,
30    /// Correlation back to the originating `tool.invoke` envelope.
31    pub correlation_id: MessageId,
32    /// Cancellation token; child of the session token.
33    pub cancel: CancellationToken,
34    /// Current state.
35    pub state: JobState,
36    /// Agent reference (`name` or `name@version`) the job is running.
37    /// For v1.0-style `tool.invoke` submissions this is the tool name.
38    pub agent: String,
39    /// Submission timestamp.
40    pub created_at: chrono::DateTime<chrono::Utc>,
41    /// Highest event sequence emitted for this job (ARCP v1.1 §6.6).
42    pub last_event_seq: u64,
43    /// Parent job id for delegated / child jobs.
44    pub parent_job_id: Option<JobId>,
45    /// Provisioned credential ids issued for this job.
46    pub credential_ids: Vec<CredentialId>,
47    /// Accepted lease constraints for this job.
48    pub lease: Option<LeaseRequest>,
49}
50
51/// Map of in-flight jobs, keyed by [`JobId`].
52///
53/// Cheap to clone; internally `Arc<DashMap<JobId, _>>`. The runtime's
54/// dispatcher stores the spawned task's `JoinHandle` here so the runtime
55/// can issue a hard kill (Phase 4+ surface) and cancellation can be
56/// driven from outside the task.
57#[derive(Clone, Default)]
58pub struct JobRegistry {
59    inner: Arc<DashMap<JobId, JobRecord>>,
60}
61
62impl std::fmt::Debug for JobRegistry {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("JobRegistry")
65            .field("len", &self.inner.len())
66            .finish()
67    }
68}
69
70struct JobRecord {
71    entry: JobEntry,
72    join: Option<JoinHandle<()>>,
73}
74
75impl std::fmt::Debug for JobRecord {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("JobRecord")
78            .field("entry", &self.entry)
79            .field(
80                "join_finished",
81                &self.join.as_ref().is_some_and(JoinHandle::is_finished),
82            )
83            .finish()
84    }
85}
86
87impl JobRegistry {
88    /// Construct empty.
89    #[must_use]
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Insert a new job; the registry takes ownership of the
95    /// [`JoinHandle`] so it can be aborted (Phase 4+).
96    pub fn insert(&self, entry: JobEntry, join: JoinHandle<()>) {
97        let id = entry.job_id.clone();
98        self.inner.insert(
99            id,
100            JobRecord {
101                entry,
102                join: Some(join),
103            },
104        );
105    }
106
107    /// Update the state for `job_id`.
108    pub fn set_state(&self, job_id: &JobId, state: JobState) {
109        if let Some(mut r) = self.inner.get_mut(job_id) {
110            r.entry.state = state;
111        }
112    }
113
114    /// Trigger cooperative cancellation for `job_id`. Returns `true` if
115    /// the job was found and the token was triggered.
116    #[must_use]
117    pub fn cancel(&self, job_id: &JobId) -> bool {
118        self.inner.get(job_id).is_some_and(|r| {
119            r.entry.cancel.cancel();
120            true
121        })
122    }
123
124    /// Number of in-flight (or recently terminal) jobs.
125    #[must_use]
126    pub fn len(&self) -> usize {
127        self.inner.len()
128    }
129
130    /// True if the registry is empty.
131    #[must_use]
132    pub fn is_empty(&self) -> bool {
133        self.inner.is_empty()
134    }
135
136    /// Drop terminal jobs from the registry. Should be called periodically
137    /// (Phase 5+) to cap memory.
138    pub fn sweep_terminals(&self) {
139        self.inner.retain(|_, r| !r.entry.state.is_terminal());
140    }
141
142    /// Snapshot of all jobs scoped to `session_id`, applying an optional
143    /// filter (ARCP v1.1 §6.6). Results are sorted by `created_at`
144    /// ascending so pagination cursors are stable.
145    #[must_use]
146    pub fn list_for_session(
147        &self,
148        session_id: &SessionId,
149        filter: Option<&arcp_core::messages::SessionListJobsFilter>,
150    ) -> Vec<arcp_core::messages::JobListEntry> {
151        let mut out: Vec<arcp_core::messages::JobListEntry> = self
152            .inner
153            .iter()
154            .filter_map(|r| {
155                let e = &r.entry;
156                if e.session_id != *session_id {
157                    return None;
158                }
159                if let Some(f) = filter {
160                    let status = e.state.wire_str();
161                    if !f.status.is_empty() && !f.status.iter().any(|s| s == status) {
162                        return None;
163                    }
164                    if let Some(agent) = f.agent.as_deref() {
165                        if e.agent != agent {
166                            return None;
167                        }
168                    }
169                    if let Some(after) = f.created_after {
170                        if e.created_at <= after {
171                            return None;
172                        }
173                    }
174                    if let Some(before) = f.created_before {
175                        if e.created_at >= before {
176                            return None;
177                        }
178                    }
179                }
180                Some(arcp_core::messages::JobListEntry {
181                    job_id: e.job_id.clone(),
182                    agent: e.agent.clone(),
183                    status: e.state.wire_str().to_owned(),
184                    parent_job_id: e.parent_job_id.clone(),
185                    created_at: e.created_at,
186                    trace_id: None,
187                    last_event_seq: e.last_event_seq,
188                })
189            })
190            .collect();
191        out.sort_by_key(|e| e.created_at);
192        out
193    }
194
195    /// Increment and return the new `last_event_seq` for `job_id`.
196    ///
197    /// Returns `None` if the job is not registered.
198    #[must_use]
199    pub fn bump_event_seq(&self, job_id: &JobId) -> Option<u64> {
200        self.inner.get_mut(job_id).map(|mut r| {
201            r.entry.last_event_seq += 1;
202            r.entry.last_event_seq
203        })
204    }
205
206    /// Record the session-scoped sequence number of the most recent
207    /// event the runtime emitted for `job_id`. Used by the connection
208    /// writer to keep `JobRegistry`'s high-water mark in lockstep with
209    /// the session sequence so `job.subscribed.subscribed_from` reflects
210    /// what the subscriber can actually ack from.
211    pub fn record_event_seq(&self, job_id: &JobId, seq: u64) {
212        if let Some(mut r) = self.inner.get_mut(job_id) {
213            if seq > r.entry.last_event_seq {
214                r.entry.last_event_seq = seq;
215            }
216        }
217    }
218
219    /// Snapshot the public-facing fields of a job, if registered.
220    ///
221    /// Used by `job.subscribe` (ARCP v1.1 §7.6) to populate the
222    /// acknowledgement.
223    #[must_use]
224    pub fn snapshot(&self, job_id: &JobId) -> Option<JobSnapshot> {
225        self.inner.get(job_id).map(|r| {
226            let e = &r.entry;
227            JobSnapshot {
228                job_id: e.job_id.clone(),
229                session_id: e.session_id.clone(),
230                state: e.state,
231                agent: e.agent.clone(),
232                parent_job_id: e.parent_job_id.clone(),
233                last_event_seq: e.last_event_seq,
234            }
235        })
236    }
237}
238
239/// Public projection of [`JobEntry`] returned by [`JobRegistry::snapshot`].
240#[derive(Debug, Clone)]
241pub struct JobSnapshot {
242    /// Job identifier.
243    pub job_id: JobId,
244    /// Originating session.
245    pub session_id: SessionId,
246    /// Current state.
247    pub state: JobState,
248    /// Agent reference (`name` or `name@version`) the job is running.
249    pub agent: String,
250    /// Parent job id for delegated / child jobs.
251    pub parent_job_id: Option<JobId>,
252    /// Highest event sequence emitted for this job.
253    pub last_event_seq: u64,
254}
255
256#[cfg(test)]
257#[allow(
258    clippy::expect_used,
259    clippy::unwrap_used,
260    clippy::panic,
261    clippy::missing_panics_doc
262)]
263mod tests {
264    use super::*;
265    use arcp_core::ids::{JobId, MessageId, SessionId};
266
267    fn make_entry(state: JobState) -> (JobEntry, tokio::task::JoinHandle<()>) {
268        let cancel = CancellationToken::new();
269        let entry = JobEntry {
270            job_id: JobId::new(),
271            session_id: SessionId::new(),
272            correlation_id: MessageId::new(),
273            cancel,
274            state,
275            agent: "test-tool".to_owned(),
276            created_at: chrono::Utc::now(),
277            last_event_seq: 0,
278            parent_job_id: None,
279            credential_ids: vec![],
280            lease: None,
281        };
282        // A no-op task so the JoinHandle is well-formed.
283        let join = tokio::spawn(async {});
284        (entry, join)
285    }
286
287    #[test]
288    fn job_state_terminals_are_classified_correctly() {
289        for s in [JobState::Completed, JobState::Failed, JobState::Cancelled] {
290            assert!(s.is_terminal(), "{s:?} should be terminal");
291        }
292        for s in [
293            JobState::Accepted,
294            JobState::Queued,
295            JobState::Running,
296            JobState::Blocked,
297            JobState::Paused,
298        ] {
299            assert!(!s.is_terminal(), "{s:?} should NOT be terminal");
300        }
301    }
302
303    #[tokio::test]
304    async fn registry_insert_and_set_state_round_trip() {
305        let reg = JobRegistry::new();
306        assert!(reg.is_empty());
307        let (entry, join) = make_entry(JobState::Accepted);
308        let id = entry.job_id.clone();
309        reg.insert(entry, join);
310        assert_eq!(reg.len(), 1);
311        reg.set_state(&id, JobState::Running);
312    }
313
314    #[tokio::test]
315    async fn cancel_returns_false_for_unknown_job() {
316        let reg = JobRegistry::new();
317        let id = JobId::new();
318        assert!(!reg.cancel(&id));
319    }
320
321    #[tokio::test]
322    async fn cancel_triggers_token_for_known_job() {
323        let reg = JobRegistry::new();
324        let (entry, join) = make_entry(JobState::Running);
325        let token = entry.cancel.clone();
326        let id = entry.job_id.clone();
327        reg.insert(entry, join);
328        assert!(reg.cancel(&id));
329        assert!(token.is_cancelled());
330    }
331
332    #[tokio::test]
333    async fn sweep_terminals_drops_only_terminal_jobs() {
334        let reg = JobRegistry::new();
335        let (running, jh1) = make_entry(JobState::Running);
336        let (done, jh2) = make_entry(JobState::Completed);
337        let running_id = running.job_id.clone();
338        let done_id = done.job_id.clone();
339        reg.insert(running, jh1);
340        reg.insert(done, jh2);
341        assert_eq!(reg.len(), 2);
342        reg.sweep_terminals();
343        assert_eq!(reg.len(), 1);
344        // Sweep is idempotent.
345        reg.sweep_terminals();
346        assert_eq!(reg.len(), 1);
347        // Running job is the survivor; cancel still finds it.
348        assert!(reg.cancel(&running_id));
349        // Terminal job was already swept.
350        assert!(!reg.cancel(&done_id));
351    }
352}