Skip to main content

actionqueue_core/run/
run_instance.rs

1//! Run instance - a specific scheduled occurrence of a task.
2
3use crate::ids::{AttemptId, RunId, TaskId};
4use crate::run::state::RunState;
5use crate::run::transitions::is_valid_transition;
6
7/// Typed validation errors returned by [`RunInstance`] constructors.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum RunInstanceConstructionError {
10    /// The supplied [`RunId`] is nil/empty and therefore invalid.
11    InvalidRunId {
12        /// Rejected run identifier.
13        run_id: RunId,
14    },
15    /// The supplied [`TaskId`] is nil/empty and therefore invalid.
16    InvalidTaskId {
17        /// Rejected task identifier.
18        task_id: TaskId,
19    },
20    /// A `Ready` run cannot be created with a future schedule instant.
21    ReadyScheduledAtAfterCreatedAt {
22        /// Run identifier associated with the failed construction.
23        run_id: RunId,
24        /// Requested scheduled timestamp.
25        scheduled_at: u64,
26        /// Requested creation timestamp.
27        created_at: u64,
28    },
29}
30
31impl std::fmt::Display for RunInstanceConstructionError {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            Self::InvalidRunId { run_id } => {
35                write!(f, "invalid run_id for run construction: {run_id}")
36            }
37            Self::InvalidTaskId { task_id } => {
38                write!(f, "invalid task_id for run construction: {task_id}")
39            }
40            Self::ReadyScheduledAtAfterCreatedAt { run_id, scheduled_at, created_at } => {
41                write!(
42                    f,
43                    "run {run_id} cannot be created in Ready with scheduled_at ({scheduled_at}) > \
44                     created_at ({created_at})"
45                )
46            }
47        }
48    }
49}
50
51impl std::error::Error for RunInstanceConstructionError {}
52
53/// A run instance represents a specific scheduled occurrence of a task.
54///
55/// Each run is uniquely identified by a RunId and is associated with a TaskId.
56/// The lifecycle of a run progresses through canonical states according to the run policy.
57#[derive(Debug, Clone, PartialEq, Eq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize))]
59#[must_use]
60pub struct RunInstance {
61    /// Unique identifier for this run instance.
62    id: RunId,
63
64    /// The task that this run belongs to.
65    task_id: TaskId,
66
67    /// The current state of this run.
68    state: RunState,
69
70    /// The attempt that is currently active (if any).
71    /// This is set when the run transitions to Running state.
72    current_attempt_id: Option<AttemptId>,
73
74    /// The number of attempts made so far.
75    attempt_count: u32,
76
77    /// The time at which this run was created.
78    created_at: u64,
79
80    /// The time at which this run should become Ready.
81    /// For "Once" policy, this is when the run is created.
82    /// For "Repeat" policy, this is computed based on the start time and interval.
83    scheduled_at: u64,
84
85    /// The snapshot priority used for selection ordering.
86    /// This is set when the run transitions to Ready state.
87    effective_priority: i32,
88
89    /// Timestamp of the most recent state change, used by retry backoff
90    /// to compute when a run entered RetryWait. Defaults to `created_at`.
91    #[cfg_attr(feature = "serde", serde(default))]
92    last_state_change_at: u64,
93}
94
95impl RunInstance {
96    /// Creates a new run instance in Scheduled state.
97    pub fn new_scheduled(
98        task_id: TaskId,
99        scheduled_at: u64,
100        created_at: u64,
101    ) -> Result<Self, RunInstanceConstructionError> {
102        Self::new_scheduled_with_id(RunId::new(), task_id, scheduled_at, created_at)
103    }
104
105    /// Creates a new run instance in Scheduled state with an explicit identifier.
106    ///
107    /// This constructor is primarily intended for deterministic replay/bootstrap
108    /// flows that must preserve a durable [`RunId`].
109    pub fn new_scheduled_with_id(
110        id: RunId,
111        task_id: TaskId,
112        scheduled_at: u64,
113        created_at: u64,
114    ) -> Result<Self, RunInstanceConstructionError> {
115        Self::validate_identifiers(id, task_id)?;
116
117        Ok(Self {
118            id,
119            task_id,
120            state: RunState::Scheduled,
121            current_attempt_id: None,
122            attempt_count: 0,
123            created_at,
124            scheduled_at,
125            effective_priority: 0,
126            last_state_change_at: created_at,
127        })
128    }
129
130    /// Creates a new ready run with the specified effective priority.
131    pub fn new_ready(
132        task_id: TaskId,
133        scheduled_at: u64,
134        created_at: u64,
135        effective_priority: i32,
136    ) -> Result<Self, RunInstanceConstructionError> {
137        Self::new_ready_with_id(RunId::new(), task_id, scheduled_at, created_at, effective_priority)
138    }
139
140    /// Creates a new ready run with an explicit identifier and effective priority.
141    pub fn new_ready_with_id(
142        id: RunId,
143        task_id: TaskId,
144        scheduled_at: u64,
145        created_at: u64,
146        effective_priority: i32,
147    ) -> Result<Self, RunInstanceConstructionError> {
148        Self::validate_identifiers(id, task_id)?;
149
150        if scheduled_at > created_at {
151            return Err(RunInstanceConstructionError::ReadyScheduledAtAfterCreatedAt {
152                run_id: id,
153                scheduled_at,
154                created_at,
155            });
156        }
157
158        Ok(Self {
159            id,
160            task_id,
161            state: RunState::Ready,
162            current_attempt_id: None,
163            attempt_count: 0,
164            created_at,
165            scheduled_at,
166            effective_priority,
167            last_state_change_at: created_at,
168        })
169    }
170
171    /// Validates identifier-level construction invariants.
172    fn validate_identifiers(
173        id: RunId,
174        task_id: TaskId,
175    ) -> Result<(), RunInstanceConstructionError> {
176        if id.as_uuid().is_nil() {
177            return Err(RunInstanceConstructionError::InvalidRunId { run_id: id });
178        }
179
180        if task_id.as_uuid().is_nil() {
181            return Err(RunInstanceConstructionError::InvalidTaskId { task_id });
182        }
183
184        Ok(())
185    }
186
187    /// Returns true if this run is in a terminal state.
188    pub fn is_terminal(&self) -> bool {
189        self.state.is_terminal()
190    }
191
192    /// Returns this run's identifier.
193    pub fn id(&self) -> RunId {
194        self.id
195    }
196
197    /// Returns the owning task identifier for this run.
198    pub fn task_id(&self) -> TaskId {
199        self.task_id
200    }
201
202    /// Returns the current lifecycle state for this run.
203    pub fn state(&self) -> RunState {
204        self.state
205    }
206
207    /// Returns the currently active attempt identifier, if one exists.
208    pub fn current_attempt_id(&self) -> Option<AttemptId> {
209        self.current_attempt_id
210    }
211
212    /// Returns the number of started attempts for this run.
213    pub fn attempt_count(&self) -> u32 {
214        self.attempt_count
215    }
216
217    /// Returns the run creation timestamp.
218    pub fn created_at(&self) -> u64 {
219        self.created_at
220    }
221
222    /// Returns the timestamp at which this run becomes eligible for readiness.
223    pub fn scheduled_at(&self) -> u64 {
224        self.scheduled_at
225    }
226
227    /// Returns the effective priority snapshot currently associated with this run.
228    pub fn effective_priority(&self) -> i32 {
229        self.effective_priority
230    }
231
232    /// Returns the timestamp of the most recent state change.
233    pub fn last_state_change_at(&self) -> u64 {
234        self.last_state_change_at
235    }
236
237    /// Records the timestamp of a state change (called by the reducer after transitions).
238    ///
239    /// # Invariants
240    ///
241    /// In production, the supplied `timestamp` must be monotonically non-decreasing
242    /// relative to the current `last_state_change_at`. The reducer is the sole caller
243    /// and is expected to feed wall-clock timestamps from durably ordered WAL events.
244    /// Violation of this invariant will not cause incorrect behavior (the field is
245    /// simply overwritten), but it indicates a clock or sequencing anomaly that should
246    /// be investigated.
247    pub fn record_state_change_at(&mut self, timestamp: u64) {
248        self.last_state_change_at = timestamp;
249    }
250
251    /// Applies a validated lifecycle transition.
252    ///
253    /// Transition legality is enforced via the canonical transition table.
254    /// Leaving `Running` while an active attempt is still open is rejected.
255    pub fn transition_to(&mut self, new_state: RunState) -> Result<(), RunInstanceError> {
256        if !is_valid_transition(self.state, new_state) {
257            return Err(RunInstanceError::InvalidTransition {
258                run_id: self.id,
259                from: self.state,
260                to: new_state,
261            });
262        }
263
264        if self.state == RunState::Running
265            && new_state != RunState::Running
266            && new_state != RunState::Canceled
267            && self.current_attempt_id.is_some()
268        {
269            return Err(RunInstanceError::AttemptInProgress {
270                run_id: self.id,
271                active_attempt_id: self.current_attempt_id.expect("checked is_some above"),
272            });
273        }
274
275        if new_state != RunState::Running {
276            self.current_attempt_id = None;
277        }
278        self.state = new_state;
279        Ok(())
280    }
281
282    /// Promotes this run into the `Ready` state with transition validation.
283    pub fn promote_to_ready(&mut self) -> Result<(), RunInstanceError> {
284        self.transition_to(RunState::Ready)
285    }
286
287    /// Promotes this run into the `Ready` state and snapshots its effective priority.
288    pub fn promote_to_ready_with_priority(
289        &mut self,
290        effective_priority: i32,
291    ) -> Result<(), RunInstanceError> {
292        self.transition_to(RunState::Ready)?;
293        self.effective_priority = effective_priority;
294        Ok(())
295    }
296
297    /// Updates the ready-state effective priority snapshot.
298    pub fn set_effective_priority(
299        &mut self,
300        effective_priority: i32,
301    ) -> Result<(), RunInstanceError> {
302        if self.state != RunState::Ready {
303            return Err(RunInstanceError::PriorityMutationRequiresReady {
304                run_id: self.id,
305                current_state: self.state,
306            });
307        }
308
309        self.effective_priority = effective_priority;
310        Ok(())
311    }
312
313    /// Records the start of a new attempt for a run currently in `Running`.
314    pub fn start_attempt(&mut self, attempt_id: AttemptId) -> Result<(), RunInstanceError> {
315        if self.state != RunState::Running {
316            return Err(RunInstanceError::AttemptStartRequiresRunning {
317                run_id: self.id,
318                current_state: self.state,
319            });
320        }
321
322        if self.current_attempt_id.is_some() {
323            return Err(RunInstanceError::AttemptAlreadyActive {
324                run_id: self.id,
325                active_attempt_id: self.current_attempt_id.expect("checked is_some above"),
326            });
327        }
328
329        self.attempt_count = self
330            .attempt_count
331            .checked_add(1)
332            .ok_or(RunInstanceError::AttemptCountOverflow { run_id: self.id })?;
333        self.current_attempt_id = Some(attempt_id);
334        Ok(())
335    }
336
337    /// Records completion of the current active attempt.
338    pub fn finish_attempt(&mut self, attempt_id: AttemptId) -> Result<(), RunInstanceError> {
339        if self.state != RunState::Running {
340            return Err(RunInstanceError::AttemptFinishRequiresRunning {
341                run_id: self.id,
342                current_state: self.state,
343            });
344        }
345
346        match self.current_attempt_id {
347            Some(active_attempt_id) if active_attempt_id == attempt_id => {
348                self.current_attempt_id = None;
349                Ok(())
350            }
351            Some(active_attempt_id) => Err(RunInstanceError::AttemptOwnershipMismatch {
352                run_id: self.id,
353                expected_attempt_id: active_attempt_id,
354                actual_attempt_id: attempt_id,
355            }),
356            None => Err(RunInstanceError::NoActiveAttempt { run_id: self.id }),
357        }
358    }
359}
360
361/// Typed lifecycle and mutation errors for [`RunInstance`].
362#[derive(Debug, Clone, Copy, PartialEq, Eq)]
363pub enum RunInstanceError {
364    /// The requested transition is not allowed by the canonical transition table.
365    InvalidTransition {
366        /// Run identifier associated with the failed mutation.
367        run_id: RunId,
368        /// Current source state.
369        from: RunState,
370        /// Requested target state.
371        to: RunState,
372    },
373    /// Attempt-related transition blocked because an attempt is still active.
374    AttemptInProgress {
375        /// Run identifier associated with the failed mutation.
376        run_id: RunId,
377        /// Active attempt that must be finished before state exit.
378        active_attempt_id: AttemptId,
379    },
380    /// Attempt start was requested while not in the `Running` state.
381    AttemptStartRequiresRunning {
382        /// Run identifier associated with the failed mutation.
383        run_id: RunId,
384        /// Current state that rejected attempt start.
385        current_state: RunState,
386    },
387    /// Attempt finish was requested while not in the `Running` state.
388    AttemptFinishRequiresRunning {
389        /// Run identifier associated with the failed mutation.
390        run_id: RunId,
391        /// Current state that rejected attempt finish.
392        current_state: RunState,
393    },
394    /// Attempt start was requested but a different active attempt already exists.
395    AttemptAlreadyActive {
396        /// Run identifier associated with the failed mutation.
397        run_id: RunId,
398        /// Currently active attempt.
399        active_attempt_id: AttemptId,
400    },
401    /// Attempt finish did not match the currently active attempt.
402    AttemptOwnershipMismatch {
403        /// Run identifier associated with the failed mutation.
404        run_id: RunId,
405        /// Expected active attempt id.
406        expected_attempt_id: AttemptId,
407        /// Supplied attempt id.
408        actual_attempt_id: AttemptId,
409    },
410    /// Attempt finish was requested without any active attempt.
411    NoActiveAttempt {
412        /// Run identifier associated with the failed mutation.
413        run_id: RunId,
414    },
415    /// Attempt counter overflowed `u32`.
416    AttemptCountOverflow {
417        /// Run identifier associated with the failed mutation.
418        run_id: RunId,
419    },
420    /// Priority snapshots can only be mutated while the run is in `Ready`.
421    PriorityMutationRequiresReady {
422        /// Run identifier associated with the failed mutation.
423        run_id: RunId,
424        /// Current state that rejected priority mutation.
425        current_state: RunState,
426    },
427}
428
429impl std::fmt::Display for RunInstanceError {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        match self {
432            RunInstanceError::InvalidTransition { run_id, from, to } => {
433                write!(f, "invalid run transition for {run_id}: {from:?} -> {to:?}")
434            }
435            RunInstanceError::AttemptInProgress { run_id, active_attempt_id } => {
436                write!(
437                    f,
438                    "run {run_id} cannot leave Running while attempt {active_attempt_id} is active"
439                )
440            }
441            RunInstanceError::AttemptStartRequiresRunning { run_id, current_state } => {
442                write!(f, "run {run_id} cannot start attempt in state {current_state:?}")
443            }
444            RunInstanceError::AttemptFinishRequiresRunning { run_id, current_state } => {
445                write!(f, "run {run_id} cannot finish attempt in state {current_state:?}")
446            }
447            RunInstanceError::AttemptAlreadyActive { run_id, active_attempt_id } => {
448                write!(f, "run {run_id} already has active attempt {active_attempt_id}")
449            }
450            RunInstanceError::AttemptOwnershipMismatch {
451                run_id,
452                expected_attempt_id,
453                actual_attempt_id,
454            } => {
455                write!(
456                    f,
457                    "run {run_id} attempt mismatch: expected {expected_attempt_id}, got \
458                     {actual_attempt_id}"
459                )
460            }
461            RunInstanceError::NoActiveAttempt { run_id } => {
462                write!(f, "run {run_id} has no active attempt")
463            }
464            RunInstanceError::AttemptCountOverflow { run_id } => {
465                write!(f, "run {run_id} attempt counter overflow")
466            }
467            RunInstanceError::PriorityMutationRequiresReady { run_id, current_state } => {
468                write!(
469                    f,
470                    "run {run_id} priority can only be updated in Ready state (current: \
471                     {current_state:?})"
472                )
473            }
474        }
475    }
476}
477
478impl std::error::Error for RunInstanceError {}
479
480#[cfg(feature = "serde")]
481impl<'de> serde::Deserialize<'de> for RunInstance {
482    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
483    where
484        D: serde::Deserializer<'de>,
485    {
486        #[derive(serde::Deserialize)]
487        struct RunInstanceWire {
488            id: RunId,
489            task_id: TaskId,
490            state: RunState,
491            current_attempt_id: Option<AttemptId>,
492            attempt_count: u32,
493            created_at: u64,
494            scheduled_at: u64,
495            effective_priority: i32,
496            #[serde(default)]
497            last_state_change_at: u64,
498        }
499
500        let wire = RunInstanceWire::deserialize(deserializer)?;
501
502        // Validate identifiers
503        if wire.id.as_uuid().is_nil() {
504            return Err(serde::de::Error::custom("run_id must not be nil"));
505        }
506        if wire.task_id.as_uuid().is_nil() {
507            return Err(serde::de::Error::custom("task_id must not be nil"));
508        }
509
510        // Validate state/attempt consistency
511        if wire.state != RunState::Running && wire.current_attempt_id.is_some() {
512            return Err(serde::de::Error::custom(
513                "active attempt_id is only valid in Running state",
514            ));
515        }
516        if wire.state.is_terminal() && wire.current_attempt_id.is_some() {
517            return Err(serde::de::Error::custom("terminal state cannot have an active attempt"));
518        }
519
520        // Validate schedule causality for Ready state
521        if wire.state == RunState::Ready && wire.scheduled_at > wire.created_at {
522            return Err(serde::de::Error::custom(format!(
523                "Ready state requires scheduled_at ({}) <= created_at ({})",
524                wire.scheduled_at, wire.created_at,
525            )));
526        }
527
528        Ok(RunInstance {
529            id: wire.id,
530            task_id: wire.task_id,
531            state: wire.state,
532            current_attempt_id: wire.current_attempt_id,
533            attempt_count: wire.attempt_count,
534            created_at: wire.created_at,
535            scheduled_at: wire.scheduled_at,
536            effective_priority: wire.effective_priority,
537            last_state_change_at: wire.last_state_change_at,
538        })
539    }
540}
541
542impl RunInstance {
543    /// Restores attempt state during snapshot bootstrap.
544    ///
545    /// During snapshot bootstrap, runs are created via `new_scheduled_with_id`
546    /// (which initializes `attempt_count=0` and `current_attempt_id=None`) then
547    /// replayed through state transitions. However, `start_attempt()`/`finish_attempt()`
548    /// are never called during bootstrap replay. This method directly sets the
549    /// attempt count and active attempt ID from the snapshot's attempt history.
550    ///
551    /// This is a bootstrap-only method — normal operation uses `start_attempt()`
552    /// and `finish_attempt()` which enforce state-machine invariants.
553    pub fn restore_attempt_state_for_bootstrap(
554        &mut self,
555        count: u32,
556        active_attempt: Option<AttemptId>,
557    ) {
558        self.attempt_count = count;
559        self.current_attempt_id = active_attempt;
560    }
561}
562
563#[cfg(test)]
564impl RunInstance {
565    /// Sets the attempt count to an arbitrary value for testing overflow paths.
566    pub(crate) fn set_attempt_count_for_testing(&mut self, count: u32) {
567        self.attempt_count = count;
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::ids::{AttemptId, TaskId};
575
576    #[test]
577    fn start_attempt_at_u32_max_returns_overflow_error() {
578        let mut run =
579            RunInstance::new_scheduled(TaskId::new(), 1_000, 1_000).expect("valid scheduled run");
580        run.transition_to(RunState::Ready).unwrap();
581        run.transition_to(RunState::Leased).unwrap();
582        run.transition_to(RunState::Running).unwrap();
583        run.set_attempt_count_for_testing(u32::MAX);
584
585        let result = run.start_attempt(AttemptId::new());
586
587        assert!(result.is_err(), "start_attempt at u32::MAX should fail");
588        assert_eq!(
589            result.unwrap_err(),
590            RunInstanceError::AttemptCountOverflow { run_id: run.id() },
591        );
592    }
593}