oxigdal_workflow/scheduler/
mod.rs1pub mod cron;
10pub mod dependency;
11pub mod event;
12pub mod interval;
13
14use crate::engine::WorkflowDefinition;
15use crate::error::{Result, WorkflowError};
16use chrono::{DateTime, Utc};
17use dashmap::DashMap;
18use serde::{Deserialize, Serialize};
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use uuid::Uuid;
22
23pub use self::cron::{CronSchedule, CronScheduler};
24pub use self::dependency::{DependencyScheduler, WorkflowDependency};
25pub use self::event::{EventScheduler, EventTrigger};
26pub use self::interval::{IntervalSchedule, IntervalScheduler};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct SchedulerConfig {
31 pub max_concurrent_executions: usize,
33 pub handle_missed_executions: bool,
35 pub max_missed_executions: usize,
37 pub execution_timeout_secs: u64,
39 pub enable_persistence: bool,
41 pub persistence_path: Option<String>,
43 pub tick_interval_ms: u64,
45 pub timezone: String,
47}
48
49impl Default for SchedulerConfig {
50 fn default() -> Self {
51 Self {
52 max_concurrent_executions: 100,
53 handle_missed_executions: true,
54 max_missed_executions: 10,
55 execution_timeout_secs: 3600,
56 enable_persistence: true,
57 persistence_path: None,
58 tick_interval_ms: 100,
59 timezone: "UTC".to_string(),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ScheduledWorkflow {
67 pub schedule_id: String,
69 pub workflow: WorkflowDefinition,
71 pub schedule_type: ScheduleType,
73 pub enabled: bool,
75 pub last_execution: Option<DateTime<Utc>>,
77 pub next_execution: Option<DateTime<Utc>>,
79 pub execution_history: Vec<ScheduleExecution>,
81 pub max_history: usize,
83 pub metadata: ScheduleMetadata,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type")]
90pub enum ScheduleType {
91 Cron {
93 expression: String,
95 },
96 Interval {
98 interval_secs: u64,
100 },
101 Event {
103 event_pattern: String,
105 },
106 Manual,
108 Dependency {
110 dependencies: Vec<String>,
112 },
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ScheduleExecution {
118 pub execution_id: String,
120 pub start_time: DateTime<Utc>,
122 pub end_time: Option<DateTime<Utc>>,
124 pub status: ExecutionStatus,
126 pub error_message: Option<String>,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132pub enum ExecutionStatus {
133 Pending,
135 Running,
137 Success,
139 Failed,
141 Cancelled,
143 TimedOut,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct ScheduleMetadata {
150 pub created_at: DateTime<Utc>,
152 pub updated_at: DateTime<Utc>,
154 pub created_by: String,
156 pub description: Option<String>,
158 pub tags: Vec<String>,
160}
161
162pub struct Scheduler {
164 config: SchedulerConfig,
165 schedules: Arc<DashMap<String, ScheduledWorkflow>>,
166 cron_scheduler: Arc<RwLock<CronScheduler>>,
167 interval_scheduler: Arc<RwLock<IntervalScheduler>>,
168 event_scheduler: Arc<RwLock<EventScheduler>>,
169 dependency_scheduler: Arc<RwLock<DependencyScheduler>>,
170 running: Arc<RwLock<bool>>,
171}
172
173impl Scheduler {
174 pub fn new(config: SchedulerConfig) -> Self {
176 Self {
177 config: config.clone(),
178 schedules: Arc::new(DashMap::new()),
179 cron_scheduler: Arc::new(RwLock::new(CronScheduler::new(config.clone()))),
180 interval_scheduler: Arc::new(RwLock::new(IntervalScheduler::new(config.clone()))),
181 event_scheduler: Arc::new(RwLock::new(EventScheduler::new(config.clone()))),
182 dependency_scheduler: Arc::new(RwLock::new(DependencyScheduler::new(config.clone()))),
183 running: Arc::new(RwLock::new(false)),
184 }
185 }
186
187 pub fn with_defaults() -> Self {
189 Self::new(SchedulerConfig::default())
190 }
191
192 pub async fn add_schedule(
194 &self,
195 workflow: WorkflowDefinition,
196 schedule_type: ScheduleType,
197 ) -> Result<String> {
198 let schedule_id = Uuid::new_v4().to_string();
199 let now = Utc::now();
200
201 let next_execution = match &schedule_type {
202 ScheduleType::Cron { expression } => {
203 let scheduler = self.cron_scheduler.write().await;
204 scheduler.calculate_next_execution(expression, now)?
205 }
206 ScheduleType::Interval { interval_secs } => Some(
207 now + chrono::Duration::try_seconds(*interval_secs as i64)
208 .ok_or_else(|| WorkflowError::scheduling("Invalid interval"))?,
209 ),
210 ScheduleType::Event { .. } | ScheduleType::Dependency { .. } => None,
211 ScheduleType::Manual => None,
212 };
213
214 let scheduled = ScheduledWorkflow {
215 schedule_id: schedule_id.clone(),
216 workflow,
217 schedule_type,
218 enabled: true,
219 last_execution: None,
220 next_execution,
221 execution_history: Vec::new(),
222 max_history: 100,
223 metadata: ScheduleMetadata {
224 created_at: now,
225 updated_at: now,
226 created_by: "system".to_string(),
227 description: None,
228 tags: Vec::new(),
229 },
230 };
231
232 self.schedules.insert(schedule_id.clone(), scheduled);
233
234 if self.config.enable_persistence {
235 self.persist_state().await?;
236 }
237
238 Ok(schedule_id)
239 }
240
241 pub async fn remove_schedule(&self, schedule_id: &str) -> Result<()> {
243 self.schedules
244 .remove(schedule_id)
245 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
246
247 if self.config.enable_persistence {
248 self.persist_state().await?;
249 }
250
251 Ok(())
252 }
253
254 pub async fn enable_schedule(&self, schedule_id: &str) -> Result<()> {
256 let mut schedule = self
257 .schedules
258 .get_mut(schedule_id)
259 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
260 schedule.enabled = true;
261 schedule.metadata.updated_at = Utc::now();
262 Ok(())
263 }
264
265 pub async fn disable_schedule(&self, schedule_id: &str) -> Result<()> {
267 let mut schedule = self
268 .schedules
269 .get_mut(schedule_id)
270 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
271 schedule.enabled = false;
272 schedule.metadata.updated_at = Utc::now();
273 Ok(())
274 }
275
276 pub async fn start(&self) -> Result<()> {
278 let mut running = self.running.write().await;
279 if *running {
280 return Err(WorkflowError::scheduling("Scheduler already running"));
281 }
282 *running = true;
283 drop(running);
284
285 let cron_scheduler = self.cron_scheduler.clone();
287 let interval_scheduler = self.interval_scheduler.clone();
288 let event_scheduler = self.event_scheduler.clone();
289 let dependency_scheduler = self.dependency_scheduler.clone();
290
291 tokio::spawn(async move {
292 let _ = cron_scheduler.write().await;
293 });
295
296 tokio::spawn(async move {
297 let _ = interval_scheduler.write().await;
298 });
300
301 tokio::spawn(async move {
302 let _ = event_scheduler.write().await;
303 });
305
306 tokio::spawn(async move {
307 let _ = dependency_scheduler.write().await;
308 });
310
311 Ok(())
312 }
313
314 pub async fn stop(&self) -> Result<()> {
316 let mut running = self.running.write().await;
317 if !*running {
318 return Err(WorkflowError::scheduling("Scheduler not running"));
319 }
320 *running = false;
321 Ok(())
322 }
323
324 pub async fn is_running(&self) -> bool {
326 *self.running.read().await
327 }
328
329 pub fn get_schedules(&self) -> Vec<ScheduledWorkflow> {
331 self.schedules
332 .iter()
333 .map(|entry| entry.value().clone())
334 .collect()
335 }
336
337 pub fn get_schedule(&self, schedule_id: &str) -> Option<ScheduledWorkflow> {
339 self.schedules.get(schedule_id).map(|entry| entry.clone())
340 }
341
342 pub async fn trigger_manual(&self, schedule_id: &str) -> Result<String> {
344 let schedule = self
345 .schedules
346 .get(schedule_id)
347 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
348
349 if !schedule.enabled {
350 return Err(WorkflowError::scheduling("Schedule is disabled"));
351 }
352
353 let execution_id = Uuid::new_v4().to_string();
354
355 let execution = ScheduleExecution {
357 execution_id: execution_id.clone(),
358 start_time: Utc::now(),
359 end_time: None,
360 status: ExecutionStatus::Pending,
361 error_message: None,
362 };
363
364 drop(schedule);
365
366 let mut schedule_mut = self
367 .schedules
368 .get_mut(schedule_id)
369 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
370 schedule_mut.execution_history.push(execution);
371 if schedule_mut.execution_history.len() > schedule_mut.max_history {
372 schedule_mut.execution_history.remove(0);
373 }
374
375 Ok(execution_id)
376 }
377
378 pub async fn update_execution_status(
380 &self,
381 schedule_id: &str,
382 execution_id: &str,
383 status: ExecutionStatus,
384 error_message: Option<String>,
385 ) -> Result<()> {
386 let mut schedule = self
387 .schedules
388 .get_mut(schedule_id)
389 .ok_or_else(|| WorkflowError::not_found(schedule_id))?;
390
391 if let Some(execution) = schedule
392 .execution_history
393 .iter_mut()
394 .find(|e| e.execution_id == execution_id)
395 {
396 execution.status = status;
397 execution.error_message = error_message;
398 if matches!(
399 status,
400 ExecutionStatus::Success
401 | ExecutionStatus::Failed
402 | ExecutionStatus::Cancelled
403 | ExecutionStatus::TimedOut
404 ) {
405 execution.end_time = Some(Utc::now());
406 }
407 }
408
409 Ok(())
410 }
411
412 async fn persist_state(&self) -> Result<()> {
414 if let Some(_path) = &self.config.persistence_path {
415 }
418 Ok(())
419 }
420
421 pub async fn load_state(&self) -> Result<()> {
423 if let Some(_path) = &self.config.persistence_path {
424 }
427 Ok(())
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::dag::WorkflowDag;
435
436 #[tokio::test]
437 async fn test_scheduler_creation() {
438 let scheduler = Scheduler::with_defaults();
439 assert!(!scheduler.is_running().await);
440 }
441
442 #[tokio::test]
443 async fn test_add_remove_schedule() {
444 let scheduler = Scheduler::with_defaults();
445 let workflow = WorkflowDefinition {
446 id: "test-workflow".to_string(),
447 name: "Test Workflow".to_string(),
448 description: None,
449 version: "1.0.0".to_string(),
450 dag: WorkflowDag::new(),
451 };
452
453 let schedule_id = scheduler
454 .add_schedule(workflow, ScheduleType::Manual)
455 .await
456 .expect("Failed to add schedule");
457
458 assert!(scheduler.get_schedule(&schedule_id).is_some());
459
460 scheduler
461 .remove_schedule(&schedule_id)
462 .await
463 .expect("Failed to remove schedule");
464
465 assert!(scheduler.get_schedule(&schedule_id).is_none());
466 }
467
468 #[tokio::test]
469 async fn test_enable_disable_schedule() {
470 let scheduler = Scheduler::with_defaults();
471 let workflow = WorkflowDefinition {
472 id: "test-workflow".to_string(),
473 name: "Test Workflow".to_string(),
474 description: None,
475 version: "1.0.0".to_string(),
476 dag: WorkflowDag::new(),
477 };
478
479 let schedule_id = scheduler
480 .add_schedule(workflow, ScheduleType::Manual)
481 .await
482 .expect("Failed to add schedule");
483
484 scheduler
485 .disable_schedule(&schedule_id)
486 .await
487 .expect("Failed to disable");
488 assert!(
489 !scheduler
490 .get_schedule(&schedule_id)
491 .is_some_and(|s| s.enabled)
492 );
493
494 scheduler
495 .enable_schedule(&schedule_id)
496 .await
497 .expect("Failed to enable");
498 assert!(
499 scheduler
500 .get_schedule(&schedule_id)
501 .is_some_and(|s| s.enabled)
502 );
503 }
504}