use crate::schedule::Schedule;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ScheduleHealth {
Healthy,
Warning { issues: Vec<String> },
Unhealthy { issues: Vec<String> },
}
impl ScheduleHealth {
pub fn is_healthy(&self) -> bool {
matches!(self, ScheduleHealth::Healthy)
}
pub fn has_warnings(&self) -> bool {
matches!(self, ScheduleHealth::Warning { .. })
}
pub fn is_unhealthy(&self) -> bool {
matches!(self, ScheduleHealth::Unhealthy { .. })
}
pub fn get_issues(&self) -> Vec<String> {
match self {
ScheduleHealth::Healthy => Vec::new(),
ScheduleHealth::Warning { issues } | ScheduleHealth::Unhealthy { issues } => {
issues.clone()
}
}
}
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub task_name: String,
pub health: ScheduleHealth,
pub next_run: Option<DateTime<Utc>>,
pub time_since_last_run: Option<chrono::Duration>,
}
impl HealthCheckResult {
pub fn new(task_name: String, health: ScheduleHealth) -> Self {
Self {
task_name,
health,
next_run: None,
time_since_last_run: None,
}
}
pub fn healthy(task_name: String) -> Self {
Self::new(task_name, ScheduleHealth::Healthy)
}
pub fn warning(task_name: String, issues: Vec<String>) -> Self {
Self::new(task_name, ScheduleHealth::Warning { issues })
}
pub fn unhealthy(task_name: String, issues: Vec<String>) -> Self {
Self::new(task_name, ScheduleHealth::Unhealthy { issues })
}
pub fn with_next_run(mut self, next_run: DateTime<Utc>) -> Self {
self.next_run = Some(next_run);
self
}
pub fn with_time_since_last_run(mut self, duration: chrono::Duration) -> Self {
self.time_since_last_run = Some(duration);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ExecutionResult {
Success,
Failure { error: String },
Timeout,
Interrupted,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub enum ExecutionState {
#[default]
Idle,
Running {
started_at: DateTime<Utc>,
timeout_after: Option<DateTime<Utc>>,
},
}
impl ExecutionState {
pub fn is_running(&self) -> bool {
matches!(self, ExecutionState::Running { .. })
}
pub fn is_idle(&self) -> bool {
matches!(self, ExecutionState::Idle)
}
pub fn has_timed_out(&self) -> bool {
match self {
ExecutionState::Running {
timeout_after: Some(timeout),
..
} => Utc::now() > *timeout,
_ => false,
}
}
pub fn running_duration(&self) -> Option<Duration> {
match self {
ExecutionState::Running { started_at, .. } => Some(Utc::now() - *started_at),
ExecutionState::Idle => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionRecord {
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub result: ExecutionResult,
pub duration_ms: Option<u64>,
}
impl ExecutionRecord {
pub fn new(started_at: DateTime<Utc>) -> Self {
Self {
started_at,
completed_at: None,
result: ExecutionResult::Success,
duration_ms: None,
}
}
pub fn completed(started_at: DateTime<Utc>, result: ExecutionResult) -> Self {
let now = Utc::now();
let duration_ms = now
.signed_duration_since(started_at)
.num_milliseconds()
.max(0) as u64;
Self {
started_at,
completed_at: Some(now),
result,
duration_ms: Some(duration_ms),
}
}
pub fn is_success(&self) -> bool {
matches!(self.result, ExecutionResult::Success)
}
pub fn is_failure(&self) -> bool {
matches!(self.result, ExecutionResult::Failure { .. })
}
pub fn is_timeout(&self) -> bool {
matches!(self.result, ExecutionResult::Timeout)
}
pub fn is_completed(&self) -> bool {
self.completed_at.is_some()
}
pub fn is_interrupted(&self) -> bool {
matches!(self.result, ExecutionResult::Interrupted)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum RetryPolicy {
#[default]
NoRetry,
FixedDelay {
delay_seconds: u64,
max_retries: u32,
},
ExponentialBackoff {
initial_delay_seconds: u64,
multiplier: f64,
max_delay_seconds: u64,
max_retries: u32,
},
}
impl RetryPolicy {
pub fn next_retry_delay(&self, attempt: u32) -> Option<u64> {
match self {
RetryPolicy::NoRetry => None,
RetryPolicy::FixedDelay {
delay_seconds,
max_retries,
} => {
if attempt < *max_retries {
Some(*delay_seconds)
} else {
None
}
}
RetryPolicy::ExponentialBackoff {
initial_delay_seconds,
multiplier,
max_delay_seconds,
max_retries,
} => {
if attempt < *max_retries {
let delay = (*initial_delay_seconds as f64) * multiplier.powi(attempt as i32);
let delay = delay.min(*max_delay_seconds as f64) as u64;
Some(delay)
} else {
None
}
}
}
}
pub fn should_retry(&self, attempt: u32) -> bool {
self.next_retry_delay(attempt).is_some()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum CatchupPolicy {
#[default]
Skip,
RunOnce,
RunMultiple { max_catchup: u32 },
TimeWindow { window_seconds: u64 },
}
impl CatchupPolicy {
pub fn should_catchup(
&self,
last_run_at: Option<DateTime<Utc>>,
next_scheduled_run: DateTime<Utc>,
now: DateTime<Utc>,
) -> bool {
match self {
CatchupPolicy::Skip => false,
CatchupPolicy::RunOnce => last_run_at.is_some() && now > next_scheduled_run,
CatchupPolicy::RunMultiple { .. } => last_run_at.is_some() && now > next_scheduled_run,
CatchupPolicy::TimeWindow { window_seconds } => {
if let Some(_last_run) = last_run_at {
let missed_duration = now.signed_duration_since(next_scheduled_run);
missed_duration.num_seconds() > 0
&& missed_duration.num_seconds() <= *window_seconds as i64
} else {
false
}
}
}
}
pub fn catchup_count(
&self,
last_run_at: Option<DateTime<Utc>>,
interval_seconds: u64,
now: DateTime<Utc>,
) -> u32 {
match self {
CatchupPolicy::Skip => 0,
CatchupPolicy::RunOnce => {
if last_run_at.is_some() {
1
} else {
0
}
}
CatchupPolicy::RunMultiple { max_catchup } => {
if let Some(last_run) = last_run_at {
let elapsed = now.signed_duration_since(last_run).num_seconds() as u64;
let missed_runs = elapsed / interval_seconds;
std::cmp::min(missed_runs.saturating_sub(1) as u32, *max_catchup)
} else {
0
}
}
CatchupPolicy::TimeWindow { .. } => {
if let Some(last_run) = last_run_at {
let next_run = last_run + Duration::seconds(interval_seconds as i64);
if self.should_catchup(last_run_at, next_run, now) {
1
} else {
0
}
} else {
0
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Jitter {
pub min_seconds: i64,
pub max_seconds: i64,
}
impl Jitter {
pub fn new(min_seconds: i64, max_seconds: i64) -> Self {
Self {
min_seconds,
max_seconds,
}
}
pub fn positive(max_seconds: i64) -> Self {
Self {
min_seconds: 0,
max_seconds,
}
}
pub fn symmetric(seconds: i64) -> Self {
Self {
min_seconds: -seconds,
max_seconds: seconds,
}
}
pub fn apply(&self, dt: DateTime<Utc>, task_name: &str) -> DateTime<Utc> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
task_name.hash(&mut hasher);
dt.timestamp().hash(&mut hasher);
let hash = hasher.finish();
let range = (self.max_seconds - self.min_seconds) as u64;
let offset = if range > 0 {
(hash % range) as i64 + self.min_seconds
} else {
self.min_seconds
};
dt + Duration::seconds(offset)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleVersion {
pub version: u32,
pub schedule: Schedule,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub change_reason: Option<String>,
#[serde(default)]
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter: Option<Jitter>,
#[serde(default)]
pub catchup_policy: CatchupPolicy,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum DependencyStatus {
Satisfied,
Waiting { pending: Vec<String> },
Failed { failed: Vec<String> },
}
impl DependencyStatus {
pub fn is_satisfied(&self) -> bool {
matches!(self, DependencyStatus::Satisfied)
}
pub fn has_failures(&self) -> bool {
matches!(self, DependencyStatus::Failed { .. })
}
pub fn pending_tasks(&self) -> Vec<String> {
match self {
DependencyStatus::Waiting { pending } => pending.clone(),
_ => Vec::new(),
}
}
pub fn failed_tasks(&self) -> Vec<String> {
match self {
DependencyStatus::Failed { failed } => failed.clone(),
_ => Vec::new(),
}
}
}