#[cfg(feature = "job_context")]
pub mod context;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration as StdDuration;
use chrono::{DateTime, Datelike, Duration as ChronoDuration, NaiveTime, Utc, Weekday};
#[cfg(feature = "cron_schedule")]
use cron::Schedule as CronSchedule;
use tracing::warn;
use uuid::Uuid;
pub type TKJobId = Uuid;
pub type InstanceId = Uuid;
pub type MaxRetries = u32;
pub(crate) type WorkerId = usize;
pub type BoxedExecFn =
Box<dyn Fn() -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> + Send + Sync + 'static>;
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Schedule {
WeekdayTimes(Vec<(Weekday, NaiveTime)>),
#[cfg(feature = "cron_schedule")]
Cron(String),
FixedInterval(StdDuration),
Once(DateTime<Utc>),
Never,
}
impl Schedule {
pub(crate) fn calculate_next_run(&self, reference_time: DateTime<Utc>) -> Option<DateTime<Utc>> {
match self {
Schedule::WeekdayTimes(times) => {
calculate_next_weekday_time(times, reference_time) }
#[cfg(feature = "cron_schedule")]
Schedule::Cron(expression) => match CronSchedule::from_str(expression) {
Ok(cron_schedule) => cron_schedule.after(&reference_time).next(),
Err(e) => {
warn!("Failed to parse cron expression '{}': {}", expression, e);
None
}
},
Schedule::FixedInterval(interval) => match ChronoDuration::from_std(*interval) {
Ok(chrono_interval) => reference_time.checked_add_signed(chrono_interval),
Err(e) => {
warn!("Failed to convert interval duration {:?}: {}", interval, e);
None
}
},
Schedule::Once(run_at) => {
if reference_time < *run_at {
Some(*run_at)
} else {
None
}
}
Schedule::Never => None,
}
}
}
fn calculate_next_weekday_time(
weekday_times: &[(Weekday, NaiveTime)],
reference_time: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
if weekday_times.is_empty() {
return None;
}
let mut next_run_candidate = None;
let mut sorted_schedule = weekday_times.to_vec(); sorted_schedule.sort_by_key(|(wd, tm)| (wd.num_days_from_sunday(), *tm));
for (weekday, time) in &sorted_schedule {
let current_weekday = reference_time.weekday();
let target_weekday = *weekday;
let mut target_day = reference_time.date_naive();
let days_offset = (7 + target_weekday.num_days_from_sunday() as i32
- current_weekday.num_days_from_sunday() as i32)
% 7;
if days_offset == 0 {
if reference_time.time() >= *time {
target_day = target_day + ChronoDuration::weeks(1);
} } else {
target_day = target_day + ChronoDuration::days(days_offset as i64);
}
let potential_next_naive = target_day.and_time(*time);
let potential_next_utc = DateTime::<Utc>::from_naive_utc_and_offset(potential_next_naive, Utc);
if potential_next_utc > reference_time {
next_run_candidate = Some(match next_run_candidate {
Some(existing) => std::cmp::min(existing, potential_next_utc),
None => potential_next_utc,
});
}
}
next_run_candidate.or_else(|| {
let (first_weekday, first_time) = sorted_schedule.first().unwrap(); let current_weekday = reference_time.weekday();
let mut target_day = reference_time.date_naive();
let days_offset = (7 + first_weekday.num_days_from_sunday() as i32
- current_weekday.num_days_from_sunday() as i32)
% 7;
let days_to_add = if days_offset == 0 && reference_time.time() >= *first_time {
7 } else {
days_offset as i64 };
target_day = target_day + ChronoDuration::days(days_to_add);
let next_cycle_naive = target_day.and_time(*first_time);
Some(DateTime::<Utc>::from_naive_utc_and_offset(
next_cycle_naive,
Utc,
))
})
}
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct TKJobRequest {
pub name: String,
pub schedule: Schedule,
pub max_retries: MaxRetries,
pub retry_delay: Option<StdDuration>,
pub(crate) retry_count: u32,
pub(crate) next_run: Option<DateTime<Utc>>,
}
impl fmt::Debug for TKJobRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TKJobRequest")
.field("name", &self.name)
.field("schedule", &self.schedule)
.field("max_retries", &self.max_retries)
.field("retry_delay", &self.retry_delay)
.field("retry_count", &self.retry_count)
.field("next_run", &self.next_run)
.finish()
}
}
impl TKJobRequest {
pub fn new(name: &str, schedule: Schedule, max_retries: u32) -> Self {
Self {
name: name.to_string(),
schedule,
max_retries,
retry_delay: None,
retry_count: 0, next_run: None, }
}
pub fn with_fixed_retry_delay(
name: &str,
schedule: Schedule,
max_retries: MaxRetries,
retry_delay: StdDuration,
) -> Self {
Self {
name: name.to_string(),
schedule,
max_retries,
retry_delay: Some(retry_delay),
retry_count: 0,
next_run: None,
}
}
pub fn from_week_day(
name: &str,
weekday_times: Vec<(Weekday, NaiveTime)>,
max_retries: u32,
) -> Self {
Self::new(name, Schedule::WeekdayTimes(weekday_times), max_retries)
}
#[cfg(feature = "cron_schedule")]
pub fn from_cron(name: &str, cron_expression: &str, max_retries: u32) -> Self {
Self::new(
name,
Schedule::Cron(cron_expression.to_string()),
max_retries,
)
}
pub fn from_interval(name: &str, interval: StdDuration, max_retries: u32) -> Self {
Self::new(name, Schedule::FixedInterval(interval), max_retries)
}
pub fn from_once(name: &str, run_at: DateTime<Utc>, max_retries: u32) -> Self {
let mut req = Self::new(name, Schedule::Once(run_at), max_retries);
req.next_run = Some(run_at); req
}
pub fn never(name: &str, max_retries: u32) -> Self {
Self::new(name, Schedule::Never, max_retries)
}
pub fn with_initial_run_time(&mut self, run_time: DateTime<Utc>) {
self.next_run = Some(run_time);
}
pub(crate) fn calculate_next_run(&self) -> Option<DateTime<Utc>> {
self.schedule.calculate_next_run(Utc::now())
}
pub(crate) fn calculate_retry_time(&self) -> DateTime<Utc> {
let now = Utc::now();
if let Some(fixed_delay) = self.retry_delay {
match ChronoDuration::from_std(fixed_delay) {
Ok(chrono_delay) => {
now.checked_add_signed(chrono_delay).unwrap_or_else(|| {
warn!(?fixed_delay, "Fixed retry delay addition overflowed.");
now + ChronoDuration::seconds(i64::MAX / 2) })
}
Err(e) => {
warn!(?fixed_delay, error=%e, "Failed to convert fixed retry delay.");
now + ChronoDuration::seconds(60) }
}
} else {
let attempt_number = self.retry_count.saturating_add(1);
let base_delay_secs: u64 = 60; let factor: u64 = 3;
let max_exponent: u32 = 5;
let exponent = std::cmp::min(attempt_number.saturating_sub(1), max_exponent);
let factor_pow = factor.checked_pow(exponent).unwrap_or(u64::MAX);
let backoff_seconds = base_delay_secs.checked_mul(factor_pow).unwrap_or(u64::MAX);
if let Ok(backoff_i64) = backoff_seconds.try_into() {
now
.checked_add_signed(ChronoDuration::seconds(backoff_i64))
.unwrap_or_else(|| {
warn!(
attempt = attempt_number,
"Exponential backoff duration overflowed."
);
now + ChronoDuration::seconds(i64::MAX / 2) })
} else {
warn!(
attempt = attempt_number,
"Exponential backoff duration exceeds i64::MAX seconds."
);
now + ChronoDuration::seconds(i64::MAX / 2) }
}
}
}
pub(crate) struct JobDefinition {
pub request: TKJobRequest,
pub exec_fn: Arc<BoxedExecFn>,
pub lineage_id: TKJobId,
pub current_instance_id: Option<InstanceId>,
}
impl fmt::Debug for JobDefinition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JobDefinition")
.field("request", &self.request)
.field("exec_fn", &format_args!("Arc<BoxedExecFn>"))
.field("lineage_id", &self.lineage_id)
.field("current_instance_id", &self.current_instance_id)
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct JobSummary {
pub id: TKJobId,
pub name: String,
pub next_run: Option<DateTime<Utc>>,
pub retry_count: u32,
pub is_cancelled: bool,
}
#[derive(Debug, Clone, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct JobDetails {
pub id: TKJobId,
pub name: String,
pub schedule: Schedule,
pub max_retries: MaxRetries,
pub retry_count: u32,
pub retry_delay: Option<StdDuration>,
pub next_run_instance: Option<InstanceId>,
pub next_run_time: Option<DateTime<Utc>>,
pub is_cancelled: bool,
}