use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronExecution {
pub id: UniversalUuid,
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: UniversalTimestamp,
pub claimed_at: UniversalTimestamp,
pub created_at: UniversalTimestamp,
pub updated_at: UniversalTimestamp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewCronExecution {
pub id: Option<UniversalUuid>,
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: UniversalTimestamp,
pub claimed_at: Option<UniversalTimestamp>,
pub created_at: Option<UniversalTimestamp>,
pub updated_at: Option<UniversalTimestamp>,
}
impl NewCronExecution {
pub fn new(schedule_id: UniversalUuid, scheduled_time: UniversalTimestamp) -> Self {
Self {
id: Some(UniversalUuid::new_v4()),
schedule_id,
pipeline_execution_id: None,
scheduled_time,
claimed_at: None,
created_at: None,
updated_at: None,
}
}
pub fn with_pipeline_execution(
schedule_id: UniversalUuid,
pipeline_execution_id: UniversalUuid,
scheduled_time: UniversalTimestamp,
) -> Self {
Self {
id: Some(UniversalUuid::new_v4()),
schedule_id,
pipeline_execution_id: Some(pipeline_execution_id),
scheduled_time,
claimed_at: None,
created_at: None,
updated_at: None,
}
}
pub fn with_claimed_at(
schedule_id: UniversalUuid,
pipeline_execution_id: Option<UniversalUuid>,
scheduled_time: UniversalTimestamp,
claimed_at: DateTime<Utc>,
) -> Self {
let claimed_ts = UniversalTimestamp(claimed_at);
Self {
id: Some(UniversalUuid::new_v4()),
schedule_id,
pipeline_execution_id,
scheduled_time,
claimed_at: Some(claimed_ts),
created_at: Some(claimed_ts),
updated_at: Some(claimed_ts),
}
}
}
impl CronExecution {
pub fn scheduled_time(&self) -> DateTime<Utc> {
self.scheduled_time.0
}
pub fn claimed_at(&self) -> DateTime<Utc> {
self.claimed_at.0
}
pub fn created_at(&self) -> DateTime<Utc> {
self.created_at.0
}
pub fn updated_at(&self) -> DateTime<Utc> {
self.updated_at.0
}
pub fn execution_delay(&self) -> chrono::Duration {
self.claimed_at.0 - self.scheduled_time.0
}
pub fn is_timely(&self, tolerance: chrono::Duration) -> bool {
let delay = self.execution_delay();
delay <= tolerance && delay >= chrono::Duration::zero()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::universal_types::current_timestamp;
use chrono::Duration;
#[test]
fn test_new_cron_execution() {
let schedule_id = UniversalUuid::new_v4();
let scheduled_time = current_timestamp();
let new_execution = NewCronExecution::new(schedule_id, scheduled_time);
assert_eq!(new_execution.schedule_id, schedule_id);
assert_eq!(new_execution.pipeline_execution_id, None);
assert_eq!(new_execution.scheduled_time, scheduled_time);
assert!(new_execution.claimed_at.is_none());
}
#[test]
fn test_cron_execution_delays() {
let now = Utc::now();
let scheduled_time = UniversalTimestamp(now - Duration::minutes(1));
let claimed_at = UniversalTimestamp(now);
let execution = CronExecution {
id: UniversalUuid::new_v4(),
schedule_id: UniversalUuid::new_v4(),
pipeline_execution_id: Some(UniversalUuid::new_v4()),
scheduled_time,
claimed_at,
created_at: claimed_at,
updated_at: claimed_at,
};
let delay = execution.execution_delay();
assert_eq!(delay, Duration::minutes(1));
assert!(execution.is_timely(Duration::minutes(2)));
assert!(!execution.is_timely(Duration::seconds(30)));
}
}