actionqueue_engine/derive/
mod.rs1use actionqueue_core::ids::TaskId;
7use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::RunInstanceConstructionError;
9use actionqueue_core::task::run_policy::{RunPolicy, RunPolicyError};
10
11use crate::time::clock::Clock;
12
13#[cfg(feature = "workflow")]
14pub mod cron;
15pub mod once;
16pub mod repeat;
17
18#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum DerivationError {
21 InvalidPolicy {
23 source: RunPolicyError,
25 },
26 ArithmeticOverflow {
29 policy: String,
31 operation: String,
33 },
34 InvalidTaskIdForRunConstruction {
36 task_id: TaskId,
38 },
39 RunConstructionFailed {
41 task_id: TaskId,
43 source: RunInstanceConstructionError,
45 },
46}
47
48impl std::fmt::Display for DerivationError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 DerivationError::InvalidPolicy { source } => {
52 write!(f, "invalid run policy for derivation: {source}")
53 }
54 DerivationError::ArithmeticOverflow { policy, operation } => {
55 write!(f, "arithmetic overflow in {policy} derivation: {operation}")
56 }
57 DerivationError::InvalidTaskIdForRunConstruction { task_id } => {
58 write!(f, "invalid task identifier for run derivation: {task_id}")
59 }
60 DerivationError::RunConstructionFailed { task_id, source } => {
61 write!(f, "run construction failed during derivation for task {task_id}: {source}")
62 }
63 }
64 }
65}
66
67impl std::error::Error for DerivationError {}
68
69pub(crate) fn map_run_construction_error(
71 task_id: TaskId,
72 source: RunInstanceConstructionError,
73) -> DerivationError {
74 match source {
75 RunInstanceConstructionError::InvalidTaskId { task_id } => {
76 DerivationError::InvalidTaskIdForRunConstruction { task_id }
77 }
78 _ => DerivationError::RunConstructionFailed { task_id, source },
79 }
80}
81
82pub type DerivationResult = Result<DerivationSuccess, DerivationError>;
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87#[must_use]
88pub struct DerivationSuccess {
89 derived: Vec<RunInstance>,
91 already_derived: u32,
93}
94
95impl DerivationSuccess {
96 pub(crate) fn new(derived: Vec<RunInstance>, already_derived: u32) -> Self {
98 Self { derived, already_derived }
99 }
100
101 pub fn derived(&self) -> &[RunInstance] {
103 &self.derived
104 }
105
106 pub fn into_derived(self) -> Vec<RunInstance> {
108 self.derived
109 }
110
111 pub fn already_derived(&self) -> u32 {
113 self.already_derived
114 }
115}
116
117fn validate_derivation(
134 run_policy: &RunPolicy,
135 already_derived: u32,
136 schedule_origin: u64,
137) -> Result<(), DerivationError> {
138 run_policy.validate().map_err(|source| DerivationError::InvalidPolicy { source })?;
139
140 match run_policy {
141 RunPolicy::Once => {
142 if already_derived > 1 {
144 return Err(DerivationError::ArithmeticOverflow {
145 policy: "Once".to_string(),
146 operation: format!(
147 "invalid state: already_derived ({already_derived}) > max (1)"
148 ),
149 });
150 }
151 Ok(())
152 }
153 #[cfg(feature = "workflow")]
154 RunPolicy::Cron(_) => {
155 let _ = (already_derived, schedule_origin);
158 Ok(())
159 }
160 RunPolicy::Repeat(ref policy) => {
161 let count = policy.count();
162 let interval_secs = policy.interval_secs();
163 if already_derived > count {
165 return Err(DerivationError::ArithmeticOverflow {
166 policy: "Repeat".to_string(),
167 operation: format!(
168 "invalid state: already_derived ({already_derived}) > count ({count})"
169 ),
170 });
171 }
172
173 let max_index = count - 1;
176
177 let max_index_u64: u64 = max_index as u64;
179 let interval_u64: u64 = interval_secs;
180
181 if max_index_u64.checked_mul(interval_u64).is_none() {
183 return Err(DerivationError::ArithmeticOverflow {
184 policy: "Repeat".to_string(),
185 operation: format!(
186 "multiplication overflow: max_index {max_index_u64} * interval \
187 {interval_u64}"
188 ),
189 });
190 }
191
192 let max_product = max_index_u64 * interval_u64;
195 if schedule_origin.checked_add(max_product).is_none() {
196 return Err(DerivationError::ArithmeticOverflow {
197 policy: "Repeat".to_string(),
198 operation: format!(
199 "addition overflow: schedule_origin {schedule_origin} + max_product \
200 {max_product}"
201 ),
202 });
203 }
204
205 Ok(())
206 }
207 }
208}
209
210pub fn derive_runs(
218 clock: &impl Clock,
219 task_id: TaskId,
220 run_policy: &RunPolicy,
221 already_derived: u32,
222 schedule_origin: u64,
223) -> DerivationResult {
224 validate_derivation(run_policy, already_derived, schedule_origin)?;
226
227 match run_policy {
228 RunPolicy::Once => once::derive_once(clock, task_id, already_derived),
229 RunPolicy::Repeat(ref policy) => repeat::derive_repeat(
230 clock,
231 task_id,
232 repeat::RepeatDerivationParams::new(
233 policy.count(),
234 policy.interval_secs(),
235 already_derived,
236 schedule_origin,
237 ),
238 ),
239 #[cfg(feature = "workflow")]
240 RunPolicy::Cron(ref policy) => {
241 cron::derive_cron_initial(clock, task_id, policy, already_derived, schedule_origin)
246 }
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use actionqueue_core::ids::RunId;
253 use actionqueue_core::task::run_policy::RepeatPolicy;
254
255 use super::*;
256
257 #[test]
258 fn derive_runs_routes_once_policy() {
259 let clock = crate::time::clock::MockClock::new(1000);
260 let task_id = TaskId::new();
261
262 let result = derive_runs(&clock, task_id, &RunPolicy::Once, 0, 0)
263 .expect("Once derivation must succeed");
264
265 assert_eq!(result.derived().len(), 1);
266 assert_eq!(result.already_derived(), 1);
267 assert_eq!(result.derived()[0].scheduled_at(), 1000);
268 }
269
270 #[test]
271 fn derive_runs_routes_repeat_policy_with_stable_origin() {
272 let mut clock = crate::time::clock::MockClock::new(1000);
273 let task_id = TaskId::new();
274 let run_policy = RunPolicy::repeat(4, 60).expect("policy should be valid");
275
276 let first = derive_runs(&clock, task_id, &run_policy, 0, 900)
277 .expect("Repeat derivation must succeed for valid inputs");
278 assert_eq!(first.derived().len(), 4);
279 assert_eq!(first.derived()[0].scheduled_at(), 900);
280 assert_eq!(first.derived()[1].scheduled_at(), 960);
281 assert_eq!(first.derived()[2].scheduled_at(), 1020);
282 assert_eq!(first.derived()[3].scheduled_at(), 1080);
283
284 clock.advance_by(500);
285
286 let second = derive_runs(&clock, task_id, &run_policy, 2, 900)
287 .expect("Repeat derivation must succeed for valid inputs");
288 assert_eq!(second.derived().len(), 2);
289 assert_eq!(second.derived()[0].scheduled_at(), 1020);
290 assert_eq!(second.derived()[1].scheduled_at(), 1080);
291 }
292
293 #[test]
294 fn validate_derivation_once_rejects_invalid_already_derived() {
295 let result = validate_derivation(&RunPolicy::Once, 2, 0);
297 assert!(result.is_err());
298 }
299
300 #[test]
301 fn validate_derivation_repeat_rejects_already_derived_exceeds_count() {
302 let policy = RunPolicy::repeat(3, 60).expect("policy should be valid");
304 let result = validate_derivation(&policy, 5, 0);
305 assert!(result.is_err());
306 }
307
308 #[test]
309 fn validate_derivation_repeat_rejects_multiplication_overflow() {
310 let policy = RunPolicy::repeat(u32::MAX, u64::MAX).expect("policy should be valid");
312 let result = validate_derivation(&policy, 0, 0);
313 assert!(result.is_err());
314 }
315
316 #[test]
317 fn validate_derivation_repeat_rejects_addition_overflow() {
318 let large_origin = u64::MAX - 100;
320 let policy = RunPolicy::repeat(100, u64::MAX / 50).expect("policy should be valid");
321 let result = validate_derivation(&policy, 0, large_origin);
322 assert!(result.is_err());
323 }
324
325 #[test]
326 fn validate_derivation_repeat_accepts_valid_values() {
327 let result = validate_derivation(
329 &RunPolicy::repeat(10, 60).expect("policy should be valid"),
330 0,
331 1000,
332 );
333 assert!(result.is_ok());
334
335 let result = validate_derivation(
337 &RunPolicy::repeat(10, 60).expect("policy should be valid"),
338 5,
339 1000,
340 );
341 assert!(result.is_ok());
342 }
343
344 #[test]
345 fn validate_derivation_repeat_rejects_zero_count_as_invalid_policy() {
346 let result = RepeatPolicy::new(0, 60);
347 assert_eq!(result, Err(RunPolicyError::InvalidRepeatCount { count: 0 }));
348 }
349
350 #[test]
351 fn validate_derivation_repeat_rejects_zero_interval_as_invalid_policy() {
352 let result = RepeatPolicy::new(3, 0);
353 assert_eq!(result, Err(RunPolicyError::InvalidRepeatIntervalSecs { interval_secs: 0 }));
354 }
355
356 #[test]
357 fn map_run_construction_error_returns_dedicated_invalid_task_id_variant() {
358 let task_id = "00000000-0000-0000-0000-000000000000"
359 .parse::<TaskId>()
360 .expect("nil task id literal must parse");
361
362 let mapped = map_run_construction_error(
363 task_id,
364 RunInstanceConstructionError::InvalidTaskId { task_id },
365 );
366
367 assert_eq!(mapped, DerivationError::InvalidTaskIdForRunConstruction { task_id });
368 }
369
370 #[test]
371 fn map_run_construction_error_preserves_non_task_id_failures() {
372 let task_id = "00000000-0000-0000-0000-000000000002"
373 .parse::<TaskId>()
374 .expect("task id literal must parse");
375 let run_id = "00000000-0000-0000-0000-000000000001"
376 .parse::<RunId>()
377 .expect("run id literal must parse");
378 let source = RunInstanceConstructionError::ReadyScheduledAtAfterCreatedAt {
379 run_id,
380 scheduled_at: 2_000,
381 created_at: 1_000,
382 };
383
384 let mapped = map_run_construction_error(task_id, source);
385
386 assert_eq!(mapped, DerivationError::RunConstructionFailed { task_id, source });
387 }
388}