Skip to main content

oxigdal_workflow/scheduler/
interval.rs

1//! Interval-based workflow scheduling.
2
3use crate::error::{Result, WorkflowError};
4use crate::scheduler::SchedulerConfig;
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::time::Duration as StdDuration;
8
9/// Interval schedule definition.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct IntervalSchedule {
12    /// Interval duration in seconds.
13    pub interval_secs: u64,
14    /// Start time (defaults to now).
15    pub start_time: Option<DateTime<Utc>>,
16    /// End time (optional).
17    pub end_time: Option<DateTime<Utc>>,
18    /// Maximum number of executions (optional).
19    pub max_executions: Option<usize>,
20    /// Current execution count.
21    pub execution_count: usize,
22    /// Description of the schedule.
23    pub description: Option<String>,
24}
25
26impl IntervalSchedule {
27    /// Create a new interval schedule with the given interval in seconds.
28    pub fn new(interval_secs: u64) -> Result<Self> {
29        if interval_secs == 0 {
30            return Err(WorkflowError::invalid_parameter(
31                "interval_secs",
32                "Interval must be greater than 0",
33            ));
34        }
35
36        Ok(Self {
37            interval_secs,
38            start_time: None,
39            end_time: None,
40            max_executions: None,
41            execution_count: 0,
42            description: None,
43        })
44    }
45
46    /// Create an interval schedule from a standard duration.
47    pub fn from_duration(duration: StdDuration) -> Result<Self> {
48        let secs = duration.as_secs();
49        if secs == 0 {
50            return Err(WorkflowError::invalid_parameter(
51                "duration",
52                "Duration must be greater than 0",
53            ));
54        }
55        Self::new(secs)
56    }
57
58    /// Set the start time for this schedule.
59    pub fn with_start_time(mut self, start_time: DateTime<Utc>) -> Self {
60        self.start_time = Some(start_time);
61        self
62    }
63
64    /// Set the end time for this schedule.
65    pub fn with_end_time(mut self, end_time: DateTime<Utc>) -> Self {
66        self.end_time = Some(end_time);
67        self
68    }
69
70    /// Set the maximum number of executions.
71    pub fn with_max_executions(mut self, max: usize) -> Self {
72        self.max_executions = Some(max);
73        self
74    }
75
76    /// Set the description.
77    pub fn with_description<S: Into<String>>(mut self, description: S) -> Self {
78        self.description = Some(description.into());
79        self
80    }
81
82    /// Calculate the next execution time from the given datetime.
83    pub fn next_execution_from(&self, from: DateTime<Utc>) -> Result<Option<DateTime<Utc>>> {
84        // Check if max executions reached
85        if let Some(max) = self.max_executions {
86            if self.execution_count >= max {
87                return Ok(None);
88            }
89        }
90
91        let start = self.start_time.unwrap_or(from);
92
93        // If current time is before start time, return start time
94        if from < start {
95            return Ok(Some(start));
96        }
97
98        // Calculate next execution
99        let duration = Duration::try_seconds(self.interval_secs as i64)
100            .ok_or_else(|| WorkflowError::internal("Duration overflow"))?;
101        let next = from + duration;
102
103        // Check if next execution is beyond end time
104        if let Some(end) = self.end_time {
105            if next > end {
106                return Ok(None);
107            }
108        }
109
110        Ok(Some(next))
111    }
112
113    /// Calculate all execution times within a given range.
114    pub fn executions_in_range(
115        &self,
116        start: DateTime<Utc>,
117        end: DateTime<Utc>,
118        max_count: usize,
119    ) -> Result<Vec<DateTime<Utc>>> {
120        let mut executions = Vec::new();
121        let mut current = self.start_time.unwrap_or(start);
122        let duration = Duration::try_seconds(self.interval_secs as i64)
123            .ok_or_else(|| WorkflowError::internal("Duration overflow"))?;
124
125        while current <= end && executions.len() < max_count {
126            if current >= start {
127                executions.push(current);
128            }
129            current += duration;
130
131            // Check max executions
132            if let Some(max) = self.max_executions {
133                if executions.len() >= max {
134                    break;
135                }
136            }
137
138            // Check end time
139            if let Some(end_time) = self.end_time {
140                if current > end_time {
141                    break;
142                }
143            }
144        }
145
146        Ok(executions)
147    }
148
149    /// Check if the schedule is still active.
150    pub fn is_active(&self, now: DateTime<Utc>) -> bool {
151        // Check if before start time
152        if let Some(start) = self.start_time {
153            if now < start {
154                return false;
155            }
156        }
157
158        // Check if after end time
159        if let Some(end) = self.end_time {
160            if now > end {
161                return false;
162            }
163        }
164
165        // Check if max executions reached
166        if let Some(max) = self.max_executions {
167            if self.execution_count >= max {
168                return false;
169            }
170        }
171
172        true
173    }
174
175    /// Increment the execution count.
176    pub fn increment_execution_count(&mut self) {
177        self.execution_count += 1;
178    }
179}
180
181/// Interval scheduler for managing interval-based workflow executions.
182pub struct IntervalScheduler {
183    config: SchedulerConfig,
184}
185
186impl IntervalScheduler {
187    /// Create a new interval scheduler.
188    pub fn new(config: SchedulerConfig) -> Self {
189        Self { config }
190    }
191
192    /// Calculate the next execution time for an interval.
193    pub fn calculate_next_execution(
194        &self,
195        interval_secs: u64,
196        last_execution: Option<DateTime<Utc>>,
197    ) -> Result<DateTime<Utc>> {
198        let now = Utc::now();
199        let last = last_execution.unwrap_or(now);
200        let duration = Duration::try_seconds(interval_secs as i64)
201            .ok_or_else(|| WorkflowError::internal("Duration overflow"))?;
202        Ok(last + duration)
203    }
204
205    /// Calculate missed executions for an interval schedule.
206    pub fn calculate_missed_executions(
207        &self,
208        interval_secs: u64,
209        last_execution: DateTime<Utc>,
210        now: DateTime<Utc>,
211    ) -> Result<Vec<DateTime<Utc>>> {
212        if !self.config.handle_missed_executions {
213            return Ok(Vec::new());
214        }
215
216        let mut missed = Vec::new();
217        let duration = Duration::try_seconds(interval_secs as i64)
218            .ok_or_else(|| WorkflowError::internal("Duration overflow"))?;
219        let mut current = last_execution + duration;
220
221        while current < now && missed.len() < self.config.max_missed_executions {
222            missed.push(current);
223            current += duration;
224        }
225
226        Ok(missed)
227    }
228
229    /// Validate interval configuration.
230    pub fn validate_interval(interval_secs: u64) -> Result<()> {
231        if interval_secs == 0 {
232            return Err(WorkflowError::invalid_parameter(
233                "interval_secs",
234                "Interval must be greater than 0",
235            ));
236        }
237
238        // Reasonable maximum (1 year in seconds)
239        const MAX_INTERVAL: u64 = 365 * 24 * 60 * 60;
240        if interval_secs > MAX_INTERVAL {
241            return Err(WorkflowError::invalid_parameter(
242                "interval_secs",
243                format!(
244                    "Interval must be less than {} seconds (1 year)",
245                    MAX_INTERVAL
246                ),
247            ));
248        }
249
250        Ok(())
251    }
252}
253
254/// Common interval patterns.
255pub struct IntervalPatterns;
256
257impl IntervalPatterns {
258    /// Every 10 seconds.
259    pub fn every_10_seconds() -> u64 {
260        10
261    }
262
263    /// Every 30 seconds.
264    pub fn every_30_seconds() -> u64 {
265        30
266    }
267
268    /// Every minute.
269    pub fn every_minute() -> u64 {
270        60
271    }
272
273    /// Every 5 minutes.
274    pub fn every_5_minutes() -> u64 {
275        5 * 60
276    }
277
278    /// Every 15 minutes.
279    pub fn every_15_minutes() -> u64 {
280        15 * 60
281    }
282
283    /// Every 30 minutes.
284    pub fn every_30_minutes() -> u64 {
285        30 * 60
286    }
287
288    /// Every hour.
289    pub fn every_hour() -> u64 {
290        60 * 60
291    }
292
293    /// Every 6 hours.
294    pub fn every_6_hours() -> u64 {
295        6 * 60 * 60
296    }
297
298    /// Every 12 hours.
299    pub fn every_12_hours() -> u64 {
300        12 * 60 * 60
301    }
302
303    /// Every day.
304    pub fn every_day() -> u64 {
305        24 * 60 * 60
306    }
307
308    /// Every week.
309    pub fn every_week() -> u64 {
310        7 * 24 * 60 * 60
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    #[test]
319    fn test_interval_schedule_creation() {
320        let schedule = IntervalSchedule::new(60).expect("Failed to create schedule");
321        assert_eq!(schedule.interval_secs, 60);
322    }
323
324    #[test]
325    fn test_invalid_interval() {
326        let result = IntervalSchedule::new(0);
327        assert!(result.is_err());
328    }
329
330    #[test]
331    fn test_next_execution() {
332        let schedule = IntervalSchedule::new(60).expect("Failed to create schedule");
333        let now = Utc::now();
334        let next = schedule
335            .next_execution_from(now)
336            .expect("Failed to calculate next execution");
337        assert!(next.is_some());
338    }
339
340    #[test]
341    fn test_max_executions() {
342        let mut schedule = IntervalSchedule::new(60)
343            .expect("Failed to create schedule")
344            .with_max_executions(3);
345
346        assert!(schedule.is_active(Utc::now()));
347
348        schedule.increment_execution_count();
349        schedule.increment_execution_count();
350        schedule.increment_execution_count();
351
352        assert!(!schedule.is_active(Utc::now()));
353    }
354
355    #[test]
356    fn test_executions_in_range() {
357        let schedule = IntervalSchedule::new(3600).expect("Failed to create schedule");
358        let start = Utc::now();
359        let end = start + Duration::try_hours(5).expect("Duration overflow");
360
361        let executions = schedule
362            .executions_in_range(start, end, 10)
363            .expect("Failed to get executions");
364
365        assert!(!executions.is_empty());
366        assert!(executions.len() <= 10);
367    }
368
369    #[test]
370    fn test_interval_patterns() {
371        assert_eq!(IntervalPatterns::every_minute(), 60);
372        assert_eq!(IntervalPatterns::every_hour(), 3600);
373        assert_eq!(IntervalPatterns::every_day(), 86400);
374    }
375
376    #[test]
377    fn test_validate_interval() {
378        assert!(IntervalScheduler::validate_interval(60).is_ok());
379        assert!(IntervalScheduler::validate_interval(0).is_err());
380    }
381
382    #[test]
383    fn test_from_duration() {
384        let duration = StdDuration::from_secs(120);
385        let schedule = IntervalSchedule::from_duration(duration).expect("Failed to create");
386        assert_eq!(schedule.interval_secs, 120);
387    }
388}