use std::borrow::Cow;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use crate::error::SchedulerError;
#[must_use]
pub fn normalize_cron_expr(expr: &str) -> Cow<'_, str> {
if expr.split_whitespace().count() == 5 {
Cow::Owned(format!("0 {expr}"))
} else {
Cow::Borrowed(expr)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TaskKind {
MemoryCleanup,
SkillRefresh,
HealthCheck,
UpdateCheck,
Experiment,
Custom(String),
}
impl TaskKind {
#[must_use]
pub fn from_str_kind(s: &str) -> Self {
match s {
"memory_cleanup" => Self::MemoryCleanup,
"skill_refresh" => Self::SkillRefresh,
"health_check" => Self::HealthCheck,
"update_check" => Self::UpdateCheck,
"experiment" => Self::Experiment,
other => Self::Custom(other.to_owned()),
}
}
#[must_use]
pub fn as_str(&self) -> &str {
match self {
Self::MemoryCleanup => "memory_cleanup",
Self::SkillRefresh => "skill_refresh",
Self::HealthCheck => "health_check",
Self::UpdateCheck => "update_check",
Self::Experiment => "experiment",
Self::Custom(s) => s,
}
}
}
pub enum TaskMode {
Periodic {
schedule: Box<CronSchedule>,
},
OneShot {
run_at: DateTime<Utc>,
},
}
pub struct TaskDescriptor {
pub name: String,
pub mode: TaskMode,
pub kind: TaskKind,
pub config: serde_json::Value,
}
pub struct ScheduledTask {
pub name: String,
pub mode: TaskMode,
pub kind: TaskKind,
pub config: serde_json::Value,
}
impl ScheduledTask {
pub fn new(
name: impl Into<String>,
cron_expr: &str,
kind: TaskKind,
config: serde_json::Value,
) -> Result<Self, SchedulerError> {
Self::periodic(name, cron_expr, kind, config)
}
pub fn periodic(
name: impl Into<String>,
cron_expr: &str,
kind: TaskKind,
config: serde_json::Value,
) -> Result<Self, SchedulerError> {
let normalized = normalize_cron_expr(cron_expr);
let schedule = CronSchedule::from_str(&normalized)
.map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?;
Ok(Self {
name: name.into(),
mode: TaskMode::Periodic {
schedule: Box::new(schedule),
},
kind,
config,
})
}
#[must_use]
pub fn oneshot(
name: impl Into<String>,
run_at: DateTime<Utc>,
kind: TaskKind,
config: serde_json::Value,
) -> Self {
Self {
name: name.into(),
mode: TaskMode::OneShot { run_at },
kind,
config,
}
}
#[must_use]
pub fn cron_schedule(&self) -> Option<&CronSchedule> {
if let TaskMode::Periodic { schedule } = &self.mode {
Some(schedule.as_ref())
} else {
None
}
}
#[must_use]
pub fn cron_expr_string(&self) -> String {
match &self.mode {
TaskMode::Periodic { schedule } => schedule.to_string(),
TaskMode::OneShot { .. } => String::new(),
}
}
#[must_use]
pub fn task_mode_str(&self) -> &'static str {
match &self.mode {
TaskMode::Periodic { .. } => "periodic",
TaskMode::OneShot { .. } => "oneshot",
}
}
}
pub trait TaskHandler: Send + Sync {
fn execute(
&self,
config: &serde_json::Value,
) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>> + Send + '_>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn task_kind_roundtrip() {
assert_eq!(
TaskKind::from_str_kind("memory_cleanup"),
TaskKind::MemoryCleanup
);
assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup");
assert_eq!(
TaskKind::from_str_kind("skill_refresh"),
TaskKind::SkillRefresh
);
assert_eq!(TaskKind::SkillRefresh.as_str(), "skill_refresh");
assert_eq!(
TaskKind::from_str_kind("health_check"),
TaskKind::HealthCheck
);
assert_eq!(
TaskKind::from_str_kind("update_check"),
TaskKind::UpdateCheck
);
assert_eq!(TaskKind::UpdateCheck.as_str(), "update_check");
assert_eq!(
TaskKind::from_str_kind("custom_job"),
TaskKind::Custom("custom_job".into())
);
assert_eq!(TaskKind::Custom("x".into()).as_str(), "x");
}
#[test]
fn task_kind_experiment_roundtrip() {
assert_eq!(
TaskKind::from_str_kind("experiment"),
TaskKind::Experiment,
"from_str_kind must map 'experiment' to Experiment variant, not Custom"
);
assert_eq!(TaskKind::Experiment.as_str(), "experiment");
}
#[test]
fn normalize_five_field_prepends_zero() {
assert_eq!(normalize_cron_expr("*/5 * * * *"), "0 */5 * * * *");
assert_eq!(normalize_cron_expr("0 3 * * *"), "0 0 3 * * *");
}
#[test]
fn normalize_six_field_passthrough() {
assert_eq!(normalize_cron_expr("0 0 3 * * *"), "0 0 3 * * *");
assert_eq!(normalize_cron_expr("* * * * * *"), "* * * * * *");
}
#[test]
fn normalize_other_field_count_passthrough() {
assert_eq!(normalize_cron_expr("not_cron"), "not_cron");
assert_eq!(normalize_cron_expr("0 0 0 0"), "0 0 0 0");
}
#[test]
fn normalize_empty_string_passthrough() {
assert_eq!(normalize_cron_expr(""), "");
}
#[test]
fn normalize_whitespace_only_passthrough() {
assert_eq!(normalize_cron_expr(" "), " ");
}
#[test]
fn valid_cron_creates_task() {
let task = ScheduledTask::new(
"test",
"0 0 * * * *",
TaskKind::HealthCheck,
serde_json::Value::Null,
);
assert!(task.is_ok());
}
#[test]
fn five_field_cron_creates_task() {
let task = ScheduledTask::new(
"five-field",
"*/5 * * * *",
TaskKind::HealthCheck,
serde_json::Value::Null,
);
assert!(task.is_ok(), "5-field cron must be accepted");
}
#[test]
fn invalid_cron_returns_error() {
let task = ScheduledTask::new(
"test",
"not_cron",
TaskKind::HealthCheck,
serde_json::Value::Null,
);
assert!(task.is_err());
}
#[test]
fn oneshot_task_creates_correctly() {
let run_at = Utc::now() + chrono::Duration::hours(1);
let task =
ScheduledTask::oneshot("t", run_at, TaskKind::HealthCheck, serde_json::Value::Null);
assert_eq!(task.task_mode_str(), "oneshot");
assert!(task.cron_schedule().is_none());
}
#[test]
fn periodic_task_mode_str() {
let task = ScheduledTask::periodic(
"p",
"0 * * * * *",
TaskKind::HealthCheck,
serde_json::Value::Null,
)
.unwrap();
assert_eq!(task.task_mode_str(), "periodic");
assert!(task.cron_schedule().is_some());
}
}