Skip to main content

actionqueue_engine/derive/
repeat.rs

1//! Repeat derivation - creates multiple runs for tasks with Repeat policy.
2//!
3//! The Repeat derivation ensures that each task with a Repeat run policy has
4//! exactly N runs created according to the specified count and interval.
5//! It tracks derived runs to prevent duplicate creation.
6
7use actionqueue_core::ids::TaskId;
8use actionqueue_core::run::run_instance::RunInstance;
9use actionqueue_core::task::run_policy::RunPolicyError;
10
11use crate::derive::{map_run_construction_error, DerivationError, DerivationSuccess};
12use crate::time::clock::Clock;
13
14/// Parameters for repeat derivation that group the repeat policy fields
15/// with derivation state.
16pub struct RepeatDerivationParams {
17    /// Number of runs to derive for the repeat policy.
18    count: u32,
19    /// Interval in seconds between successive runs.
20    interval_secs: u64,
21    /// Number of runs already derived (avoids duplicates).
22    already_derived: u32,
23    /// Stable base timestamp for deterministic schedule calculation.
24    schedule_origin: u64,
25}
26
27impl RepeatDerivationParams {
28    /// Creates a new set of repeat derivation parameters.
29    pub fn new(count: u32, interval_secs: u64, already_derived: u32, schedule_origin: u64) -> Self {
30        Self { count, interval_secs, already_derived, schedule_origin }
31    }
32
33    /// Returns the number of runs to derive.
34    pub fn count(&self) -> u32 {
35        self.count
36    }
37
38    /// Returns the interval in seconds between successive runs.
39    pub fn interval_secs(&self) -> u64 {
40        self.interval_secs
41    }
42
43    /// Returns the number of runs already derived.
44    pub fn already_derived(&self) -> u32 {
45        self.already_derived
46    }
47
48    /// Returns the stable base timestamp for schedule calculation.
49    pub fn schedule_origin(&self) -> u64 {
50        self.schedule_origin
51    }
52}
53
54/// Derives runs for a Repeat policy task.
55///
56/// Creates runs up to the count if they don't exist, spaced by the interval.
57/// Returns a DerivationSuccess with newly created runs and the total policy count.
58///
59/// # Errors
60///
61/// Returns [`DerivationError::ArithmeticOverflow`] if the schedule time calculation
62/// `schedule_origin + (index * interval_secs)` would overflow `u64::MAX` for any index.
63/// This ensures strict exact-N accounting - derivation either produces exactly N runs
64/// or fails with a typed error.
65///
66/// Returns [`DerivationError::InvalidPolicy`] if `count == 0` or
67/// `interval_secs == 0`, preserving defensive contract enforcement for callers
68/// that bypass the primary run-policy boundary.
69///
70/// `schedule_origin` is the stable base timestamp for repeat scheduling. Derived
71/// run times are always `schedule_origin + index * interval_secs`, independent
72/// of the current clock time, ensuring deterministic re-derivation.
73pub fn derive_repeat(
74    clock: &impl Clock,
75    task_id: TaskId,
76    params: RepeatDerivationParams,
77) -> Result<DerivationSuccess, DerivationError> {
78    let RepeatDerivationParams { count, interval_secs, already_derived, schedule_origin } = params;
79
80    if count == 0 {
81        return Err(DerivationError::InvalidPolicy {
82            source: RunPolicyError::InvalidRepeatCount { count },
83        });
84    }
85
86    if interval_secs == 0 {
87        return Err(DerivationError::InvalidPolicy {
88            source: RunPolicyError::InvalidRepeatIntervalSecs { interval_secs },
89        });
90    }
91
92    let now = clock.now();
93    let mut derived = Vec::new();
94
95    // Derive only missing runs, with deterministic schedule positions.
96    // Use checked arithmetic to detect overflow.
97    for i in already_derived..count {
98        // Calculate: schedule_origin + (i as u64 * interval_secs)
99        // Fail with typed error if overflow would occur.
100        let idx: u64 = i as u64;
101        let product = match idx.checked_mul(interval_secs) {
102            Some(p) => p,
103            None => {
104                // Overflow in multiplication: cannot derive remaining runs safely.
105                // Return typed error instead of partial result.
106                return Err(DerivationError::ArithmeticOverflow {
107                    policy: "Repeat".to_string(),
108                    operation: format!("multiplication: index {idx} * interval {interval_secs}"),
109                });
110            }
111        };
112
113        let scheduled_at = match schedule_origin.checked_add(product) {
114            Some(sa) => sa,
115            None => {
116                // Overflow in addition: cannot derive remaining runs safely.
117                // Return typed error instead of partial result.
118                return Err(DerivationError::ArithmeticOverflow {
119                    policy: "Repeat".to_string(),
120                    operation: format!(
121                        "addition: schedule_origin {schedule_origin} + product {product}"
122                    ),
123                });
124            }
125        };
126
127        let run = RunInstance::new_scheduled(task_id, scheduled_at, now)
128            .map_err(|source| map_run_construction_error(task_id, source))?;
129
130        derived.push(run);
131    }
132
133    Ok(DerivationSuccess::new(derived, count))
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    fn params(
141        count: u32,
142        interval_secs: u64,
143        already_derived: u32,
144        schedule_origin: u64,
145    ) -> RepeatDerivationParams {
146        RepeatDerivationParams::new(count, interval_secs, already_derived, schedule_origin)
147    }
148
149    #[test]
150    fn derive_repeat_creates_correct_count() {
151        let clock = crate::time::clock::MockClock::new(1000);
152        let task_id = TaskId::new();
153
154        let result = derive_repeat(&clock, task_id, params(3, 60, 0, 1000)).unwrap();
155
156        assert_eq!(result.derived().len(), 3);
157        assert_eq!(result.already_derived(), 3);
158
159        // Check the scheduled times
160        assert_eq!(result.derived()[0].scheduled_at(), 1000); // t=1000
161        assert_eq!(result.derived()[1].scheduled_at(), 1060); // t=1000 + 60
162        assert_eq!(result.derived()[2].scheduled_at(), 1120); // t=1000 + 120
163    }
164
165    #[test]
166    fn derive_repeat_does_not_duplicate() {
167        let clock = crate::time::clock::MockClock::new(1000);
168        let task_id = TaskId::new();
169
170        // First derivation creates 5 runs
171        let result1 = derive_repeat(&clock, task_id, params(5, 60, 0, 1000)).unwrap();
172        assert_eq!(result1.derived().len(), 5);
173
174        // Second derivation with already_derived=2 should create 3 more
175        let result2 = derive_repeat(&clock, task_id, params(5, 60, 2, 1000)).unwrap();
176
177        assert_eq!(result2.derived().len(), 3);
178        assert_eq!(result2.already_derived(), 5);
179    }
180
181    #[test]
182    fn derive_repeat_rejects_zero_count() {
183        let clock = crate::time::clock::MockClock::new(1000);
184        let task_id = TaskId::new();
185
186        let result = derive_repeat(&clock, task_id, params(0, 60, 0, 1000));
187
188        assert_eq!(
189            result,
190            Err(DerivationError::InvalidPolicy {
191                source: RunPolicyError::InvalidRepeatCount { count: 0 },
192            })
193        );
194    }
195
196    #[test]
197    fn derive_repeat_rejects_zero_interval() {
198        let clock = crate::time::clock::MockClock::new(1000);
199        let task_id = TaskId::new();
200
201        let result = derive_repeat(&clock, task_id, params(3, 0, 0, 1000));
202
203        assert_eq!(
204            result,
205            Err(DerivationError::InvalidPolicy {
206                source: RunPolicyError::InvalidRepeatIntervalSecs { interval_secs: 0 },
207            })
208        );
209    }
210
211    #[test]
212    fn derive_repeat_remains_stable_when_clock_advances() {
213        let mut clock = crate::time::clock::MockClock::new(1000);
214        let task_id = TaskId::new();
215
216        let first = derive_repeat(&clock, task_id, params(4, 60, 0, 900)).unwrap();
217        assert_eq!(first.derived().len(), 4);
218        assert_eq!(first.derived()[0].scheduled_at(), 900);
219        assert_eq!(first.derived()[1].scheduled_at(), 960);
220        assert_eq!(first.derived()[2].scheduled_at(), 1020);
221        assert_eq!(first.derived()[3].scheduled_at(), 1080);
222
223        clock.advance_by(600);
224
225        let second = derive_repeat(&clock, task_id, params(4, 60, 2, 900)).unwrap();
226        assert_eq!(second.derived().len(), 2);
227        assert_eq!(second.derived()[0].scheduled_at(), 1020);
228        assert_eq!(second.derived()[1].scheduled_at(), 1080);
229    }
230
231    #[test]
232    fn derive_repeat_returns_typed_error_for_nil_task_id_without_partial_derivation() {
233        let clock = crate::time::clock::MockClock::new(1000);
234        let task_id = "00000000-0000-0000-0000-000000000000"
235            .parse::<TaskId>()
236            .expect("nil task id literal must parse");
237
238        let result = derive_repeat(&clock, task_id, params(4, 60, 0, 1000));
239
240        assert_eq!(result, Err(DerivationError::InvalidTaskIdForRunConstruction { task_id }));
241    }
242}