Skip to main content

actionqueue_engine/derive/
mod.rs

1//! Derivation logic for task run creation.
2//!
3//! This module handles deriving runs from task specifications according
4//! to their run policies.
5
6use actionqueue_core::ids::TaskId;
7use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::RunInstanceConstructionError;
9use actionqueue_core::task::run_policy::{RunPolicy, RunPolicyError};
10
11use crate::time::clock::Clock;
12
13#[cfg(feature = "workflow")]
14pub mod cron;
15pub mod once;
16pub mod repeat;
17
18/// Error returned when run derivation fails.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum DerivationError {
21    /// The run policy payload is invalid for derivation.
22    InvalidPolicy {
23        /// The typed policy validation failure.
24        source: RunPolicyError,
25    },
26    /// Arithmetic overflow would occur during run derivation.
27    /// This indicates the derivation request cannot be fulfilled with representable values.
28    ArithmeticOverflow {
29        /// The run policy that caused the overflow.
30        policy: String,
31        /// Description of the overflowed calculation.
32        operation: String,
33    },
34    /// Run construction failed because the task identifier is nil/invalid.
35    InvalidTaskIdForRunConstruction {
36        /// Rejected task identifier.
37        task_id: TaskId,
38    },
39    /// Run construction failed for a reason other than task identifier shape.
40    RunConstructionFailed {
41        /// Task identifier associated with the failed construction.
42        task_id: TaskId,
43        /// Typed construction failure returned by core run constructors.
44        source: RunInstanceConstructionError,
45    },
46}
47
48impl std::fmt::Display for DerivationError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            DerivationError::InvalidPolicy { source } => {
52                write!(f, "invalid run policy for derivation: {source}")
53            }
54            DerivationError::ArithmeticOverflow { policy, operation } => {
55                write!(f, "arithmetic overflow in {policy} derivation: {operation}")
56            }
57            DerivationError::InvalidTaskIdForRunConstruction { task_id } => {
58                write!(f, "invalid task identifier for run derivation: {task_id}")
59            }
60            DerivationError::RunConstructionFailed { task_id, source } => {
61                write!(f, "run construction failed during derivation for task {task_id}: {source}")
62            }
63        }
64    }
65}
66
67impl std::error::Error for DerivationError {}
68
69/// Maps a typed run-construction failure into a deterministic derivation error.
70pub(crate) fn map_run_construction_error(
71    task_id: TaskId,
72    source: RunInstanceConstructionError,
73) -> DerivationError {
74    match source {
75        RunInstanceConstructionError::InvalidTaskId { task_id } => {
76            DerivationError::InvalidTaskIdForRunConstruction { task_id }
77        }
78        _ => DerivationError::RunConstructionFailed { task_id, source },
79    }
80}
81
82/// Result of deriving runs for a task from its run policy.
83pub type DerivationResult = Result<DerivationSuccess, DerivationError>;
84
85/// Success result from run derivation.
86#[derive(Debug, Clone, PartialEq, Eq)]
87#[must_use]
88pub struct DerivationSuccess {
89    /// Newly derived run instances.
90    derived: Vec<RunInstance>,
91    /// Total number of runs considered derived for the task after this call.
92    already_derived: u32,
93}
94
95impl DerivationSuccess {
96    /// Creates a new derivation success result.
97    pub(crate) fn new(derived: Vec<RunInstance>, already_derived: u32) -> Self {
98        Self { derived, already_derived }
99    }
100
101    /// Returns the newly derived run instances.
102    pub fn derived(&self) -> &[RunInstance] {
103        &self.derived
104    }
105
106    /// Consumes self and returns the newly derived run instances.
107    pub fn into_derived(self) -> Vec<RunInstance> {
108        self.derived
109    }
110
111    /// Returns the total number of runs considered derived for the task after this call.
112    pub fn already_derived(&self) -> u32 {
113        self.already_derived
114    }
115}
116
117/// Pre-derivation validation for run policies.
118///
119/// This function validates that a run policy can be derived without
120/// arithmetic overflow or other impossible conditions before any
121/// derivation work is performed.
122///
123/// # Arguments
124///
125/// * `run_policy` - The run policy to validate
126/// * `already_derived` - The number of runs already derived
127/// * `schedule_origin` - The base timestamp for repeat policy scheduling
128///
129/// # Returns
130///
131/// * `Ok(())` if the policy is valid for derivation
132/// * `Err(DerivationError)` if the policy cannot be derived
133fn validate_derivation(
134    run_policy: &RunPolicy,
135    already_derived: u32,
136    schedule_origin: u64,
137) -> Result<(), DerivationError> {
138    run_policy.validate().map_err(|source| DerivationError::InvalidPolicy { source })?;
139
140    match run_policy {
141        RunPolicy::Once => {
142            // Once policy: already_derived should not exceed 1
143            if already_derived > 1 {
144                return Err(DerivationError::ArithmeticOverflow {
145                    policy: "Once".to_string(),
146                    operation: format!(
147                        "invalid state: already_derived ({already_derived}) > max (1)"
148                    ),
149                });
150            }
151            Ok(())
152        }
153        #[cfg(feature = "workflow")]
154        RunPolicy::Cron(_) => {
155            // Cron expression is validated at CronPolicy::new(); no arithmetic
156            // overflow is possible since occurrences are computed on-demand.
157            let _ = (already_derived, schedule_origin);
158            Ok(())
159        }
160        RunPolicy::Repeat(ref policy) => {
161            let count = policy.count();
162            let interval_secs = policy.interval_secs();
163            // Validate count values - count should be >= already_derived
164            if already_derived > count {
165                return Err(DerivationError::ArithmeticOverflow {
166                    policy: "Repeat".to_string(),
167                    operation: format!(
168                        "invalid state: already_derived ({already_derived}) > count ({count})"
169                    ),
170                });
171            }
172
173            // Check that count and interval don't cause overflow
174            // The maximum index we'll compute is (count - 1)
175            let max_index = count - 1;
176
177            // Convert to u64 for multiplication check
178            let max_index_u64: u64 = max_index as u64;
179            let interval_u64: u64 = interval_secs;
180
181            // Check multiplication: max_index * interval_secs
182            if max_index_u64.checked_mul(interval_u64).is_none() {
183                return Err(DerivationError::ArithmeticOverflow {
184                    policy: "Repeat".to_string(),
185                    operation: format!(
186                        "multiplication overflow: max_index {max_index_u64} * interval \
187                         {interval_u64}"
188                    ),
189                });
190            }
191
192            // Check that schedule_origin + (count - 1) * interval_secs doesn't overflow
193            // This is the maximum scheduled_at value we'll compute
194            let max_product = max_index_u64 * interval_u64;
195            if schedule_origin.checked_add(max_product).is_none() {
196                return Err(DerivationError::ArithmeticOverflow {
197                    policy: "Repeat".to_string(),
198                    operation: format!(
199                        "addition overflow: schedule_origin {schedule_origin} + max_product \
200                         {max_product}"
201                    ),
202                });
203            }
204
205            Ok(())
206        }
207    }
208}
209
210/// Derives runs from a task run policy.
211///
212/// This function is the policy-routing boundary for run derivation. Concrete
213/// policy behavior is implemented in policy-specific modules.
214///
215/// `schedule_origin` is the stable base timestamp used by repeat policies to
216/// compute deterministic scheduled times.
217pub fn derive_runs(
218    clock: &impl Clock,
219    task_id: TaskId,
220    run_policy: &RunPolicy,
221    already_derived: u32,
222    schedule_origin: u64,
223) -> DerivationResult {
224    // Validate the policy before derivation to catch impossible cases early
225    validate_derivation(run_policy, already_derived, schedule_origin)?;
226
227    match run_policy {
228        RunPolicy::Once => once::derive_once(clock, task_id, already_derived),
229        RunPolicy::Repeat(ref policy) => repeat::derive_repeat(
230            clock,
231            task_id,
232            repeat::RepeatDerivationParams::new(
233                policy.count(),
234                policy.interval_secs(),
235                already_derived,
236                schedule_origin,
237            ),
238        ),
239        #[cfg(feature = "workflow")]
240        RunPolicy::Cron(ref policy) => {
241            // `schedule_origin` serves as the "start looking from" timestamp for
242            // cron. The initial window of CRON_WINDOW_SIZE runs is derived here.
243            // Subsequent rolling-window derivation is handled by `derive_cron_runs`
244            // in the dispatch loop, which bypasses this path.
245            cron::derive_cron_initial(clock, task_id, policy, already_derived, schedule_origin)
246        }
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use actionqueue_core::ids::RunId;
253    use actionqueue_core::task::run_policy::RepeatPolicy;
254
255    use super::*;
256
257    #[test]
258    fn derive_runs_routes_once_policy() {
259        let clock = crate::time::clock::MockClock::new(1000);
260        let task_id = TaskId::new();
261
262        let result = derive_runs(&clock, task_id, &RunPolicy::Once, 0, 0)
263            .expect("Once derivation must succeed");
264
265        assert_eq!(result.derived().len(), 1);
266        assert_eq!(result.already_derived(), 1);
267        assert_eq!(result.derived()[0].scheduled_at(), 1000);
268    }
269
270    #[test]
271    fn derive_runs_routes_repeat_policy_with_stable_origin() {
272        let mut clock = crate::time::clock::MockClock::new(1000);
273        let task_id = TaskId::new();
274        let run_policy = RunPolicy::repeat(4, 60).expect("policy should be valid");
275
276        let first = derive_runs(&clock, task_id, &run_policy, 0, 900)
277            .expect("Repeat derivation must succeed for valid inputs");
278        assert_eq!(first.derived().len(), 4);
279        assert_eq!(first.derived()[0].scheduled_at(), 900);
280        assert_eq!(first.derived()[1].scheduled_at(), 960);
281        assert_eq!(first.derived()[2].scheduled_at(), 1020);
282        assert_eq!(first.derived()[3].scheduled_at(), 1080);
283
284        clock.advance_by(500);
285
286        let second = derive_runs(&clock, task_id, &run_policy, 2, 900)
287            .expect("Repeat derivation must succeed for valid inputs");
288        assert_eq!(second.derived().len(), 2);
289        assert_eq!(second.derived()[0].scheduled_at(), 1020);
290        assert_eq!(second.derived()[1].scheduled_at(), 1080);
291    }
292
293    #[test]
294    fn validate_derivation_once_rejects_invalid_already_derived() {
295        // already_derived > 1 should be rejected
296        let result = validate_derivation(&RunPolicy::Once, 2, 0);
297        assert!(result.is_err());
298    }
299
300    #[test]
301    fn validate_derivation_repeat_rejects_already_derived_exceeds_count() {
302        // already_derived > count should be rejected
303        let policy = RunPolicy::repeat(3, 60).expect("policy should be valid");
304        let result = validate_derivation(&policy, 5, 0);
305        assert!(result.is_err());
306    }
307
308    #[test]
309    fn validate_derivation_repeat_rejects_multiplication_overflow() {
310        // count * interval would overflow u64
311        let policy = RunPolicy::repeat(u32::MAX, u64::MAX).expect("policy should be valid");
312        let result = validate_derivation(&policy, 0, 0);
313        assert!(result.is_err());
314    }
315
316    #[test]
317    fn validate_derivation_repeat_rejects_addition_overflow() {
318        // schedule_origin + (count - 1) * interval_secs would overflow u64
319        let large_origin = u64::MAX - 100;
320        let policy = RunPolicy::repeat(100, u64::MAX / 50).expect("policy should be valid");
321        let result = validate_derivation(&policy, 0, large_origin);
322        assert!(result.is_err());
323    }
324
325    #[test]
326    fn validate_derivation_repeat_accepts_valid_values() {
327        // Valid values should pass validation
328        let result = validate_derivation(
329            &RunPolicy::repeat(10, 60).expect("policy should be valid"),
330            0,
331            1000,
332        );
333        assert!(result.is_ok());
334
335        // With already_derived
336        let result = validate_derivation(
337            &RunPolicy::repeat(10, 60).expect("policy should be valid"),
338            5,
339            1000,
340        );
341        assert!(result.is_ok());
342    }
343
344    #[test]
345    fn validate_derivation_repeat_rejects_zero_count_as_invalid_policy() {
346        let result = RepeatPolicy::new(0, 60);
347        assert_eq!(result, Err(RunPolicyError::InvalidRepeatCount { count: 0 }));
348    }
349
350    #[test]
351    fn validate_derivation_repeat_rejects_zero_interval_as_invalid_policy() {
352        let result = RepeatPolicy::new(3, 0);
353        assert_eq!(result, Err(RunPolicyError::InvalidRepeatIntervalSecs { interval_secs: 0 }));
354    }
355
356    #[test]
357    fn map_run_construction_error_returns_dedicated_invalid_task_id_variant() {
358        let task_id = "00000000-0000-0000-0000-000000000000"
359            .parse::<TaskId>()
360            .expect("nil task id literal must parse");
361
362        let mapped = map_run_construction_error(
363            task_id,
364            RunInstanceConstructionError::InvalidTaskId { task_id },
365        );
366
367        assert_eq!(mapped, DerivationError::InvalidTaskIdForRunConstruction { task_id });
368    }
369
370    #[test]
371    fn map_run_construction_error_preserves_non_task_id_failures() {
372        let task_id = "00000000-0000-0000-0000-000000000002"
373            .parse::<TaskId>()
374            .expect("task id literal must parse");
375        let run_id = "00000000-0000-0000-0000-000000000001"
376            .parse::<RunId>()
377            .expect("run id literal must parse");
378        let source = RunInstanceConstructionError::ReadyScheduledAtAfterCreatedAt {
379            run_id,
380            scheduled_at: 2_000,
381            created_at: 1_000,
382        };
383
384        let mapped = map_run_construction_error(task_id, source);
385
386        assert_eq!(mapped, DerivationError::RunConstructionFailed { task_id, source });
387    }
388}