use crate::database::universal_types::{UniversalBool, UniversalTimestamp, UniversalUuid};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum CatchupPolicy {
Skip,
RunAll,
}
impl From<CatchupPolicy> for String {
fn from(policy: CatchupPolicy) -> Self {
match policy {
CatchupPolicy::Skip => "skip".to_string(),
CatchupPolicy::RunAll => "run_all".to_string(),
}
}
}
impl From<String> for CatchupPolicy {
fn from(s: String) -> Self {
match s.as_str() {
"run_all" => CatchupPolicy::RunAll,
"run_once" => CatchupPolicy::Skip,
_ => CatchupPolicy::Skip,
}
}
}
impl From<&str> for CatchupPolicy {
fn from(s: &str) -> Self {
Self::from(s.to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScheduleType {
Cron,
Trigger,
}
impl From<&str> for ScheduleType {
fn from(s: &str) -> Self {
match s {
"trigger" => ScheduleType::Trigger,
_ => ScheduleType::Cron,
}
}
}
impl From<String> for ScheduleType {
fn from(s: String) -> Self {
Self::from(s.as_str())
}
}
impl std::fmt::Display for ScheduleType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ScheduleType::Cron => write!(f, "cron"),
ScheduleType::Trigger => write!(f, "trigger"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Schedule {
pub id: UniversalUuid,
pub schedule_type: String,
pub workflow_name: String,
pub enabled: UniversalBool,
pub cron_expression: Option<String>,
pub timezone: Option<String>,
pub catchup_policy: Option<String>,
pub start_date: Option<UniversalTimestamp>,
pub end_date: Option<UniversalTimestamp>,
pub trigger_name: Option<String>,
pub poll_interval_ms: Option<i32>,
pub allow_concurrent: Option<UniversalBool>,
pub next_run_at: Option<UniversalTimestamp>,
pub last_run_at: Option<UniversalTimestamp>,
pub last_poll_at: Option<UniversalTimestamp>,
pub created_at: UniversalTimestamp,
pub updated_at: UniversalTimestamp,
}
impl Schedule {
pub fn get_type(&self) -> ScheduleType {
ScheduleType::from(self.schedule_type.as_str())
}
pub fn is_cron(&self) -> bool {
self.get_type() == ScheduleType::Cron
}
pub fn is_trigger(&self) -> bool {
self.get_type() == ScheduleType::Trigger
}
pub fn is_enabled(&self) -> bool {
self.enabled.is_true()
}
pub fn poll_interval(&self) -> Option<Duration> {
self.poll_interval_ms
.map(|ms| Duration::from_millis(ms as u64))
}
pub fn allows_concurrent(&self) -> bool {
self.allow_concurrent
.as_ref()
.map(|b| b.is_true())
.unwrap_or(false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewSchedule {
pub schedule_type: String,
pub workflow_name: String,
pub enabled: Option<UniversalBool>,
pub cron_expression: Option<String>,
pub timezone: Option<String>,
pub catchup_policy: Option<String>,
pub start_date: Option<UniversalTimestamp>,
pub end_date: Option<UniversalTimestamp>,
pub trigger_name: Option<String>,
pub poll_interval_ms: Option<i32>,
pub allow_concurrent: Option<UniversalBool>,
pub next_run_at: Option<UniversalTimestamp>,
}
impl NewSchedule {
pub fn cron(
workflow_name: &str,
cron_expression: &str,
next_run_at: UniversalTimestamp,
) -> Self {
Self {
schedule_type: "cron".to_string(),
workflow_name: workflow_name.to_string(),
enabled: Some(UniversalBool::new(true)),
cron_expression: Some(cron_expression.to_string()),
timezone: Some("UTC".to_string()),
catchup_policy: Some("skip".to_string()),
start_date: None,
end_date: None,
trigger_name: None,
poll_interval_ms: None,
allow_concurrent: None,
next_run_at: Some(next_run_at),
}
}
pub fn trigger(trigger_name: &str, workflow_name: &str, poll_interval: Duration) -> Self {
Self {
schedule_type: "trigger".to_string(),
workflow_name: workflow_name.to_string(),
enabled: Some(UniversalBool::new(true)),
cron_expression: None,
timezone: None,
catchup_policy: None,
start_date: None,
end_date: None,
trigger_name: Some(trigger_name.to_string()),
poll_interval_ms: Some(poll_interval.as_millis() as i32),
allow_concurrent: Some(UniversalBool::new(false)),
next_run_at: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleExecution {
pub id: UniversalUuid,
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: Option<UniversalTimestamp>,
pub claimed_at: Option<UniversalTimestamp>,
pub context_hash: Option<String>,
pub started_at: UniversalTimestamp,
pub completed_at: Option<UniversalTimestamp>,
pub created_at: UniversalTimestamp,
pub updated_at: UniversalTimestamp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewScheduleExecution {
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: Option<UniversalTimestamp>,
pub claimed_at: Option<UniversalTimestamp>,
pub context_hash: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::universal_types::current_timestamp;
#[test]
fn test_schedule_type_conversions() {
assert_eq!(ScheduleType::from("cron"), ScheduleType::Cron);
assert_eq!(ScheduleType::from("trigger"), ScheduleType::Trigger);
assert_eq!(ScheduleType::from("unknown"), ScheduleType::Cron);
assert_eq!(ScheduleType::Cron.to_string(), "cron");
assert_eq!(ScheduleType::Trigger.to_string(), "trigger");
}
#[test]
fn test_new_cron_schedule() {
let now = current_timestamp();
let schedule = NewSchedule::cron("my_workflow", "0 2 * * *", now);
assert_eq!(schedule.schedule_type, "cron");
assert_eq!(schedule.workflow_name, "my_workflow");
assert_eq!(schedule.cron_expression.as_deref(), Some("0 2 * * *"));
assert!(schedule.trigger_name.is_none());
}
#[test]
fn test_new_trigger_schedule() {
let schedule =
NewSchedule::trigger("file_watcher", "process_files", Duration::from_secs(5));
assert_eq!(schedule.schedule_type, "trigger");
assert_eq!(schedule.workflow_name, "process_files");
assert_eq!(schedule.trigger_name.as_deref(), Some("file_watcher"));
assert_eq!(schedule.poll_interval_ms, Some(5000));
assert!(schedule.cron_expression.is_none());
}
#[test]
fn test_schedule_helpers() {
let now = current_timestamp();
let schedule = Schedule {
id: UniversalUuid::new_v4(),
schedule_type: "trigger".to_string(),
workflow_name: "test".to_string(),
enabled: UniversalBool::new(true),
cron_expression: None,
timezone: None,
catchup_policy: None,
start_date: None,
end_date: None,
trigger_name: Some("my_trigger".to_string()),
poll_interval_ms: Some(5000),
allow_concurrent: Some(UniversalBool::new(false)),
next_run_at: None,
last_run_at: None,
last_poll_at: None,
created_at: now,
updated_at: now,
};
assert!(schedule.is_trigger());
assert!(!schedule.is_cron());
assert!(schedule.is_enabled());
assert_eq!(schedule.poll_interval(), Some(Duration::from_secs(5)));
assert!(!schedule.allows_concurrent());
}
}