Skip to main content

oxigdal_workflow/scheduler/
mod.rs

1//! Workflow scheduler for managing workflow executions.
2//!
3//! Provides multiple scheduling strategies:
4//! - Cron-based scheduling
5//! - Event-driven triggers
6//! - Interval-based scheduling
7//! - Cross-workflow dependencies
8
9pub mod cron;
10pub mod dependency;
11pub mod event;
12pub mod interval;
13
14use crate::engine::WorkflowDefinition;
15use crate::error::{Result, WorkflowError};
16use chrono::{DateTime, Utc};
17use dashmap::DashMap;
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use uuid::Uuid;
22
23pub use self::cron::{CronSchedule, CronScheduler};
24pub use self::dependency::{DependencyScheduler, WorkflowDependency};
25pub use self::event::{EventScheduler, EventTrigger};
26pub use self::interval::{IntervalSchedule, IntervalScheduler};
27
28/// Scheduler configuration.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct SchedulerConfig {
31    /// Maximum number of concurrent workflow executions.
32    pub max_concurrent_executions: usize,
33    /// Enable missed execution handling.
34    pub handle_missed_executions: bool,
35    /// Maximum number of missed executions to handle.
36    pub max_missed_executions: usize,
37    /// Execution timeout in seconds.
38    pub execution_timeout_secs: u64,
39    /// Enable scheduler state persistence.
40    pub enable_persistence: bool,
41    /// Persistence directory path.
42    pub persistence_path: Option<String>,
43    /// Scheduler tick interval in milliseconds.
44    pub tick_interval_ms: u64,
45    /// Time zone for scheduling (IANA timezone name).
46    pub timezone: String,
47}
48
49impl Default for SchedulerConfig {
50    fn default() -> Self {
51        Self {
52            max_concurrent_executions: 100,
53            handle_missed_executions: true,
54            max_missed_executions: 10,
55            execution_timeout_secs: 3600,
56            enable_persistence: true,
57            persistence_path: None,
58            tick_interval_ms: 100,
59            timezone: "UTC".to_string(),
60        }
61    }
62}
63
64/// Scheduled workflow entry.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ScheduledWorkflow {
67    /// Unique schedule ID.
68    pub schedule_id: String,
69    /// Workflow definition.
70    pub workflow: WorkflowDefinition,
71    /// Schedule type.
72    pub schedule_type: ScheduleType,
73    /// Whether the schedule is enabled.
74    pub enabled: bool,
75    /// Last execution time.
76    pub last_execution: Option<DateTime<Utc>>,
77    /// Next scheduled execution time.
78    pub next_execution: Option<DateTime<Utc>>,
79    /// Execution history (last N executions).
80    pub execution_history: Vec<ScheduleExecution>,
81    /// Maximum number of history entries to keep.
82    pub max_history: usize,
83    /// Schedule metadata.
84    pub metadata: ScheduleMetadata,
85}
86
87/// Schedule type enumeration.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type")]
90pub enum ScheduleType {
91    /// Cron-based scheduling.
92    Cron {
93        /// Cron expression.
94        expression: String,
95    },
96    /// Interval-based scheduling.
97    Interval {
98        /// Interval in seconds.
99        interval_secs: u64,
100    },
101    /// Event-driven trigger.
102    Event {
103        /// Event pattern to match.
104        event_pattern: String,
105    },
106    /// Manual trigger only.
107    Manual,
108    /// Dependency-based trigger.
109    Dependency {
110        /// Workflow dependencies.
111        dependencies: Vec<String>,
112    },
113}
114
115/// Schedule execution record.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ScheduleExecution {
118    /// Execution ID.
119    pub execution_id: String,
120    /// Execution start time.
121    pub start_time: DateTime<Utc>,
122    /// Execution end time.
123    pub end_time: Option<DateTime<Utc>>,
124    /// Execution status.
125    pub status: ExecutionStatus,
126    /// Error message if failed.
127    pub error_message: Option<String>,
128}
129
130/// Execution status.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132pub enum ExecutionStatus {
133    /// Execution is pending.
134    Pending,
135    /// Execution is running.
136    Running,
137    /// Execution completed successfully.
138    Success,
139    /// Execution failed.
140    Failed,
141    /// Execution was cancelled.
142    Cancelled,
143    /// Execution timed out.
144    TimedOut,
145}
146
147/// Schedule metadata.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct ScheduleMetadata {
150    /// Schedule creation time.
151    pub created_at: DateTime<Utc>,
152    /// Schedule last update time.
153    pub updated_at: DateTime<Utc>,
154    /// Schedule creator.
155    pub created_by: String,
156    /// Schedule description.
157    pub description: Option<String>,
158    /// Schedule tags.
159    pub tags: Vec<String>,
160}
161
162/// Main workflow scheduler.
163pub struct Scheduler {
164    config: SchedulerConfig,
165    schedules: Arc<DashMap<String, ScheduledWorkflow>>,
166    cron_scheduler: Arc<RwLock<CronScheduler>>,
167    interval_scheduler: Arc<RwLock<IntervalScheduler>>,
168    event_scheduler: Arc<RwLock<EventScheduler>>,
169    dependency_scheduler: Arc<RwLock<DependencyScheduler>>,
170    running: Arc<RwLock<bool>>,
171}
172
173impl Scheduler {
174    /// Create a new scheduler with the given configuration.
175    pub fn new(config: SchedulerConfig) -> Self {
176        Self {
177            config: config.clone(),
178            schedules: Arc::new(DashMap::new()),
179            cron_scheduler: Arc::new(RwLock::new(CronScheduler::new(config.clone()))),
180            interval_scheduler: Arc::new(RwLock::new(IntervalScheduler::new(config.clone()))),
181            event_scheduler: Arc::new(RwLock::new(EventScheduler::new(config.clone()))),
182            dependency_scheduler: Arc::new(RwLock::new(DependencyScheduler::new(config.clone()))),
183            running: Arc::new(RwLock::new(false)),
184        }
185    }
186
187    /// Create a new scheduler with default configuration.
188    pub fn with_defaults() -> Self {
189        Self::new(SchedulerConfig::default())
190    }
191
192    /// Add a scheduled workflow.
193    pub async fn add_schedule(
194        &self,
195        workflow: WorkflowDefinition,
196        schedule_type: ScheduleType,
197    ) -> Result<String> {
198        let schedule_id = Uuid::new_v4().to_string();
199        let now = Utc::now();
200
201        let next_execution = match &schedule_type {
202            ScheduleType::Cron { expression } => {
203                let scheduler = self.cron_scheduler.write().await;
204                scheduler.calculate_next_execution(expression, now)?
205            }
206            ScheduleType::Interval { interval_secs } => Some(
207                now + chrono::Duration::try_seconds(*interval_secs as i64)
208                    .ok_or_else(|| WorkflowError::scheduling("Invalid interval"))?,
209            ),
210            ScheduleType::Event { .. } | ScheduleType::Dependency { .. } => None,
211            ScheduleType::Manual => None,
212        };
213
214        let scheduled = ScheduledWorkflow {
215            schedule_id: schedule_id.clone(),
216            workflow,
217            schedule_type,
218            enabled: true,
219            last_execution: None,
220            next_execution,
221            execution_history: Vec::new(),
222            max_history: 100,
223            metadata: ScheduleMetadata {
224                created_at: now,
225                updated_at: now,
226                created_by: "system".to_string(),
227                description: None,
228                tags: Vec::new(),
229            },
230        };
231
232        self.schedules.insert(schedule_id.clone(), scheduled);
233
234        if self.config.enable_persistence {
235            self.persist_state().await?;
236        }
237
238        Ok(schedule_id)
239    }
240
241    /// Remove a scheduled workflow.
242    pub async fn remove_schedule(&self, schedule_id: &str) -> Result<()> {
243        self.schedules
244            .remove(schedule_id)
245            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
246
247        if self.config.enable_persistence {
248            self.persist_state().await?;
249        }
250
251        Ok(())
252    }
253
254    /// Enable a schedule.
255    pub async fn enable_schedule(&self, schedule_id: &str) -> Result<()> {
256        let mut schedule = self
257            .schedules
258            .get_mut(schedule_id)
259            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
260        schedule.enabled = true;
261        schedule.metadata.updated_at = Utc::now();
262        Ok(())
263    }
264
265    /// Disable a schedule.
266    pub async fn disable_schedule(&self, schedule_id: &str) -> Result<()> {
267        let mut schedule = self
268            .schedules
269            .get_mut(schedule_id)
270            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
271        schedule.enabled = false;
272        schedule.metadata.updated_at = Utc::now();
273        Ok(())
274    }
275
276    /// Start the scheduler.
277    pub async fn start(&self) -> Result<()> {
278        let mut running = self.running.write().await;
279        if *running {
280            return Err(WorkflowError::scheduling("Scheduler already running"));
281        }
282        *running = true;
283        drop(running);
284
285        // Start all sub-schedulers
286        let cron_scheduler = self.cron_scheduler.clone();
287        let interval_scheduler = self.interval_scheduler.clone();
288        let event_scheduler = self.event_scheduler.clone();
289        let dependency_scheduler = self.dependency_scheduler.clone();
290
291        tokio::spawn(async move {
292            let _ = cron_scheduler.write().await;
293            // Scheduler loop would go here
294        });
295
296        tokio::spawn(async move {
297            let _ = interval_scheduler.write().await;
298            // Scheduler loop would go here
299        });
300
301        tokio::spawn(async move {
302            let _ = event_scheduler.write().await;
303            // Scheduler loop would go here
304        });
305
306        tokio::spawn(async move {
307            let _ = dependency_scheduler.write().await;
308            // Scheduler loop would go here
309        });
310
311        Ok(())
312    }
313
314    /// Stop the scheduler.
315    pub async fn stop(&self) -> Result<()> {
316        let mut running = self.running.write().await;
317        if !*running {
318            return Err(WorkflowError::scheduling("Scheduler not running"));
319        }
320        *running = false;
321        Ok(())
322    }
323
324    /// Check if the scheduler is running.
325    pub async fn is_running(&self) -> bool {
326        *self.running.read().await
327    }
328
329    /// Get all schedules.
330    pub fn get_schedules(&self) -> Vec<ScheduledWorkflow> {
331        self.schedules
332            .iter()
333            .map(|entry| entry.value().clone())
334            .collect()
335    }
336
337    /// Get a specific schedule.
338    pub fn get_schedule(&self, schedule_id: &str) -> Option<ScheduledWorkflow> {
339        self.schedules.get(schedule_id).map(|entry| entry.clone())
340    }
341
342    /// Trigger a manual execution.
343    pub async fn trigger_manual(&self, schedule_id: &str) -> Result<String> {
344        let schedule = self
345            .schedules
346            .get(schedule_id)
347            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
348
349        if !schedule.enabled {
350            return Err(WorkflowError::scheduling("Schedule is disabled"));
351        }
352
353        let execution_id = Uuid::new_v4().to_string();
354
355        // Record execution start
356        let execution = ScheduleExecution {
357            execution_id: execution_id.clone(),
358            start_time: Utc::now(),
359            end_time: None,
360            status: ExecutionStatus::Pending,
361            error_message: None,
362        };
363
364        drop(schedule);
365
366        let mut schedule_mut = self
367            .schedules
368            .get_mut(schedule_id)
369            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
370        schedule_mut.execution_history.push(execution);
371        if schedule_mut.execution_history.len() > schedule_mut.max_history {
372            schedule_mut.execution_history.remove(0);
373        }
374
375        Ok(execution_id)
376    }
377
378    /// Update execution status.
379    pub async fn update_execution_status(
380        &self,
381        schedule_id: &str,
382        execution_id: &str,
383        status: ExecutionStatus,
384        error_message: Option<String>,
385    ) -> Result<()> {
386        let mut schedule = self
387            .schedules
388            .get_mut(schedule_id)
389            .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
390
391        if let Some(execution) = schedule
392            .execution_history
393            .iter_mut()
394            .find(|e| e.execution_id == execution_id)
395        {
396            execution.status = status;
397            execution.error_message = error_message;
398            if matches!(
399                status,
400                ExecutionStatus::Success
401                    | ExecutionStatus::Failed
402                    | ExecutionStatus::Cancelled
403                    | ExecutionStatus::TimedOut
404            ) {
405                execution.end_time = Some(Utc::now());
406            }
407        }
408
409        Ok(())
410    }
411
412    /// Persist scheduler state.
413    async fn persist_state(&self) -> Result<()> {
414        if let Some(_path) = &self.config.persistence_path {
415            // Persistence implementation would go here
416            // For now, this is a placeholder
417        }
418        Ok(())
419    }
420
421    /// Load scheduler state from persistence.
422    pub async fn load_state(&self) -> Result<()> {
423        if let Some(_path) = &self.config.persistence_path {
424            // Load implementation would go here
425            // For now, this is a placeholder
426        }
427        Ok(())
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use crate::dag::WorkflowDag;
435
436    #[tokio::test]
437    async fn test_scheduler_creation() {
438        let scheduler = Scheduler::with_defaults();
439        assert!(!scheduler.is_running().await);
440    }
441
442    #[tokio::test]
443    async fn test_add_remove_schedule() {
444        let scheduler = Scheduler::with_defaults();
445        let workflow = WorkflowDefinition {
446            id: "test-workflow".to_string(),
447            name: "Test Workflow".to_string(),
448            description: None,
449            version: "1.0.0".to_string(),
450            dag: WorkflowDag::new(),
451        };
452
453        let schedule_id = scheduler
454            .add_schedule(workflow, ScheduleType::Manual)
455            .await
456            .expect("Failed to add schedule");
457
458        assert!(scheduler.get_schedule(&schedule_id).is_some());
459
460        scheduler
461            .remove_schedule(&schedule_id)
462            .await
463            .expect("Failed to remove schedule");
464
465        assert!(scheduler.get_schedule(&schedule_id).is_none());
466    }
467
468    #[tokio::test]
469    async fn test_enable_disable_schedule() {
470        let scheduler = Scheduler::with_defaults();
471        let workflow = WorkflowDefinition {
472            id: "test-workflow".to_string(),
473            name: "Test Workflow".to_string(),
474            description: None,
475            version: "1.0.0".to_string(),
476            dag: WorkflowDag::new(),
477        };
478
479        let schedule_id = scheduler
480            .add_schedule(workflow, ScheduleType::Manual)
481            .await
482            .expect("Failed to add schedule");
483
484        scheduler
485            .disable_schedule(&schedule_id)
486            .await
487            .expect("Failed to disable");
488        assert!(
489            !scheduler
490                .get_schedule(&schedule_id)
491                .is_some_and(|s| s.enabled)
492        );
493
494        scheduler
495            .enable_schedule(&schedule_id)
496            .await
497            .expect("Failed to enable");
498        assert!(
499            scheduler
500                .get_schedule(&schedule_id)
501                .is_some_and(|s| s.enabled)
502        );
503    }
504}