use chrono::Datelike;
use chrono::{DateTime, Duration, Timelike, Utc};
#[cfg(feature = "cron")]
use chrono::{Offset, TimeZone};
#[cfg(feature = "cron")]
use chrono_tz::Tz;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
pub type FailureCallback = Arc<dyn Fn(&str, &str) + Send + Sync>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Schedule {
Interval {
every: u64,
},
#[cfg(feature = "cron")]
Crontab {
minute: String,
hour: String,
day_of_week: String,
day_of_month: String,
month_of_year: String,
#[serde(default)]
timezone: Option<String>,
},
#[cfg(feature = "solar")]
Solar {
event: String,
latitude: f64,
longitude: f64,
},
OneTime {
run_at: DateTime<Utc>,
},
}
impl Schedule {
pub fn interval(seconds: u64) -> Self {
Self::Interval { every: seconds }
}
#[cfg(feature = "cron")]
pub fn crontab(
minute: &str,
hour: &str,
day_of_week: &str,
day_of_month: &str,
month_of_year: &str,
) -> Self {
Self::Crontab {
minute: minute.to_string(),
hour: hour.to_string(),
day_of_week: day_of_week.to_string(),
day_of_month: day_of_month.to_string(),
month_of_year: month_of_year.to_string(),
timezone: None,
}
}
#[cfg(feature = "cron")]
pub fn crontab_tz(
minute: &str,
hour: &str,
day_of_week: &str,
day_of_month: &str,
month_of_year: &str,
timezone: &str,
) -> Self {
Self::Crontab {
minute: minute.to_string(),
hour: hour.to_string(),
day_of_week: day_of_week.to_string(),
day_of_month: day_of_month.to_string(),
month_of_year: month_of_year.to_string(),
timezone: Some(timezone.to_string()),
}
}
#[cfg(feature = "solar")]
pub fn solar(event: &str, latitude: f64, longitude: f64) -> Self {
Self::Solar {
event: event.to_string(),
latitude,
longitude,
}
}
pub fn onetime(run_at: DateTime<Utc>) -> Self {
Self::OneTime { run_at }
}
pub fn next_run(
&self,
last_run: Option<DateTime<Utc>>,
) -> Result<DateTime<Utc>, ScheduleError> {
match self {
Schedule::Interval { every } => {
let base = last_run.unwrap_or_else(Utc::now);
Ok(base + Duration::seconds(*every as i64))
}
#[cfg(feature = "cron")]
Schedule::Crontab {
minute,
hour,
day_of_week,
day_of_month,
month_of_year,
timezone,
} => {
use cron::Schedule as CronSchedule;
use std::str::FromStr;
let cron_expr = format!(
"0 {} {} {} {} {} *",
minute, hour, day_of_month, month_of_year, day_of_week
);
let cron_schedule = CronSchedule::from_str(&cron_expr)
.map_err(|e| ScheduleError::Parse(format!("Invalid cron expression: {}", e)))?;
if let Some(tz_str) = timezone {
let tz: Tz = tz_str.parse().map_err(|_| {
ScheduleError::Parse(format!("Invalid timezone: {}", tz_str))
})?;
let after_utc = last_run.unwrap_or_else(Utc::now);
let after_tz = after_utc.with_timezone(&tz);
let next_tz = cron_schedule.after(&after_tz).next().ok_or_else(|| {
ScheduleError::Invalid("No future execution time".to_string())
})?;
Ok(next_tz.with_timezone(&Utc))
} else {
let after = last_run.unwrap_or_else(Utc::now);
let next = cron_schedule.after(&after).next().ok_or_else(|| {
ScheduleError::Invalid("No future execution time".to_string())
})?;
Ok(next)
}
}
#[cfg(feature = "solar")]
Schedule::Solar {
event,
latitude,
longitude,
} => {
#[allow(deprecated)]
use sunrise::sunrise_sunset;
let start_time = last_run.unwrap_or_else(Utc::now);
let mut current_date = start_time.date_naive();
for _ in 0..365 {
#[allow(deprecated)]
let (sunrise_time, sunset_time) = sunrise_sunset(
*latitude,
*longitude,
current_date.year(),
current_date.month(),
current_date.day(),
);
let event_time = match event.to_lowercase().as_str() {
"sunrise" => {
let hours = (sunrise_time / 60) as u32;
let minutes = (sunrise_time % 60) as u32;
current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunrise time: {} minutes",
sunrise_time
))
})?
.and_utc()
}
"sunset" => {
let hours = (sunset_time / 60) as u32;
let minutes = (sunset_time % 60) as u32;
current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunset time: {} minutes",
sunset_time
))
})?
.and_utc()
}
"civil_twilight_begin" | "dawn" => {
let hours = (sunrise_time / 60) as u32;
let minutes = (sunrise_time % 60) as u32;
let sunrise = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunrise time: {} minutes",
sunrise_time
))
})?
.and_utc();
sunrise - Duration::minutes(30)
}
"civil_twilight_end" | "dusk" => {
let hours = (sunset_time / 60) as u32;
let minutes = (sunset_time % 60) as u32;
let sunset = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunset time: {} minutes",
sunset_time
))
})?
.and_utc();
sunset + Duration::minutes(30)
}
"nautical_twilight_begin" => {
let hours = (sunrise_time / 60) as u32;
let minutes = (sunrise_time % 60) as u32;
let sunrise = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunrise time: {} minutes",
sunrise_time
))
})?
.and_utc();
sunrise - Duration::minutes(60)
}
"nautical_twilight_end" => {
let hours = (sunset_time / 60) as u32;
let minutes = (sunset_time % 60) as u32;
let sunset = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunset time: {} minutes",
sunset_time
))
})?
.and_utc();
sunset + Duration::minutes(60)
}
"astronomical_twilight_begin" => {
let hours = (sunrise_time / 60) as u32;
let minutes = (sunrise_time % 60) as u32;
let sunrise = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunrise time: {} minutes",
sunrise_time
))
})?
.and_utc();
sunrise - Duration::minutes(90)
}
"astronomical_twilight_end" => {
let hours = (sunset_time / 60) as u32;
let minutes = (sunset_time % 60) as u32;
let sunset = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunset time: {} minutes",
sunset_time
))
})?
.and_utc();
sunset + Duration::minutes(90)
}
"golden_hour_begin" => {
let hours = (sunrise_time / 60) as u32;
let minutes = (sunrise_time % 60) as u32;
current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunrise time: {} minutes",
sunrise_time
))
})?
.and_utc()
}
"golden_hour_end" => {
let hours = (sunset_time / 60) as u32;
let minutes = (sunset_time % 60) as u32;
let sunset = current_date
.and_hms_opt(hours, minutes, 0)
.ok_or_else(|| {
ScheduleError::Invalid(format!(
"Invalid sunset time: {} minutes",
sunset_time
))
})?
.and_utc();
sunset - Duration::minutes(30)
}
_ => {
return Err(ScheduleError::Invalid(format!(
"Unknown solar event: {}. Supported events: sunrise, sunset, civil_twilight_begin/end, nautical_twilight_begin/end, astronomical_twilight_begin/end, golden_hour_begin/end, dawn, dusk",
event
)))
}
};
if event_time > start_time {
return Ok(event_time);
}
current_date = current_date
.checked_add_days(chrono::Days::new(1))
.ok_or_else(|| ScheduleError::Invalid("Date overflow".to_string()))?;
}
Err(ScheduleError::Invalid(
"Could not find solar event in next 365 days".to_string(),
))
}
Schedule::OneTime { run_at } => {
if last_run.is_some() {
Err(ScheduleError::Invalid(
"One-time schedule has already been executed".to_string(),
))
} else {
Ok(*run_at)
}
}
}
}
pub fn is_interval(&self) -> bool {
matches!(self, Schedule::Interval { .. })
}
#[cfg(feature = "cron")]
pub fn is_crontab(&self) -> bool {
matches!(self, Schedule::Crontab { .. })
}
#[cfg(feature = "solar")]
pub fn is_solar(&self) -> bool {
matches!(self, Schedule::Solar { .. })
}
pub fn is_onetime(&self) -> bool {
matches!(self, Schedule::OneTime { .. })
}
}
impl std::fmt::Display for Schedule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Schedule::Interval { every } => write!(f, "Interval[every {}s]", every),
#[cfg(feature = "cron")]
Schedule::Crontab {
minute,
hour,
day_of_week,
day_of_month,
month_of_year,
timezone,
} => {
if let Some(tz) = timezone {
write!(
f,
"Crontab[{} {} {} {} {} ({})]",
minute, hour, day_of_month, day_of_week, month_of_year, tz
)
} else {
write!(
f,
"Crontab[{} {} {} {} {} (UTC)]",
minute, hour, day_of_month, day_of_week, month_of_year
)
}
}
#[cfg(feature = "solar")]
Schedule::Solar {
event,
latitude,
longitude,
} => write!(f, "Solar[{} at ({:.4}, {:.4})]", event, latitude, longitude),
Schedule::OneTime { run_at } => {
write!(f, "OneTime[at {}]", run_at.format("%Y-%m-%d %H:%M:%S UTC"))
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleLock {
pub task_name: String,
pub owner: String,
pub acquired_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
pub renewal_count: u32,
}
impl ScheduleLock {
pub fn new(task_name: String, owner: String, ttl_seconds: u64) -> Self {
let now = Utc::now();
Self {
task_name,
owner,
acquired_at: now,
expires_at: now + Duration::seconds(ttl_seconds as i64),
renewal_count: 0,
}
}
pub fn is_expired(&self) -> bool {
Utc::now() > self.expires_at
}
pub fn is_owned_by(&self, owner: &str) -> bool {
self.owner == owner
}
pub fn renew(&mut self, ttl_seconds: u64) -> Result<(), ScheduleError> {
if self.is_expired() {
return Err(ScheduleError::Invalid(
"Cannot renew expired lock".to_string(),
));
}
self.expires_at = Utc::now() + Duration::seconds(ttl_seconds as i64);
self.renewal_count += 1;
Ok(())
}
pub fn ttl(&self) -> Duration {
self.expires_at - Utc::now()
}
pub fn age(&self) -> Duration {
Utc::now() - self.acquired_at
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockManager {
locks: HashMap<String, ScheduleLock>,
default_ttl: u64,
}
impl LockManager {
pub fn new(default_ttl: u64) -> Self {
Self {
locks: HashMap::new(),
default_ttl,
}
}
pub fn try_acquire(
&mut self,
task_name: &str,
owner: &str,
ttl: Option<u64>,
) -> Result<bool, ScheduleError> {
self.cleanup_expired();
if let Some(existing_lock) = self.locks.get(task_name) {
if !existing_lock.is_expired() {
if !existing_lock.is_owned_by(owner) {
return Ok(false);
}
return Ok(true);
}
}
let ttl_seconds = ttl.unwrap_or(self.default_ttl);
let lock = ScheduleLock::new(task_name.to_string(), owner.to_string(), ttl_seconds);
self.locks.insert(task_name.to_string(), lock);
Ok(true)
}
pub fn release(&mut self, task_name: &str, owner: &str) -> Result<bool, ScheduleError> {
if let Some(lock) = self.locks.get(task_name) {
if lock.is_owned_by(owner) {
self.locks.remove(task_name);
return Ok(true);
}
}
Ok(false)
}
pub fn renew(
&mut self,
task_name: &str,
owner: &str,
ttl: Option<u64>,
) -> Result<bool, ScheduleError> {
if let Some(lock) = self.locks.get_mut(task_name) {
if lock.is_owned_by(owner) && !lock.is_expired() {
let ttl_seconds = ttl.unwrap_or(self.default_ttl);
lock.renew(ttl_seconds)?;
return Ok(true);
}
}
Ok(false)
}
pub fn is_locked(&self, task_name: &str) -> bool {
if let Some(lock) = self.locks.get(task_name) {
!lock.is_expired()
} else {
false
}
}
pub fn get_lock(&self, task_name: &str) -> Option<&ScheduleLock> {
self.locks.get(task_name)
}
pub fn cleanup_expired(&mut self) {
self.locks.retain(|_, lock| !lock.is_expired());
}
pub fn get_active_locks(&self) -> Vec<&ScheduleLock> {
self.locks
.values()
.filter(|lock| !lock.is_expired())
.collect()
}
pub fn release_all(&mut self) {
self.locks.clear();
}
}
impl Default for LockManager {
fn default() -> Self {
Self::new(300) }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictSeverity {
Low,
Medium,
High,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleConflict {
pub task1: String,
pub task2: String,
pub severity: ConflictSeverity,
pub overlap_seconds: u64,
pub description: String,
pub resolution: Option<String>,
}
impl ScheduleConflict {
pub fn new(
task1: String,
task2: String,
severity: ConflictSeverity,
overlap_seconds: u64,
description: String,
) -> Self {
Self {
task1,
task2,
severity,
overlap_seconds,
description,
resolution: None,
}
}
pub fn with_resolution(mut self, resolution: String) -> Self {
self.resolution = Some(resolution);
self
}
pub fn is_high_severity(&self) -> bool {
self.severity == ConflictSeverity::High
}
pub fn is_medium_severity(&self) -> bool {
self.severity == ConflictSeverity::Medium
}
pub fn is_low_severity(&self) -> bool {
self.severity == ConflictSeverity::Low
}
}
impl std::fmt::Display for ScheduleConflict {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Conflict[{:?}]: {} <-> {} (overlap: {}s) - {}",
self.severity, self.task1, self.task2, self.overlap_seconds, self.description
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerMetrics {
pub total_tasks: usize,
pub enabled_tasks: usize,
pub disabled_tasks: usize,
pub tasks_with_executions: usize,
pub total_successes: u64,
pub total_failures: u64,
pub total_timeouts: u64,
pub total_executions: u64,
pub overall_success_rate: f64,
pub tasks_in_retry: usize,
pub tasks_with_warnings: usize,
pub unhealthy_tasks: usize,
pub stuck_tasks: usize,
}
impl SchedulerMetrics {
pub fn from_scheduler(scheduler: &BeatScheduler) -> Self {
let total_tasks = scheduler.tasks.len();
let enabled_tasks = scheduler.tasks.values().filter(|t| t.enabled).count();
let disabled_tasks = total_tasks - enabled_tasks;
let tasks_with_executions = scheduler.tasks.values().filter(|t| t.has_run()).count();
let mut total_successes = 0u64;
let mut total_failures = 0u64;
let mut total_timeouts = 0u64;
for task in scheduler.tasks.values() {
total_successes += task.history_success_count() as u64;
total_failures += task.history_failure_count() as u64;
total_timeouts += task.history_timeout_count() as u64;
}
let total_executions = total_successes + total_failures + total_timeouts;
let overall_success_rate = if total_executions == 0 {
0.0
} else {
total_successes as f64 / total_executions as f64
};
let tasks_in_retry = scheduler
.tasks
.values()
.filter(|t| t.retry_count > 0)
.count();
let tasks_with_warnings = scheduler
.tasks
.values()
.map(|t| t.check_health())
.filter(|r| r.health.has_warnings())
.count();
let unhealthy_tasks = scheduler
.tasks
.values()
.map(|t| t.check_health())
.filter(|r| r.health.is_unhealthy())
.count();
let stuck_tasks = scheduler.get_stuck_tasks().len();
Self {
total_tasks,
enabled_tasks,
disabled_tasks,
tasks_with_executions,
total_successes,
total_failures,
total_timeouts,
total_executions,
overall_success_rate,
tasks_in_retry,
tasks_with_warnings,
unhealthy_tasks,
stuck_tasks,
}
}
}
#[derive(Debug, Clone)]
pub struct TaskStatistics {
pub name: String,
pub success_count: usize,
pub failure_count: usize,
pub timeout_count: usize,
pub average_duration_ms: Option<u64>,
pub min_duration_ms: Option<u64>,
pub max_duration_ms: Option<u64>,
pub success_rate: f64,
pub failure_rate: f64,
pub retry_count: u32,
pub is_stuck: bool,
}
impl TaskStatistics {
pub fn from_task(task: &ScheduledTask) -> Self {
Self {
name: task.name.clone(),
success_count: task.history_success_count(),
failure_count: task.history_failure_count(),
timeout_count: task.history_timeout_count(),
average_duration_ms: task.average_duration_ms(),
min_duration_ms: task.min_duration_ms(),
max_duration_ms: task.max_duration_ms(),
success_rate: task.history_success_rate(),
failure_rate: task.failure_rate(),
retry_count: task.retry_count,
is_stuck: task.is_stuck().is_some(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
pub enum AlertLevel {
Info,
Warning,
Critical,
}
impl std::fmt::Display for AlertLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertLevel::Info => write!(f, "INFO"),
AlertLevel::Warning => write!(f, "WARNING"),
AlertLevel::Critical => write!(f, "CRITICAL"),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
#[serde(tag = "type")]
pub enum AlertCondition {
MissedSchedule {
expected_at: DateTime<Utc>,
detected_at: DateTime<Utc>,
},
ConsecutiveFailures {
count: u32,
threshold: u32,
},
HighFailureRate {
rate: String, threshold: String,
},
SlowExecution {
duration_ms: u64,
threshold_ms: u64,
},
TaskStuck {
idle_duration_seconds: i64,
expected_interval_seconds: u64,
},
TaskUnhealthy {
issues: Vec<String>,
},
}
impl std::fmt::Display for AlertCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertCondition::MissedSchedule {
expected_at,
detected_at,
} => {
let delay = detected_at
.signed_duration_since(*expected_at)
.num_seconds();
write!(
f,
"Missed schedule ({}s late, expected at {})",
delay,
expected_at.format("%Y-%m-%d %H:%M:%S UTC")
)
}
AlertCondition::ConsecutiveFailures { count, threshold } => {
write!(
f,
"Consecutive failures ({} failures, threshold: {})",
count, threshold
)
}
AlertCondition::HighFailureRate { rate, threshold } => {
write!(
f,
"High failure rate (rate: {}, threshold: {})",
rate, threshold
)
}
AlertCondition::SlowExecution {
duration_ms,
threshold_ms,
} => {
write!(
f,
"Slow execution ({}ms, threshold: {}ms)",
duration_ms, threshold_ms
)
}
AlertCondition::TaskStuck {
idle_duration_seconds,
expected_interval_seconds,
} => {
write!(
f,
"Task stuck (idle: {}s, expected interval: {}s)",
idle_duration_seconds, expected_interval_seconds
)
}
AlertCondition::TaskUnhealthy { issues } => {
write!(f, "Task unhealthy: {}", issues.join(", "))
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub timestamp: DateTime<Utc>,
pub task_name: String,
pub level: AlertLevel,
pub condition: AlertCondition,
pub message: String,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
impl Alert {
pub fn new(
task_name: String,
level: AlertLevel,
condition: AlertCondition,
message: String,
) -> Self {
Self {
timestamp: Utc::now(),
task_name,
level,
condition,
message,
metadata: HashMap::new(),
}
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn is_critical(&self) -> bool {
self.level == AlertLevel::Critical
}
pub fn is_warning(&self) -> bool {
self.level == AlertLevel::Warning
}
pub fn is_info(&self) -> bool {
self.level == AlertLevel::Info
}
fn dedup_key(&self) -> String {
format!("{}::{:?}", self.task_name, self.condition)
}
}
impl std::fmt::Display for Alert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}] {} - {} - {}",
self.timestamp.format("%Y-%m-%d %H:%M:%S UTC"),
self.level,
self.task_name,
self.message
)
}
}
pub type AlertCallback = Arc<dyn Fn(&Alert) + Send + Sync>;
#[derive(Clone, Serialize, Deserialize)]
pub struct AlertManager {
alerts: Vec<Alert>,
max_history: usize,
dedup_window_seconds: i64,
last_alert_time: HashMap<String, DateTime<Utc>>,
#[serde(skip)]
callbacks: Vec<AlertCallback>,
}
impl AlertManager {
pub fn new(max_history: usize, dedup_window_seconds: i64) -> Self {
Self {
alerts: Vec::new(),
max_history,
dedup_window_seconds,
last_alert_time: HashMap::new(),
callbacks: Vec::new(),
}
}
pub fn add_callback(&mut self, callback: AlertCallback) {
self.callbacks.push(callback);
}
pub fn record_alert(&mut self, alert: Alert) -> bool {
let dedup_key = alert.dedup_key();
let now = Utc::now();
if let Some(last_time) = self.last_alert_time.get(&dedup_key) {
let elapsed = now.signed_duration_since(*last_time).num_seconds();
if elapsed < self.dedup_window_seconds {
return false;
}
}
self.last_alert_time.insert(dedup_key, now);
for callback in &self.callbacks {
callback(&alert);
}
self.alerts.push(alert);
if self.alerts.len() > self.max_history {
self.alerts.drain(0..self.alerts.len() - self.max_history);
}
self.last_alert_time.retain(|_, last_time| {
now.signed_duration_since(*last_time).num_seconds() < self.dedup_window_seconds * 2
});
true
}
pub fn get_alerts(&self) -> &[Alert] {
&self.alerts
}
pub fn get_critical_alerts(&self) -> Vec<&Alert> {
self.alerts.iter().filter(|a| a.is_critical()).collect()
}
pub fn get_warning_alerts(&self) -> Vec<&Alert> {
self.alerts.iter().filter(|a| a.is_warning()).collect()
}
pub fn get_task_alerts(&self, task_name: &str) -> Vec<&Alert> {
self.alerts
.iter()
.filter(|a| a.task_name == task_name)
.collect()
}
pub fn get_recent_alerts(&self, seconds: i64) -> Vec<&Alert> {
let cutoff = Utc::now() - Duration::seconds(seconds);
self.alerts
.iter()
.filter(|a| a.timestamp > cutoff)
.collect()
}
pub fn clear(&mut self) {
self.alerts.clear();
self.last_alert_time.clear();
}
pub fn clear_task_alerts(&mut self, task_name: &str) {
self.alerts.retain(|a| a.task_name != task_name);
self.last_alert_time
.retain(|k, _| !k.starts_with(&format!("{}::", task_name)));
}
pub fn alert_count(&self) -> usize {
self.alerts.len()
}
pub fn critical_alert_count(&self) -> usize {
self.alerts.iter().filter(|a| a.is_critical()).count()
}
pub fn warning_alert_count(&self) -> usize {
self.alerts.iter().filter(|a| a.is_warning()).count()
}
}
impl Default for AlertManager {
fn default() -> Self {
Self::new(1000, 300) }
}
impl std::fmt::Debug for AlertManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AlertManager")
.field("alerts_count", &self.alerts.len())
.field("max_history", &self.max_history)
.field("dedup_window_seconds", &self.dedup_window_seconds)
.field("callbacks_count", &self.callbacks.len())
.finish()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_consecutive_failures_threshold")]
pub consecutive_failures_threshold: u32,
#[serde(default = "default_failure_rate_threshold")]
pub failure_rate_threshold: f64,
pub slow_execution_threshold_ms: Option<u64>,
#[serde(default = "default_true")]
pub alert_on_missed_schedule: bool,
#[serde(default = "default_true")]
pub alert_on_stuck: bool,
}
#[allow(dead_code)]
fn default_true() -> bool {
true
}
#[allow(dead_code)]
fn default_consecutive_failures_threshold() -> u32 {
3
}
#[allow(dead_code)]
fn default_failure_rate_threshold() -> f64 {
0.5
}
impl Default for AlertConfig {
fn default() -> Self {
Self {
enabled: true,
consecutive_failures_threshold: 3,
failure_rate_threshold: 0.5,
slow_execution_threshold_ms: None,
alert_on_missed_schedule: true,
alert_on_stuck: true,
}
}
}
impl AlertConfig {
pub fn new() -> Self {
Self::default()
}
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
pub fn with_consecutive_failures_threshold(mut self, threshold: u32) -> Self {
self.consecutive_failures_threshold = threshold;
self
}
pub fn with_failure_rate_threshold(mut self, threshold: f64) -> Self {
self.failure_rate_threshold = threshold;
self
}
pub fn with_slow_execution_threshold_ms(mut self, threshold_ms: u64) -> Self {
self.slow_execution_threshold_ms = Some(threshold_ms);
self
}
pub fn without_missed_schedule_alerts(mut self) -> Self {
self.alert_on_missed_schedule = false;
self
}
pub fn without_stuck_alerts(mut self) -> Self {
self.alert_on_stuck = false;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ScheduleHealth {
Healthy,
Warning { issues: Vec<String> },
Unhealthy { issues: Vec<String> },
}
impl ScheduleHealth {
pub fn is_healthy(&self) -> bool {
matches!(self, ScheduleHealth::Healthy)
}
pub fn has_warnings(&self) -> bool {
matches!(self, ScheduleHealth::Warning { .. })
}
pub fn is_unhealthy(&self) -> bool {
matches!(self, ScheduleHealth::Unhealthy { .. })
}
pub fn get_issues(&self) -> Vec<String> {
match self {
ScheduleHealth::Healthy => Vec::new(),
ScheduleHealth::Warning { issues } | ScheduleHealth::Unhealthy { issues } => {
issues.clone()
}
}
}
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub task_name: String,
pub health: ScheduleHealth,
pub next_run: Option<DateTime<Utc>>,
pub time_since_last_run: Option<chrono::Duration>,
}
impl HealthCheckResult {
pub fn new(task_name: String, health: ScheduleHealth) -> Self {
Self {
task_name,
health,
next_run: None,
time_since_last_run: None,
}
}
pub fn healthy(task_name: String) -> Self {
Self::new(task_name, ScheduleHealth::Healthy)
}
pub fn warning(task_name: String, issues: Vec<String>) -> Self {
Self::new(task_name, ScheduleHealth::Warning { issues })
}
pub fn unhealthy(task_name: String, issues: Vec<String>) -> Self {
Self::new(task_name, ScheduleHealth::Unhealthy { issues })
}
pub fn with_next_run(mut self, next_run: DateTime<Utc>) -> Self {
self.next_run = Some(next_run);
self
}
pub fn with_time_since_last_run(mut self, duration: chrono::Duration) -> Self {
self.time_since_last_run = Some(duration);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ExecutionResult {
Success,
Failure { error: String },
Timeout,
Interrupted,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub enum ExecutionState {
#[default]
Idle,
Running {
started_at: DateTime<Utc>,
timeout_after: Option<DateTime<Utc>>,
},
}
impl ExecutionState {
pub fn is_running(&self) -> bool {
matches!(self, ExecutionState::Running { .. })
}
pub fn is_idle(&self) -> bool {
matches!(self, ExecutionState::Idle)
}
pub fn has_timed_out(&self) -> bool {
match self {
ExecutionState::Running {
timeout_after: Some(timeout),
..
} => Utc::now() > *timeout,
_ => false,
}
}
pub fn running_duration(&self) -> Option<Duration> {
match self {
ExecutionState::Running { started_at, .. } => Some(Utc::now() - *started_at),
ExecutionState::Idle => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionRecord {
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub result: ExecutionResult,
pub duration_ms: Option<u64>,
}
impl ExecutionRecord {
pub fn new(started_at: DateTime<Utc>) -> Self {
Self {
started_at,
completed_at: None,
result: ExecutionResult::Success,
duration_ms: None,
}
}
pub fn completed(started_at: DateTime<Utc>, result: ExecutionResult) -> Self {
let now = Utc::now();
let duration_ms = now
.signed_duration_since(started_at)
.num_milliseconds()
.max(0) as u64;
Self {
started_at,
completed_at: Some(now),
result,
duration_ms: Some(duration_ms),
}
}
pub fn is_success(&self) -> bool {
matches!(self.result, ExecutionResult::Success)
}
pub fn is_failure(&self) -> bool {
matches!(self.result, ExecutionResult::Failure { .. })
}
pub fn is_timeout(&self) -> bool {
matches!(self.result, ExecutionResult::Timeout)
}
pub fn is_completed(&self) -> bool {
self.completed_at.is_some()
}
pub fn is_interrupted(&self) -> bool {
matches!(self.result, ExecutionResult::Interrupted)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum RetryPolicy {
#[default]
NoRetry,
FixedDelay {
delay_seconds: u64,
max_retries: u32,
},
ExponentialBackoff {
initial_delay_seconds: u64,
multiplier: f64,
max_delay_seconds: u64,
max_retries: u32,
},
}
impl RetryPolicy {
pub fn next_retry_delay(&self, attempt: u32) -> Option<u64> {
match self {
RetryPolicy::NoRetry => None,
RetryPolicy::FixedDelay {
delay_seconds,
max_retries,
} => {
if attempt < *max_retries {
Some(*delay_seconds)
} else {
None
}
}
RetryPolicy::ExponentialBackoff {
initial_delay_seconds,
multiplier,
max_delay_seconds,
max_retries,
} => {
if attempt < *max_retries {
let delay = (*initial_delay_seconds as f64) * multiplier.powi(attempt as i32);
let delay = delay.min(*max_delay_seconds as f64) as u64;
Some(delay)
} else {
None
}
}
}
}
pub fn should_retry(&self, attempt: u32) -> bool {
self.next_retry_delay(attempt).is_some()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum CatchupPolicy {
#[default]
Skip,
RunOnce,
RunMultiple { max_catchup: u32 },
TimeWindow { window_seconds: u64 },
}
impl CatchupPolicy {
pub fn should_catchup(
&self,
last_run_at: Option<DateTime<Utc>>,
next_scheduled_run: DateTime<Utc>,
now: DateTime<Utc>,
) -> bool {
match self {
CatchupPolicy::Skip => false,
CatchupPolicy::RunOnce => last_run_at.is_some() && now > next_scheduled_run,
CatchupPolicy::RunMultiple { .. } => last_run_at.is_some() && now > next_scheduled_run,
CatchupPolicy::TimeWindow { window_seconds } => {
if let Some(_last_run) = last_run_at {
let missed_duration = now.signed_duration_since(next_scheduled_run);
missed_duration.num_seconds() > 0
&& missed_duration.num_seconds() <= *window_seconds as i64
} else {
false
}
}
}
}
pub fn catchup_count(
&self,
last_run_at: Option<DateTime<Utc>>,
interval_seconds: u64,
now: DateTime<Utc>,
) -> u32 {
match self {
CatchupPolicy::Skip => 0,
CatchupPolicy::RunOnce => {
if last_run_at.is_some() {
1
} else {
0
}
}
CatchupPolicy::RunMultiple { max_catchup } => {
if let Some(last_run) = last_run_at {
let elapsed = now.signed_duration_since(last_run).num_seconds() as u64;
let missed_runs = elapsed / interval_seconds;
std::cmp::min(missed_runs.saturating_sub(1) as u32, *max_catchup)
} else {
0
}
}
CatchupPolicy::TimeWindow { .. } => {
if let Some(last_run) = last_run_at {
let next_run = last_run + Duration::seconds(interval_seconds as i64);
if self.should_catchup(last_run_at, next_run, now) {
1
} else {
0
}
} else {
0
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Jitter {
pub min_seconds: i64,
pub max_seconds: i64,
}
impl Jitter {
pub fn new(min_seconds: i64, max_seconds: i64) -> Self {
Self {
min_seconds,
max_seconds,
}
}
pub fn positive(max_seconds: i64) -> Self {
Self {
min_seconds: 0,
max_seconds,
}
}
pub fn symmetric(seconds: i64) -> Self {
Self {
min_seconds: -seconds,
max_seconds: seconds,
}
}
pub fn apply(&self, dt: DateTime<Utc>, task_name: &str) -> DateTime<Utc> {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
task_name.hash(&mut hasher);
dt.timestamp().hash(&mut hasher);
let hash = hasher.finish();
let range = (self.max_seconds - self.min_seconds) as u64;
let offset = if range > 0 {
(hash % range) as i64 + self.min_seconds
} else {
self.min_seconds
};
dt + Duration::seconds(offset)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleVersion {
pub version: u32,
pub schedule: Schedule,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub change_reason: Option<String>,
#[serde(default)]
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter: Option<Jitter>,
#[serde(default)]
pub catchup_policy: CatchupPolicy,
}
impl ScheduleVersion {
pub fn from_task(task: &ScheduledTask, version: u32, change_reason: Option<String>) -> Self {
Self {
version,
schedule: task.schedule.clone(),
created_at: Utc::now(),
change_reason,
enabled: task.enabled,
jitter: task.jitter.clone(),
catchup_policy: task.catchup_policy.clone(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum DependencyStatus {
Satisfied,
Waiting { pending: Vec<String> },
Failed { failed: Vec<String> },
}
impl DependencyStatus {
pub fn is_satisfied(&self) -> bool {
matches!(self, DependencyStatus::Satisfied)
}
pub fn has_failures(&self) -> bool {
matches!(self, DependencyStatus::Failed { .. })
}
pub fn pending_tasks(&self) -> Vec<String> {
match self {
DependencyStatus::Waiting { pending } => pending.clone(),
_ => Vec::new(),
}
}
pub fn failed_tasks(&self) -> Vec<String> {
match self {
DependencyStatus::Failed { failed } => failed.clone(),
_ => Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledTask {
pub name: String,
pub schedule: Schedule,
#[serde(default)]
pub args: Vec<serde_json::Value>,
#[serde(default)]
pub kwargs: HashMap<String, serde_json::Value>,
#[serde(default)]
pub options: TaskOptions,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_at: Option<DateTime<Utc>>,
#[serde(default)]
pub total_run_count: u64,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter: Option<Jitter>,
#[serde(default)]
pub catchup_policy: CatchupPolicy,
#[serde(skip_serializing_if = "Option::is_none")]
pub group: Option<String>,
#[serde(default)]
pub tags: HashSet<String>,
#[serde(default)]
pub retry_policy: RetryPolicy,
#[serde(default)]
pub retry_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_failure_at: Option<DateTime<Utc>>,
#[serde(default)]
pub total_failure_count: u64,
#[serde(default)]
pub execution_history: Vec<ExecutionRecord>,
#[serde(default)]
pub max_history_size: usize,
#[serde(default)]
pub version_history: Vec<ScheduleVersion>,
#[serde(default = "default_version")]
pub current_version: u32,
#[serde(default)]
pub dependencies: HashSet<String>,
#[serde(default = "default_true")]
pub wait_for_dependencies: bool,
#[serde(skip)]
cached_next_run: Option<DateTime<Utc>>,
#[serde(default)]
pub alert_config: AlertConfig,
#[serde(default)]
pub execution_state: ExecutionState,
#[serde(skip_serializing_if = "Option::is_none")]
pub wfq_state: Option<WFQState>,
}
fn default_version() -> u32 {
1
}
impl ScheduledTask {
pub fn new(name: String, schedule: Schedule) -> Self {
let mut task = Self {
name,
schedule: schedule.clone(),
args: Vec::new(),
kwargs: HashMap::new(),
options: TaskOptions::default(),
last_run_at: None,
total_run_count: 0,
enabled: true,
jitter: None,
catchup_policy: CatchupPolicy::default(),
group: None,
tags: HashSet::new(),
retry_policy: RetryPolicy::default(),
retry_count: 0,
last_failure_at: None,
total_failure_count: 0,
execution_history: Vec::new(),
max_history_size: 0, version_history: Vec::new(),
current_version: 1,
dependencies: HashSet::new(),
wait_for_dependencies: true,
cached_next_run: None,
alert_config: AlertConfig::default(),
execution_state: ExecutionState::default(),
wfq_state: None,
};
let initial_version =
ScheduleVersion::from_task(&task, 1, Some("Initial creation".to_string()));
task.version_history.push(initial_version);
task
}
pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
self.args = args;
self
}
pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
self.kwargs = kwargs;
self
}
pub fn disabled(mut self) -> Self {
self.enabled = false;
self
}
pub fn with_queue(mut self, queue: String) -> Self {
self.options.queue = Some(queue);
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.options.priority = Some(priority);
self
}
pub fn with_expires(mut self, expires: u64) -> Self {
self.options.expires = Some(expires);
self
}
pub fn with_jitter(mut self, jitter: Jitter) -> Self {
self.jitter = Some(jitter);
self
}
pub fn with_catchup_policy(mut self, policy: CatchupPolicy) -> Self {
self.catchup_policy = policy;
self
}
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub fn with_group(mut self, group: String) -> Self {
self.group = Some(group);
self
}
pub fn with_tag(mut self, tag: String) -> Self {
self.tags.insert(tag);
self
}
pub fn with_tags(mut self, tags: HashSet<String>) -> Self {
self.tags = tags;
self
}
pub fn with_alert_config(mut self, config: AlertConfig) -> Self {
self.alert_config = config;
self
}
pub fn add_tag(&mut self, tag: String) {
self.tags.insert(tag);
}
pub fn remove_tag(&mut self, tag: &str) -> bool {
self.tags.remove(tag)
}
pub fn has_tag(&self, tag: &str) -> bool {
self.tags.contains(tag)
}
pub fn is_in_group(&self, group: &str) -> bool {
self.group.as_deref() == Some(group)
}
pub fn is_due(&self) -> Result<bool, ScheduleError> {
if self.last_run_at.is_none() {
return Ok(true);
}
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(Utc::now() >= next_run)
}
pub fn next_run_time(&self) -> Result<DateTime<Utc>, ScheduleError> {
if let Some(cached) = self.cached_next_run {
return Ok(cached);
}
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(next_run)
}
pub fn update_next_run_cache(&mut self) {
if let Ok(next_run) = self.next_run_time_uncached() {
self.cached_next_run = Some(next_run);
} else {
self.cached_next_run = None;
}
}
fn next_run_time_uncached(&self) -> Result<DateTime<Utc>, ScheduleError> {
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(next_run)
}
pub fn invalidate_next_run_cache(&mut self) {
self.cached_next_run = None;
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn has_run(&self) -> bool {
self.last_run_at.is_some()
}
pub fn has_options(&self) -> bool {
self.options.has_queue() || self.options.has_priority() || self.options.has_expires()
}
pub fn age_since_last_run(&self) -> Option<chrono::Duration> {
self.last_run_at.map(|last_run| Utc::now() - last_run)
}
pub fn mark_success(&mut self) {
self.retry_count = 0; }
pub fn mark_failure(&mut self) {
self.last_failure_at = Some(Utc::now());
self.total_failure_count += 1;
self.retry_count += 1;
}
pub fn should_retry(&self) -> bool {
self.retry_policy.should_retry(self.retry_count)
}
pub fn begin_execution(&mut self, timeout_seconds: Option<u64>) {
let timeout_after = timeout_seconds.map(|secs| Utc::now() + Duration::seconds(secs as i64));
self.execution_state = ExecutionState::Running {
started_at: Utc::now(),
timeout_after,
};
}
pub fn complete_execution(&mut self) {
self.execution_state = ExecutionState::Idle;
}
pub fn detect_interrupted_execution(&self) -> bool {
match &self.execution_state {
ExecutionState::Idle => false,
ExecutionState::Running {
started_at,
timeout_after,
} => {
if let Some(timeout) = timeout_after {
if Utc::now() > *timeout {
return true;
}
}
let running_duration = Utc::now() - *started_at;
let max_reasonable_duration = Duration::hours(24); running_duration > max_reasonable_duration
}
}
}
pub fn recover_from_interruption(&mut self) -> Option<Duration> {
match &self.execution_state {
ExecutionState::Running { started_at, .. } => {
let duration = Utc::now() - *started_at;
let record = ExecutionRecord::completed(*started_at, ExecutionResult::Interrupted);
self.execution_history.push(record);
if self.max_history_size > 0 && self.execution_history.len() > self.max_history_size
{
let remove_count = self.execution_history.len() - self.max_history_size;
self.execution_history.drain(0..remove_count);
}
self.execution_state = ExecutionState::Idle;
self.retry_count += 1;
Some(duration)
}
ExecutionState::Idle => None,
}
}
pub fn is_ready_for_retry_after_crash(&self) -> bool {
if !self.execution_history.is_empty() {
if let Some(last_exec) = self.execution_history.last() {
if last_exec.is_interrupted() && self.should_retry() {
return true;
}
}
}
false
}
pub fn next_retry_delay(&self) -> Option<u64> {
self.retry_policy.next_retry_delay(self.retry_count)
}
pub fn next_retry_time(&self) -> Option<DateTime<Utc>> {
if let (Some(last_failure), Some(delay)) = (self.last_failure_at, self.next_retry_delay()) {
Some(last_failure + Duration::seconds(delay as i64))
} else {
None
}
}
pub fn is_ready_for_retry(&self) -> bool {
if !self.should_retry() {
return false;
}
if let Some(next_retry) = self.next_retry_time() {
Utc::now() >= next_retry
} else {
false
}
}
pub fn failure_rate(&self) -> f64 {
let total_attempts = self.total_run_count + self.total_failure_count;
if total_attempts == 0 {
0.0
} else {
self.total_failure_count as f64 / total_attempts as f64
}
}
pub fn with_max_history(mut self, max_size: usize) -> Self {
self.max_history_size = max_size;
self
}
pub fn add_execution_record(&mut self, record: ExecutionRecord) {
self.execution_history.push(record);
if self.max_history_size > 0 && self.execution_history.len() > self.max_history_size {
let remove_count = self.execution_history.len() - self.max_history_size;
self.execution_history.drain(0..remove_count);
}
}
pub fn get_last_executions(&self, n: usize) -> &[ExecutionRecord] {
let start = self.execution_history.len().saturating_sub(n);
&self.execution_history[start..]
}
pub fn get_all_executions(&self) -> &[ExecutionRecord] {
&self.execution_history
}
pub fn history_success_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_success())
.count()
}
pub fn history_failure_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_failure())
.count()
}
pub fn consecutive_failure_count(&self) -> u32 {
let mut count = 0u32;
for record in self.execution_history.iter().rev() {
if record.is_failure() {
count += 1;
} else {
break;
}
}
count
}
pub fn history_timeout_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_timeout())
.count()
}
pub fn average_duration_ms(&self) -> Option<u64> {
let durations: Vec<u64> = self
.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.collect();
if durations.is_empty() {
None
} else {
Some(durations.iter().sum::<u64>() / durations.len() as u64)
}
}
pub fn min_duration_ms(&self) -> Option<u64> {
self.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.min()
}
pub fn max_duration_ms(&self) -> Option<u64> {
self.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.max()
}
pub fn history_success_rate(&self) -> f64 {
let total = self.execution_history.len();
if total == 0 {
0.0
} else {
self.history_success_count() as f64 / total as f64
}
}
pub fn clear_history(&mut self) {
self.execution_history.clear();
}
pub fn check_health(&self) -> HealthCheckResult {
let mut issues = Vec::new();
let next_run = match self.next_run_time() {
Ok(next) => Some(next),
Err(e) => {
issues.push(format!("Cannot calculate next run: {}", e));
None
}
};
if let Some(stuck_duration) = self.is_stuck() {
issues.push(format!(
"Task may be stuck (no execution in {} hours)",
stuck_duration.num_hours()
));
}
let failure_rate = self.failure_rate();
let total_attempts = self.total_run_count + self.total_failure_count;
if failure_rate > 0.5 && total_attempts >= 10 {
issues.push(format!(
"High failure rate: {:.1}% ({} failures out of {} total)",
failure_rate * 100.0,
self.total_failure_count,
total_attempts
));
}
if self.execution_history.len() >= 3 {
let last_3 = &self.execution_history[self.execution_history.len() - 3..];
if last_3.iter().all(|r| r.is_failure() || r.is_timeout()) {
issues.push("Last 3 executions failed".to_string());
}
}
let health = if !self.enabled {
ScheduleHealth::Warning {
issues: vec!["Task is disabled".to_string()],
}
} else if issues.is_empty() {
ScheduleHealth::Healthy
} else if issues.iter().any(|i| i.contains("Cannot calculate")) {
ScheduleHealth::Unhealthy { issues }
} else {
ScheduleHealth::Warning { issues }
};
let time_since_last_run = self.age_since_last_run();
let mut result = HealthCheckResult::new(self.name.clone(), health);
if let Some(next) = next_run {
result = result.with_next_run(next);
}
if let Some(duration) = time_since_last_run {
result = result.with_time_since_last_run(duration);
}
result
}
pub fn is_stuck(&self) -> Option<chrono::Duration> {
if !self.enabled {
return None;
}
if let Some(last_run) = self.last_run_at {
let age = Utc::now() - last_run;
let expected_interval = match &self.schedule {
Schedule::Interval { every } => Duration::seconds(*every as i64),
#[cfg(feature = "cron")]
Schedule::Crontab { .. } => Duration::hours(24), #[cfg(feature = "solar")]
Schedule::Solar { .. } => Duration::hours(24), Schedule::OneTime { .. } => return None, };
let stuck_threshold = expected_interval * 10;
if age > stuck_threshold {
return Some(age);
}
}
None
}
pub fn validate_schedule(&self) -> Result<(), ScheduleError> {
self.schedule.next_run(self.last_run_at)?;
Ok(())
}
pub fn update_schedule(&mut self, new_schedule: Schedule, change_reason: Option<String>) {
self.current_version += 1;
self.schedule = new_schedule;
self.update_next_run_cache();
let version = ScheduleVersion::from_task(self, self.current_version, change_reason);
self.version_history.push(version);
}
pub fn update_config(
&mut self,
enabled: Option<bool>,
jitter: Option<Option<Jitter>>,
catchup_policy: Option<CatchupPolicy>,
change_reason: Option<String>,
) {
let mut jitter_changed = false;
if let Some(e) = enabled {
self.enabled = e;
}
if let Some(j) = jitter {
self.jitter = j;
jitter_changed = true;
}
if let Some(c) = catchup_policy {
self.catchup_policy = c;
}
if jitter_changed {
self.update_next_run_cache();
}
self.current_version += 1;
let version = ScheduleVersion::from_task(self, self.current_version, change_reason);
self.version_history.push(version);
}
pub fn rollback_to_version(&mut self, version_number: u32) -> Result<(), ScheduleError> {
let version = self
.version_history
.iter()
.find(|v| v.version == version_number)
.ok_or_else(|| {
ScheduleError::Invalid(format!("Version {} not found", version_number))
})?;
self.schedule = version.schedule.clone();
self.enabled = version.enabled;
self.jitter = version.jitter.clone();
self.catchup_policy = version.catchup_policy.clone();
self.current_version += 1;
let rollback_version = ScheduleVersion::from_task(
self,
self.current_version,
Some(format!("Rolled back to version {}", version_number)),
);
self.version_history.push(rollback_version);
Ok(())
}
pub fn get_version_history(&self) -> &[ScheduleVersion] {
&self.version_history
}
pub fn get_version(&self, version_number: u32) -> Option<&ScheduleVersion> {
self.version_history
.iter()
.find(|v| v.version == version_number)
}
pub fn get_previous_version(&self) -> Option<&ScheduleVersion> {
if self.current_version > 1 {
self.version_history.iter().rev().nth(1) } else {
None
}
}
pub fn add_dependency(&mut self, task_name: String) {
self.dependencies.insert(task_name);
}
pub fn remove_dependency(&mut self, task_name: &str) -> bool {
self.dependencies.remove(task_name)
}
pub fn clear_dependencies(&mut self) {
self.dependencies.clear();
}
pub fn has_dependencies(&self) -> bool {
!self.dependencies.is_empty()
}
pub fn depends_on(&self, task_name: &str) -> bool {
self.dependencies.contains(task_name)
}
pub fn with_dependencies(mut self, dependencies: HashSet<String>) -> Self {
self.dependencies = dependencies;
self
}
pub fn with_wait_for_dependencies(mut self, wait: bool) -> Self {
self.wait_for_dependencies = wait;
self
}
pub fn check_dependencies(&self, completed_tasks: &HashSet<String>) -> DependencyStatus {
if self.dependencies.is_empty() {
return DependencyStatus::Satisfied;
}
let pending: Vec<String> = self
.dependencies
.iter()
.filter(|dep| !completed_tasks.contains(*dep))
.cloned()
.collect();
if pending.is_empty() {
DependencyStatus::Satisfied
} else {
DependencyStatus::Waiting { pending }
}
}
pub fn check_dependencies_with_failures(
&self,
completed_tasks: &HashSet<String>,
failed_tasks: &HashSet<String>,
) -> DependencyStatus {
if self.dependencies.is_empty() {
return DependencyStatus::Satisfied;
}
let failed: Vec<String> = self
.dependencies
.iter()
.filter(|dep| failed_tasks.contains(*dep))
.cloned()
.collect();
if !failed.is_empty() {
return DependencyStatus::Failed { failed };
}
let pending: Vec<String> = self
.dependencies
.iter()
.filter(|dep| !completed_tasks.contains(*dep))
.cloned()
.collect();
if pending.is_empty() {
DependencyStatus::Satisfied
} else {
DependencyStatus::Waiting { pending }
}
}
}
impl std::fmt::Display for ScheduledTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ScheduledTask[{}] schedule={}", self.name, self.schedule)?;
if !self.enabled {
write!(f, " (disabled)")?;
}
if let Some(last_run) = self.last_run_at {
let age = Utc::now() - last_run;
write!(f, " last_run={}s ago", age.num_seconds())?;
}
write!(f, " runs={}", self.total_run_count)?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TaskOptions {
pub queue: Option<String>,
pub priority: Option<u8>,
pub expires: Option<u64>,
}
impl TaskOptions {
pub fn has_queue(&self) -> bool {
self.queue.is_some()
}
pub fn has_priority(&self) -> bool {
self.priority.is_some()
}
pub fn has_expires(&self) -> bool {
self.expires.is_some()
}
}
impl std::fmt::Display for TaskOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut parts = Vec::new();
if let Some(ref queue) = self.queue {
parts.push(format!("queue={}", queue));
}
if let Some(priority) = self.priority {
parts.push(format!("priority={}", priority));
}
if let Some(expires) = self.expires {
parts.push(format!("expires={}s", expires));
}
if parts.is_empty() {
write!(f, "TaskOptions[default]")
} else {
write!(f, "TaskOptions[{}]", parts.join(", "))
}
}
}
#[derive(Serialize, Deserialize)]
pub struct BeatScheduler {
tasks: HashMap<String, ScheduledTask>,
#[serde(skip)]
state_file: Option<PathBuf>,
#[serde(skip)]
failure_callbacks: Vec<FailureCallback>,
#[serde(default)]
lock_manager: LockManager,
#[serde(skip)]
instance_id: String,
#[serde(default)]
alert_manager: AlertManager,
}
impl BeatScheduler {
pub fn new() -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
Self {
tasks: HashMap::new(),
state_file: None,
failure_callbacks: Vec::new(),
lock_manager: LockManager::default(),
instance_id: format!("scheduler-{}", id),
alert_manager: AlertManager::default(),
}
}
pub fn with_persistence<P: Into<PathBuf>>(state_file: P) -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
Self {
tasks: HashMap::new(),
state_file: Some(state_file.into()),
failure_callbacks: Vec::new(),
lock_manager: LockManager::default(),
instance_id: format!("scheduler-{}", id),
alert_manager: AlertManager::default(),
}
}
pub fn load_from_file<P: Into<PathBuf>>(path: P) -> Result<Self, ScheduleError> {
let path = path.into();
if !path.exists() {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
return Ok(Self {
tasks: HashMap::new(),
state_file: Some(path),
failure_callbacks: Vec::new(),
lock_manager: LockManager::default(),
instance_id: format!("scheduler-{}", id),
alert_manager: AlertManager::default(),
});
}
let content = std::fs::read_to_string(&path)
.map_err(|e| ScheduleError::Persistence(format!("Failed to read state file: {}", e)))?;
let mut scheduler: BeatScheduler = serde_json::from_str(&content).map_err(|e| {
ScheduleError::Persistence(format!("Failed to parse state file: {}", e))
})?;
scheduler.state_file = Some(path);
if scheduler.instance_id.is_empty() {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
scheduler.instance_id = format!("scheduler-{}", id);
}
Ok(scheduler)
}
pub fn save_state(&self) -> Result<(), ScheduleError> {
if let Some(ref path) = self.state_file {
let json = serde_json::to_string_pretty(&self).map_err(|e| {
ScheduleError::Persistence(format!("Failed to serialize state: {}", e))
})?;
std::fs::write(path, json).map_err(|e| {
ScheduleError::Persistence(format!("Failed to write state file: {}", e))
})?;
}
Ok(())
}
pub fn export_state(&self) -> Result<String, ScheduleError> {
serde_json::to_string_pretty(&self)
.map_err(|e| ScheduleError::Persistence(format!("Failed to serialize state: {}", e)))
}
pub fn list_tasks(&self) -> &HashMap<String, ScheduledTask> {
&self.tasks
}
pub fn get_task(&self, name: &str) -> Option<&ScheduledTask> {
self.tasks.get(name)
}
pub fn add_task(&mut self, mut task: ScheduledTask) -> Result<(), ScheduleError> {
task.update_next_run_cache();
self.tasks.insert(task.name.clone(), task);
self.save_state()?;
Ok(())
}
pub fn add_tasks_batch(&mut self, tasks: Vec<ScheduledTask>) -> Result<usize, ScheduleError> {
let mut added_count = 0;
for mut task in tasks {
task.update_next_run_cache();
self.tasks.insert(task.name.clone(), task);
added_count += 1;
}
if added_count > 0 {
self.save_state()?;
}
Ok(added_count)
}
pub fn remove_task(&mut self, name: &str) -> Result<Option<ScheduledTask>, ScheduleError> {
let task = self.tasks.remove(name);
self.save_state()?;
Ok(task)
}
pub fn remove_tasks_batch(&mut self, names: &[&str]) -> Result<usize, ScheduleError> {
let mut removed_count = 0;
for name in names {
if self.tasks.remove(*name).is_some() {
removed_count += 1;
}
}
if removed_count > 0 {
self.save_state()?;
}
Ok(removed_count)
}
pub fn mark_task_run(&mut self, name: &str) -> Result<(), ScheduleError> {
if let Some(task) = self.tasks.get_mut(name) {
task.last_run_at = Some(Utc::now());
task.total_run_count += 1;
self.save_state()?;
}
Ok(())
}
pub fn mark_task_success(&mut self, name: &str) -> Result<(), ScheduleError> {
let should_remove = if let Some(task) = self.tasks.get_mut(name) {
let now = Utc::now();
task.last_run_at = Some(now);
task.total_run_count += 1;
task.mark_success();
let record = ExecutionRecord::completed(now, ExecutionResult::Success);
task.add_execution_record(record);
task.update_next_run_cache();
task.schedule.is_onetime()
} else {
false
};
if should_remove {
self.tasks.remove(name);
}
self.save_state()?;
Ok(())
}
pub fn mark_task_success_with_start(
&mut self,
name: &str,
started_at: DateTime<Utc>,
) -> Result<(), ScheduleError> {
let should_remove = if let Some(task) = self.tasks.get_mut(name) {
let now = Utc::now();
task.last_run_at = Some(now);
task.total_run_count += 1;
task.mark_success();
let record = ExecutionRecord::completed(started_at, ExecutionResult::Success);
task.add_execution_record(record);
task.update_next_run_cache();
task.schedule.is_onetime()
} else {
false
};
if should_remove {
self.tasks.remove(name);
}
self.save_state()?;
Ok(())
}
pub fn mark_task_failure(&mut self, name: &str) -> Result<(), ScheduleError> {
self.mark_task_failure_with_error(name, "Unknown error".to_string())
}
pub fn mark_task_failure_with_error(
&mut self,
name: &str,
error: String,
) -> Result<(), ScheduleError> {
if let Some(task) = self.tasks.get_mut(name) {
let now = Utc::now();
task.mark_failure();
let record = ExecutionRecord::completed(
now,
ExecutionResult::Failure {
error: error.clone(),
},
);
task.add_execution_record(record);
self.invoke_failure_callbacks(name, &error);
self.save_state()?;
}
Ok(())
}
pub fn mark_task_failure_with_start(
&mut self,
name: &str,
started_at: DateTime<Utc>,
error: String,
) -> Result<(), ScheduleError> {
if let Some(task) = self.tasks.get_mut(name) {
task.mark_failure();
let record = ExecutionRecord::completed(
started_at,
ExecutionResult::Failure {
error: error.clone(),
},
);
task.add_execution_record(record);
self.invoke_failure_callbacks(name, &error);
self.save_state()?;
}
Ok(())
}
pub fn mark_task_timeout(
&mut self,
name: &str,
started_at: DateTime<Utc>,
) -> Result<(), ScheduleError> {
if let Some(task) = self.tasks.get_mut(name) {
task.mark_failure();
let record = ExecutionRecord::completed(started_at, ExecutionResult::Timeout);
task.add_execution_record(record);
self.save_state()?;
}
Ok(())
}
pub fn on_failure(&mut self, callback: FailureCallback) {
self.failure_callbacks.push(callback);
}
pub fn clear_failure_callbacks(&mut self) {
self.failure_callbacks.clear();
}
fn invoke_failure_callbacks(&self, task_name: &str, error: &str) {
for callback in &self.failure_callbacks {
callback(task_name, error);
}
}
pub fn on_alert(&mut self, callback: AlertCallback) {
self.alert_manager.add_callback(callback);
}
pub fn get_alerts(&self) -> &[Alert] {
self.alert_manager.get_alerts()
}
pub fn get_critical_alerts(&self) -> Vec<&Alert> {
self.alert_manager.get_critical_alerts()
}
pub fn get_warning_alerts(&self) -> Vec<&Alert> {
self.alert_manager.get_warning_alerts()
}
pub fn get_task_alerts(&self, task_name: &str) -> Vec<&Alert> {
self.alert_manager.get_task_alerts(task_name)
}
pub fn get_recent_alerts(&self, seconds: i64) -> Vec<&Alert> {
self.alert_manager.get_recent_alerts(seconds)
}
pub fn clear_alerts(&mut self) {
self.alert_manager.clear();
}
pub fn clear_task_alerts(&mut self, task_name: &str) {
self.alert_manager.clear_task_alerts(task_name);
}
pub fn check_task_alerts(&mut self, task_name: &str) -> usize {
let task = match self.tasks.get(task_name) {
Some(t) => t,
None => return 0,
};
if !task.alert_config.enabled {
return 0;
}
let mut alerts_triggered = 0;
let consecutive_failures = task.consecutive_failure_count();
if consecutive_failures >= task.alert_config.consecutive_failures_threshold {
let alert = Alert::new(
task_name.to_string(),
AlertLevel::Critical,
AlertCondition::ConsecutiveFailures {
count: consecutive_failures,
threshold: task.alert_config.consecutive_failures_threshold,
},
format!(
"Task has {} consecutive failures (threshold: {})",
consecutive_failures, task.alert_config.consecutive_failures_threshold
),
);
if self.alert_manager.record_alert(alert) {
alerts_triggered += 1;
}
}
let failure_rate = task.failure_rate();
if failure_rate > task.alert_config.failure_rate_threshold {
let alert = Alert::new(
task_name.to_string(),
AlertLevel::Warning,
AlertCondition::HighFailureRate {
rate: format!("{:.2}", failure_rate),
threshold: format!("{:.2}", task.alert_config.failure_rate_threshold),
},
format!(
"Task has high failure rate: {:.1}% (threshold: {:.1}%)",
failure_rate * 100.0,
task.alert_config.failure_rate_threshold * 100.0
),
);
if self.alert_manager.record_alert(alert) {
alerts_triggered += 1;
}
}
if let Some(threshold_ms) = task.alert_config.slow_execution_threshold_ms {
if let Some(avg_duration_ms) = task.average_duration_ms() {
if avg_duration_ms > threshold_ms {
let alert = Alert::new(
task_name.to_string(),
AlertLevel::Warning,
AlertCondition::SlowExecution {
duration_ms: avg_duration_ms,
threshold_ms,
},
format!(
"Task execution is slow: {}ms average (threshold: {}ms)",
avg_duration_ms, threshold_ms
),
);
if self.alert_manager.record_alert(alert) {
alerts_triggered += 1;
}
}
}
}
if task.alert_config.alert_on_stuck {
if let Some(stuck_duration) = task.is_stuck() {
let expected_interval_secs = match &task.schedule {
Schedule::Interval { every } => *every,
#[cfg(feature = "cron")]
Schedule::Crontab { .. } => 86400, #[cfg(feature = "solar")]
Schedule::Solar { .. } => 86400, Schedule::OneTime { .. } => 0, };
let alert = Alert::new(
task_name.to_string(),
AlertLevel::Critical,
AlertCondition::TaskStuck {
idle_duration_seconds: stuck_duration.num_seconds(),
expected_interval_seconds: expected_interval_secs,
},
format!(
"Task is stuck: no execution for {}s (expected interval: {}s)",
stuck_duration.num_seconds(),
expected_interval_secs
),
);
if self.alert_manager.record_alert(alert) {
alerts_triggered += 1;
}
}
}
let health_result = task.check_health();
if health_result.health.is_unhealthy() {
let issues = health_result.health.get_issues();
let alert = Alert::new(
task_name.to_string(),
AlertLevel::Critical,
AlertCondition::TaskUnhealthy {
issues: issues.clone(),
},
format!("Task is unhealthy: {}", issues.join(", ")),
);
if self.alert_manager.record_alert(alert) {
alerts_triggered += 1;
}
}
alerts_triggered
}
pub fn check_all_alerts(&mut self) -> usize {
let task_names: Vec<String> = self
.tasks
.keys()
.filter(|name| {
if let Some(task) = self.tasks.get(*name) {
task.enabled && task.alert_config.enabled
} else {
false
}
})
.cloned()
.collect();
let mut total_alerts = 0;
for task_name in task_names {
total_alerts += self.check_task_alerts(&task_name);
}
total_alerts
}
pub fn get_retry_tasks(&self) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.enabled && task.is_ready_for_retry())
.collect()
}
pub fn detect_crashed_tasks(&self) -> Vec<String> {
self.tasks
.iter()
.filter(|(_, task)| task.detect_interrupted_execution())
.map(|(name, _)| name.clone())
.collect()
}
pub fn recover_from_crash(&mut self) -> usize {
let crashed_task_names = self.detect_crashed_tasks();
let mut recovered_count = 0;
for task_name in crashed_task_names {
if let Some(task) = self.tasks.get_mut(&task_name) {
if let Some(duration) = task.recover_from_interruption() {
eprintln!(
"Recovered task '{}' from interrupted execution (was running for {}s)",
task_name,
duration.num_seconds()
);
recovered_count += 1;
}
}
}
let _ = self.save_state();
recovered_count
}
pub fn get_tasks_ready_for_crash_retry(&self) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.enabled && task.is_ready_for_retry_after_crash())
.collect()
}
pub fn get_due_tasks(&self) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.enabled && task.is_due().unwrap_or(false))
.collect()
}
pub fn get_due_tasks_by_priority(&self) -> Vec<&ScheduledTask> {
let mut tasks: Vec<&ScheduledTask> = self
.tasks
.values()
.filter(|task| task.enabled && task.is_due().unwrap_or(false))
.collect();
tasks.sort_by(|a, b| {
let priority_a = a.options.priority.unwrap_or(5);
let priority_b = b.options.priority.unwrap_or(5);
match priority_b.cmp(&priority_a) {
std::cmp::Ordering::Equal => {
let next_a = a
.schedule
.next_run(a.last_run_at)
.unwrap_or_else(|_| Utc::now());
let next_b = b
.schedule
.next_run(b.last_run_at)
.unwrap_or_else(|_| Utc::now());
next_a.cmp(&next_b)
}
other => other,
}
});
tasks
}
pub fn get_tasks_by_priority(&self) -> Vec<&ScheduledTask> {
let mut tasks: Vec<&ScheduledTask> =
self.tasks.values().filter(|task| task.enabled).collect();
tasks.sort_by(|a, b| {
let priority_a = a.options.priority.unwrap_or(5);
let priority_b = b.options.priority.unwrap_or(5);
priority_b.cmp(&priority_a)
});
tasks
}
pub fn get_tasks_by_group(&self, group: &str) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.is_in_group(group))
.collect()
}
pub fn get_tasks_by_tag(&self, tag: &str) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.has_tag(tag))
.collect()
}
pub fn get_tasks_by_tags(&self, tags: &[&str]) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| tags.iter().any(|tag| task.has_tag(tag)))
.collect()
}
pub fn get_tasks_with_all_tags(&self, tags: &[&str]) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| tags.iter().all(|tag| task.has_tag(tag)))
.collect()
}
pub fn get_all_groups(&self) -> HashSet<String> {
self.tasks
.values()
.filter_map(|task| task.group.clone())
.collect()
}
pub fn get_all_tags(&self) -> HashSet<String> {
self.tasks
.values()
.flat_map(|task| task.tags.iter().cloned())
.collect()
}
pub fn enable_group(&mut self, group: &str) -> Result<usize, ScheduleError> {
let mut count = 0;
for task in self.tasks.values_mut() {
if task.is_in_group(group) && !task.enabled {
task.enabled = true;
count += 1;
}
}
if count > 0 {
self.save_state()?;
}
Ok(count)
}
pub fn disable_group(&mut self, group: &str) -> Result<usize, ScheduleError> {
let mut count = 0;
for task in self.tasks.values_mut() {
if task.is_in_group(group) && task.enabled {
task.enabled = false;
count += 1;
}
}
if count > 0 {
self.save_state()?;
}
Ok(count)
}
pub fn enable_tag(&mut self, tag: &str) -> Result<usize, ScheduleError> {
let mut count = 0;
for task in self.tasks.values_mut() {
if task.has_tag(tag) && !task.enabled {
task.enabled = true;
count += 1;
}
}
if count > 0 {
self.save_state()?;
}
Ok(count)
}
pub fn disable_tag(&mut self, tag: &str) -> Result<usize, ScheduleError> {
let mut count = 0;
for task in self.tasks.values_mut() {
if task.has_tag(tag) && task.enabled {
task.enabled = false;
count += 1;
}
}
if count > 0 {
self.save_state()?;
}
Ok(count)
}
pub fn check_all_tasks_health(&self) -> Vec<HealthCheckResult> {
self.tasks
.values()
.map(|task| task.check_health())
.collect()
}
pub fn get_unhealthy_tasks(&self) -> Vec<HealthCheckResult> {
self.tasks
.values()
.map(|task| task.check_health())
.filter(|result| !result.health.is_healthy())
.collect()
}
pub fn detect_missed_schedules(&self, grace_period_seconds: u64) -> Vec<(String, Duration)> {
let now = Utc::now();
let grace_period = Duration::seconds(grace_period_seconds as i64);
let mut missed = Vec::new();
for (name, task) in &self.tasks {
if !task.enabled {
continue;
}
if let Ok(next_run) = task.schedule.next_run(task.last_run_at) {
let deadline = next_run + grace_period;
if now > deadline {
let missed_by = now - next_run;
missed.push((name.clone(), missed_by));
}
}
}
missed
}
pub fn check_missed_schedules(&mut self, grace_period_seconds: Option<u64>) -> usize {
let grace = grace_period_seconds.unwrap_or(60);
let missed = self.detect_missed_schedules(grace);
let mut alert_count = 0;
let now = Utc::now();
for (task_name, missed_by) in missed {
let expected_at = now - missed_by;
let alert = Alert {
timestamp: now,
task_name: task_name.clone(),
level: AlertLevel::Warning,
condition: AlertCondition::MissedSchedule {
expected_at,
detected_at: now,
},
message: format!(
"Task missed its schedule by {} seconds",
missed_by.num_seconds()
),
metadata: HashMap::new(),
};
if self.alert_manager.record_alert(alert) {
alert_count += 1;
}
}
alert_count
}
pub fn get_missed_schedule_stats(
&self,
grace_period_seconds: Option<u64>,
) -> Vec<(String, i64, String)> {
let grace = grace_period_seconds.unwrap_or(60);
let mut stats: Vec<(String, i64, String)> = self
.detect_missed_schedules(grace)
.into_iter()
.map(|(name, missed_by)| {
let schedule_type = if let Some(task) = self.tasks.get(&name) {
format!("{}", task.schedule)
} else {
"Unknown".to_string()
};
(name, missed_by.num_seconds(), schedule_type)
})
.collect();
stats.sort_by(|a, b| b.1.cmp(&a.1));
stats
}
pub fn get_tasks_with_warnings(&self) -> Vec<HealthCheckResult> {
self.tasks
.values()
.map(|task| task.check_health())
.filter(|result| result.health.has_warnings())
.collect()
}
pub fn get_tasks_with_errors(&self) -> Vec<HealthCheckResult> {
self.tasks
.values()
.map(|task| task.check_health())
.filter(|result| result.health.is_unhealthy())
.collect()
}
pub fn get_stuck_tasks(&self) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| task.is_stuck().is_some())
.collect()
}
pub fn validate_all_schedules(&self) -> Vec<(String, Result<(), ScheduleError>)> {
self.tasks
.iter()
.map(|(name, task)| (name.clone(), task.validate_schedule()))
.collect()
}
pub fn get_metrics(&self) -> SchedulerMetrics {
SchedulerMetrics::from_scheduler(self)
}
pub fn get_all_task_statistics(&self) -> Vec<TaskStatistics> {
self.tasks.values().map(TaskStatistics::from_task).collect()
}
pub fn get_task_statistics(&self, name: &str) -> Option<TaskStatistics> {
self.tasks.get(name).map(TaskStatistics::from_task)
}
pub fn get_group_statistics(&self, group: &str) -> Vec<TaskStatistics> {
self.tasks
.values()
.filter(|task| task.is_in_group(group))
.map(TaskStatistics::from_task)
.collect()
}
pub fn get_tag_statistics(&self, tag: &str) -> Vec<TaskStatistics> {
self.tasks
.values()
.filter(|task| task.has_tag(tag))
.map(TaskStatistics::from_task)
.collect()
}
pub fn has_circular_dependency(&self, task_name: &str) -> bool {
let mut visited = HashSet::new();
let mut stack = HashSet::new();
self.has_circular_dependency_helper(task_name, &mut visited, &mut stack)
}
fn has_circular_dependency_helper(
&self,
task_name: &str,
visited: &mut HashSet<String>,
stack: &mut HashSet<String>,
) -> bool {
if stack.contains(task_name) {
return true; }
if visited.contains(task_name) {
return false; }
visited.insert(task_name.to_string());
stack.insert(task_name.to_string());
if let Some(task) = self.tasks.get(task_name) {
for dep in &task.dependencies {
if self.has_circular_dependency_helper(dep, visited, stack) {
return true;
}
}
}
stack.remove(task_name);
false
}
pub fn get_dependency_chain(&self, task_name: &str) -> Result<Vec<String>, ScheduleError> {
if self.has_circular_dependency(task_name) {
return Err(ScheduleError::Invalid(format!(
"Circular dependency detected for task '{}'",
task_name
)));
}
let mut chain = Vec::new();
let mut visited = HashSet::new();
self.get_dependency_chain_helper(task_name, &mut chain, &mut visited);
Ok(chain)
}
fn get_dependency_chain_helper(
&self,
task_name: &str,
chain: &mut Vec<String>,
visited: &mut HashSet<String>,
) {
if visited.contains(task_name) {
return;
}
visited.insert(task_name.to_string());
if let Some(task) = self.tasks.get(task_name) {
for dep in &task.dependencies {
self.get_dependency_chain_helper(dep, chain, visited);
}
}
chain.push(task_name.to_string());
}
pub fn get_tasks_ready_with_dependencies(
&self,
completed_tasks: &HashSet<String>,
failed_tasks: &HashSet<String>,
) -> Vec<&ScheduledTask> {
self.tasks
.values()
.filter(|task| {
if !task.enabled {
return false;
}
if !task.is_due().unwrap_or(false) {
return false;
}
if task.wait_for_dependencies {
let status =
task.check_dependencies_with_failures(completed_tasks, failed_tasks);
status.is_satisfied()
} else {
true
}
})
.collect()
}
pub fn get_tasks_waiting_for_dependencies(
&self,
completed_tasks: &HashSet<String>,
) -> Vec<(&ScheduledTask, DependencyStatus)> {
self.tasks
.values()
.filter_map(|task| {
if task.enabled && task.has_dependencies() {
let status = task.check_dependencies(completed_tasks);
if !status.is_satisfied() {
return Some((task, status));
}
}
None
})
.collect()
}
pub fn get_tasks_with_failed_dependencies(
&self,
completed_tasks: &HashSet<String>,
failed_tasks: &HashSet<String>,
) -> Vec<(&ScheduledTask, DependencyStatus)> {
self.tasks
.values()
.filter_map(|task| {
if task.enabled && task.has_dependencies() {
let status =
task.check_dependencies_with_failures(completed_tasks, failed_tasks);
if status.has_failures() {
return Some((task, status));
}
}
None
})
.collect()
}
pub fn validate_dependencies(&self) -> Result<(), ScheduleError> {
for (task_name, task) in &self.tasks {
if self.has_circular_dependency(task_name) {
return Err(ScheduleError::Invalid(format!(
"Circular dependency detected for task '{}'",
task_name
)));
}
for dep in &task.dependencies {
if !self.tasks.contains_key(dep) {
return Err(ScheduleError::Invalid(format!(
"Task '{}' depends on non-existent task '{}'",
task_name, dep
)));
}
}
}
Ok(())
}
pub fn try_acquire_lock(
&mut self,
task_name: &str,
ttl: Option<u64>,
) -> Result<bool, ScheduleError> {
self.lock_manager
.try_acquire(task_name, &self.instance_id, ttl)
}
pub fn release_lock(&mut self, task_name: &str) -> Result<bool, ScheduleError> {
self.lock_manager.release(task_name, &self.instance_id)
}
pub fn renew_lock(&mut self, task_name: &str, ttl: Option<u64>) -> Result<bool, ScheduleError> {
self.lock_manager.renew(task_name, &self.instance_id, ttl)
}
pub fn is_task_locked(&self, task_name: &str) -> bool {
self.lock_manager.is_locked(task_name)
}
pub fn get_task_lock(&self, task_name: &str) -> Option<&ScheduleLock> {
self.lock_manager.get_lock(task_name)
}
pub fn cleanup_expired_locks(&mut self) {
self.lock_manager.cleanup_expired();
}
pub fn get_active_locks(&self) -> Vec<&ScheduleLock> {
self.lock_manager.get_active_locks()
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn set_instance_id(&mut self, id: String) {
self.instance_id = id;
}
pub fn execute_with_lock<F>(
&mut self,
task_name: &str,
ttl: Option<u64>,
mut f: F,
) -> Result<bool, ScheduleError>
where
F: FnMut() -> Result<(), ScheduleError>,
{
if !self.try_acquire_lock(task_name, ttl)? {
return Ok(false);
}
let result = f();
let _ = self.release_lock(task_name);
result.map(|_| true)
}
#[allow(clippy::too_many_arguments)]
pub fn detect_conflicts(
&self,
window_seconds: u64,
estimated_duration: u64,
) -> Vec<ScheduleConflict> {
let mut conflicts = Vec::new();
let now = Utc::now();
let window_end = now + Duration::seconds(window_seconds as i64);
let task_names: Vec<String> = self.tasks.keys().cloned().collect();
for i in 0..task_names.len() {
for j in (i + 1)..task_names.len() {
let task1_name = &task_names[i];
let task2_name = &task_names[j];
if let (Some(task1), Some(task2)) =
(self.tasks.get(task1_name), self.tasks.get(task2_name))
{
if !task1.enabled || !task2.enabled {
continue;
}
let next1 = match task1.schedule.next_run(task1.last_run_at) {
Ok(time) => time,
Err(_) => continue,
};
let next2 = match task2.schedule.next_run(task2.last_run_at) {
Ok(time) => time,
Err(_) => continue,
};
if next1 > window_end || next2 > window_end {
continue;
}
let task1_start = next1;
let task1_end = next1 + Duration::seconds(estimated_duration as i64);
let task2_start = next2;
let task2_end = next2 + Duration::seconds(estimated_duration as i64);
if task1_start < task2_end && task2_start < task1_end {
let overlap_start = if task1_start > task2_start {
task1_start
} else {
task2_start
};
let overlap_end = if task1_end < task2_end {
task1_end
} else {
task2_end
};
let overlap_seconds = (overlap_end - overlap_start).num_seconds() as u64;
let severity = if overlap_seconds >= estimated_duration {
ConflictSeverity::High
} else if overlap_seconds >= estimated_duration / 2 {
ConflictSeverity::Medium
} else {
ConflictSeverity::Low
};
let description = format!(
"Tasks will run at overlapping times: {} at {}, {} at {}",
task1_name,
next1.format("%Y-%m-%d %H:%M:%S"),
task2_name,
next2.format("%Y-%m-%d %H:%M:%S")
);
let resolution =
"Consider adjusting schedules or using jitter to avoid overlap"
.to_string();
conflicts.push(
ScheduleConflict::new(
task1_name.clone(),
task2_name.clone(),
severity,
overlap_seconds,
description,
)
.with_resolution(resolution),
);
}
}
}
}
conflicts
}
pub fn get_high_severity_conflicts(
&self,
window_seconds: u64,
estimated_duration: u64,
) -> Vec<ScheduleConflict> {
self.detect_conflicts(window_seconds, estimated_duration)
.into_iter()
.filter(|c| c.is_high_severity())
.collect()
}
pub fn get_medium_severity_conflicts(
&self,
window_seconds: u64,
estimated_duration: u64,
) -> Vec<ScheduleConflict> {
self.detect_conflicts(window_seconds, estimated_duration)
.into_iter()
.filter(|c| c.is_medium_severity())
.collect()
}
pub fn has_conflicts(&self, window_seconds: u64, estimated_duration: u64) -> bool {
!self
.detect_conflicts(window_seconds, estimated_duration)
.is_empty()
}
pub fn conflict_count(&self, window_seconds: u64, estimated_duration: u64) -> usize {
self.detect_conflicts(window_seconds, estimated_duration)
.len()
}
pub fn auto_resolve_conflicts(
&mut self,
window_seconds: u64,
estimated_duration: u64,
jitter_seconds: Option<u64>,
) -> Vec<(String, String)> {
let jitter = jitter_seconds.unwrap_or(30);
let mut resolutions = Vec::new();
let conflicts = self.detect_conflicts(window_seconds, estimated_duration);
for conflict in conflicts {
let task1 = self.tasks.get(&conflict.task1);
let task2 = self.tasks.get(&conflict.task2);
if let (Some(t1), Some(t2)) = (task1, task2) {
let priority1 = t1.options.priority.unwrap_or(5); let priority2 = t2.options.priority.unwrap_or(5);
match priority1.cmp(&priority2) {
std::cmp::Ordering::Greater => {
if let Some(task) = self.tasks.get_mut(&conflict.task2) {
if task.jitter.is_none() {
task.jitter = Some(Jitter::positive(jitter as i64));
resolutions.push((
conflict.task2.clone(),
format!(
"Applied +{}s jitter (lower priority than {})",
jitter, conflict.task1
),
));
}
}
}
std::cmp::Ordering::Less => {
if let Some(task) = self.tasks.get_mut(&conflict.task1) {
if task.jitter.is_none() {
task.jitter = Some(Jitter::positive(jitter as i64));
resolutions.push((
conflict.task1.clone(),
format!(
"Applied +{}s jitter (lower priority than {})",
jitter, conflict.task2
),
));
}
}
}
std::cmp::Ordering::Equal => {
if conflict.severity == ConflictSeverity::High {
resolutions.push((
format!("{} & {}", conflict.task1, conflict.task2),
"HIGH SEVERITY: Manual review recommended - tasks have equal priority and significant overlap".to_string(),
));
} else {
if let Some(task) = self.tasks.get_mut(&conflict.task1) {
if task.jitter.is_none() {
task.jitter = Some(Jitter::symmetric((jitter / 2) as i64));
}
}
if let Some(task) = self.tasks.get_mut(&conflict.task2) {
if task.jitter.is_none() {
task.jitter = Some(Jitter::symmetric((jitter / 2) as i64));
}
}
resolutions.push((
format!("{} & {}", conflict.task1, conflict.task2),
format!(
"Applied ±{}s symmetric jitter to both (equal priority)",
jitter / 2
),
));
}
}
}
}
}
if !resolutions.is_empty() {
let _ = self.save_state();
}
resolutions
}
pub fn preview_conflict_resolutions(
&self,
window_seconds: u64,
estimated_duration: u64,
jitter_seconds: Option<u64>,
) -> Vec<(String, String)> {
let jitter = jitter_seconds.unwrap_or(30);
let mut resolutions = Vec::new();
let conflicts = self.detect_conflicts(window_seconds, estimated_duration);
for conflict in conflicts {
let task1 = self.tasks.get(&conflict.task1);
let task2 = self.tasks.get(&conflict.task2);
if let (Some(t1), Some(t2)) = (task1, task2) {
let priority1 = t1.options.priority.unwrap_or(5);
let priority2 = t2.options.priority.unwrap_or(5);
match priority1.cmp(&priority2) {
std::cmp::Ordering::Greater => {
resolutions.push((
conflict.task2.clone(),
format!(
"Would apply +{}s jitter (priority {} < {})",
jitter, priority2, priority1
),
));
}
std::cmp::Ordering::Less => {
resolutions.push((
conflict.task1.clone(),
format!(
"Would apply +{}s jitter (priority {} < {})",
jitter, priority1, priority2
),
));
}
std::cmp::Ordering::Equal => {
if conflict.severity == ConflictSeverity::High {
resolutions.push((
format!("{} & {}", conflict.task1, conflict.task2),
"Would recommend manual review (high severity, equal priority)"
.to_string(),
));
} else {
resolutions.push((
format!("{} & {}", conflict.task1, conflict.task2),
format!(
"Would apply ±{}s symmetric jitter to both (priority {})",
jitter / 2,
priority1
),
));
}
}
}
}
}
resolutions
}
pub fn clear_conflict_jitter(&mut self) {
for task in self.tasks.values_mut() {
task.jitter = None;
}
let _ = self.save_state();
}
}
impl Default for BeatScheduler {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Error)]
pub enum ScheduleError {
#[error("Invalid schedule: {0}")]
Invalid(String),
#[error("Not implemented: {0}")]
NotImplemented(String),
#[error("Parsing error: {0}")]
Parse(String),
#[error("Persistence error: {0}")]
Persistence(String),
}
impl ScheduleError {
pub fn is_invalid(&self) -> bool {
matches!(self, ScheduleError::Invalid(_))
}
pub fn is_not_implemented(&self) -> bool {
matches!(self, ScheduleError::NotImplemented(_))
}
pub fn is_parse(&self) -> bool {
matches!(self, ScheduleError::Parse(_))
}
pub fn is_persistence(&self) -> bool {
matches!(self, ScheduleError::Persistence(_))
}
pub fn is_retryable(&self) -> bool {
matches!(self, ScheduleError::Persistence(_))
}
pub fn category(&self) -> &'static str {
match self {
ScheduleError::Invalid(_) => "invalid",
ScheduleError::NotImplemented(_) => "not_implemented",
ScheduleError::Parse(_) => "parse",
ScheduleError::Persistence(_) => "persistence",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default = "default_webhook_timeout")]
pub timeout_seconds: u64,
#[serde(default)]
pub alert_levels: Vec<AlertLevel>,
}
#[allow(dead_code)]
fn default_webhook_timeout() -> u64 {
30
}
impl WebhookConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
headers: HashMap::new(),
timeout_seconds: default_webhook_timeout(),
alert_levels: Vec::new(),
}
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = timeout_seconds;
self
}
pub fn with_alert_levels(mut self, levels: Vec<AlertLevel>) -> Self {
self.alert_levels = levels;
self
}
pub fn should_send(&self, alert: &Alert) -> bool {
if self.alert_levels.is_empty() {
return true;
}
self.alert_levels.contains(&alert.level)
}
pub fn create_payload(&self, alert: &Alert) -> serde_json::Value {
serde_json::json!({
"timestamp": alert.timestamp.to_rfc3339(),
"task_name": alert.task_name,
"level": format!("{:?}", alert.level),
"condition": format!("{:?}", alert.condition),
"message": alert.message,
"metadata": alert.metadata,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DayOfWeek {
Monday,
Tuesday,
Wednesday,
Thursday,
Friday,
Saturday,
Sunday,
}
impl DayOfWeek {
pub fn is_weekend(&self) -> bool {
matches!(self, DayOfWeek::Saturday | DayOfWeek::Sunday)
}
pub fn is_weekday(&self) -> bool {
!self.is_weekend()
}
pub fn from_chrono(weekday: chrono::Weekday) -> Self {
match weekday {
chrono::Weekday::Mon => DayOfWeek::Monday,
chrono::Weekday::Tue => DayOfWeek::Tuesday,
chrono::Weekday::Wed => DayOfWeek::Wednesday,
chrono::Weekday::Thu => DayOfWeek::Thursday,
chrono::Weekday::Fri => DayOfWeek::Friday,
chrono::Weekday::Sat => DayOfWeek::Saturday,
chrono::Weekday::Sun => DayOfWeek::Sunday,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BusinessHours {
pub start_hour: u32,
pub end_hour: u32,
}
impl BusinessHours {
pub fn new(start_hour: u32, end_hour: u32) -> Self {
Self {
start_hour,
end_hour,
}
}
pub fn standard() -> Self {
Self {
start_hour: 9,
end_hour: 17,
}
}
pub fn is_business_hour(&self, hour: u32) -> bool {
hour >= self.start_hour && hour < self.end_hour
}
pub fn is_within(&self, time: &DateTime<Utc>) -> bool {
self.is_business_hour(time.hour())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BusinessCalendar {
pub business_hours: BusinessHours,
#[serde(default = "default_working_days")]
pub working_days: Vec<DayOfWeek>,
}
#[allow(dead_code)]
fn default_working_days() -> Vec<DayOfWeek> {
vec![
DayOfWeek::Monday,
DayOfWeek::Tuesday,
DayOfWeek::Wednesday,
DayOfWeek::Thursday,
DayOfWeek::Friday,
]
}
impl BusinessCalendar {
pub fn standard() -> Self {
Self {
business_hours: BusinessHours::standard(),
working_days: default_working_days(),
}
}
pub fn new(business_hours: BusinessHours, working_days: Vec<DayOfWeek>) -> Self {
Self {
business_hours,
working_days,
}
}
pub fn is_working_day(&self, day: DayOfWeek) -> bool {
self.working_days.contains(&day)
}
pub fn is_business_time(&self, time: &DateTime<Utc>) -> bool {
let day = DayOfWeek::from_chrono(time.weekday());
self.is_working_day(day) && self.business_hours.is_within(time)
}
pub fn next_business_time(&self, time: DateTime<Utc>) -> DateTime<Utc> {
let mut current = time;
for _ in 0..14 {
let day = DayOfWeek::from_chrono(current.weekday());
if self.is_working_day(day) {
let hour = current.hour();
if hour < self.business_hours.start_hour {
current = current
.with_hour(self.business_hours.start_hour)
.unwrap()
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap();
return current;
} else if hour < self.business_hours.end_hour {
return current;
}
}
current = (current + Duration::days(1))
.with_hour(self.business_hours.start_hour)
.unwrap()
.with_minute(0)
.unwrap()
.with_second(0)
.unwrap();
}
current
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Holiday {
pub name: String,
pub date: (i32, u32, u32),
}
impl Holiday {
pub fn new(name: impl Into<String>, year: i32, month: u32, day: u32) -> Self {
Self {
name: name.into(),
date: (year, month, day),
}
}
pub fn matches(&self, date: &DateTime<Utc>) -> bool {
let (year, month, day) = self.date;
date.year() == year && date.month() == month && date.day() == day
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HolidayCalendar {
holidays: Vec<Holiday>,
}
impl HolidayCalendar {
pub fn new() -> Self {
Self {
holidays: Vec::new(),
}
}
pub fn add_holiday(&mut self, holiday: Holiday) {
self.holidays.push(holiday);
}
pub fn add(&mut self, name: impl Into<String>, year: i32, month: u32, day: u32) {
self.holidays.push(Holiday::new(name, year, month, day));
}
pub fn is_holiday(&self, date: &DateTime<Utc>) -> bool {
self.holidays.iter().any(|h| h.matches(date))
}
pub fn get_holiday(&self, date: &DateTime<Utc>) -> Option<&Holiday> {
self.holidays.iter().find(|h| h.matches(date))
}
pub fn len(&self) -> usize {
self.holidays.len()
}
pub fn is_empty(&self) -> bool {
self.holidays.is_empty()
}
pub fn next_non_holiday(&self, time: DateTime<Utc>) -> DateTime<Utc> {
let mut current = time;
for _ in 0..365 {
if !self.is_holiday(¤t) {
return current;
}
current += Duration::days(1);
}
current
}
pub fn us_federal(year: i32) -> Self {
let mut calendar = Self::new();
calendar.add("New Year's Day", year, 1, 1);
if let Some((_, day)) = Self::nth_weekday(year, 1, 1, 3) {
calendar.add("Martin Luther King Jr Day", year, 1, day);
}
if let Some((_, day)) = Self::nth_weekday(year, 2, 1, 3) {
calendar.add("Presidents Day", year, 2, day);
}
if let Some((_, day)) = Self::last_weekday(year, 5, 1) {
calendar.add("Memorial Day", year, 5, day);
}
calendar.add("Juneteenth", year, 6, 19);
calendar.add("Independence Day", year, 7, 4);
if let Some((_, day)) = Self::nth_weekday(year, 9, 1, 1) {
calendar.add("Labor Day", year, 9, day);
}
if let Some((_, day)) = Self::nth_weekday(year, 10, 1, 2) {
calendar.add("Columbus Day", year, 10, day);
}
calendar.add("Veterans Day", year, 11, 11);
if let Some((_, day)) = Self::nth_weekday(year, 11, 4, 4) {
calendar.add("Thanksgiving Day", year, 11, day);
}
calendar.add("Christmas Day", year, 12, 25);
calendar
}
fn nth_weekday(year: i32, month: u32, weekday: u32, nth: u32) -> Option<(u32, u32)> {
use chrono::NaiveDate;
let first_day = NaiveDate::from_ymd_opt(year, month, 1)?;
let first_weekday = first_day.weekday().num_days_from_sunday();
let days_until_first = if weekday >= first_weekday {
weekday - first_weekday
} else {
7 - (first_weekday - weekday)
};
let target_day = 1 + days_until_first + (nth - 1) * 7;
if NaiveDate::from_ymd_opt(year, month, target_day).is_some() {
Some((month, target_day))
} else {
None
}
}
fn last_weekday(year: i32, month: u32, weekday: u32) -> Option<(u32, u32)> {
use chrono::NaiveDate;
let next_month = if month == 12 { 1 } else { month + 1 };
let next_year = if month == 12 { year + 1 } else { year };
let last_day = NaiveDate::from_ymd_opt(next_year, next_month, 1)?.pred_opt()?;
for days_back in 0..7 {
if let Some(date) = last_day.checked_sub_signed(Duration::days(days_back)) {
if date.weekday().num_days_from_sunday() == weekday {
return Some((month, date.day()));
}
}
}
None
}
pub fn japan(year: i32) -> Self {
let mut calendar = Self::new();
calendar.add("New Year's Day", year, 1, 1);
if let Some((_, day)) = Self::nth_weekday(year, 1, 1, 2) {
calendar.add("Coming of Age Day", year, 1, day);
}
calendar.add("National Foundation Day", year, 2, 11);
calendar.add("Emperor's Birthday", year, 2, 23);
calendar.add("Vernal Equinox Day", year, 3, 20);
calendar.add("Showa Day", year, 4, 29);
calendar.add("Constitution Memorial Day", year, 5, 3);
calendar.add("Greenery Day", year, 5, 4);
calendar.add("Children's Day", year, 5, 5);
if let Some((_, day)) = Self::nth_weekday(year, 7, 1, 3) {
calendar.add("Marine Day", year, 7, day);
}
calendar.add("Mountain Day", year, 8, 11);
if let Some((_, day)) = Self::nth_weekday(year, 9, 1, 3) {
calendar.add("Respect for the Aged Day", year, 9, day);
}
calendar.add("Autumnal Equinox Day", year, 9, 23);
if let Some((_, day)) = Self::nth_weekday(year, 10, 1, 2) {
calendar.add("Sports Day", year, 10, day);
}
calendar.add("Culture Day", year, 11, 3);
calendar.add("Labor Thanksgiving Day", year, 11, 23);
calendar
}
pub fn uk(year: i32) -> Self {
let mut calendar = Self::new();
calendar.add("New Year's Day", year, 1, 1);
calendar.add("Good Friday", year, 4, 15);
calendar.add("Easter Monday", year, 4, 18);
if let Some((_, day)) = Self::nth_weekday(year, 5, 1, 1) {
calendar.add("Early May Bank Holiday", year, 5, day);
}
if let Some((_, day)) = Self::last_weekday(year, 5, 1) {
calendar.add("Spring Bank Holiday", year, 5, day);
}
if let Some((_, day)) = Self::last_weekday(year, 8, 1) {
calendar.add("Summer Bank Holiday", year, 8, day);
}
calendar.add("Christmas Day", year, 12, 25);
calendar.add("Boxing Day", year, 12, 26);
calendar
}
pub fn canada(year: i32) -> Self {
let mut calendar = Self::new();
calendar.add("New Year's Day", year, 1, 1);
calendar.add("Good Friday", year, 4, 15);
if let Some((_, day)) = Self::monday_on_or_before(year, 5, 24) {
calendar.add("Victoria Day", year, 5, day);
}
calendar.add("Canada Day", year, 7, 1);
if let Some((_, day)) = Self::nth_weekday(year, 9, 1, 1) {
calendar.add("Labour Day", year, 9, day);
}
if let Some((_, day)) = Self::nth_weekday(year, 10, 1, 2) {
calendar.add("Thanksgiving", year, 10, day);
}
calendar.add("Remembrance Day", year, 11, 11);
calendar.add("Christmas Day", year, 12, 25);
calendar.add("Boxing Day", year, 12, 26);
calendar
}
fn monday_on_or_before(year: i32, month: u32, day: u32) -> Option<(u32, u32)> {
use chrono::NaiveDate;
let date = NaiveDate::from_ymd_opt(year, month, day)?;
let weekday = date.weekday().num_days_from_sunday();
let days_back = if weekday >= 1 {
weekday - 1
} else {
6 };
let target = date.checked_sub_signed(Duration::days(days_back as i64))?;
Some((target.month(), target.day()))
}
}
impl ScheduledTask {
pub fn with_business_calendar(self, _calendar: BusinessCalendar) -> Self {
self
}
pub fn with_holiday_calendar(self, _calendar: HolidayCalendar) -> Self {
self
}
}
impl BeatScheduler {
pub fn register_webhook(&mut self, webhook: WebhookConfig) {
let webhook = Arc::new(webhook);
self.alert_manager
.add_callback(Arc::new(move |alert: &Alert| {
if webhook.should_send(alert) {
let payload = webhook.create_payload(alert);
eprintln!("Webhook alert to {}: {}", webhook.url, payload);
}
}));
}
}
#[derive(Debug, Clone, Default)]
pub struct ScheduleIndex {
by_type: HashMap<String, HashSet<String>>,
by_next_run: Vec<(String, DateTime<Utc>)>,
dirty: bool,
}
impl ScheduleIndex {
pub fn new() -> Self {
Self {
by_type: HashMap::new(),
by_next_run: Vec::new(),
dirty: false,
}
}
pub fn add_task(&mut self, task: &ScheduledTask) {
let type_key = Self::schedule_type_key(&task.schedule);
self.by_type
.entry(type_key)
.or_default()
.insert(task.name.clone());
if let Ok(next_run) = task.next_run_time() {
self.by_next_run.push((task.name.clone(), next_run));
self.dirty = true;
}
}
pub fn remove_task(&mut self, task: &ScheduledTask) {
let type_key = Self::schedule_type_key(&task.schedule);
if let Some(set) = self.by_type.get_mut(&type_key) {
set.remove(&task.name);
}
self.by_next_run.retain(|(name, _)| name != &task.name);
self.dirty = true;
}
pub fn update_task(&mut self, old_task: &ScheduledTask, new_task: &ScheduledTask) {
self.remove_task(old_task);
self.add_task(new_task);
}
pub fn rebuild(&mut self, tasks: &HashMap<String, ScheduledTask>) {
self.by_type.clear();
self.by_next_run.clear();
for task in tasks.values() {
if task.enabled {
self.add_task(task);
}
}
self.sort_by_next_run();
self.dirty = false;
}
fn sort_by_next_run(&mut self) {
if self.dirty {
self.by_next_run.sort_by_key(|(_, time)| *time);
self.dirty = false;
}
}
pub fn get_by_type(&self, schedule_type: &str) -> Option<&HashSet<String>> {
self.by_type.get(schedule_type)
}
pub fn get_next_due(&mut self, limit: usize, now: DateTime<Utc>) -> Vec<String> {
self.sort_by_next_run();
self.by_next_run
.iter()
.filter(|(_, time)| *time <= now)
.take(limit)
.map(|(name, _)| name.clone())
.collect()
}
pub fn earliest_next_run(&mut self) -> Option<DateTime<Utc>> {
self.sort_by_next_run();
self.by_next_run.first().map(|(_, time)| *time)
}
fn schedule_type_key(schedule: &Schedule) -> String {
match schedule {
Schedule::Interval { .. } => "interval".to_string(),
#[cfg(feature = "cron")]
Schedule::Crontab { .. } => "crontab".to_string(),
#[cfg(feature = "solar")]
Schedule::Solar { .. } => "solar".to_string(),
Schedule::OneTime { .. } => "onetime".to_string(),
}
}
pub fn is_dirty(&self) -> bool {
self.dirty
}
pub fn mark_dirty(&mut self) {
self.dirty = true;
}
pub fn count_by_type(&self, schedule_type: &str) -> usize {
self.by_type
.get(schedule_type)
.map(|set| set.len())
.unwrap_or(0)
}
pub fn total_count(&self) -> usize {
self.by_next_run.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlackoutPeriod {
pub name: String,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recurring: Option<BlackoutRecurrence>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BlackoutRecurrence {
Daily,
Weekly,
Monthly,
}
impl BlackoutPeriod {
pub fn new(name: impl Into<String>, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
Self {
name: name.into(),
start,
end,
recurring: None,
}
}
pub fn with_recurrence(mut self, recurrence: BlackoutRecurrence) -> Self {
self.recurring = Some(recurrence);
self
}
pub fn is_blackout(&self, time: DateTime<Utc>) -> bool {
if time >= self.start && time <= self.end {
return true;
}
if let Some(ref recurrence) = self.recurring {
match recurrence {
BlackoutRecurrence::Daily => {
let start_time = (self.start.hour(), self.start.minute());
let end_time = (self.end.hour(), self.end.minute());
let current_time = (time.hour(), time.minute());
current_time >= start_time && current_time <= end_time
}
BlackoutRecurrence::Weekly => {
if time.weekday() == self.start.weekday() {
let start_time = (self.start.hour(), self.start.minute());
let end_time = (self.end.hour(), self.end.minute());
let current_time = (time.hour(), time.minute());
current_time >= start_time && current_time <= end_time
} else {
false
}
}
BlackoutRecurrence::Monthly => {
if time.day() == self.start.day() {
let start_time = (self.start.hour(), self.start.minute());
let end_time = (self.end.hour(), self.end.minute());
let current_time = (time.hour(), time.minute());
current_time >= start_time && current_time <= end_time
} else {
false
}
}
}
} else {
false
}
}
pub fn next_available_time(&self, time: DateTime<Utc>) -> DateTime<Utc> {
if !self.is_blackout(time) {
return time;
}
let mut current = self.end;
if let Some(ref recurrence) = self.recurring {
while self.is_blackout(current) {
current = match recurrence {
BlackoutRecurrence::Daily => current + Duration::days(1),
BlackoutRecurrence::Weekly => current + Duration::weeks(1),
BlackoutRecurrence::Monthly => {
if current.month() == 12 {
current
.with_year(current.year() + 1)
.unwrap()
.with_month(1)
.unwrap()
} else {
current.with_month(current.month() + 1).unwrap()
}
}
};
}
}
current
}
}
#[derive(Debug, Clone, Default)]
pub struct CalendarWithBlackout {
pub holiday_calendar: HolidayCalendar,
pub business_calendar: Option<BusinessCalendar>,
pub blackout_periods: Vec<BlackoutPeriod>,
pub holidays_only: bool,
}
impl CalendarWithBlackout {
pub fn new() -> Self {
Self {
holiday_calendar: HolidayCalendar::new(),
business_calendar: None,
blackout_periods: Vec::new(),
holidays_only: false,
}
}
pub fn add_blackout(&mut self, blackout: BlackoutPeriod) {
self.blackout_periods.push(blackout);
}
pub fn set_holidays_only(&mut self, enabled: bool) {
self.holidays_only = enabled;
}
pub fn set_business_calendar(&mut self, calendar: BusinessCalendar) {
self.business_calendar = Some(calendar);
}
pub fn is_valid_time(&self, time: DateTime<Utc>) -> bool {
for blackout in &self.blackout_periods {
if blackout.is_blackout(time) {
return false;
}
}
if self.holidays_only && !self.holiday_calendar.is_holiday(&time) {
return false;
}
if let Some(ref business) = self.business_calendar {
if !business.is_business_time(&time) {
return false;
}
}
true
}
pub fn next_valid_time(&self, mut time: DateTime<Utc>) -> DateTime<Utc> {
for _ in 0..365 {
let mut in_blackout = false;
for blackout in &self.blackout_periods {
if blackout.is_blackout(time) {
time = blackout.next_available_time(time);
in_blackout = true;
break;
}
}
if in_blackout {
continue;
}
if self.holidays_only && !self.holiday_calendar.is_holiday(&time) {
time += Duration::days(1);
continue;
}
if let Some(ref business) = self.business_calendar {
if !business.is_business_time(&time) {
time = business.next_business_time(time);
continue;
}
}
return time;
}
time
}
}
#[derive(Debug, Clone)]
pub struct CompositeSchedule {
schedules: Vec<Schedule>,
mode: CompositeMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompositeMode {
And,
Or,
}
impl CompositeSchedule {
pub fn and(schedules: Vec<Schedule>) -> Self {
Self {
schedules,
mode: CompositeMode::And,
}
}
pub fn or(schedules: Vec<Schedule>) -> Self {
Self {
schedules,
mode: CompositeMode::Or,
}
}
pub fn next_run(
&self,
last_run: Option<DateTime<Utc>>,
) -> Result<DateTime<Utc>, ScheduleError> {
if self.schedules.is_empty() {
return Err(ScheduleError::Invalid(
"Composite schedule has no sub-schedules".to_string(),
));
}
let mut next_runs = Vec::new();
for schedule in &self.schedules {
match schedule.next_run(last_run) {
Ok(next) => next_runs.push(next),
Err(e) => {
if self.mode == CompositeMode::And {
return Err(e);
}
}
}
}
if next_runs.is_empty() {
return Err(ScheduleError::Invalid(
"No valid next run time from any sub-schedule".to_string(),
));
}
match self.mode {
CompositeMode::And => {
Ok(*next_runs.iter().max().unwrap())
}
CompositeMode::Or => {
Ok(*next_runs.iter().min().unwrap())
}
}
}
pub fn is_due(&self, last_run: Option<DateTime<Utc>>) -> Result<bool, ScheduleError> {
let next_run = self.next_run(last_run)?;
Ok(Utc::now() >= next_run)
}
pub fn schedule_count(&self) -> usize {
self.schedules.len()
}
pub fn mode(&self) -> CompositeMode {
self.mode
}
pub fn schedules(&self) -> &[Schedule] {
&self.schedules
}
}
impl std::fmt::Display for CompositeSchedule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mode_str = match self.mode {
CompositeMode::And => "AND",
CompositeMode::Or => "OR",
};
write!(f, "Composite[{}](", mode_str)?;
for (i, schedule) in self.schedules.iter().enumerate() {
if i > 0 {
write!(f, " {} ", mode_str)?;
}
write!(f, "{}", schedule)?;
}
write!(f, ")")
}
}
type CustomScheduleFn =
Arc<dyn Fn(Option<DateTime<Utc>>) -> Result<DateTime<Utc>, ScheduleError> + Send + Sync>;
pub struct CustomSchedule {
pub name: String,
next_run_fn: CustomScheduleFn,
}
impl CustomSchedule {
pub fn new<F>(name: impl Into<String>, next_run_fn: F) -> Self
where
F: Fn(Option<DateTime<Utc>>) -> Result<DateTime<Utc>, ScheduleError>
+ Send
+ Sync
+ 'static,
{
Self {
name: name.into(),
next_run_fn: Arc::new(next_run_fn),
}
}
pub fn next_run(
&self,
last_run: Option<DateTime<Utc>>,
) -> Result<DateTime<Utc>, ScheduleError> {
(self.next_run_fn)(last_run)
}
pub fn is_due(&self, last_run: Option<DateTime<Utc>>) -> Result<bool, ScheduleError> {
let next_run = self.next_run(last_run)?;
Ok(Utc::now() >= next_run)
}
}
impl std::fmt::Debug for CustomSchedule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CustomSchedule")
.field("name", &self.name)
.finish()
}
}
impl std::fmt::Display for CustomSchedule {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Custom[{}]", self.name)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskWaitingInfo {
pub task_name: String,
pub original_priority: Option<u8>,
pub effective_priority: u8,
pub waiting_duration: chrono::Duration,
pub priority_boosted: bool,
pub boost_reason: Option<String>,
}
impl BeatScheduler {
pub fn get_tasks_with_starvation_prevention(
&self,
starvation_threshold_minutes: Option<i64>,
boost_amount: Option<u8>,
) -> Vec<TaskWaitingInfo> {
let threshold_minutes = starvation_threshold_minutes.unwrap_or(60);
let boost = boost_amount.unwrap_or(2);
let now = Utc::now();
self.tasks
.values()
.filter(|task| task.enabled)
.map(|task| {
let original_priority = task.options.priority;
let base_priority = original_priority.unwrap_or(5);
let waiting_duration = if let Some(last_run) = task.last_run_at {
now - last_run
} else {
Duration::hours(24 * 365)
};
let waiting_minutes = waiting_duration.num_minutes();
let should_boost = waiting_minutes >= threshold_minutes && base_priority < 7;
let (effective_priority, priority_boosted, boost_reason) = if should_boost {
let boosted = (base_priority + boost).min(9);
(
boosted,
true,
Some(format!(
"Waited {} minutes (threshold: {} min)",
waiting_minutes, threshold_minutes
)),
)
} else {
(base_priority, false, None)
};
TaskWaitingInfo {
task_name: task.name.clone(),
original_priority,
effective_priority,
waiting_duration,
priority_boosted,
boost_reason,
}
})
.collect()
}
pub fn get_due_tasks_with_starvation_prevention(
&self,
starvation_threshold_minutes: Option<i64>,
) -> Vec<&ScheduledTask> {
let waiting_info =
self.get_tasks_with_starvation_prevention(starvation_threshold_minutes, Some(2));
let effective_priorities: HashMap<String, u8> = waiting_info
.into_iter()
.map(|info| (info.task_name, info.effective_priority))
.collect();
let mut due_tasks: Vec<&ScheduledTask> = self
.tasks
.values()
.filter(|task| task.enabled && task.is_due().unwrap_or(false))
.collect();
due_tasks.sort_by(|a, b| {
let a_priority = effective_priorities.get(&a.name).copied().unwrap_or(5);
let b_priority = effective_priorities.get(&b.name).copied().unwrap_or(5);
match b_priority.cmp(&a_priority) {
std::cmp::Ordering::Equal => {
match (a.next_run_time(), b.next_run_time()) {
(Ok(a_time), Ok(b_time)) => a_time.cmp(&b_time),
_ => std::cmp::Ordering::Equal,
}
}
other => other,
}
});
due_tasks
}
pub fn preview_upcoming_executions(
&self,
count: Option<usize>,
task_name: Option<&str>,
) -> HashMap<String, Vec<DateTime<Utc>>> {
let preview_count = count.unwrap_or(10);
let mut preview = HashMap::new();
let tasks: Vec<&ScheduledTask> = if let Some(name) = task_name {
self.tasks.get(name).into_iter().collect()
} else {
self.tasks.values().collect()
};
for task in tasks {
let mut upcoming = Vec::new();
let mut last_run = task.last_run_at;
for _ in 0..preview_count {
match task.schedule.next_run(last_run) {
Ok(next_time) => {
upcoming.push(next_time);
last_run = Some(next_time);
}
Err(_) => break, }
}
if !upcoming.is_empty() {
preview.insert(task.name.clone(), upcoming);
}
}
preview
}
pub fn dry_run(
&self,
duration_seconds: i64,
tick_interval_seconds: Option<i64>,
) -> Vec<(DateTime<Utc>, String)> {
let tick_interval = Duration::seconds(tick_interval_seconds.unwrap_or(1));
let end_time = Utc::now() + Duration::seconds(duration_seconds);
let mut current_time = Utc::now();
let mut executions = Vec::new();
let mut task_state: HashMap<String, Option<DateTime<Utc>>> = self
.tasks
.iter()
.map(|(name, task)| (name.clone(), task.last_run_at))
.collect();
while current_time < end_time {
for (name, task) in &self.tasks {
if !task.enabled {
continue;
}
let last_run = task_state.get(name).and_then(|t| *t);
if let Ok(next_run) = task.schedule.next_run(last_run) {
if next_run <= current_time && last_run.is_none_or(|lr| lr < next_run) {
executions.push((current_time, name.clone()));
task_state.insert(name.clone(), Some(current_time));
}
}
}
current_time += tick_interval;
}
executions.sort_by_key(|(time, _)| *time);
executions
}
pub fn get_comprehensive_stats(&self) -> SchedulerStatistics {
let total_tasks = self.tasks.len();
let enabled_tasks = self.tasks.values().filter(|t| t.enabled).count();
let disabled_tasks = total_tasks - enabled_tasks;
let mut total_executions = 0u64;
let mut total_failures = 0u64;
let mut total_timeouts = 0u64;
let mut total_duration_ms = 0u64;
let mut execution_count = 0u64;
let mut tasks_in_retry = 0;
let mut tasks_with_failures = 0;
let mut healthy_tasks = 0;
let mut warning_tasks = 0;
let mut unhealthy_tasks = 0;
let now = Utc::now();
let mut oldest_execution: Option<DateTime<Utc>> = None;
let mut newest_execution: Option<DateTime<Utc>> = None;
for task in self.tasks.values() {
total_executions += task.total_run_count;
total_failures += task.total_failure_count;
for record in &task.execution_history {
if record.is_timeout() {
total_timeouts += 1;
}
if let Some(duration) = record.duration_ms {
total_duration_ms += duration;
execution_count += 1;
}
if let Some(exec_time) = record.completed_at {
if oldest_execution.is_none() || exec_time < oldest_execution.unwrap() {
oldest_execution = Some(exec_time);
}
if newest_execution.is_none() || exec_time > newest_execution.unwrap() {
newest_execution = Some(exec_time);
}
}
}
if task.should_retry() {
tasks_in_retry += 1;
}
if task.total_failure_count > 0 {
tasks_with_failures += 1;
}
let health = task.check_health();
match health.health {
ScheduleHealth::Healthy => healthy_tasks += 1,
ScheduleHealth::Warning { .. } => warning_tasks += 1,
ScheduleHealth::Unhealthy { .. } => unhealthy_tasks += 1,
}
}
let success_rate = if total_executions + total_failures > 0 {
total_executions as f64 / (total_executions + total_failures) as f64
} else {
0.0
};
let avg_duration_ms = if execution_count > 0 {
Some(total_duration_ms / execution_count)
} else {
None
};
let uptime = oldest_execution.map(|oldest| now - oldest);
SchedulerStatistics {
total_tasks,
enabled_tasks,
disabled_tasks,
total_executions,
total_failures,
total_timeouts,
success_rate,
tasks_in_retry,
tasks_with_failures,
healthy_tasks,
warning_tasks,
unhealthy_tasks,
avg_duration_ms,
uptime,
oldest_execution,
newest_execution,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerStatistics {
pub total_tasks: usize,
pub enabled_tasks: usize,
pub disabled_tasks: usize,
pub total_executions: u64,
pub total_failures: u64,
pub total_timeouts: u64,
pub success_rate: f64,
pub tasks_in_retry: usize,
pub tasks_with_failures: usize,
pub healthy_tasks: usize,
pub warning_tasks: usize,
pub unhealthy_tasks: usize,
pub avg_duration_ms: Option<u64>,
pub uptime: Option<chrono::Duration>,
pub oldest_execution: Option<DateTime<Utc>>,
pub newest_execution: Option<DateTime<Utc>>,
}
impl std::fmt::Display for SchedulerStatistics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Scheduler Statistics:")?;
writeln!(
f,
" Tasks: {} total ({} enabled, {} disabled)",
self.total_tasks, self.enabled_tasks, self.disabled_tasks
)?;
writeln!(
f,
" Executions: {} successful, {} failed, {} timed out",
self.total_executions, self.total_failures, self.total_timeouts
)?;
writeln!(f, " Success Rate: {:.1}%", self.success_rate * 100.0)?;
writeln!(
f,
" Health: {} healthy, {} warnings, {} unhealthy",
self.healthy_tasks, self.warning_tasks, self.unhealthy_tasks
)?;
if let Some(avg_ms) = self.avg_duration_ms {
writeln!(f, " Avg Duration: {}ms", avg_ms)?;
}
if let Some(uptime) = self.uptime {
writeln!(
f,
" Uptime: {} days {} hours",
uptime.num_days(),
uptime.num_hours() % 24
)?;
}
Ok(())
}
}
pub struct TimezoneUtils;
impl TimezoneUtils {
#[cfg(feature = "cron")]
pub fn format_in_timezone(utc_time: DateTime<Utc>, timezone: &str) -> String {
use chrono_tz::Tz;
if let Ok(tz) = timezone.parse::<Tz>() {
let local_time = utc_time.with_timezone(&tz);
format!("{} {}", local_time.format("%Y-%m-%d %H:%M:%S"), tz.name())
} else {
format!(
"{} (invalid timezone: {})",
utc_time.format("%Y-%m-%d %H:%M:%S UTC"),
timezone
)
}
}
#[cfg(feature = "cron")]
pub fn current_time_in_zones(timezones: &[&str]) -> HashMap<String, String> {
let now = Utc::now();
timezones
.iter()
.map(|tz| {
let formatted = Self::format_in_timezone(now, tz);
(tz.to_string(), formatted)
})
.collect()
}
#[cfg(feature = "cron")]
pub fn time_until_next_occurrence(
target_hour: u32,
target_minute: u32,
timezone: &str,
) -> Result<chrono::Duration, String> {
use chrono_tz::Tz;
let tz: Tz = timezone
.parse()
.map_err(|_| format!("Invalid timezone: {}", timezone))?;
let now_utc = Utc::now();
let now_local = now_utc.with_timezone(&tz);
let target_today = now_local
.date_naive()
.and_hms_opt(target_hour, target_minute, 0)
.ok_or("Invalid time")?
.and_local_timezone(tz)
.single()
.ok_or("Ambiguous or invalid time due to DST")?;
let target_utc = target_today.with_timezone(&Utc);
let final_target = if target_utc <= now_utc {
let tomorrow = now_local.date_naive() + chrono::Days::new(1);
let target_tomorrow = tomorrow
.and_hms_opt(target_hour, target_minute, 0)
.ok_or("Invalid time")?
.and_local_timezone(tz)
.single()
.ok_or("Ambiguous or invalid time due to DST")?;
target_tomorrow.with_timezone(&Utc)
} else {
target_utc
};
Ok(final_target - now_utc)
}
#[cfg(feature = "cron")]
pub fn detect_system_timezone() -> String {
if let Ok(tz) = std::env::var("TZ") {
if Self::is_valid_timezone(&tz) {
return tz;
}
}
#[cfg(target_os = "linux")]
{
if let Ok(tz) = std::fs::read_to_string("/etc/timezone") {
let tz = tz.trim().to_string();
if Self::is_valid_timezone(&tz) {
return tz;
}
}
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
if let Ok(link) = std::fs::read_link("/etc/localtime") {
if let Some(tz_path) = link.to_str() {
if let Some(tz_start) = tz_path.find("zoneinfo/") {
let tz = &tz_path[tz_start + 9..];
if Self::is_valid_timezone(tz) {
return tz.to_string();
}
}
}
}
}
"UTC".to_string()
}
#[cfg(feature = "cron")]
pub fn is_valid_timezone(timezone: &str) -> bool {
use chrono_tz::Tz;
timezone.parse::<Tz>().is_ok()
}
#[cfg(feature = "cron")]
pub fn list_all_timezones() -> Vec<String> {
use chrono_tz::TZ_VARIANTS;
TZ_VARIANTS.iter().map(|tz| tz.name().to_string()).collect()
}
#[cfg(feature = "cron")]
pub fn search_timezones(pattern: &str) -> Vec<String> {
let pattern_lower = pattern.to_lowercase();
Self::list_all_timezones()
.into_iter()
.filter(|tz| tz.to_lowercase().contains(&pattern_lower))
.collect()
}
#[cfg(feature = "cron")]
pub fn is_dst_active(timezone: &str, at_time: Option<DateTime<Utc>>) -> Result<bool, String> {
use chrono_tz::Tz;
let tz: Tz = timezone
.parse()
.map_err(|_| format!("Invalid timezone: {}", timezone))?;
let time = at_time.unwrap_or_else(Utc::now);
let local_time = time.with_timezone(&tz);
let offset = local_time.offset().fix();
let std_offset = tz.offset_from_utc_datetime(&local_time.naive_utc()).fix();
Ok(offset != std_offset)
}
#[cfg(feature = "cron")]
pub fn get_utc_offset(timezone: &str, at_time: Option<DateTime<Utc>>) -> Result<i32, String> {
use chrono_tz::Tz;
let tz: Tz = timezone
.parse()
.map_err(|_| format!("Invalid timezone: {}", timezone))?;
let time = at_time.unwrap_or_else(Utc::now);
let local_time = time.with_timezone(&tz);
Ok(local_time.offset().fix().local_minus_utc())
}
#[cfg(feature = "cron")]
pub fn get_timezone_info(
timezone: &str,
at_time: Option<DateTime<Utc>>,
) -> Result<TimezoneInfo, String> {
use chrono_tz::Tz;
let tz: Tz = timezone
.parse()
.map_err(|_| format!("Invalid timezone: {}", timezone))?;
let time = at_time.unwrap_or_else(Utc::now);
let local_time = time.with_timezone(&tz);
let offset_seconds = local_time.offset().fix().local_minus_utc();
let is_dst = Self::is_dst_active(timezone, Some(time))?;
Ok(TimezoneInfo {
name: timezone.to_string(),
utc_offset_seconds: offset_seconds,
utc_offset_hours: offset_seconds as f32 / 3600.0,
is_dst,
current_time: local_time.format("%Y-%m-%d %H:%M:%S %Z").to_string(),
abbreviation: format!("{}", local_time.format("%Z")),
})
}
#[cfg(feature = "cron")]
pub fn convert_between_timezones(
time: DateTime<Utc>,
from_tz: &str,
to_tz: &str,
) -> Result<String, String> {
use chrono_tz::Tz;
let _source_tz: Tz = from_tz
.parse()
.map_err(|_| format!("Invalid source timezone: {}", from_tz))?;
let target_tz: Tz = to_tz
.parse()
.map_err(|_| format!("Invalid target timezone: {}", to_tz))?;
let target_time = time.with_timezone(&target_tz);
Ok(format!(
"{} {}",
target_time.format("%Y-%m-%d %H:%M:%S"),
target_tz.name()
))
}
#[cfg(feature = "cron")]
pub fn get_common_timezone_abbreviations() -> HashMap<String, String> {
let mut abbrevs = HashMap::new();
abbrevs.insert("EST".to_string(), "America/New_York".to_string());
abbrevs.insert("EDT".to_string(), "America/New_York".to_string());
abbrevs.insert("CST".to_string(), "America/Chicago".to_string());
abbrevs.insert("CDT".to_string(), "America/Chicago".to_string());
abbrevs.insert("MST".to_string(), "America/Denver".to_string());
abbrevs.insert("MDT".to_string(), "America/Denver".to_string());
abbrevs.insert("PST".to_string(), "America/Los_Angeles".to_string());
abbrevs.insert("PDT".to_string(), "America/Los_Angeles".to_string());
abbrevs.insert("AKST".to_string(), "America/Anchorage".to_string());
abbrevs.insert("AKDT".to_string(), "America/Anchorage".to_string());
abbrevs.insert("HST".to_string(), "Pacific/Honolulu".to_string());
abbrevs.insert("GMT".to_string(), "Europe/London".to_string());
abbrevs.insert("BST".to_string(), "Europe/London".to_string());
abbrevs.insert("CET".to_string(), "Europe/Paris".to_string());
abbrevs.insert("CEST".to_string(), "Europe/Paris".to_string());
abbrevs.insert("EET".to_string(), "Europe/Athens".to_string());
abbrevs.insert("EEST".to_string(), "Europe/Athens".to_string());
abbrevs.insert("JST".to_string(), "Asia/Tokyo".to_string());
abbrevs.insert("KST".to_string(), "Asia/Seoul".to_string());
abbrevs.insert("CST_CHINA".to_string(), "Asia/Shanghai".to_string());
abbrevs.insert("IST".to_string(), "Asia/Kolkata".to_string());
abbrevs.insert("SGT".to_string(), "Asia/Singapore".to_string());
abbrevs.insert("HKT".to_string(), "Asia/Hong_Kong".to_string());
abbrevs.insert("AEST".to_string(), "Australia/Sydney".to_string());
abbrevs.insert("AEDT".to_string(), "Australia/Sydney".to_string());
abbrevs.insert("ACST".to_string(), "Australia/Adelaide".to_string());
abbrevs.insert("ACDT".to_string(), "Australia/Adelaide".to_string());
abbrevs.insert("AWST".to_string(), "Australia/Perth".to_string());
abbrevs
}
}
#[cfg(feature = "cron")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimezoneInfo {
pub name: String,
pub utc_offset_seconds: i32,
pub utc_offset_hours: f32,
pub is_dst: bool,
pub current_time: String,
pub abbreviation: String,
}
#[cfg(feature = "cron")]
impl std::fmt::Display for TimezoneInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} (UTC{:+.1}h, {}, DST: {}): {}",
self.name,
self.utc_offset_hours,
self.abbreviation,
if self.is_dst { "Yes" } else { "No" },
self.current_time
)
}
}
#[derive(Debug, Clone)]
pub struct ScheduleBuilder {
interval_seconds: Option<u64>,
#[cfg(feature = "cron")]
timezone: Option<String>,
#[cfg(feature = "cron")]
business_hours: bool,
#[cfg(feature = "cron")]
weekends: bool,
#[cfg(feature = "cron")]
weekdays: bool,
}
impl ScheduleBuilder {
pub fn new() -> Self {
Self {
interval_seconds: None,
#[cfg(feature = "cron")]
timezone: None,
#[cfg(feature = "cron")]
business_hours: false,
#[cfg(feature = "cron")]
weekends: false,
#[cfg(feature = "cron")]
weekdays: false,
}
}
pub fn every_n_seconds(mut self, seconds: u64) -> Self {
self.interval_seconds = Some(seconds);
self
}
pub fn every_n_minutes(mut self, minutes: u64) -> Self {
self.interval_seconds = Some(minutes * 60);
self
}
pub fn every_n_hours(mut self, hours: u64) -> Self {
self.interval_seconds = Some(hours * 3600);
self
}
pub fn every_n_days(mut self, days: u64) -> Self {
self.interval_seconds = Some(days * 86400);
self
}
#[cfg(feature = "cron")]
pub fn business_hours_only(mut self) -> Self {
self.business_hours = true;
self.weekdays = true;
self
}
#[cfg(feature = "cron")]
pub fn weekends_only(mut self) -> Self {
self.weekends = true;
self
}
#[cfg(feature = "cron")]
pub fn weekdays_only(mut self) -> Self {
self.weekdays = true;
self
}
#[cfg(feature = "cron")]
pub fn in_timezone(mut self, timezone: &str) -> Self {
self.timezone = Some(timezone.to_string());
self
}
pub fn build(self) -> Schedule {
#[cfg(feature = "cron")]
{
if self.business_hours || self.weekends || self.weekdays || self.timezone.is_some() {
let interval_minutes = self.interval_seconds.unwrap_or(3600) / 60;
let minute_expr = if interval_minutes < 60 {
format!("*/{}", interval_minutes)
} else {
"0".to_string()
};
let hour_expr = if self.business_hours {
"9-17".to_string()
} else {
"*".to_string()
};
let dow_expr = if self.weekends {
"0,6".to_string() } else if self.weekdays || self.business_hours {
"1-5".to_string() } else {
"*".to_string()
};
if let Some(tz) = self.timezone {
return Schedule::crontab_tz(
&minute_expr,
&hour_expr,
&dow_expr,
"*",
"*",
&tz,
);
} else {
return Schedule::crontab(&minute_expr, &hour_expr, &dow_expr, "*", "*");
}
}
}
Schedule::interval(self.interval_seconds.unwrap_or(3600))
}
}
impl Default for ScheduleBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct ScheduleTemplates;
impl ScheduleTemplates {
pub fn every_minute() -> Schedule {
Schedule::interval(60)
}
pub fn every_5_minutes() -> Schedule {
Schedule::interval(300)
}
pub fn every_15_minutes() -> Schedule {
Schedule::interval(900)
}
pub fn every_30_minutes() -> Schedule {
Schedule::interval(1800)
}
pub fn hourly() -> Schedule {
Schedule::interval(3600)
}
#[cfg(feature = "cron")]
pub fn daily_at_midnight() -> Schedule {
Schedule::crontab("0", "0", "*", "*", "*")
}
#[cfg(feature = "cron")]
pub fn daily_at_hour(hour: u32) -> Schedule {
Schedule::crontab("0", &hour.to_string(), "*", "*", "*")
}
#[cfg(feature = "cron")]
pub fn weekdays_at(hour: u32, minute: u32) -> Schedule {
Schedule::crontab(&minute.to_string(), &hour.to_string(), "1-5", "*", "*")
}
#[cfg(feature = "cron")]
pub fn weekly_on_monday(hour: u32, minute: u32) -> Schedule {
Schedule::crontab(&minute.to_string(), &hour.to_string(), "1", "*", "*")
}
#[cfg(feature = "cron")]
pub fn monthly_first_day() -> Schedule {
Schedule::crontab("0", "0", "*", "1", "*")
}
#[cfg(feature = "cron")]
pub fn monthly_last_day() -> Schedule {
Schedule::crontab("0", "0", "*", "28-31", "*")
}
#[cfg(feature = "cron")]
pub fn business_hours_hourly() -> Schedule {
Schedule::crontab("0", "9-17", "1-5", "*", "*")
}
#[cfg(feature = "cron")]
pub fn business_hours_every_15_minutes() -> Schedule {
Schedule::crontab("*/15", "9-17", "1-5", "*", "*")
}
#[cfg(feature = "cron")]
pub fn weekend_mornings() -> Schedule {
Schedule::crontab("0", "8", "0,6", "*", "*")
}
#[cfg(feature = "cron")]
pub fn quarterly() -> Schedule {
Schedule::crontab("0", "0", "*", "1", "1,4,7,10")
}
pub fn every_2_hours() -> Schedule {
Schedule::interval(7200)
}
pub fn every_6_hours() -> Schedule {
Schedule::interval(21600)
}
pub fn every_12_hours() -> Schedule {
Schedule::interval(43200)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WFQConfig {
pub enabled: bool,
pub default_weight: f64,
pub min_weight: f64,
pub max_weight: f64,
}
impl Default for WFQConfig {
fn default() -> Self {
Self {
enabled: false,
default_weight: 1.0,
min_weight: 0.1,
max_weight: 10.0,
}
}
}
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub struct TaskWeight(f64);
impl TaskWeight {
pub fn new(weight: f64) -> Result<Self, String> {
if !(0.1..=10.0).contains(&weight) {
return Err(format!(
"Weight must be between 0.1 and 10.0, got {}",
weight
));
}
Ok(Self(weight))
}
pub fn value(&self) -> f64 {
self.0
}
}
impl Default for TaskWeight {
fn default() -> Self {
Self(1.0)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WFQState {
#[serde(default)]
pub weight: TaskWeight,
#[serde(default)]
pub virtual_start_time: f64,
#[serde(default)]
pub virtual_finish_time: f64,
#[serde(default)]
pub total_execution_time: f64,
}
impl Default for WFQState {
fn default() -> Self {
Self {
weight: TaskWeight::default(),
virtual_start_time: 0.0,
virtual_finish_time: 0.0,
total_execution_time: 0.0,
}
}
}
impl WFQState {
pub fn with_weight(weight: f64) -> Result<Self, String> {
Ok(Self {
weight: TaskWeight::new(weight)?,
..Default::default()
})
}
pub fn update_after_execution(
&mut self,
execution_duration_secs: f64,
global_virtual_time: f64,
) {
self.total_execution_time += execution_duration_secs;
self.virtual_start_time = global_virtual_time.max(self.virtual_finish_time);
self.virtual_finish_time =
self.virtual_start_time + (execution_duration_secs / self.weight.value());
}
pub fn finish_time(&self) -> f64 {
self.virtual_finish_time
}
}
#[derive(Debug, Clone)]
pub struct WFQTaskInfo {
pub name: String,
pub virtual_finish_time: f64,
pub weight: f64,
pub next_run_time: DateTime<Utc>,
}
impl ScheduledTask {
pub fn with_wfq_weight(mut self, weight: f64) -> Result<Self, String> {
if self.wfq_state.is_none() {
self.wfq_state = Some(WFQState::default());
}
self.wfq_state.as_mut().unwrap().weight = TaskWeight::new(weight)?;
Ok(self)
}
pub fn wfq_weight(&self) -> f64 {
self.wfq_state
.as_ref()
.map(|state| state.weight.value())
.unwrap_or(1.0)
}
pub fn wfq_finish_time(&self) -> f64 {
self.wfq_state
.as_ref()
.map(|state| state.finish_time())
.unwrap_or(0.0)
}
}
impl BeatScheduler {
pub fn get_due_tasks_wfq(&self) -> Vec<WFQTaskInfo> {
let now = Utc::now();
let mut wfq_tasks: Vec<WFQTaskInfo> = Vec::new();
for (name, task) in &self.tasks {
if !task.enabled {
continue;
}
match task.next_run_time() {
Ok(next_run) if next_run <= now => {
wfq_tasks.push(WFQTaskInfo {
name: name.clone(),
virtual_finish_time: task.wfq_finish_time(),
weight: task.wfq_weight(),
next_run_time: next_run,
});
}
_ => {}
}
}
wfq_tasks.sort_by(|a, b| {
a.virtual_finish_time
.partial_cmp(&b.virtual_finish_time)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.next_run_time.cmp(&b.next_run_time))
});
wfq_tasks
}
pub fn update_wfq_after_execution(
&mut self,
task_name: &str,
execution_duration_secs: f64,
) -> Result<(), ScheduleError> {
let global_virtual_time = self.calculate_global_virtual_time();
let task = self
.tasks
.get_mut(task_name)
.ok_or_else(|| ScheduleError::Invalid(format!("Task not found: {}", task_name)))?;
if task.wfq_state.is_none() {
task.wfq_state = Some(WFQState::default());
}
if let Some(wfq_state) = task.wfq_state.as_mut() {
wfq_state.update_after_execution(execution_duration_secs, global_virtual_time);
}
Ok(())
}
fn calculate_global_virtual_time(&self) -> f64 {
self.tasks
.values()
.filter_map(|task| task.wfq_state.as_ref())
.map(|state| state.virtual_finish_time)
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or(0.0)
}
pub fn get_wfq_stats(&self) -> WFQStats {
let tasks_with_wfq: Vec<_> = self
.tasks
.values()
.filter(|t| t.wfq_state.is_some())
.collect();
let total_weight: f64 = tasks_with_wfq.iter().map(|t| t.wfq_weight()).sum();
let avg_weight = if !tasks_with_wfq.is_empty() {
total_weight / tasks_with_wfq.len() as f64
} else {
0.0
};
WFQStats {
total_tasks: self.tasks.len(),
tasks_with_wfq_config: tasks_with_wfq.len(),
total_weight,
average_weight: avg_weight,
global_virtual_time: self.calculate_global_virtual_time(),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WFQStats {
pub total_tasks: usize,
pub tasks_with_wfq_config: usize,
pub total_weight: f64,
pub average_weight: f64,
pub global_virtual_time: f64,
}
impl std::fmt::Display for WFQStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"WFQ Stats: {}/{} tasks configured, total_weight={:.2}, avg_weight={:.2}, global_vtime={:.2}",
self.tasks_with_wfq_config,
self.total_tasks,
self.total_weight,
self.average_weight,
self.global_virtual_time
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "cron")]
use chrono::Timelike;
use tempfile::NamedTempFile;
#[test]
fn test_interval_schedule_basic() {
let schedule = Schedule::interval(60);
assert!(schedule.is_interval());
}
#[test]
fn test_interval_schedule_next_run_no_last_run() {
let schedule = Schedule::interval(60);
let before = Utc::now();
let next_run = schedule.next_run(None).unwrap();
let after = Utc::now();
assert!(next_run > before + Duration::seconds(59));
assert!(next_run < after + Duration::seconds(61));
}
#[test]
fn test_interval_schedule_next_run_with_last_run() {
let schedule = Schedule::interval(60);
let last_run = Utc::now();
let next_run = schedule.next_run(Some(last_run)).unwrap();
let expected = last_run + Duration::seconds(60);
assert_eq!(next_run, expected);
}
#[test]
fn test_interval_schedule_multiple_intervals() {
for interval in [1, 5, 10, 30, 60, 120, 300, 3600] {
let schedule = Schedule::interval(interval);
let last_run = Utc::now();
let next_run = schedule.next_run(Some(last_run)).unwrap();
assert_eq!(
next_run,
last_run + Duration::seconds(interval as i64),
"Failed for interval {}",
interval
);
}
}
#[test]
fn test_interval_schedule_display() {
let schedule = Schedule::interval(60);
let display = format!("{}", schedule);
assert_eq!(display, "Interval[every 60s]");
}
#[test]
fn test_onetime_schedule_basic() {
let run_at = Utc::now() + Duration::hours(1);
let schedule = Schedule::onetime(run_at);
assert!(schedule.is_onetime());
}
#[test]
fn test_onetime_schedule_next_run_no_last_run() {
let run_at = Utc::now() + Duration::hours(1);
let schedule = Schedule::onetime(run_at);
let next_run = schedule.next_run(None).unwrap();
assert_eq!(next_run, run_at);
}
#[test]
fn test_onetime_schedule_next_run_with_last_run() {
let run_at = Utc::now() + Duration::hours(1);
let schedule = Schedule::onetime(run_at);
let last_run = Utc::now();
let result = schedule.next_run(Some(last_run));
assert!(result.is_err());
if let Err(ScheduleError::Invalid(msg)) = result {
assert_eq!(msg, "One-time schedule has already been executed");
}
}
#[test]
fn test_onetime_schedule_display() {
let run_at = Utc::now() + Duration::hours(1);
let schedule = Schedule::onetime(run_at);
let display = format!("{}", schedule);
assert!(display.starts_with("OneTime[at "));
assert!(display.ends_with(" UTC]"));
}
#[test]
fn test_onetime_schedule_in_future() {
let run_at = Utc::now() + Duration::days(7);
let schedule = Schedule::onetime(run_at);
let next_run = schedule.next_run(None).unwrap();
assert_eq!(next_run, run_at);
}
#[test]
fn test_onetime_schedule_in_past() {
let run_at = Utc::now() - Duration::hours(1);
let schedule = Schedule::onetime(run_at);
let next_run = schedule.next_run(None).unwrap();
assert_eq!(next_run, run_at);
}
#[test]
fn test_onetime_task_auto_cleanup() {
let mut scheduler = BeatScheduler::new();
let run_at = Utc::now() - Duration::hours(1); let task = ScheduledTask::new("test_onetime".to_string(), Schedule::onetime(run_at));
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
scheduler.mark_task_success("test_onetime").unwrap();
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_onetime_task_auto_cleanup_with_start_time() {
let mut scheduler = BeatScheduler::new();
let run_at = Utc::now() - Duration::hours(1);
let task = ScheduledTask::new("test_onetime".to_string(), Schedule::onetime(run_at));
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
let started_at = Utc::now() - Duration::seconds(5);
scheduler
.mark_task_success_with_start("test_onetime", started_at)
.unwrap();
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_onetime_task_not_removed_on_failure() {
let mut scheduler = BeatScheduler::new();
let run_at = Utc::now() - Duration::hours(1);
let task = ScheduledTask::new("test_onetime".to_string(), Schedule::onetime(run_at));
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
scheduler.mark_task_failure("test_onetime").unwrap();
assert_eq!(scheduler.tasks.len(), 1);
}
#[test]
fn test_onetime_serialization() {
let run_at = Utc::now() + Duration::hours(2);
let schedule = Schedule::onetime(run_at);
let json = serde_json::to_string(&schedule).unwrap();
let deserialized: Schedule = serde_json::from_str(&json).unwrap();
assert!(deserialized.is_onetime());
let next_run = deserialized.next_run(None).unwrap();
assert_eq!(next_run, run_at);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_basic() {
let schedule = Schedule::crontab("0", "0", "*", "*", "*");
assert!(schedule.is_crontab());
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_every_minute() {
let schedule = Schedule::crontab("*", "*", "*", "*", "*");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
assert!(next_run < now + Duration::minutes(2));
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_specific_time() {
let schedule = Schedule::crontab("30", "10", "*", "*", "*");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
assert_eq!(next_run.hour(), 10);
assert_eq!(next_run.minute(), 30);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_invalid() {
let schedule = Schedule::crontab("invalid", "0", "*", "*", "*");
let result = schedule.next_run(None);
assert!(result.is_err());
assert!(result.unwrap_err().is_parse());
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_display() {
let schedule = Schedule::crontab("0", "12", "*", "*", "1");
let display = format!("{}", schedule);
assert_eq!(display, "Crontab[0 12 * * 1 (UTC)]");
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_with_timezone() {
let schedule = Schedule::crontab_tz("0", "9", "1-5", "*", "*", "America/New_York");
assert!(schedule.is_crontab());
let display = format!("{}", schedule);
assert!(display.contains("America/New_York"));
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_timezone_next_run() {
let schedule = Schedule::crontab_tz("0", "9", "1-5", "*", "*", "America/New_York");
let next_run = schedule.next_run(None).unwrap();
assert!(next_run > Utc::now());
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_invalid_timezone() {
let schedule = Schedule::crontab_tz("0", "9", "*", "*", "*", "Invalid/Timezone");
let result = schedule.next_run(None);
assert!(result.is_err());
assert!(result.unwrap_err().is_parse());
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_schedule_timezone_serialization() {
let schedule = Schedule::crontab_tz("30", "14", "*", "*", "*", "Europe/London");
let json = serde_json::to_string(&schedule).unwrap();
let deserialized: Schedule = serde_json::from_str(&json).unwrap();
let display = format!("{}", deserialized);
assert!(display.contains("Europe/London"));
}
#[cfg(feature = "solar")]
#[test]
fn test_solar_schedule_basic() {
let schedule = Schedule::solar("sunrise", 35.6762, 139.6503); assert!(schedule.is_solar());
}
#[cfg(feature = "solar")]
#[test]
#[ignore] fn test_solar_schedule_sunrise() {
let schedule = Schedule::solar("sunrise", 35.6762, 139.6503); let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
assert!(next_run < now + Duration::hours(48));
}
#[cfg(feature = "solar")]
#[test]
#[ignore] fn test_solar_schedule_sunset() {
let schedule = Schedule::solar("sunset", 35.6762, 139.6503); let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
assert!(next_run < now + Duration::hours(48));
}
#[cfg(feature = "solar")]
#[test]
fn test_solar_schedule_invalid_event() {
let schedule = Schedule::solar("invalid", 35.6762, 139.6503);
let result = schedule.next_run(None);
assert!(result.is_err());
assert!(result.unwrap_err().is_invalid());
}
#[cfg(feature = "solar")]
#[test]
fn test_solar_schedule_display() {
let schedule = Schedule::solar("sunrise", 35.6762, 139.6503);
let display = format!("{}", schedule);
assert!(display.contains("Solar[sunrise"));
assert!(display.contains("35.6762"));
assert!(display.contains("139.6503"));
}
#[test]
fn test_scheduled_task_basic() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
assert_eq!(task.name, "test_task");
assert!(task.enabled);
assert!(!task.has_run());
assert_eq!(task.total_run_count, 0);
assert!(task.last_run_at.is_none());
}
#[test]
fn test_scheduled_task_with_args() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_args(vec![serde_json::json!(1), serde_json::json!("test")]);
assert_eq!(task.args.len(), 2);
assert_eq!(task.args[0], serde_json::json!(1));
assert_eq!(task.args[1], serde_json::json!("test"));
}
#[test]
fn test_scheduled_task_with_kwargs() {
let schedule = Schedule::interval(60);
let mut kwargs = HashMap::new();
kwargs.insert("key1".to_string(), serde_json::json!("value1"));
kwargs.insert("key2".to_string(), serde_json::json!(42));
let task = ScheduledTask::new("test_task".to_string(), schedule).with_kwargs(kwargs);
assert_eq!(task.kwargs.len(), 2);
assert_eq!(
task.kwargs.get("key1").unwrap(),
&serde_json::json!("value1")
);
assert_eq!(task.kwargs.get("key2").unwrap(), &serde_json::json!(42));
}
#[test]
fn test_scheduled_task_with_options() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_queue("high_priority".to_string())
.with_priority(9)
.with_expires(3600);
assert!(task.has_options());
assert_eq!(task.options.queue, Some("high_priority".to_string()));
assert_eq!(task.options.priority, Some(9));
assert_eq!(task.options.expires, Some(3600));
}
#[test]
fn test_scheduled_task_disabled() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).disabled();
assert!(!task.is_enabled());
assert!(!task.enabled);
}
#[test]
fn test_scheduled_task_is_due_never_run() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
assert!(task.is_due().unwrap());
}
#[test]
fn test_scheduled_task_age_since_last_run() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
assert!(task.age_since_last_run().is_none());
task.last_run_at = Some(Utc::now() - Duration::seconds(30));
let age = task.age_since_last_run().unwrap();
assert!(age.num_seconds() >= 29 && age.num_seconds() <= 31);
}
#[test]
fn test_scheduled_task_display() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.total_run_count = 5;
let display = format!("{}", task);
assert!(display.contains("test_task"));
assert!(display.contains("Interval[every 60s]"));
assert!(display.contains("runs=5"));
}
#[test]
fn test_scheduled_task_display_disabled() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).disabled();
let display = format!("{}", task);
assert!(display.contains("(disabled)"));
}
#[test]
fn test_task_options_default() {
let options = TaskOptions::default();
assert!(!options.has_queue());
assert!(!options.has_priority());
assert!(!options.has_expires());
}
#[test]
fn test_task_options_has_queue() {
let options = TaskOptions {
queue: Some("test_queue".to_string()),
..Default::default()
};
assert!(options.has_queue());
}
#[test]
fn test_task_options_has_priority() {
let options = TaskOptions {
priority: Some(5),
..Default::default()
};
assert!(options.has_priority());
}
#[test]
fn test_task_options_has_expires() {
let options = TaskOptions {
expires: Some(3600),
..Default::default()
};
assert!(options.has_expires());
}
#[test]
fn test_task_options_display() {
let options = TaskOptions {
queue: Some("test".to_string()),
priority: Some(5),
expires: Some(3600),
};
let display = format!("{}", options);
assert!(display.contains("queue=test"));
assert!(display.contains("priority=5"));
assert!(display.contains("expires=3600s"));
}
#[test]
fn test_beat_scheduler_new() {
let scheduler = BeatScheduler::new();
assert_eq!(scheduler.tasks.len(), 0);
assert!(scheduler.state_file.is_none());
}
#[test]
fn test_beat_scheduler_add_task() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
assert!(scheduler.tasks.contains_key("test_task"));
}
#[test]
fn test_beat_scheduler_remove_task() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
let removed = scheduler.remove_task("test_task").unwrap();
assert!(removed.is_some());
assert_eq!(removed.unwrap().name, "test_task");
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_beat_scheduler_remove_nonexistent_task() {
let mut scheduler = BeatScheduler::new();
let removed = scheduler.remove_task("nonexistent").unwrap();
assert!(removed.is_none());
}
#[test]
fn test_beat_scheduler_mark_task_run() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
scheduler.mark_task_run("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert!(task.has_run());
assert_eq!(task.total_run_count, 1);
assert!(task.last_run_at.is_some());
}
#[test]
fn test_beat_scheduler_get_due_tasks_empty() {
let scheduler = BeatScheduler::new();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
}
#[test]
fn test_beat_scheduler_get_due_tasks() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 1);
assert_eq!(due_tasks[0].name, "test_task");
}
#[test]
fn test_beat_scheduler_get_due_tasks_disabled() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).disabled();
scheduler.add_task(task).unwrap();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
}
#[test]
fn test_beat_scheduler_persistence_path() {
let temp_file = NamedTempFile::new().unwrap();
let temp_path = temp_file.path().to_str().unwrap().to_string();
let scheduler = BeatScheduler::with_persistence(&temp_path);
assert!(scheduler.state_file.is_some());
assert_eq!(scheduler.state_file.unwrap(), PathBuf::from(&temp_path));
}
#[test]
fn test_persistence_save_and_load() {
let temp_file = NamedTempFile::new().unwrap();
let temp_path = temp_file.path().to_str().unwrap().to_string();
let mut scheduler = BeatScheduler::with_persistence(&temp_path);
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_args(vec![serde_json::json!(1)])
.with_queue("test_queue".to_string());
scheduler.add_task(task).unwrap();
scheduler.mark_task_run("test_task").unwrap();
let loaded_scheduler = BeatScheduler::load_from_file(&temp_path).unwrap();
assert_eq!(loaded_scheduler.tasks.len(), 1);
let loaded_task = loaded_scheduler.tasks.get("test_task").unwrap();
assert_eq!(loaded_task.name, "test_task");
assert_eq!(loaded_task.args.len(), 1);
assert!(loaded_task.has_run());
assert_eq!(loaded_task.total_run_count, 1);
assert_eq!(loaded_task.options.queue, Some("test_queue".to_string()));
}
#[test]
fn test_persistence_load_nonexistent_file() {
let temp_dir = std::env::temp_dir();
let temp_file = temp_dir.join("nonexistent_test_file_celers.json");
let _ = std::fs::remove_file(&temp_file);
let scheduler = BeatScheduler::load_from_file(temp_file.to_str().unwrap()).unwrap();
assert_eq!(scheduler.tasks.len(), 0);
assert!(scheduler.state_file.is_some());
}
#[test]
fn test_persistence_preserves_run_history() {
let temp_file = NamedTempFile::new().unwrap();
let temp_path = temp_file.path().to_str().unwrap().to_string();
{
let mut scheduler = BeatScheduler::with_persistence(&temp_path);
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
scheduler.mark_task_run("test_task").unwrap();
scheduler.mark_task_run("test_task").unwrap();
}
{
let scheduler = BeatScheduler::load_from_file(&temp_path).unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.total_run_count, 2);
assert!(task.last_run_at.is_some());
}
}
#[test]
fn test_schedule_error_is_invalid() {
let err = ScheduleError::Invalid("test".to_string());
assert!(err.is_invalid());
assert!(!err.is_parse());
assert!(!err.is_persistence());
assert!(!err.is_not_implemented());
}
#[test]
fn test_schedule_error_is_parse() {
let err = ScheduleError::Parse("test".to_string());
assert!(err.is_parse());
assert!(!err.is_invalid());
assert!(!err.is_persistence());
assert!(!err.is_not_implemented());
}
#[test]
fn test_schedule_error_is_persistence() {
let err = ScheduleError::Persistence("test".to_string());
assert!(err.is_persistence());
assert!(!err.is_invalid());
assert!(!err.is_parse());
assert!(!err.is_not_implemented());
}
#[test]
fn test_schedule_error_is_not_implemented() {
let err = ScheduleError::NotImplemented("test".to_string());
assert!(err.is_not_implemented());
assert!(!err.is_invalid());
assert!(!err.is_parse());
assert!(!err.is_persistence());
}
#[test]
fn test_schedule_error_is_retryable() {
let persistence_err = ScheduleError::Persistence("test".to_string());
assert!(persistence_err.is_retryable());
let invalid_err = ScheduleError::Invalid("test".to_string());
assert!(!invalid_err.is_retryable());
let parse_err = ScheduleError::Parse("test".to_string());
assert!(!parse_err.is_retryable());
let not_impl_err = ScheduleError::NotImplemented("test".to_string());
assert!(!not_impl_err.is_retryable());
}
#[test]
fn test_schedule_error_category() {
assert_eq!(
ScheduleError::Invalid("test".to_string()).category(),
"invalid"
);
assert_eq!(ScheduleError::Parse("test".to_string()).category(), "parse");
assert_eq!(
ScheduleError::Persistence("test".to_string()).category(),
"persistence"
);
assert_eq!(
ScheduleError::NotImplemented("test".to_string()).category(),
"not_implemented"
);
}
#[test]
fn test_jitter_new() {
let jitter = Jitter::new(-10, 10);
assert_eq!(jitter.min_seconds, -10);
assert_eq!(jitter.max_seconds, 10);
}
#[test]
fn test_jitter_positive() {
let jitter = Jitter::positive(30);
assert_eq!(jitter.min_seconds, 0);
assert_eq!(jitter.max_seconds, 30);
}
#[test]
fn test_jitter_symmetric() {
let jitter = Jitter::symmetric(15);
assert_eq!(jitter.min_seconds, -15);
assert_eq!(jitter.max_seconds, 15);
}
#[test]
fn test_jitter_apply_deterministic() {
let jitter = Jitter::symmetric(60);
let dt = Utc::now();
let task_name = "test_task";
let result1 = jitter.apply(dt, task_name);
let result2 = jitter.apply(dt, task_name);
assert_eq!(result1, result2);
}
#[test]
fn test_jitter_apply_different_tasks() {
let jitter = Jitter::symmetric(60);
let dt = Utc::now();
let result1 = jitter.apply(dt, "task1");
let result2 = jitter.apply(dt, "task2");
assert_ne!(result1, result2);
}
#[test]
fn test_jitter_apply_range() {
let jitter = Jitter::new(10, 50);
let dt = Utc::now();
let task_name = "test_task";
let result = jitter.apply(dt, task_name);
let diff_seconds = (result - dt).num_seconds();
assert!(diff_seconds >= 10);
assert!(diff_seconds <= 50);
}
#[test]
fn test_scheduled_task_with_jitter() {
let schedule = Schedule::interval(60);
let jitter = Jitter::positive(10);
let task = ScheduledTask::new("test_task".to_string(), schedule).with_jitter(jitter);
assert!(task.jitter.is_some());
let j = task.jitter.unwrap();
assert_eq!(j.min_seconds, 0);
assert_eq!(j.max_seconds, 10);
}
#[test]
fn test_scheduled_task_next_run_time_with_jitter() {
let schedule = Schedule::interval(60);
let jitter = Jitter::positive(10);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_jitter(jitter);
let last_run = Utc::now() - Duration::seconds(70);
task.last_run_at = Some(last_run);
let next_run = task.next_run_time().unwrap();
let expected_base = last_run + Duration::seconds(60);
let diff = (next_run - expected_base).num_seconds();
assert!(diff >= 0);
assert!(diff <= 10);
}
#[test]
fn test_jitter_serialization() {
let jitter = Jitter::symmetric(30);
let json = serde_json::to_string(&jitter).unwrap();
let deserialized: Jitter = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.min_seconds, -30);
assert_eq!(deserialized.max_seconds, 30);
}
#[test]
fn test_scheduled_task_with_jitter_serialization() {
let schedule = Schedule::interval(60);
let jitter = Jitter::positive(15);
let task = ScheduledTask::new("test_task".to_string(), schedule).with_jitter(jitter);
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert!(deserialized.jitter.is_some());
let j = deserialized.jitter.unwrap();
assert_eq!(j.min_seconds, 0);
assert_eq!(j.max_seconds, 15);
}
#[test]
fn test_catchup_policy_skip() {
let policy = CatchupPolicy::Skip;
let last_run = Utc::now() - Duration::seconds(200);
let next_run = Utc::now() - Duration::seconds(50);
let now = Utc::now();
assert!(!policy.should_catchup(Some(last_run), next_run, now));
assert_eq!(policy.catchup_count(Some(last_run), 60, now), 0);
}
#[test]
fn test_catchup_policy_run_once() {
let policy = CatchupPolicy::RunOnce;
let last_run = Utc::now() - Duration::seconds(200);
let next_run = Utc::now() - Duration::seconds(50);
let now = Utc::now();
assert!(policy.should_catchup(Some(last_run), next_run, now));
assert_eq!(policy.catchup_count(Some(last_run), 60, now), 1);
}
#[test]
fn test_catchup_policy_run_once_not_missed() {
let policy = CatchupPolicy::RunOnce;
let last_run = Utc::now() - Duration::seconds(30);
let next_run = Utc::now() + Duration::seconds(30);
let now = Utc::now();
assert!(!policy.should_catchup(Some(last_run), next_run, now));
}
#[test]
fn test_catchup_policy_run_multiple() {
let policy = CatchupPolicy::RunMultiple { max_catchup: 5 };
let last_run = Utc::now() - Duration::seconds(250); let next_run = Utc::now() - Duration::seconds(50);
let now = Utc::now();
assert!(policy.should_catchup(Some(last_run), next_run, now));
let count = policy.catchup_count(Some(last_run), 60, now);
assert!((2..=4).contains(&count));
}
#[test]
fn test_catchup_policy_run_multiple_max_limit() {
let policy = CatchupPolicy::RunMultiple { max_catchup: 2 };
let last_run = Utc::now() - Duration::seconds(600); let now = Utc::now();
let count = policy.catchup_count(Some(last_run), 60, now);
assert_eq!(count, 2);
}
#[test]
fn test_catchup_policy_time_window_within() {
let policy = CatchupPolicy::TimeWindow {
window_seconds: 120,
};
let last_run = Utc::now() - Duration::seconds(150);
let next_run = Utc::now() - Duration::seconds(50); let now = Utc::now();
assert!(policy.should_catchup(Some(last_run), next_run, now));
assert_eq!(policy.catchup_count(Some(last_run), 60, now), 1);
}
#[test]
fn test_catchup_policy_time_window_outside() {
let policy = CatchupPolicy::TimeWindow { window_seconds: 30 };
let last_run = Utc::now() - Duration::seconds(200);
let next_run = Utc::now() - Duration::seconds(100); let now = Utc::now();
assert!(!policy.should_catchup(Some(last_run), next_run, now));
assert_eq!(policy.catchup_count(Some(last_run), 60, now), 0);
}
#[test]
fn test_catchup_policy_never_run() {
let policy = CatchupPolicy::RunOnce;
let now = Utc::now();
let next_run = now + Duration::seconds(60);
assert!(!policy.should_catchup(None, next_run, now));
assert_eq!(policy.catchup_count(None, 60, now), 0);
}
#[test]
fn test_catchup_policy_default() {
let policy = CatchupPolicy::default();
assert_eq!(policy, CatchupPolicy::Skip);
}
#[test]
fn test_catchup_policy_serialization() {
let policies = vec![
CatchupPolicy::Skip,
CatchupPolicy::RunOnce,
CatchupPolicy::RunMultiple { max_catchup: 5 },
CatchupPolicy::TimeWindow {
window_seconds: 300,
},
];
for policy in policies {
let json = serde_json::to_string(&policy).unwrap();
let deserialized: CatchupPolicy = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, policy);
}
}
#[test]
fn test_scheduled_task_with_catchup_policy() {
let schedule = Schedule::interval(60);
let policy = CatchupPolicy::RunOnce;
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_catchup_policy(policy.clone());
assert_eq!(task.catchup_policy, policy);
}
#[test]
fn test_scheduled_task_catchup_policy_serialization() {
let schedule = Schedule::interval(60);
let policy = CatchupPolicy::RunMultiple { max_catchup: 3 };
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_catchup_policy(policy.clone());
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.catchup_policy, policy);
}
#[test]
fn test_scheduled_task_with_group() {
let schedule = Schedule::interval(60);
let task =
ScheduledTask::new("test_task".to_string(), schedule).with_group("reports".to_string());
assert_eq!(task.group, Some("reports".to_string()));
assert!(task.is_in_group("reports"));
assert!(!task.is_in_group("other"));
}
#[test]
fn test_scheduled_task_with_tag() {
let schedule = Schedule::interval(60);
let task =
ScheduledTask::new("test_task".to_string(), schedule).with_tag("daily".to_string());
assert_eq!(task.tags.len(), 1);
assert!(task.has_tag("daily"));
assert!(!task.has_tag("weekly"));
}
#[test]
fn test_scheduled_task_with_tags() {
let schedule = Schedule::interval(60);
let mut tags = HashSet::new();
tags.insert("daily".to_string());
tags.insert("reports".to_string());
tags.insert("critical".to_string());
let task = ScheduledTask::new("test_task".to_string(), schedule).with_tags(tags.clone());
assert_eq!(task.tags.len(), 3);
assert!(task.has_tag("daily"));
assert!(task.has_tag("reports"));
assert!(task.has_tag("critical"));
}
#[test]
fn test_scheduled_task_add_remove_tag() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_tag("tag1".to_string());
assert!(task.has_tag("tag1"));
task.add_tag("tag2".to_string());
assert!(task.has_tag("tag2"));
assert_eq!(task.tags.len(), 2);
let removed = task.remove_tag("tag1");
assert!(removed);
assert!(!task.has_tag("tag1"));
assert_eq!(task.tags.len(), 1);
let not_removed = task.remove_tag("nonexistent");
assert!(!not_removed);
}
#[test]
fn test_beat_scheduler_get_tasks_by_group() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_group("alerts".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let reports = scheduler.get_tasks_by_group("reports");
assert_eq!(reports.len(), 2);
let alerts = scheduler.get_tasks_by_group("alerts");
assert_eq!(alerts.len(), 1);
let nonexistent = scheduler.get_tasks_by_group("nonexistent");
assert_eq!(nonexistent.len(), 0);
}
#[test]
fn test_beat_scheduler_get_tasks_by_tag() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("daily".to_string())
.with_tag("critical".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("weekly".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let daily = scheduler.get_tasks_by_tag("daily");
assert_eq!(daily.len(), 2);
let critical = scheduler.get_tasks_by_tag("critical");
assert_eq!(critical.len(), 1);
let weekly = scheduler.get_tasks_by_tag("weekly");
assert_eq!(weekly.len(), 1);
}
#[test]
fn test_beat_scheduler_get_tasks_by_tags() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("weekly".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("monthly".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let tasks = scheduler.get_tasks_by_tags(&["daily", "weekly"]);
assert_eq!(tasks.len(), 2);
}
#[test]
fn test_beat_scheduler_get_tasks_with_all_tags() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string())
.with_tag("critical".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("critical".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let tasks = scheduler.get_tasks_with_all_tags(&["daily", "critical"]);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].name, "task1");
}
#[test]
fn test_beat_scheduler_get_all_groups() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_group("alerts".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let groups = scheduler.get_all_groups();
assert_eq!(groups.len(), 2);
assert!(groups.contains("reports"));
assert!(groups.contains("alerts"));
}
#[test]
fn test_beat_scheduler_get_all_tags() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string())
.with_tag("critical".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("weekly".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let tags = scheduler.get_all_tags();
assert_eq!(tags.len(), 3);
assert!(tags.contains("daily"));
assert!(tags.contains("weekly"));
assert!(tags.contains("critical"));
}
#[test]
fn test_beat_scheduler_enable_disable_group() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_group("alerts".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let count = scheduler.disable_group("reports").unwrap();
assert_eq!(count, 2);
let reports = scheduler.get_tasks_by_group("reports");
for task in reports {
assert!(!task.enabled);
}
let count = scheduler.enable_group("reports").unwrap();
assert_eq!(count, 2);
let reports = scheduler.get_tasks_by_group("reports");
for task in reports {
assert!(task.enabled);
}
}
#[test]
fn test_beat_scheduler_enable_disable_tag() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("weekly".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let count = scheduler.disable_tag("daily").unwrap();
assert_eq!(count, 2);
let daily_tasks = scheduler.get_tasks_by_tag("daily");
for task in daily_tasks {
assert!(!task.enabled);
}
let count = scheduler.enable_tag("daily").unwrap();
assert_eq!(count, 2);
let daily_tasks = scheduler.get_tasks_by_tag("daily");
for task in daily_tasks {
assert!(task.enabled);
}
}
#[test]
fn test_groups_tags_serialization() {
let schedule = Schedule::interval(60);
let mut tags = HashSet::new();
tags.insert("daily".to_string());
tags.insert("critical".to_string());
let task = ScheduledTask::new("test_task".to_string(), schedule)
.with_group("reports".to_string())
.with_tags(tags.clone());
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.group, Some("reports".to_string()));
assert_eq!(deserialized.tags.len(), 2);
assert!(deserialized.has_tag("daily"));
assert!(deserialized.has_tag("critical"));
}
#[test]
fn test_retry_policy_no_retry() {
let policy = RetryPolicy::NoRetry;
assert!(!policy.should_retry(0));
assert!(!policy.should_retry(1));
assert_eq!(policy.next_retry_delay(0), None);
}
#[test]
fn test_retry_policy_fixed_delay() {
let policy = RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
};
assert!(policy.should_retry(0));
assert_eq!(policy.next_retry_delay(0), Some(30));
assert!(policy.should_retry(1));
assert_eq!(policy.next_retry_delay(1), Some(30));
assert!(policy.should_retry(2));
assert_eq!(policy.next_retry_delay(2), Some(30));
assert!(!policy.should_retry(3));
assert_eq!(policy.next_retry_delay(3), None);
}
#[test]
fn test_retry_policy_exponential_backoff() {
let policy = RetryPolicy::ExponentialBackoff {
initial_delay_seconds: 10,
multiplier: 2.0,
max_delay_seconds: 300,
max_retries: 5,
};
assert_eq!(policy.next_retry_delay(0), Some(10));
assert_eq!(policy.next_retry_delay(1), Some(20));
assert_eq!(policy.next_retry_delay(2), Some(40));
assert_eq!(policy.next_retry_delay(3), Some(80));
assert_eq!(policy.next_retry_delay(4), Some(160));
assert_eq!(policy.next_retry_delay(5), None);
assert!(!policy.should_retry(5));
}
#[test]
fn test_retry_policy_exponential_backoff_max_delay() {
let policy = RetryPolicy::ExponentialBackoff {
initial_delay_seconds: 10,
multiplier: 2.0,
max_delay_seconds: 100,
max_retries: 10,
};
assert_eq!(policy.next_retry_delay(6), Some(100));
}
#[test]
fn test_retry_policy_default() {
let policy = RetryPolicy::default();
assert_eq!(policy, RetryPolicy::NoRetry);
}
#[test]
fn test_retry_policy_serialization() {
let policies = vec![
RetryPolicy::NoRetry,
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
RetryPolicy::ExponentialBackoff {
initial_delay_seconds: 10,
multiplier: 2.0,
max_delay_seconds: 300,
max_retries: 5,
},
];
for policy in policies {
let json = serde_json::to_string(&policy).unwrap();
let deserialized: RetryPolicy = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, policy);
}
}
#[test]
fn test_scheduled_task_with_retry_policy() {
let schedule = Schedule::interval(60);
let policy = RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
};
let task =
ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(policy.clone());
assert_eq!(task.retry_policy, policy);
assert_eq!(task.retry_count, 0);
assert_eq!(task.total_failure_count, 0);
}
#[test]
fn test_scheduled_task_mark_failure() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
task.mark_failure();
assert_eq!(task.retry_count, 1);
assert_eq!(task.total_failure_count, 1);
assert!(task.last_failure_at.is_some());
}
#[test]
fn test_scheduled_task_mark_success() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
task.mark_failure();
task.mark_failure();
assert_eq!(task.retry_count, 2);
task.mark_success();
assert_eq!(task.retry_count, 0); }
#[test]
fn test_scheduled_task_should_retry() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 2,
},
);
assert!(task.should_retry());
task.mark_failure();
assert!(task.should_retry());
task.mark_failure();
assert!(!task.should_retry()); }
#[test]
fn test_scheduled_task_next_retry_time() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
assert!(task.next_retry_time().is_none());
task.mark_failure();
let next_retry = task.next_retry_time().unwrap();
let expected = task.last_failure_at.unwrap() + Duration::seconds(30);
assert_eq!(next_retry, expected);
}
#[test]
fn test_scheduled_task_failure_rate() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
assert_eq!(task.failure_rate(), 0.0);
task.total_run_count = 7;
task.total_failure_count = 3;
assert_eq!(task.failure_rate(), 0.3); }
#[test]
fn test_beat_scheduler_mark_task_success() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
scheduler.add_task(task).unwrap();
scheduler.mark_task_failure("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.retry_count, 1);
scheduler.mark_task_success("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.retry_count, 0);
assert_eq!(task.total_run_count, 1);
}
#[test]
fn test_beat_scheduler_mark_task_failure() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
scheduler.add_task(task).unwrap();
scheduler.mark_task_failure("test_task").unwrap();
scheduler.mark_task_failure("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.retry_count, 2);
assert_eq!(task.total_failure_count, 2);
assert!(task.last_failure_at.is_some());
}
#[test]
fn test_beat_scheduler_get_retry_tasks() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task1 = ScheduledTask::new("task1".to_string(), schedule.clone()).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 1, max_retries: 3,
},
);
let task2 = ScheduledTask::new("task2".to_string(), schedule);
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.mark_task_failure("task1").unwrap();
scheduler.mark_task_failure("task2").unwrap();
std::thread::sleep(std::time::Duration::from_secs(2));
let retry_tasks = scheduler.get_retry_tasks();
assert_eq!(retry_tasks.len(), 1);
assert_eq!(retry_tasks[0].name, "task1");
}
#[test]
fn test_retry_policy_serialization_in_task() {
let schedule = Schedule::interval(60);
let policy = RetryPolicy::ExponentialBackoff {
initial_delay_seconds: 10,
multiplier: 2.0,
max_delay_seconds: 300,
max_retries: 5,
};
let task =
ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(policy.clone());
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.retry_policy, policy);
}
#[test]
fn test_execution_result_success() {
let result = ExecutionResult::Success;
assert!(matches!(result, ExecutionResult::Success));
}
#[test]
fn test_execution_result_failure() {
let result = ExecutionResult::Failure {
error: "Test error".to_string(),
};
assert!(matches!(result, ExecutionResult::Failure { .. }));
}
#[test]
fn test_execution_result_timeout() {
let result = ExecutionResult::Timeout;
assert!(matches!(result, ExecutionResult::Timeout));
}
#[test]
fn test_execution_record_new() {
let started_at = Utc::now();
let record = ExecutionRecord::new(started_at);
assert_eq!(record.started_at, started_at);
assert!(record.completed_at.is_none());
assert!(matches!(record.result, ExecutionResult::Success));
assert!(record.duration_ms.is_none());
}
#[test]
fn test_execution_record_completed() {
let started_at = Utc::now() - Duration::milliseconds(100);
let record = ExecutionRecord::completed(started_at, ExecutionResult::Success);
assert_eq!(record.started_at, started_at);
assert!(record.completed_at.is_some());
assert!(record.duration_ms.is_some());
assert!(record.duration_ms.unwrap() >= 100);
}
#[test]
fn test_execution_record_is_success() {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
assert!(record.is_success());
assert!(!record.is_failure());
assert!(!record.is_timeout());
}
#[test]
fn test_execution_record_is_failure() {
let record = ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Test error".to_string(),
},
);
assert!(record.is_failure());
assert!(!record.is_success());
assert!(!record.is_timeout());
}
#[test]
fn test_execution_record_is_timeout() {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Timeout);
assert!(record.is_timeout());
assert!(!record.is_success());
assert!(!record.is_failure());
}
#[test]
fn test_execution_record_is_completed() {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
assert!(record.is_completed());
let incomplete = ExecutionRecord::new(Utc::now());
assert!(!incomplete.is_completed());
}
#[test]
fn test_scheduled_task_add_execution_record() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
task.add_execution_record(record);
assert_eq!(task.execution_history.len(), 1);
assert!(task.execution_history[0].is_success());
}
#[test]
fn test_scheduled_task_with_max_history() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).with_max_history(3);
assert_eq!(task.max_history_size, 3);
}
#[test]
fn test_scheduled_task_history_trimming() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_max_history(3);
for _ in 0..5 {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
task.add_execution_record(record);
std::thread::sleep(std::time::Duration::from_millis(1));
}
assert_eq!(task.execution_history.len(), 3);
}
#[test]
fn test_scheduled_task_get_last_executions() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
for _ in 0..5 {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
task.add_execution_record(record);
}
let last_3 = task.get_last_executions(3);
assert_eq!(last_3.len(), 3);
let last_10 = task.get_last_executions(10);
assert_eq!(last_10.len(), 5); }
#[test]
fn test_scheduled_task_get_all_executions() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
for _ in 0..3 {
let record = ExecutionRecord::completed(Utc::now(), ExecutionResult::Success);
task.add_execution_record(record);
}
let all = task.get_all_executions();
assert_eq!(all.len(), 3);
}
#[test]
fn test_scheduled_task_history_success_count() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Error".to_string(),
},
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
assert_eq!(task.history_success_count(), 2);
}
#[test]
fn test_scheduled_task_history_failure_count() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Error 1".to_string(),
},
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Error 2".to_string(),
},
));
assert_eq!(task.history_failure_count(), 2);
}
#[test]
fn test_scheduled_task_history_timeout_count() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Timeout,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Timeout,
));
assert_eq!(task.history_timeout_count(), 2);
}
#[test]
fn test_scheduled_task_average_duration_ms() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
assert!(task.average_duration_ms().is_none());
let started1 = Utc::now() - Duration::milliseconds(100);
task.add_execution_record(ExecutionRecord::completed(
started1,
ExecutionResult::Success,
));
std::thread::sleep(std::time::Duration::from_millis(10));
let started2 = Utc::now() - Duration::milliseconds(200);
task.add_execution_record(ExecutionRecord::completed(
started2,
ExecutionResult::Success,
));
let avg = task.average_duration_ms().unwrap();
assert!(avg >= 100); }
#[test]
fn test_scheduled_task_min_max_duration() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
let started1 = Utc::now() - Duration::milliseconds(100);
task.add_execution_record(ExecutionRecord::completed(
started1,
ExecutionResult::Success,
));
std::thread::sleep(std::time::Duration::from_millis(10));
let started2 = Utc::now() - Duration::milliseconds(200);
task.add_execution_record(ExecutionRecord::completed(
started2,
ExecutionResult::Success,
));
let min = task.min_duration_ms().unwrap();
let max = task.max_duration_ms().unwrap();
assert!(min >= 100);
assert!(max >= 200);
assert!(max >= min);
}
#[test]
fn test_scheduled_task_history_success_rate() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
assert_eq!(task.history_success_rate(), 0.0);
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Error".to_string(),
},
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
assert_eq!(task.history_success_rate(), 0.75); }
#[test]
fn test_scheduled_task_clear_history() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
for _ in 0..3 {
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
}
assert_eq!(task.execution_history.len(), 3);
task.clear_history();
assert_eq!(task.execution_history.len(), 0);
}
#[test]
fn test_beat_scheduler_mark_task_success_with_history() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
scheduler.mark_task_success("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.execution_history.len(), 1);
assert!(task.execution_history[0].is_success());
}
#[test]
fn test_beat_scheduler_mark_task_failure_with_history() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
scheduler
.mark_task_failure_with_error("test_task", "Test error".to_string())
.unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.execution_history.len(), 1);
assert!(task.execution_history[0].is_failure());
}
#[test]
fn test_beat_scheduler_mark_task_timeout_with_history() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
let started_at = Utc::now() - Duration::seconds(5);
scheduler
.mark_task_timeout("test_task", started_at)
.unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.execution_history.len(), 1);
assert!(task.execution_history[0].is_timeout());
}
#[test]
fn test_beat_scheduler_mark_task_success_with_start_time() {
let mut scheduler = BeatScheduler::new();
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
let started_at = Utc::now() - Duration::milliseconds(150);
scheduler
.mark_task_success_with_start("test_task", started_at)
.unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.execution_history.len(), 1);
assert!(task.execution_history[0].is_success());
assert!(task.execution_history[0].duration_ms.unwrap() >= 150);
}
#[test]
fn test_execution_history_serialization() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Test error".to_string(),
},
));
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.execution_history.len(), 2);
assert!(deserialized.execution_history[0].is_success());
assert!(deserialized.execution_history[1].is_failure());
}
#[test]
fn test_execution_history_persistence() {
let temp_file = NamedTempFile::new().unwrap();
let temp_path = temp_file.path().to_str().unwrap().to_string();
{
let mut scheduler = BeatScheduler::with_persistence(&temp_path);
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
scheduler.add_task(task).unwrap();
scheduler.mark_task_success("test_task").unwrap();
scheduler
.mark_task_failure_with_error("test_task", "Test error".to_string())
.unwrap();
scheduler.mark_task_success("test_task").unwrap();
}
{
let scheduler = BeatScheduler::load_from_file(&temp_path).unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert_eq!(task.execution_history.len(), 3);
assert!(task.execution_history[0].is_success());
assert!(task.execution_history[1].is_failure());
assert!(task.execution_history[2].is_success());
}
}
#[test]
fn test_schedule_health_healthy() {
let health = ScheduleHealth::Healthy;
assert!(health.is_healthy());
assert!(!health.has_warnings());
assert!(!health.is_unhealthy());
assert_eq!(health.get_issues().len(), 0);
}
#[test]
fn test_schedule_health_warning() {
let health = ScheduleHealth::Warning {
issues: vec!["Warning 1".to_string(), "Warning 2".to_string()],
};
assert!(!health.is_healthy());
assert!(health.has_warnings());
assert!(!health.is_unhealthy());
assert_eq!(health.get_issues().len(), 2);
}
#[test]
fn test_schedule_health_unhealthy() {
let health = ScheduleHealth::Unhealthy {
issues: vec!["Error 1".to_string()],
};
assert!(!health.is_healthy());
assert!(!health.has_warnings());
assert!(health.is_unhealthy());
assert_eq!(health.get_issues().len(), 1);
}
#[test]
fn test_health_check_result_creation() {
let result = HealthCheckResult::healthy("test_task".to_string());
assert_eq!(result.task_name, "test_task");
assert!(result.health.is_healthy());
assert!(result.next_run.is_none());
assert!(result.time_since_last_run.is_none());
}
#[test]
fn test_health_check_result_with_details() {
let next_run = Utc::now() + Duration::seconds(60);
let duration = Duration::seconds(30);
let result = HealthCheckResult::healthy("test_task".to_string())
.with_next_run(next_run)
.with_time_since_last_run(duration);
assert_eq!(result.next_run, Some(next_run));
assert_eq!(result.time_since_last_run, Some(duration));
}
#[test]
fn test_scheduled_task_check_health_healthy() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
let result = task.check_health();
assert!(result.health.is_healthy());
assert!(result.next_run.is_some());
}
#[test]
fn test_scheduled_task_check_health_disabled() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule).disabled();
let result = task.check_health();
assert!(result.health.has_warnings());
let issues = result.health.get_issues();
assert!(issues.iter().any(|i| i.contains("disabled")));
}
#[test]
fn test_scheduled_task_check_health_high_failure_rate() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.total_run_count = 5;
task.total_failure_count = 10;
let result = task.check_health();
assert!(result.health.has_warnings());
let issues = result.health.get_issues();
assert!(issues.iter().any(|i| i.contains("High failure rate")));
}
#[test]
fn test_scheduled_task_check_health_consecutive_failures() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
for _ in 0..3 {
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Test error".to_string(),
},
));
}
let result = task.check_health();
assert!(result.health.has_warnings() || result.health.is_unhealthy());
let issues = result.health.get_issues();
assert!(issues
.iter()
.any(|i| i.contains("Last 3 executions failed")));
}
#[test]
fn test_scheduled_task_is_stuck_not_stuck() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.last_run_at = Some(Utc::now() - Duration::seconds(30));
assert!(task.is_stuck().is_none());
}
#[test]
fn test_scheduled_task_is_stuck() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.last_run_at = Some(Utc::now() - Duration::seconds(6000));
let stuck_duration = task.is_stuck();
assert!(stuck_duration.is_some());
assert!(stuck_duration.unwrap().num_seconds() >= 6000);
}
#[test]
fn test_scheduled_task_is_stuck_disabled() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).disabled();
task.last_run_at = Some(Utc::now() - Duration::seconds(10000));
assert!(task.is_stuck().is_none());
}
#[test]
fn test_scheduled_task_validate_schedule_valid() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
assert!(task.validate_schedule().is_ok());
}
#[test]
#[cfg(feature = "cron")]
fn test_scheduled_task_validate_schedule_invalid_cron() {
let schedule = Schedule::crontab("invalid", "0", "*", "*", "*");
let task = ScheduledTask::new("test_task".to_string(), schedule);
assert!(task.validate_schedule().is_err());
}
#[test]
fn test_beat_scheduler_check_all_tasks_health() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(120));
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let results = scheduler.check_all_tasks_health();
assert_eq!(results.len(), 2);
}
#[test]
fn test_beat_scheduler_get_unhealthy_tasks() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60)).disabled();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let unhealthy = scheduler.get_unhealthy_tasks();
assert_eq!(unhealthy.len(), 1);
assert_eq!(unhealthy[0].task_name, "task2");
}
#[test]
fn test_beat_scheduler_get_tasks_with_warnings() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60)).disabled();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let warnings = scheduler.get_tasks_with_warnings();
assert_eq!(warnings.len(), 1);
assert_eq!(warnings[0].task_name, "task2");
}
#[test]
fn test_beat_scheduler_get_stuck_tasks() {
let mut scheduler = BeatScheduler::new();
let mut task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
task1.last_run_at = Some(Utc::now() - Duration::seconds(30));
let mut task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60));
task2.last_run_at = Some(Utc::now() - Duration::seconds(10000));
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let stuck = scheduler.get_stuck_tasks();
assert_eq!(stuck.len(), 1);
assert_eq!(stuck[0].name, "task2");
}
#[test]
fn test_beat_scheduler_validate_all_schedules() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(120));
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let results = scheduler.validate_all_schedules();
assert_eq!(results.len(), 2);
for (_, result) in results {
assert!(result.is_ok());
}
}
#[test]
fn test_schedule_health_serialization() {
let health_variants = vec![
ScheduleHealth::Healthy,
ScheduleHealth::Warning {
issues: vec!["Warning".to_string()],
},
ScheduleHealth::Unhealthy {
issues: vec!["Error".to_string()],
},
];
for health in health_variants {
let json = serde_json::to_string(&health).unwrap();
let deserialized: ScheduleHealth = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, health);
}
}
#[test]
fn test_scheduler_metrics_empty_scheduler() {
let scheduler = BeatScheduler::new();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.total_tasks, 0);
assert_eq!(metrics.enabled_tasks, 0);
assert_eq!(metrics.disabled_tasks, 0);
assert_eq!(metrics.total_executions, 0);
assert_eq!(metrics.overall_success_rate, 0.0);
}
#[test]
fn test_scheduler_metrics_basic() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60)).disabled();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.total_tasks, 2);
assert_eq!(metrics.enabled_tasks, 1);
assert_eq!(metrics.disabled_tasks, 1);
}
#[test]
fn test_scheduler_metrics_with_executions() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
scheduler.add_task(task1).unwrap();
scheduler.mark_task_success("task1").unwrap();
scheduler.mark_task_success("task1").unwrap();
scheduler
.mark_task_failure_with_error("task1", "Error".to_string())
.unwrap();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.tasks_with_executions, 1);
assert_eq!(metrics.total_successes, 2);
assert_eq!(metrics.total_failures, 1);
assert_eq!(metrics.total_executions, 3);
assert_eq!(metrics.overall_success_rate, 2.0 / 3.0);
}
#[test]
fn test_scheduler_metrics_retry_state() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_retry_policy(RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
});
scheduler.add_task(task1).unwrap();
scheduler
.mark_task_failure_with_error("task1", "Error".to_string())
.unwrap();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.tasks_in_retry, 1);
}
#[test]
fn test_scheduler_metrics_health_status() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60)).disabled();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.tasks_with_warnings, 1);
}
#[test]
fn test_scheduler_metrics_stuck_tasks() {
let mut scheduler = BeatScheduler::new();
let mut task = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
task.last_run_at = Some(Utc::now() - Duration::seconds(10000));
scheduler.add_task(task).unwrap();
let metrics = scheduler.get_metrics();
assert_eq!(metrics.stuck_tasks, 1);
}
#[test]
fn test_scheduler_metrics_serialization() {
let scheduler = BeatScheduler::new();
let metrics = scheduler.get_metrics();
let json = serde_json::to_string(&metrics).unwrap();
let deserialized: SchedulerMetrics = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.total_tasks, metrics.total_tasks);
assert_eq!(deserialized.enabled_tasks, metrics.enabled_tasks);
}
#[test]
fn test_task_statistics_basic() {
let schedule = Schedule::interval(60);
let task = ScheduledTask::new("test_task".to_string(), schedule);
let stats = TaskStatistics::from_task(&task);
assert_eq!(stats.name, "test_task");
assert_eq!(stats.success_count, 0);
assert_eq!(stats.failure_count, 0);
assert_eq!(stats.success_rate, 0.0);
}
#[test]
fn test_task_statistics_with_history() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
task.add_execution_record(ExecutionRecord::completed(
Utc::now() - Duration::milliseconds(100),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now() - Duration::milliseconds(200),
ExecutionResult::Success,
));
task.add_execution_record(ExecutionRecord::completed(
Utc::now(),
ExecutionResult::Failure {
error: "Error".to_string(),
},
));
let stats = TaskStatistics::from_task(&task);
assert_eq!(stats.success_count, 2);
assert_eq!(stats.failure_count, 1);
assert_eq!(stats.success_rate, 2.0 / 3.0);
assert!(stats.average_duration_ms.is_some());
}
#[test]
fn test_beat_scheduler_get_all_task_statistics() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(120));
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let stats = scheduler.get_all_task_statistics();
assert_eq!(stats.len(), 2);
}
#[test]
fn test_beat_scheduler_get_task_statistics() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let stats = scheduler.get_task_statistics("test_task");
assert!(stats.is_some());
assert_eq!(stats.unwrap().name, "test_task");
let missing = scheduler.get_task_statistics("nonexistent");
assert!(missing.is_none());
}
#[test]
fn test_beat_scheduler_get_group_statistics() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_group("reports".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_group("alerts".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let stats = scheduler.get_group_statistics("reports");
assert_eq!(stats.len(), 2);
}
#[test]
fn test_beat_scheduler_get_tag_statistics() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_tag("daily".to_string());
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(60))
.with_tag("weekly".to_string());
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
let stats = scheduler.get_tag_statistics("daily");
assert_eq!(stats.len(), 2);
}
#[test]
fn test_task_statistics_retry_count() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule).with_retry_policy(
RetryPolicy::FixedDelay {
delay_seconds: 30,
max_retries: 3,
},
);
task.mark_failure();
task.mark_failure();
let stats = TaskStatistics::from_task(&task);
assert_eq!(stats.retry_count, 2);
}
#[test]
fn test_task_statistics_stuck_detection() {
let schedule = Schedule::interval(60);
let mut task = ScheduledTask::new("test_task".to_string(), schedule);
let stats = TaskStatistics::from_task(&task);
assert!(!stats.is_stuck);
task.last_run_at = Some(Utc::now() - Duration::seconds(10000));
let stats = TaskStatistics::from_task(&task);
assert!(stats.is_stuck);
}
#[test]
fn test_version_initial_creation() {
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
assert_eq!(task.current_version, 1);
assert_eq!(task.version_history.len(), 1);
let initial_version = &task.version_history[0];
assert_eq!(initial_version.version, 1);
assert!(initial_version.schedule.is_interval());
assert_eq!(
initial_version.change_reason,
Some("Initial creation".to_string())
);
}
#[test]
fn test_version_update_schedule() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(
Schedule::interval(120),
Some("Changed interval".to_string()),
);
assert_eq!(task.current_version, 2);
assert_eq!(task.version_history.len(), 2);
if let Schedule::Interval { every } = task.schedule {
assert_eq!(every, 120);
} else {
panic!("Expected interval schedule");
}
let v2 = &task.version_history[1];
assert_eq!(v2.version, 2);
assert_eq!(v2.change_reason, Some("Changed interval".to_string()));
}
#[test]
fn test_version_update_config() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_config(
Some(false), Some(Some(Jitter::positive(30))),
Some(CatchupPolicy::RunOnce),
Some("Changed config".to_string()),
);
assert_eq!(task.current_version, 2);
assert!(!task.enabled);
assert!(task.jitter.is_some());
assert!(matches!(task.catchup_policy, CatchupPolicy::RunOnce));
}
#[test]
fn test_version_rollback() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
task.update_schedule(Schedule::interval(180), Some("Change 2".to_string()));
assert_eq!(task.current_version, 3);
task.rollback_to_version(1).unwrap();
if let Schedule::Interval { every } = task.schedule {
assert_eq!(every, 60);
} else {
panic!("Expected interval schedule");
}
assert_eq!(task.current_version, 4);
let rollback_version = &task.version_history[3];
assert!(rollback_version
.change_reason
.as_ref()
.unwrap()
.contains("Rolled back to version 1"));
}
#[test]
fn test_version_rollback_invalid() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
let result = task.rollback_to_version(999);
assert!(result.is_err());
if let Err(ScheduleError::Invalid(msg)) = result {
assert_eq!(msg, "Version 999 not found");
}
}
#[test]
fn test_version_get_history() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
task.update_schedule(Schedule::interval(180), Some("Change 2".to_string()));
let history = task.get_version_history();
assert_eq!(history.len(), 3);
assert_eq!(history[0].version, 1);
assert_eq!(history[1].version, 2);
assert_eq!(history[2].version, 3);
}
#[test]
fn test_version_get_specific() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
let v1 = task.get_version(1).unwrap();
assert_eq!(v1.version, 1);
let v2 = task.get_version(2).unwrap();
assert_eq!(v2.version, 2);
assert!(task.get_version(999).is_none());
}
#[test]
fn test_version_get_previous() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
assert!(task.get_previous_version().is_none());
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
let prev = task.get_previous_version().unwrap();
assert_eq!(prev.version, 1);
}
#[test]
fn test_version_serialization() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.current_version, task.current_version);
assert_eq!(
deserialized.version_history.len(),
task.version_history.len()
);
assert_eq!(deserialized.version_history[0].version, 1);
assert_eq!(deserialized.version_history[1].version, 2);
}
#[test]
fn test_version_multiple_rollbacks() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_schedule(Schedule::interval(120), Some("Change 1".to_string()));
task.update_schedule(Schedule::interval(180), Some("Change 2".to_string()));
task.rollback_to_version(1).unwrap();
assert_eq!(task.current_version, 4);
task.rollback_to_version(2).unwrap();
assert_eq!(task.current_version, 5);
assert_eq!(task.version_history.len(), 5);
}
#[test]
fn test_dependency_basic() {
let mut task = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
assert!(!task.has_dependencies());
task.add_dependency("task_a".to_string());
assert!(task.has_dependencies());
assert!(task.depends_on("task_a"));
assert!(!task.depends_on("task_c"));
}
#[test]
fn test_dependency_add_remove() {
let mut task = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task.add_dependency("task_a".to_string());
assert_eq!(task.dependencies.len(), 1);
task.add_dependency("task_c".to_string());
assert_eq!(task.dependencies.len(), 2);
assert!(task.remove_dependency("task_a"));
assert_eq!(task.dependencies.len(), 1);
assert!(!task.depends_on("task_a"));
assert!(!task.remove_dependency("nonexistent"));
}
#[test]
fn test_dependency_clear() {
let mut task = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task.add_dependency("task_a".to_string());
task.add_dependency("task_c".to_string());
assert_eq!(task.dependencies.len(), 2);
task.clear_dependencies();
assert_eq!(task.dependencies.len(), 0);
assert!(!task.has_dependencies());
}
#[test]
fn test_dependency_with_dependencies() {
let mut deps = HashSet::new();
deps.insert("task_a".to_string());
deps.insert("task_b".to_string());
let task = ScheduledTask::new("task_c".to_string(), Schedule::interval(60))
.with_dependencies(deps);
assert_eq!(task.dependencies.len(), 2);
assert!(task.depends_on("task_a"));
assert!(task.depends_on("task_b"));
}
#[test]
fn test_dependency_status_satisfied() {
let mut task = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task.add_dependency("task_a".to_string());
let mut completed = HashSet::new();
completed.insert("task_a".to_string());
let status = task.check_dependencies(&completed);
assert!(status.is_satisfied());
}
#[test]
fn test_dependency_status_waiting() {
let mut task = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task.add_dependency("task_a".to_string());
let completed = HashSet::new();
let status = task.check_dependencies(&completed);
assert!(!status.is_satisfied());
if let DependencyStatus::Waiting { pending } = status {
assert_eq!(pending.len(), 1);
assert_eq!(pending[0], "task_a");
} else {
panic!("Expected Waiting status");
}
}
#[test]
fn test_dependency_status_with_failures() {
let mut task = ScheduledTask::new("task_c".to_string(), Schedule::interval(60));
task.add_dependency("task_a".to_string());
task.add_dependency("task_b".to_string());
let mut completed = HashSet::new();
completed.insert("task_a".to_string());
let mut failed = HashSet::new();
failed.insert("task_b".to_string());
let status = task.check_dependencies_with_failures(&completed, &failed);
assert!(status.has_failures());
if let DependencyStatus::Failed {
failed: failed_tasks,
} = status
{
assert_eq!(failed_tasks.len(), 1);
assert_eq!(failed_tasks[0], "task_b");
} else {
panic!("Expected Failed status");
}
}
#[test]
fn test_circular_dependency_simple() {
let mut scheduler = BeatScheduler::new();
let mut task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
task_a.add_dependency("task_b".to_string());
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_a".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
assert!(scheduler.has_circular_dependency("task_a"));
assert!(scheduler.has_circular_dependency("task_b"));
}
#[test]
fn test_circular_dependency_complex() {
let mut scheduler = BeatScheduler::new();
let mut task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
task_a.add_dependency("task_b".to_string());
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_c".to_string());
let mut task_c = ScheduledTask::new("task_c".to_string(), Schedule::interval(60));
task_c.add_dependency("task_a".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
scheduler.add_task(task_c).unwrap();
assert!(scheduler.has_circular_dependency("task_a"));
assert!(scheduler.has_circular_dependency("task_b"));
assert!(scheduler.has_circular_dependency("task_c"));
}
#[test]
fn test_no_circular_dependency() {
let mut scheduler = BeatScheduler::new();
let task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_a".to_string());
let mut task_c = ScheduledTask::new("task_c".to_string(), Schedule::interval(60));
task_c.add_dependency("task_b".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
scheduler.add_task(task_c).unwrap();
assert!(!scheduler.has_circular_dependency("task_a"));
assert!(!scheduler.has_circular_dependency("task_b"));
assert!(!scheduler.has_circular_dependency("task_c"));
}
#[test]
fn test_dependency_chain() {
let mut scheduler = BeatScheduler::new();
let task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_a".to_string());
let mut task_c = ScheduledTask::new("task_c".to_string(), Schedule::interval(60));
task_c.add_dependency("task_b".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
scheduler.add_task(task_c).unwrap();
let chain = scheduler.get_dependency_chain("task_c").unwrap();
assert_eq!(chain.len(), 3);
assert_eq!(chain[0], "task_a");
assert_eq!(chain[1], "task_b");
assert_eq!(chain[2], "task_c");
}
#[test]
fn test_dependency_chain_circular() {
let mut scheduler = BeatScheduler::new();
let mut task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
task_a.add_dependency("task_b".to_string());
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_a".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
let result = scheduler.get_dependency_chain("task_a");
assert!(result.is_err());
if let Err(ScheduleError::Invalid(msg)) = result {
assert!(msg.contains("Circular dependency"));
}
}
#[test]
fn test_validate_dependencies_success() {
let mut scheduler = BeatScheduler::new();
let task_a = ScheduledTask::new("task_a".to_string(), Schedule::interval(60));
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("task_a".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
assert!(scheduler.validate_dependencies().is_ok());
}
#[test]
fn test_validate_dependencies_missing_task() {
let mut scheduler = BeatScheduler::new();
let mut task_b = ScheduledTask::new("task_b".to_string(), Schedule::interval(60));
task_b.add_dependency("nonexistent_task".to_string());
scheduler.add_task(task_b).unwrap();
let result = scheduler.validate_dependencies();
assert!(result.is_err());
if let Err(ScheduleError::Invalid(msg)) = result {
assert!(msg.contains("non-existent task"));
}
}
#[test]
fn test_tasks_ready_with_dependencies() {
let mut scheduler = BeatScheduler::new();
let task_a = ScheduledTask::new(
"task_a".to_string(),
Schedule::onetime(Utc::now() - Duration::hours(1)),
);
let mut task_b = ScheduledTask::new(
"task_b".to_string(),
Schedule::onetime(Utc::now() - Duration::hours(1)),
);
task_b.add_dependency("task_a".to_string());
scheduler.add_task(task_a).unwrap();
scheduler.add_task(task_b).unwrap();
let completed = HashSet::new();
let failed = HashSet::new();
let ready = scheduler.get_tasks_ready_with_dependencies(&completed, &failed);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].name, "task_a");
let mut completed = HashSet::new();
completed.insert("task_a".to_string());
let ready = scheduler.get_tasks_ready_with_dependencies(&completed, &failed);
assert_eq!(ready.len(), 2);
let task_b_ready = ready.iter().any(|t| t.name == "task_b");
assert!(
task_b_ready,
"task_b should be ready after task_a completes"
);
}
#[test]
fn test_dependency_serialization() {
let mut deps = HashSet::new();
deps.insert("task_a".to_string());
deps.insert("task_b".to_string());
let task = ScheduledTask::new("task_c".to_string(), Schedule::interval(60))
.with_dependencies(deps);
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.dependencies.len(), 2);
assert!(deserialized.depends_on("task_a"));
assert!(deserialized.depends_on("task_b"));
assert!(deserialized.wait_for_dependencies);
}
#[test]
fn test_failure_notification_callback() {
use std::sync::{Arc, Mutex};
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let invocations = Arc::new(Mutex::new(Vec::new()));
let invocations_clone = invocations.clone();
scheduler.on_failure(Arc::new(move |task_name, error| {
invocations_clone
.lock()
.unwrap()
.push((task_name.to_string(), error.to_string()));
}));
scheduler
.mark_task_failure_with_error("test_task", "Test error".to_string())
.unwrap();
let invocations = invocations.lock().unwrap();
assert_eq!(invocations.len(), 1);
assert_eq!(invocations[0].0, "test_task");
assert_eq!(invocations[0].1, "Test error");
}
#[test]
fn test_failure_notification_multiple_callbacks() {
use std::sync::{Arc, Mutex};
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let invocations1 = Arc::new(Mutex::new(0));
let invocations2 = Arc::new(Mutex::new(0));
let inv1_clone = invocations1.clone();
let inv2_clone = invocations2.clone();
scheduler.on_failure(Arc::new(move |_, _| {
*inv1_clone.lock().unwrap() += 1;
}));
scheduler.on_failure(Arc::new(move |_, _| {
*inv2_clone.lock().unwrap() += 1;
}));
scheduler
.mark_task_failure_with_error("test_task", "Test error".to_string())
.unwrap();
assert_eq!(*invocations1.lock().unwrap(), 1);
assert_eq!(*invocations2.lock().unwrap(), 1);
}
#[test]
fn test_failure_notification_clear_callbacks() {
use std::sync::{Arc, Mutex};
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let invocations = Arc::new(Mutex::new(0));
let inv_clone = invocations.clone();
scheduler.on_failure(Arc::new(move |_, _| {
*inv_clone.lock().unwrap() += 1;
}));
scheduler.clear_failure_callbacks();
scheduler
.mark_task_failure_with_error("test_task", "Test error".to_string())
.unwrap();
assert_eq!(*invocations.lock().unwrap(), 0);
}
#[test]
fn test_failure_notification_with_start_time() {
use std::sync::{Arc, Mutex};
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let invocations = Arc::new(Mutex::new(Vec::new()));
let invocations_clone = invocations.clone();
scheduler.on_failure(Arc::new(move |task_name, error| {
invocations_clone
.lock()
.unwrap()
.push((task_name.to_string(), error.to_string()));
}));
let start_time = Utc::now();
scheduler
.mark_task_failure_with_start("test_task", start_time, "Test error".to_string())
.unwrap();
let invocations = invocations.lock().unwrap();
assert_eq!(invocations.len(), 1);
assert_eq!(invocations[0].0, "test_task");
assert_eq!(invocations[0].1, "Test error");
}
#[test]
fn test_failure_notification_multiple_failures() {
use std::sync::{Arc, Mutex};
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let invocations = Arc::new(Mutex::new(Vec::new()));
let invocations_clone = invocations.clone();
scheduler.on_failure(Arc::new(move |task_name, error| {
invocations_clone
.lock()
.unwrap()
.push((task_name.to_string(), error.to_string()));
}));
scheduler
.mark_task_failure_with_error("test_task", "Error 1".to_string())
.unwrap();
scheduler
.mark_task_failure_with_error("test_task", "Error 2".to_string())
.unwrap();
scheduler
.mark_task_failure_with_error("test_task", "Error 3".to_string())
.unwrap();
let invocations = invocations.lock().unwrap();
assert_eq!(invocations.len(), 3);
assert_eq!(invocations[0].1, "Error 1");
assert_eq!(invocations[1].1, "Error 2");
assert_eq!(invocations[2].1, "Error 3");
}
#[test]
fn test_schedule_cache_basic() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
assert!(task.cached_next_run.is_none());
task.update_next_run_cache();
assert!(task.cached_next_run.is_some());
let cached_time = task.cached_next_run.unwrap();
let next_run = task.next_run_time().unwrap();
assert_eq!(next_run, cached_time);
}
#[test]
fn test_schedule_cache_invalidation() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_next_run_cache();
assert!(task.cached_next_run.is_some());
task.invalidate_next_run_cache();
assert!(task.cached_next_run.is_none());
}
#[test]
fn test_schedule_cache_on_execution() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert!(task.cached_next_run.is_some());
scheduler.mark_task_success("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert!(task.cached_next_run.is_some());
}
#[test]
fn test_schedule_cache_on_schedule_update() {
let mut task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
task.update_next_run_cache();
let old_cached_time = task.cached_next_run.unwrap();
task.update_schedule(
Schedule::interval(120),
Some("Changed interval".to_string()),
);
assert!(task.cached_next_run.is_some());
let new_cached_time = task.cached_next_run.unwrap();
assert!(new_cached_time >= old_cached_time);
}
#[test]
fn test_add_tasks_batch() {
let mut scheduler = BeatScheduler::new();
let tasks = vec![
ScheduledTask::new("task1".to_string(), Schedule::interval(60)),
ScheduledTask::new("task2".to_string(), Schedule::interval(120)),
ScheduledTask::new("task3".to_string(), Schedule::interval(180)),
];
let count = scheduler.add_tasks_batch(tasks).unwrap();
assert_eq!(count, 3);
assert_eq!(scheduler.tasks.len(), 3);
assert!(scheduler.tasks.contains_key("task1"));
assert!(scheduler.tasks.contains_key("task2"));
assert!(scheduler.tasks.contains_key("task3"));
assert!(scheduler
.tasks
.get("task1")
.unwrap()
.cached_next_run
.is_some());
assert!(scheduler
.tasks
.get("task2")
.unwrap()
.cached_next_run
.is_some());
assert!(scheduler
.tasks
.get("task3")
.unwrap()
.cached_next_run
.is_some());
}
#[test]
fn test_add_tasks_batch_empty() {
let mut scheduler = BeatScheduler::new();
let tasks = vec![];
let count = scheduler.add_tasks_batch(tasks).unwrap();
assert_eq!(count, 0);
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_remove_tasks_batch() {
let mut scheduler = BeatScheduler::new();
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(120),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task3".to_string(),
Schedule::interval(180),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task4".to_string(),
Schedule::interval(240),
))
.unwrap();
assert_eq!(scheduler.tasks.len(), 4);
let count = scheduler
.remove_tasks_batch(&["task1", "task2", "task3"])
.unwrap();
assert_eq!(count, 3);
assert_eq!(scheduler.tasks.len(), 1);
assert!(!scheduler.tasks.contains_key("task1"));
assert!(!scheduler.tasks.contains_key("task2"));
assert!(!scheduler.tasks.contains_key("task3"));
assert!(scheduler.tasks.contains_key("task4"));
}
#[test]
fn test_remove_tasks_batch_nonexistent() {
let mut scheduler = BeatScheduler::new();
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(120),
))
.unwrap();
assert_eq!(scheduler.tasks.len(), 2);
let count = scheduler
.remove_tasks_batch(&["task1", "nonexistent", "task2"])
.unwrap();
assert_eq!(count, 2); assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_remove_tasks_batch_empty() {
let mut scheduler = BeatScheduler::new();
let count = scheduler.remove_tasks_batch(&[]).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_schedule_lock_basic() {
let lock = ScheduleLock::new("task1".to_string(), "owner1".to_string(), 300);
assert_eq!(lock.task_name, "task1");
assert_eq!(lock.owner, "owner1");
assert!(!lock.is_expired());
assert!(lock.is_owned_by("owner1"));
assert!(!lock.is_owned_by("owner2"));
assert_eq!(lock.renewal_count, 0);
}
#[test]
fn test_schedule_lock_ttl() {
let lock = ScheduleLock::new("task1".to_string(), "owner1".to_string(), 300);
let ttl = lock.ttl();
assert!(ttl.num_seconds() > 290);
assert!(ttl.num_seconds() <= 300);
}
#[test]
fn test_schedule_lock_renew() {
let mut lock = ScheduleLock::new("task1".to_string(), "owner1".to_string(), 1);
std::thread::sleep(std::time::Duration::from_millis(100));
let result = lock.renew(300);
assert!(result.is_ok());
assert_eq!(lock.renewal_count, 1);
assert!(!lock.is_expired());
}
#[test]
fn test_lock_manager_acquire_release() {
let mut manager = LockManager::new(300);
let acquired = manager.try_acquire("task1", "owner1", None).unwrap();
assert!(acquired);
assert!(manager.is_locked("task1"));
let acquired = manager.try_acquire("task1", "owner2", None).unwrap();
assert!(!acquired);
let released = manager.release("task1", "owner1").unwrap();
assert!(released);
assert!(!manager.is_locked("task1"));
}
#[test]
fn test_lock_manager_acquire_same_owner() {
let mut manager = LockManager::new(300);
let acquired = manager.try_acquire("task1", "owner1", None).unwrap();
assert!(acquired);
let acquired = manager.try_acquire("task1", "owner1", None).unwrap();
assert!(acquired);
}
#[test]
fn test_lock_manager_renew() {
let mut manager = LockManager::new(300);
manager.try_acquire("task1", "owner1", None).unwrap();
let renewed = manager.renew("task1", "owner1", Some(600)).unwrap();
assert!(renewed);
let lock = manager.get_lock("task1").unwrap();
assert_eq!(lock.renewal_count, 1);
}
#[test]
fn test_lock_manager_cleanup_expired() {
let mut manager = LockManager::new(1);
manager.try_acquire("task1", "owner1", Some(1)).unwrap();
assert!(manager.is_locked("task1"));
std::thread::sleep(std::time::Duration::from_millis(1100));
manager.cleanup_expired();
assert!(!manager.is_locked("task1"));
}
#[test]
fn test_lock_manager_get_active_locks() {
let mut manager = LockManager::new(300);
manager.try_acquire("task1", "owner1", None).unwrap();
manager.try_acquire("task2", "owner2", None).unwrap();
let active_locks = manager.get_active_locks();
assert_eq!(active_locks.len(), 2);
}
#[test]
fn test_lock_manager_release_all() {
let mut manager = LockManager::new(300);
manager.try_acquire("task1", "owner1", None).unwrap();
manager.try_acquire("task2", "owner2", None).unwrap();
assert_eq!(manager.get_active_locks().len(), 2);
manager.release_all();
assert_eq!(manager.get_active_locks().len(), 0);
}
#[test]
fn test_scheduler_lock_acquire_release() {
let mut scheduler = BeatScheduler::new();
let acquired = scheduler.try_acquire_lock("task1", None).unwrap();
assert!(acquired);
assert!(scheduler.is_task_locked("task1"));
let released = scheduler.release_lock("task1").unwrap();
assert!(released);
assert!(!scheduler.is_task_locked("task1"));
}
#[test]
fn test_scheduler_lock_multiple_instances() {
let mut scheduler1 = BeatScheduler::new();
let mut scheduler2 = BeatScheduler::new();
let acquired = scheduler1.try_acquire_lock("task1", None).unwrap();
assert!(acquired);
let acquired = scheduler2.try_acquire_lock("task1", None).unwrap();
assert!(acquired);
assert!(scheduler1.is_task_locked("task1"));
assert!(scheduler2.is_task_locked("task1"));
scheduler1.release_lock("task1").unwrap();
assert!(!scheduler1.is_task_locked("task1"));
assert!(scheduler2.is_task_locked("task1"));
}
#[test]
fn test_scheduler_execute_with_lock() {
let mut scheduler = BeatScheduler::new();
let mut executed = false;
let result = scheduler.execute_with_lock("task1", None, || {
executed = true;
Ok(())
});
assert!(result.is_ok());
assert!(result.unwrap());
assert!(executed);
assert!(!scheduler.is_task_locked("task1"));
}
#[test]
fn test_scheduler_instance_id() {
let scheduler1 = BeatScheduler::new();
let scheduler2 = BeatScheduler::new();
assert_ne!(scheduler1.instance_id(), scheduler2.instance_id());
}
#[test]
fn test_scheduler_set_custom_instance_id() {
let mut scheduler = BeatScheduler::new();
scheduler.set_instance_id("custom-id-123".to_string());
assert_eq!(scheduler.instance_id(), "custom-id-123");
}
#[test]
fn test_lock_manager_serialization() {
let mut manager = LockManager::new(300);
manager.try_acquire("task1", "owner1", None).unwrap();
let json = serde_json::to_string(&manager).unwrap();
let deserialized: LockManager = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.default_ttl, 300);
assert!(deserialized.is_locked("task1"));
}
#[test]
fn test_schedule_conflict_basic() {
let conflict = ScheduleConflict::new(
"task1".to_string(),
"task2".to_string(),
ConflictSeverity::High,
120,
"Overlapping execution".to_string(),
);
assert_eq!(conflict.task1, "task1");
assert_eq!(conflict.task2, "task2");
assert_eq!(conflict.severity, ConflictSeverity::High);
assert_eq!(conflict.overlap_seconds, 120);
assert!(conflict.is_high_severity());
assert!(!conflict.is_medium_severity());
assert!(!conflict.is_low_severity());
}
#[test]
fn test_schedule_conflict_with_resolution() {
let conflict = ScheduleConflict::new(
"task1".to_string(),
"task2".to_string(),
ConflictSeverity::Medium,
60,
"Partial overlap".to_string(),
)
.with_resolution("Add jitter".to_string());
assert!(conflict.resolution.is_some());
assert_eq!(conflict.resolution.unwrap(), "Add jitter");
}
#[test]
fn test_schedule_conflict_severity() {
let low = ScheduleConflict::new(
"t1".to_string(),
"t2".to_string(),
ConflictSeverity::Low,
10,
"Low conflict".to_string(),
);
let medium = ScheduleConflict::new(
"t1".to_string(),
"t2".to_string(),
ConflictSeverity::Medium,
30,
"Medium conflict".to_string(),
);
let high = ScheduleConflict::new(
"t1".to_string(),
"t2".to_string(),
ConflictSeverity::High,
60,
"High conflict".to_string(),
);
assert!(low.is_low_severity());
assert!(medium.is_medium_severity());
assert!(high.is_high_severity());
}
#[test]
fn test_detect_conflicts_no_conflict() {
let mut scheduler = BeatScheduler::new();
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(3600),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(7200),
))
.unwrap();
let conflicts = scheduler.detect_conflicts(60, 30);
assert_eq!(conflicts.len(), 0);
}
#[test]
fn test_detect_conflicts_with_overlap() {
let mut scheduler = BeatScheduler::new();
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(60),
))
.unwrap();
let conflicts = scheduler.detect_conflicts(3600, 30);
assert!(!conflicts.is_empty());
}
#[test]
fn test_detect_conflicts_disabled_tasks() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60)).disabled();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let conflicts = scheduler.detect_conflicts(3600, 30);
assert_eq!(conflicts.len(), 0);
}
#[test]
fn test_get_high_severity_conflicts() {
let mut scheduler = BeatScheduler::new();
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task3".to_string(),
Schedule::interval(120),
))
.unwrap();
let high_conflicts = scheduler.get_high_severity_conflicts(3600, 60);
assert!(high_conflicts.len() <= scheduler.conflict_count(3600, 60));
}
#[test]
fn test_has_conflicts() {
let mut scheduler = BeatScheduler::new();
assert!(!scheduler.has_conflicts(3600, 30));
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(60),
))
.unwrap();
assert!(scheduler.has_conflicts(3600, 30));
}
#[test]
fn test_conflict_count() {
let mut scheduler = BeatScheduler::new();
assert_eq!(scheduler.conflict_count(3600, 30), 0);
scheduler
.add_task(ScheduledTask::new(
"task1".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task2".to_string(),
Schedule::interval(60),
))
.unwrap();
scheduler
.add_task(ScheduledTask::new(
"task3".to_string(),
Schedule::interval(60),
))
.unwrap();
let count = scheduler.conflict_count(3600, 30);
assert!(count > 0);
}
#[test]
fn test_schedule_conflict_display() {
let conflict = ScheduleConflict::new(
"task1".to_string(),
"task2".to_string(),
ConflictSeverity::High,
120,
"Test conflict".to_string(),
);
let display = format!("{}", conflict);
assert!(display.contains("task1"));
assert!(display.contains("task2"));
assert!(display.contains("120s"));
}
#[test]
fn test_schedule_conflict_serialization() {
let conflict = ScheduleConflict::new(
"task1".to_string(),
"task2".to_string(),
ConflictSeverity::Medium,
60,
"Test".to_string(),
);
let json = serde_json::to_string(&conflict).unwrap();
let deserialized: ScheduleConflict = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.task1, "task1");
assert_eq!(deserialized.task2, "task2");
assert_eq!(deserialized.severity, ConflictSeverity::Medium);
assert_eq!(deserialized.overlap_seconds, 60);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_utc() {
let schedule = Schedule::crontab("0", "12", "*", "*", "*");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert_eq!(next_run.hour(), 12);
assert_eq!(next_run.minute(), 0);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_new_york() {
let schedule = Schedule::crontab_tz("0", "9", "*", "*", "*", "America/New_York");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_london() {
let schedule = Schedule::crontab_tz("30", "14", "*", "*", "*", "Europe/London");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_tokyo() {
let schedule = Schedule::crontab_tz("0", "18", "*", "*", "*", "Asia/Tokyo");
let now = Utc::now();
let next_run = schedule.next_run(Some(now)).unwrap();
assert!(next_run > now);
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_invalid() {
let schedule = Schedule::crontab_tz("0", "12", "*", "*", "*", "Invalid/Timezone");
let now = Utc::now();
let result = schedule.next_run(Some(now));
assert!(result.is_err());
if let Err(ScheduleError::Parse(msg)) = result {
assert!(msg.contains("Invalid timezone"));
}
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_serialization() {
let schedule = Schedule::crontab_tz("0", "9", "1-5", "*", "*", "America/New_York");
let json = serde_json::to_string(&schedule).unwrap();
let deserialized: Schedule = serde_json::from_str(&json).unwrap();
if let Schedule::Crontab { timezone, .. } = deserialized {
assert_eq!(timezone, Some("America/New_York".to_string()));
} else {
panic!("Expected Crontab schedule");
}
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_display() {
let schedule = Schedule::crontab_tz("0", "9", "1-5", "*", "*", "America/New_York");
let display = format!("{}", schedule);
assert!(display.contains("America/New_York"));
assert!(display.contains("Crontab"));
}
#[cfg(feature = "cron")]
#[test]
fn test_crontab_timezone_consistency() {
let ny_schedule = Schedule::crontab_tz("9", "0", "*", "*", "*", "America/New_York");
let london_schedule = Schedule::crontab_tz("9", "0", "*", "*", "*", "Europe/London");
let now = Utc::now();
let ny_next = ny_schedule.next_run(Some(now)).unwrap();
let london_next = london_schedule.next_run(Some(now)).unwrap();
assert_ne!(ny_next.hour(), london_next.hour());
}
#[cfg(feature = "cron")]
#[test]
fn test_scheduled_task_timezone_persistence() {
let schedule = Schedule::crontab_tz("0", "9", "1-5", "*", "*", "America/New_York");
let task = ScheduledTask::new("timezone_task".to_string(), schedule);
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
if let Schedule::Crontab { timezone, .. } = deserialized.schedule {
assert_eq!(timezone, Some("America/New_York".to_string()));
} else {
panic!("Expected Crontab schedule");
}
}
#[test]
fn test_scheduler_get_due_tasks_empty() {
let scheduler = BeatScheduler::new();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
}
#[test]
fn test_scheduler_get_due_tasks_with_due_task() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("due_task".to_string(), Schedule::interval(1));
scheduler.add_task(task).unwrap();
if let Some(task) = scheduler.tasks.get_mut("due_task") {
task.last_run_at = Some(Utc::now() - Duration::seconds(10));
}
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 1);
assert_eq!(due_tasks[0].name, "due_task");
}
#[test]
fn test_scheduler_get_due_tasks_with_future_task() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("future_task".to_string(), Schedule::interval(3600));
scheduler.add_task(task).unwrap();
scheduler.mark_task_run("future_task").unwrap();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
}
#[test]
fn test_scheduler_get_due_tasks_mixed() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("due_task".to_string(), Schedule::interval(1));
scheduler.add_task(task1).unwrap();
if let Some(task) = scheduler.tasks.get_mut("due_task") {
task.last_run_at = Some(Utc::now() - Duration::seconds(10));
}
let task2 = ScheduledTask::new("future_task".to_string(), Schedule::interval(3600));
scheduler.add_task(task2).unwrap();
scheduler.mark_task_run("future_task").unwrap();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 1);
assert_eq!(due_tasks[0].name, "due_task");
}
#[test]
fn test_scheduler_get_due_tasks_disabled() {
let mut scheduler = BeatScheduler::new();
let task =
ScheduledTask::new("disabled_task".to_string(), Schedule::interval(1)).disabled();
scheduler.add_task(task).unwrap();
if let Some(task) = scheduler.tasks.get_mut("disabled_task") {
task.last_run_at = Some(Utc::now() - Duration::seconds(10));
}
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
}
#[test]
fn test_scheduler_mark_task_run_updates_timestamp() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let before = Utc::now();
std::thread::sleep(std::time::Duration::from_millis(10));
scheduler.mark_task_run("test_task").unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert!(task.last_run_at.is_some());
assert!(task.last_run_at.unwrap() >= before);
}
#[test]
fn test_scheduler_mark_task_run_increments_count() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60));
scheduler.add_task(task).unwrap();
let initial_count = scheduler.tasks.get("test_task").unwrap().total_run_count;
scheduler.mark_task_run("test_task").unwrap();
let count_after_1 = scheduler.tasks.get("test_task").unwrap().total_run_count;
assert_eq!(count_after_1, initial_count + 1);
scheduler.mark_task_run("test_task").unwrap();
let count_after_2 = scheduler.tasks.get("test_task").unwrap().total_run_count;
assert_eq!(count_after_2, initial_count + 2);
}
#[test]
fn test_scheduler_multiple_due_tasks() {
let mut scheduler = BeatScheduler::new();
for i in 0..5 {
let task = ScheduledTask::new(format!("task_{}", i), Schedule::interval(1));
scheduler.add_task(task).unwrap();
if let Some(task) = scheduler.tasks.get_mut(&format!("task_{}", i)) {
task.last_run_at = Some(Utc::now() - Duration::seconds(10));
}
}
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 5);
}
#[test]
fn test_scheduler_task_prioritization() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task_60".to_string(), Schedule::interval(60));
let task2 = ScheduledTask::new("task_120".to_string(), Schedule::interval(120));
let task3 = ScheduledTask::new("task_30".to_string(), Schedule::interval(30));
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
assert_eq!(scheduler.tasks.len(), 3);
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 3);
}
#[test]
fn test_scheduler_task_lifecycle() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("lifecycle_task".to_string(), Schedule::interval(1));
scheduler.add_task(task).unwrap();
assert_eq!(scheduler.tasks.len(), 1);
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 1);
scheduler.mark_task_run("lifecycle_task").unwrap();
let due_tasks = scheduler.get_due_tasks();
assert_eq!(due_tasks.len(), 0);
if let Some(task) = scheduler.tasks.get_mut("lifecycle_task") {
task.enabled = false;
}
let task = scheduler.tasks.get("lifecycle_task").unwrap();
assert!(!task.enabled);
if let Some(task) = scheduler.tasks.get_mut("lifecycle_task") {
task.enabled = true;
}
let task = scheduler.tasks.get("lifecycle_task").unwrap();
assert!(task.enabled);
scheduler.remove_task("lifecycle_task").unwrap();
assert_eq!(scheduler.tasks.len(), 0);
}
#[test]
fn test_scheduler_persistence_preserves_state() {
let temp_file = NamedTempFile::new().unwrap();
let state_file = temp_file.path().to_str().unwrap().to_string();
let mut scheduler1 = BeatScheduler::with_persistence(state_file.clone());
let task = ScheduledTask::new("persistent_task".to_string(), Schedule::interval(60));
scheduler1.add_task(task).unwrap();
scheduler1.mark_task_run("persistent_task").unwrap();
scheduler1.save_state().unwrap();
let scheduler2 = BeatScheduler::load_from_file(&state_file).unwrap();
assert_eq!(scheduler2.tasks.len(), 1);
let task = scheduler2.tasks.get("persistent_task").unwrap();
assert_eq!(task.name, "persistent_task");
assert!(task.last_run_at.is_some());
assert_eq!(task.total_run_count, 1);
}
#[test]
fn test_scheduler_loop_simulation() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("loop_task".to_string(), Schedule::interval(1));
scheduler.add_task(task).unwrap();
let mut executions = 0;
for _ in 0..5 {
let task_names: Vec<String> = scheduler
.get_due_tasks()
.into_iter()
.map(|t| t.name.clone())
.collect();
for task_name in task_names {
scheduler.mark_task_run(&task_name).unwrap();
executions += 1;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(executions > 0);
let task = scheduler.tasks.get("loop_task").unwrap();
assert_eq!(task.total_run_count as usize, executions);
}
#[test]
fn test_wfq_task_weight_validation() {
let task = ScheduledTask::new("test".to_string(), Schedule::interval(60))
.with_wfq_weight(5.0)
.unwrap();
assert_eq!(task.wfq_weight(), 5.0);
let result =
ScheduledTask::new("test".to_string(), Schedule::interval(60)).with_wfq_weight(0.05); assert!(result.is_err());
let result =
ScheduledTask::new("test".to_string(), Schedule::interval(60)).with_wfq_weight(15.0); assert!(result.is_err());
}
#[test]
fn test_wfq_basic_scheduling() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("low_weight".to_string(), Schedule::interval(1))
.with_wfq_weight(0.5)
.unwrap();
let task2 = ScheduledTask::new("high_weight".to_string(), Schedule::interval(1))
.with_wfq_weight(5.0)
.unwrap();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1100));
let wfq_tasks = scheduler.get_due_tasks_wfq();
assert_eq!(wfq_tasks.len(), 2);
for task in &wfq_tasks {
assert_eq!(task.virtual_finish_time, 0.0);
}
}
#[test]
fn test_wfq_virtual_time_update() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("test_task".to_string(), Schedule::interval(60))
.with_wfq_weight(2.0)
.unwrap();
scheduler.add_task(task).unwrap();
scheduler
.update_wfq_after_execution("test_task", 10.0)
.unwrap();
let task = scheduler.tasks.get("test_task").unwrap();
assert!(task.wfq_state.is_some());
let wfq_state = task.wfq_state.as_ref().unwrap();
assert_eq!(wfq_state.virtual_finish_time, 5.0);
assert_eq!(wfq_state.total_execution_time, 10.0);
}
#[test]
fn test_wfq_fairness() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(1))
.with_wfq_weight(1.0)
.unwrap();
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(1))
.with_wfq_weight(2.0)
.unwrap();
let task3 = ScheduledTask::new("task3".to_string(), Schedule::interval(1))
.with_wfq_weight(5.0)
.unwrap();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.add_task(task3).unwrap();
scheduler.update_wfq_after_execution("task1", 10.0).unwrap();
scheduler.update_wfq_after_execution("task2", 10.0).unwrap();
scheduler.update_wfq_after_execution("task3", 10.0).unwrap();
let t1 = scheduler.tasks.get("task1").unwrap();
let t2 = scheduler.tasks.get("task2").unwrap();
let t3 = scheduler.tasks.get("task3").unwrap();
let vft1 = t1.wfq_finish_time();
let vft2 = t2.wfq_finish_time();
let vft3 = t3.wfq_finish_time();
assert!(vft1 > 0.0);
assert!(vft2 > 0.0);
assert!(vft3 > 0.0);
let t1_state = t1.wfq_state.as_ref().unwrap();
let t2_state = t2.wfq_state.as_ref().unwrap();
let t3_state = t3.wfq_state.as_ref().unwrap();
let increment1 = t1_state.virtual_finish_time - t1_state.virtual_start_time;
let increment2 = t2_state.virtual_finish_time - t2_state.virtual_start_time;
let increment3 = t3_state.virtual_finish_time - t3_state.virtual_start_time;
assert_eq!(increment1, 10.0);
assert_eq!(increment2, 5.0);
assert_eq!(increment3, 2.0);
}
#[test]
fn test_wfq_task_ordering() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("heavy_task".to_string(), Schedule::interval(1))
.with_wfq_weight(1.0)
.unwrap();
let task2 = ScheduledTask::new("light_task".to_string(), Schedule::interval(1))
.with_wfq_weight(5.0)
.unwrap();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler
.update_wfq_after_execution("heavy_task", 10.0)
.unwrap();
scheduler
.update_wfq_after_execution("light_task", 10.0)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(1100));
let wfq_tasks = scheduler.get_due_tasks_wfq();
assert_eq!(wfq_tasks.len(), 2);
let heavy = wfq_tasks.iter().find(|t| t.name == "heavy_task").unwrap();
let light = wfq_tasks.iter().find(|t| t.name == "light_task").unwrap();
assert_eq!(heavy.weight, 1.0);
assert_eq!(light.weight, 5.0);
}
#[test]
fn test_wfq_stats() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_wfq_weight(2.0)
.unwrap();
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_wfq_weight(3.0)
.unwrap();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
let stats = scheduler.get_wfq_stats();
assert_eq!(stats.total_tasks, 2);
assert_eq!(stats.tasks_with_wfq_config, 2);
assert_eq!(stats.total_weight, 5.0);
assert_eq!(stats.average_weight, 2.5);
assert_eq!(stats.global_virtual_time, 0.0);
}
#[test]
fn test_wfq_stats_display() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_wfq_weight(2.0)
.unwrap();
scheduler.add_task(task).unwrap();
let stats = scheduler.get_wfq_stats();
let display = format!("{}", stats);
assert!(display.contains("WFQ Stats"));
assert!(display.contains("1/1 tasks configured"));
}
#[test]
fn test_wfq_with_disabled_tasks() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("enabled_task".to_string(), Schedule::interval(1))
.with_wfq_weight(2.0)
.unwrap();
let mut task2 = ScheduledTask::new("disabled_task".to_string(), Schedule::interval(1))
.with_wfq_weight(5.0)
.unwrap();
task2.enabled = false;
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1100));
let wfq_tasks = scheduler.get_due_tasks_wfq();
assert_eq!(wfq_tasks.len(), 1);
assert_eq!(wfq_tasks[0].name, "enabled_task");
}
#[test]
fn test_wfq_global_virtual_time() {
let mut scheduler = BeatScheduler::new();
let task1 = ScheduledTask::new("task1".to_string(), Schedule::interval(60))
.with_wfq_weight(1.0)
.unwrap();
let task2 = ScheduledTask::new("task2".to_string(), Schedule::interval(60))
.with_wfq_weight(2.0)
.unwrap();
scheduler.add_task(task1).unwrap();
scheduler.add_task(task2).unwrap();
scheduler.update_wfq_after_execution("task1", 10.0).unwrap();
scheduler.update_wfq_after_execution("task2", 10.0).unwrap();
let stats = scheduler.get_wfq_stats();
assert_eq!(stats.global_virtual_time, 15.0);
}
#[test]
fn test_wfq_multiple_executions() {
let mut scheduler = BeatScheduler::new();
let task = ScheduledTask::new("task".to_string(), Schedule::interval(60))
.with_wfq_weight(2.0)
.unwrap();
scheduler.add_task(task).unwrap();
scheduler.update_wfq_after_execution("task", 10.0).unwrap();
let task_state = scheduler.tasks.get("task").unwrap();
let vft1 = task_state.wfq_finish_time();
assert_eq!(vft1, 5.0);
scheduler.update_wfq_after_execution("task", 6.0).unwrap();
let task_state = scheduler.tasks.get("task").unwrap();
let vft2 = task_state.wfq_finish_time();
assert_eq!(vft2, 8.0); assert_eq!(
task_state.wfq_state.as_ref().unwrap().total_execution_time,
16.0
);
}
#[test]
fn test_wfq_task_weight_default() {
let task = ScheduledTask::new("task".to_string(), Schedule::interval(60));
assert_eq!(task.wfq_weight(), 1.0); }
#[test]
fn test_wfq_serialization() {
let task = ScheduledTask::new("task".to_string(), Schedule::interval(60))
.with_wfq_weight(3.5)
.unwrap();
let json = serde_json::to_string(&task).unwrap();
let deserialized: ScheduledTask = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.wfq_weight(), 3.5);
}
}