use crate::alert::AlertConfig;
use crate::config::ScheduleError;
use crate::history::{
CatchupPolicy, DependencyStatus, ExecutionRecord, ExecutionResult, ExecutionState,
HealthCheckResult, Jitter, RetryPolicy, ScheduleHealth, ScheduleVersion,
};
use crate::schedule::{BusinessCalendar, HolidayCalendar, Schedule};
use crate::wfq::{TaskWeight, WFQState};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledTask {
pub name: String,
pub schedule: Schedule,
#[serde(default)]
pub args: Vec<serde_json::Value>,
#[serde(default)]
pub kwargs: HashMap<String, serde_json::Value>,
#[serde(default)]
pub options: TaskOptions,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_at: Option<DateTime<Utc>>,
#[serde(default)]
pub total_run_count: u64,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter: Option<Jitter>,
#[serde(default)]
pub catchup_policy: CatchupPolicy,
#[serde(skip_serializing_if = "Option::is_none")]
pub group: Option<String>,
#[serde(default)]
pub tags: HashSet<String>,
#[serde(default)]
pub retry_policy: RetryPolicy,
#[serde(default)]
pub retry_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_failure_at: Option<DateTime<Utc>>,
#[serde(default)]
pub total_failure_count: u64,
#[serde(default)]
pub execution_history: Vec<ExecutionRecord>,
#[serde(default)]
pub max_history_size: usize,
#[serde(default)]
pub version_history: Vec<ScheduleVersion>,
#[serde(default = "default_version")]
pub current_version: u32,
#[serde(default)]
pub dependencies: HashSet<String>,
#[serde(default = "default_true")]
pub wait_for_dependencies: bool,
#[serde(skip)]
pub(crate) cached_next_run: Option<DateTime<Utc>>,
#[serde(default)]
pub alert_config: AlertConfig,
#[serde(default)]
pub execution_state: ExecutionState,
#[serde(skip_serializing_if = "Option::is_none")]
pub wfq_state: Option<WFQState>,
}
#[allow(dead_code)]
fn default_true() -> bool {
true
}
fn default_version() -> u32 {
1
}
impl ScheduledTask {
pub fn new(name: String, schedule: Schedule) -> Self {
let mut task = Self {
name,
schedule: schedule.clone(),
args: Vec::new(),
kwargs: HashMap::new(),
options: TaskOptions::default(),
last_run_at: None,
total_run_count: 0,
enabled: true,
jitter: None,
catchup_policy: CatchupPolicy::default(),
group: None,
tags: HashSet::new(),
retry_policy: RetryPolicy::default(),
retry_count: 0,
last_failure_at: None,
total_failure_count: 0,
execution_history: Vec::new(),
max_history_size: 0, version_history: Vec::new(),
current_version: 1,
dependencies: HashSet::new(),
wait_for_dependencies: true,
cached_next_run: None,
alert_config: AlertConfig::default(),
execution_state: ExecutionState::default(),
wfq_state: None,
};
let initial_version =
ScheduleVersion::from_task(&task, 1, Some("Initial creation".to_string()));
task.version_history.push(initial_version);
task
}
pub fn with_args(mut self, args: Vec<serde_json::Value>) -> Self {
self.args = args;
self
}
pub fn with_kwargs(mut self, kwargs: HashMap<String, serde_json::Value>) -> Self {
self.kwargs = kwargs;
self
}
pub fn disabled(mut self) -> Self {
self.enabled = false;
self
}
pub fn with_queue(mut self, queue: String) -> Self {
self.options.queue = Some(queue);
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.options.priority = Some(priority);
self
}
pub fn with_expires(mut self, expires: u64) -> Self {
self.options.expires = Some(expires);
self
}
pub fn with_jitter(mut self, jitter: Jitter) -> Self {
self.jitter = Some(jitter);
self
}
pub fn with_catchup_policy(mut self, policy: CatchupPolicy) -> Self {
self.catchup_policy = policy;
self
}
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub fn with_group(mut self, group: String) -> Self {
self.group = Some(group);
self
}
pub fn with_tag(mut self, tag: String) -> Self {
self.tags.insert(tag);
self
}
pub fn with_tags(mut self, tags: HashSet<String>) -> Self {
self.tags = tags;
self
}
pub fn with_alert_config(mut self, config: AlertConfig) -> Self {
self.alert_config = config;
self
}
pub fn add_tag(&mut self, tag: String) {
self.tags.insert(tag);
}
pub fn remove_tag(&mut self, tag: &str) -> bool {
self.tags.remove(tag)
}
pub fn has_tag(&self, tag: &str) -> bool {
self.tags.contains(tag)
}
pub fn is_in_group(&self, group: &str) -> bool {
self.group.as_deref() == Some(group)
}
pub fn is_due(&self) -> Result<bool, ScheduleError> {
if self.last_run_at.is_none() {
return Ok(true);
}
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(Utc::now() >= next_run)
}
pub fn next_run_time(&self) -> Result<DateTime<Utc>, ScheduleError> {
if let Some(cached) = self.cached_next_run {
return Ok(cached);
}
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(next_run)
}
pub fn update_next_run_cache(&mut self) {
if let Ok(next_run) = self.next_run_time_uncached() {
self.cached_next_run = Some(next_run);
} else {
self.cached_next_run = None;
}
}
fn next_run_time_uncached(&self) -> Result<DateTime<Utc>, ScheduleError> {
let mut next_run = self.schedule.next_run(self.last_run_at)?;
if let Some(ref jitter) = self.jitter {
next_run = jitter.apply(next_run, &self.name);
}
Ok(next_run)
}
pub fn invalidate_next_run_cache(&mut self) {
self.cached_next_run = None;
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn has_run(&self) -> bool {
self.last_run_at.is_some()
}
pub fn has_options(&self) -> bool {
self.options.has_queue() || self.options.has_priority() || self.options.has_expires()
}
pub fn age_since_last_run(&self) -> Option<chrono::Duration> {
self.last_run_at.map(|last_run| Utc::now() - last_run)
}
pub fn mark_success(&mut self) {
self.retry_count = 0; }
pub fn mark_failure(&mut self) {
self.last_failure_at = Some(Utc::now());
self.total_failure_count += 1;
self.retry_count += 1;
}
pub fn should_retry(&self) -> bool {
self.retry_policy.should_retry(self.retry_count)
}
pub fn begin_execution(&mut self, timeout_seconds: Option<u64>) {
let timeout_after = timeout_seconds.map(|secs| Utc::now() + Duration::seconds(secs as i64));
self.execution_state = ExecutionState::Running {
started_at: Utc::now(),
timeout_after,
};
}
pub fn complete_execution(&mut self) {
self.execution_state = ExecutionState::Idle;
}
pub fn detect_interrupted_execution(&self) -> bool {
match &self.execution_state {
ExecutionState::Idle => false,
ExecutionState::Running {
started_at,
timeout_after,
} => {
if let Some(timeout) = timeout_after {
if Utc::now() > *timeout {
return true;
}
}
let running_duration = Utc::now() - *started_at;
let max_reasonable_duration = Duration::hours(24); running_duration > max_reasonable_duration
}
}
}
pub fn recover_from_interruption(&mut self) -> Option<Duration> {
match &self.execution_state {
ExecutionState::Running { started_at, .. } => {
let duration = Utc::now() - *started_at;
let record = ExecutionRecord::completed(*started_at, ExecutionResult::Interrupted);
self.execution_history.push(record);
if self.max_history_size > 0 && self.execution_history.len() > self.max_history_size
{
let remove_count = self.execution_history.len() - self.max_history_size;
self.execution_history.drain(0..remove_count);
}
self.execution_state = ExecutionState::Idle;
self.retry_count += 1;
Some(duration)
}
ExecutionState::Idle => None,
}
}
pub fn is_ready_for_retry_after_crash(&self) -> bool {
if !self.execution_history.is_empty() {
if let Some(last_exec) = self.execution_history.last() {
if last_exec.is_interrupted() && self.should_retry() {
return true;
}
}
}
false
}
pub fn next_retry_delay(&self) -> Option<u64> {
self.retry_policy.next_retry_delay(self.retry_count)
}
pub fn next_retry_time(&self) -> Option<DateTime<Utc>> {
if let (Some(last_failure), Some(delay)) = (self.last_failure_at, self.next_retry_delay()) {
Some(last_failure + Duration::seconds(delay as i64))
} else {
None
}
}
pub fn is_ready_for_retry(&self) -> bool {
if !self.should_retry() {
return false;
}
if let Some(next_retry) = self.next_retry_time() {
Utc::now() >= next_retry
} else {
false
}
}
pub fn failure_rate(&self) -> f64 {
let total_attempts = self.total_run_count + self.total_failure_count;
if total_attempts == 0 {
0.0
} else {
self.total_failure_count as f64 / total_attempts as f64
}
}
pub fn with_max_history(mut self, max_size: usize) -> Self {
self.max_history_size = max_size;
self
}
pub fn add_execution_record(&mut self, record: ExecutionRecord) {
self.execution_history.push(record);
if self.max_history_size > 0 && self.execution_history.len() > self.max_history_size {
let remove_count = self.execution_history.len() - self.max_history_size;
self.execution_history.drain(0..remove_count);
}
}
pub fn get_last_executions(&self, n: usize) -> &[ExecutionRecord] {
let start = self.execution_history.len().saturating_sub(n);
&self.execution_history[start..]
}
pub fn get_all_executions(&self) -> &[ExecutionRecord] {
&self.execution_history
}
pub fn history_success_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_success())
.count()
}
pub fn history_failure_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_failure())
.count()
}
pub fn consecutive_failure_count(&self) -> u32 {
let mut count = 0u32;
for record in self.execution_history.iter().rev() {
if record.is_failure() {
count += 1;
} else {
break;
}
}
count
}
pub fn history_timeout_count(&self) -> usize {
self.execution_history
.iter()
.filter(|r| r.is_timeout())
.count()
}
pub fn average_duration_ms(&self) -> Option<u64> {
let durations: Vec<u64> = self
.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.collect();
if durations.is_empty() {
None
} else {
Some(durations.iter().sum::<u64>() / durations.len() as u64)
}
}
pub fn min_duration_ms(&self) -> Option<u64> {
self.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.min()
}
pub fn max_duration_ms(&self) -> Option<u64> {
self.execution_history
.iter()
.filter_map(|r| r.duration_ms)
.max()
}
pub fn history_success_rate(&self) -> f64 {
let total = self.execution_history.len();
if total == 0 {
0.0
} else {
self.history_success_count() as f64 / total as f64
}
}
pub fn clear_history(&mut self) {
self.execution_history.clear();
}
pub fn check_health(&self) -> HealthCheckResult {
let mut issues = Vec::new();
let next_run = match self.next_run_time() {
Ok(next) => Some(next),
Err(e) => {
issues.push(format!("Cannot calculate next run: {}", e));
None
}
};
if let Some(stuck_duration) = self.is_stuck() {
issues.push(format!(
"Task may be stuck (no execution in {} hours)",
stuck_duration.num_hours()
));
}
let failure_rate = self.failure_rate();
let total_attempts = self.total_run_count + self.total_failure_count;
if failure_rate > 0.5 && total_attempts >= 10 {
issues.push(format!(
"High failure rate: {:.1}% ({} failures out of {} total)",
failure_rate * 100.0,
self.total_failure_count,
total_attempts
));
}
if self.execution_history.len() >= 3 {
let last_3 = &self.execution_history[self.execution_history.len() - 3..];
if last_3.iter().all(|r| r.is_failure() || r.is_timeout()) {
issues.push("Last 3 executions failed".to_string());
}
}
let health = if !self.enabled {
ScheduleHealth::Warning {
issues: vec!["Task is disabled".to_string()],
}
} else if issues.is_empty() {
ScheduleHealth::Healthy
} else if issues.iter().any(|i| i.contains("Cannot calculate")) {
ScheduleHealth::Unhealthy { issues }
} else {
ScheduleHealth::Warning { issues }
};
let time_since_last_run = self.age_since_last_run();
let mut result = HealthCheckResult::new(self.name.clone(), health);
if let Some(next) = next_run {
result = result.with_next_run(next);
}
if let Some(duration) = time_since_last_run {
result = result.with_time_since_last_run(duration);
}
result
}
pub fn is_stuck(&self) -> Option<chrono::Duration> {
if !self.enabled {
return None;
}
if let Some(last_run) = self.last_run_at {
let age = Utc::now() - last_run;
let expected_interval = match &self.schedule {
Schedule::Interval { every } => Duration::seconds(*every as i64),
#[cfg(feature = "cron")]
Schedule::Crontab { .. } => Duration::hours(24), #[cfg(feature = "solar")]
Schedule::Solar { .. } => Duration::hours(24), Schedule::OneTime { .. } => return None, };
let stuck_threshold = expected_interval * 10;
if age > stuck_threshold {
return Some(age);
}
}
None
}
pub fn validate_schedule(&self) -> Result<(), ScheduleError> {
self.schedule.next_run(self.last_run_at)?;
Ok(())
}
pub fn update_schedule(&mut self, new_schedule: Schedule, change_reason: Option<String>) {
self.current_version += 1;
self.schedule = new_schedule;
self.update_next_run_cache();
let version = ScheduleVersion::from_task(self, self.current_version, change_reason);
self.version_history.push(version);
}
pub fn update_config(
&mut self,
enabled: Option<bool>,
jitter: Option<Option<Jitter>>,
catchup_policy: Option<CatchupPolicy>,
change_reason: Option<String>,
) {
let mut jitter_changed = false;
if let Some(e) = enabled {
self.enabled = e;
}
if let Some(j) = jitter {
self.jitter = j;
jitter_changed = true;
}
if let Some(c) = catchup_policy {
self.catchup_policy = c;
}
if jitter_changed {
self.update_next_run_cache();
}
self.current_version += 1;
let version = ScheduleVersion::from_task(self, self.current_version, change_reason);
self.version_history.push(version);
}
pub fn rollback_to_version(&mut self, version_number: u32) -> Result<(), ScheduleError> {
let version = self
.version_history
.iter()
.find(|v| v.version == version_number)
.ok_or_else(|| {
ScheduleError::Invalid(format!("Version {} not found", version_number))
})?;
self.schedule = version.schedule.clone();
self.enabled = version.enabled;
self.jitter = version.jitter.clone();
self.catchup_policy = version.catchup_policy.clone();
self.current_version += 1;
let rollback_version = ScheduleVersion::from_task(
self,
self.current_version,
Some(format!("Rolled back to version {}", version_number)),
);
self.version_history.push(rollback_version);
Ok(())
}
pub fn get_version_history(&self) -> &[ScheduleVersion] {
&self.version_history
}
pub fn get_version(&self, version_number: u32) -> Option<&ScheduleVersion> {
self.version_history
.iter()
.find(|v| v.version == version_number)
}
pub fn get_previous_version(&self) -> Option<&ScheduleVersion> {
if self.current_version > 1 {
self.version_history.iter().rev().nth(1) } else {
None
}
}
pub fn add_dependency(&mut self, task_name: String) {
self.dependencies.insert(task_name);
}
pub fn remove_dependency(&mut self, task_name: &str) -> bool {
self.dependencies.remove(task_name)
}
pub fn clear_dependencies(&mut self) {
self.dependencies.clear();
}
pub fn has_dependencies(&self) -> bool {
!self.dependencies.is_empty()
}
pub fn depends_on(&self, task_name: &str) -> bool {
self.dependencies.contains(task_name)
}
pub fn with_dependencies(mut self, dependencies: HashSet<String>) -> Self {
self.dependencies = dependencies;
self
}
pub fn with_wait_for_dependencies(mut self, wait: bool) -> Self {
self.wait_for_dependencies = wait;
self
}
pub fn check_dependencies(&self, completed_tasks: &HashSet<String>) -> DependencyStatus {
if self.dependencies.is_empty() {
return DependencyStatus::Satisfied;
}
let pending: Vec<String> = self
.dependencies
.iter()
.filter(|dep| !completed_tasks.contains(*dep))
.cloned()
.collect();
if pending.is_empty() {
DependencyStatus::Satisfied
} else {
DependencyStatus::Waiting { pending }
}
}
pub fn check_dependencies_with_failures(
&self,
completed_tasks: &HashSet<String>,
failed_tasks: &HashSet<String>,
) -> DependencyStatus {
if self.dependencies.is_empty() {
return DependencyStatus::Satisfied;
}
let failed: Vec<String> = self
.dependencies
.iter()
.filter(|dep| failed_tasks.contains(*dep))
.cloned()
.collect();
if !failed.is_empty() {
return DependencyStatus::Failed { failed };
}
let pending: Vec<String> = self
.dependencies
.iter()
.filter(|dep| !completed_tasks.contains(*dep))
.cloned()
.collect();
if pending.is_empty() {
DependencyStatus::Satisfied
} else {
DependencyStatus::Waiting { pending }
}
}
}
impl std::fmt::Display for ScheduledTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ScheduledTask[{}] schedule={}", self.name, self.schedule)?;
if !self.enabled {
write!(f, " (disabled)")?;
}
if let Some(last_run) = self.last_run_at {
let age = Utc::now() - last_run;
write!(f, " last_run={}s ago", age.num_seconds())?;
}
write!(f, " runs={}", self.total_run_count)?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TaskOptions {
pub queue: Option<String>,
pub priority: Option<u8>,
pub expires: Option<u64>,
}
impl TaskOptions {
pub fn has_queue(&self) -> bool {
self.queue.is_some()
}
pub fn has_priority(&self) -> bool {
self.priority.is_some()
}
pub fn has_expires(&self) -> bool {
self.expires.is_some()
}
}
impl std::fmt::Display for TaskOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut parts = Vec::new();
if let Some(ref queue) = self.queue {
parts.push(format!("queue={}", queue));
}
if let Some(priority) = self.priority {
parts.push(format!("priority={}", priority));
}
if let Some(expires) = self.expires {
parts.push(format!("expires={}s", expires));
}
if parts.is_empty() {
write!(f, "TaskOptions[default]")
} else {
write!(f, "TaskOptions[{}]", parts.join(", "))
}
}
}
impl ScheduleVersion {
pub fn from_task(task: &ScheduledTask, version: u32, change_reason: Option<String>) -> Self {
Self {
version,
schedule: task.schedule.clone(),
created_at: Utc::now(),
change_reason,
enabled: task.enabled,
jitter: task.jitter.clone(),
catchup_policy: task.catchup_policy.clone(),
}
}
}
impl ScheduledTask {
pub fn with_business_calendar(self, _calendar: BusinessCalendar) -> Self {
self
}
pub fn with_holiday_calendar(self, _calendar: HolidayCalendar) -> Self {
self
}
}
impl ScheduledTask {
pub fn with_wfq_weight(mut self, weight: f64) -> Result<Self, String> {
if self.wfq_state.is_none() {
self.wfq_state = Some(WFQState::default());
}
self.wfq_state.as_mut().unwrap().weight = TaskWeight::new(weight)?;
Ok(self)
}
pub fn wfq_weight(&self) -> f64 {
self.wfq_state
.as_ref()
.map(|state| state.weight.value())
.unwrap_or(1.0)
}
pub fn wfq_finish_time(&self) -> f64 {
self.wfq_state
.as_ref()
.map(|state| state.finish_time())
.unwrap_or(0.0)
}
}