use std::collections::HashMap;
use tokio::time::{Duration, interval, MissedTickBehavior};
use redis::{AsyncCommands, Script};
use chrono::{Utc, DateTime};
use anyhow::Result;
use tracing::{info, error, warn};
use futures::FutureExt;
use crate::config::get_shutdown_notify;
use crate::utils::rdconfig::get_redis_connection;
use crate::cron::cron_parser::CronParser;
use crate::cron::cron_job::CronJobMeta;
use crate::cron::cron_job::CronJob;
const CRON_JOBS_KEY: &str = "snm:cron:jobs";
const CRON_SCHEDULE_KEY: &str = "snm:cron:schedule";
const CLAIM_BATCH_LIMIT: i64 = 100;
pub struct CronScheduler;
impl CronScheduler {
pub async fn start() {
let shutdown = get_shutdown_notify();
tokio::spawn(async move {
info!("🕐 Cron scheduler started");
let mut tick = interval(Duration::from_secs(5)); tick.set_missed_tick_behavior(MissedTickBehavior::Burst); loop {
if shutdown.notified().now_or_never().is_some() {
info!("🕐 Cron scheduler shutting down");
break;
}
if let Err(e) = Self::process_cron_jobs().await {
error!("Cron scheduler error: {:?}", e);
}
tick.tick().await;
}
});
}
pub async fn register_cron_job<T>(job: T) -> Result<()>
where
T: CronJob + serde::Serialize + 'static,
{
let mut conn = get_redis_connection().await?;
let now = Utc::now();
let next_run = CronParser::next_execution(job.cron_expression(), now, job.timezone())?;
let job_key = format!("{}:{}", CRON_JOBS_KEY, job.cron_id());
if conn.exists::<_, bool>(&job_key).await.unwrap_or(false) {
anyhow::bail!("Cron job with id '{}' already exists", job.cron_id());
}
let meta = CronJobMeta {
id: job.cron_id().to_string(),
name: job.name().to_string(),
queue: job.queue().to_string(),
cron_expression: job.cron_expression().to_string(),
timezone: job.timezone().to_string(),
enabled: job.enabled(),
last_run: None,
next_run: next_run.to_rfc3339(),
created_at: now.to_rfc3339(),
payload: serde_json::to_string(&job)?,
};
let job_key = format!("{}:{}", CRON_JOBS_KEY, meta.id);
conn.hset_multiple::<_, _, _, ()>(&job_key, &[
("name", &meta.name),
("queue", &meta.queue),
("cron_expression", &meta.cron_expression),
("timezone", &meta.timezone),
("enabled", &meta.enabled.to_string()),
("next_run", &meta.next_run),
("created_at", &meta.created_at),
("payload", &meta.payload),
]).await?;
conn.zadd::<_, _, _, ()>(
CRON_SCHEDULE_KEY,
&meta.id,
next_run.timestamp()
).await?;
info!("📅 Registered cron job: {} ({})", meta.name, meta.cron_expression);
Ok(())
}
async fn process_cron_jobs() -> Result<()> {
let mut conn = get_redis_connection().await?;
let now_ts = Utc::now().timestamp();
const CLAIM_SCRIPT: &str = r#"
local key = KEYS[1]
local now = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local ids = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit)
for i, id in ipairs(ids) do
redis.call('ZREM', key, id)
end
return ids
"#;
let script = Script::new(CLAIM_SCRIPT);
let due_jobs: Vec<String> = script.key(CRON_SCHEDULE_KEY)
.arg(now_ts)
.arg(CLAIM_BATCH_LIMIT)
.invoke_async(&mut conn).await?;
for job_id in due_jobs {
if let Err(e) = Self::execute_cron_job(&job_id).await {
error!("Failed to execute cron job {}: {:?}", job_id, e);
}
}
Ok(())
}
async fn execute_cron_job(job_id: &str) -> Result<()> {
let mut conn = get_redis_connection().await?;
let job_key = format!("{}:{}", CRON_JOBS_KEY, job_id);
let job_data: HashMap<String, String> = conn.hgetall(&job_key).await?;
if job_data.is_empty() {
warn!("Cron job {} not found", job_id);
return Ok(());
}
let enabled: bool = job_data
.get("enabled")
.and_then(|v| v.parse().ok())
.unwrap_or(false);
if !enabled {
info!("Skipping disabled cron job: {}", job_id);
return Ok(());
}
let cron_expr = job_data.get("cron_expression")
.ok_or_else(|| anyhow::anyhow!("Missing cron_expression"))?;
let timezone = job_data.get("timezone")
.map(|s| s.as_str())
.unwrap_or("UTC");
let payload = job_data.get("payload")
.ok_or_else(|| anyhow::anyhow!("Missing payload"))?;
let queue = job_data.get("queue")
.map(|s| s.clone())
.unwrap_or_else(|| "default".to_string());
let enqueued_job_id = Self::enqueue_raw_job(payload, &queue).await?;
info!("🚀 Cron job {} enqueued as {}", job_id, enqueued_job_id);
let now = Utc::now();
let anchor: DateTime<Utc> = job_data.get("next_run")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or(now);
let mut next_run = CronParser::next_execution(cron_expr, anchor, timezone)?;
while next_run <= now {
next_run = CronParser::next_execution(cron_expr, next_run, timezone)?;
}
conn.hset_multiple::<_, _, _, ()>(&job_key, &[
("last_run", &now.to_rfc3339()),
("next_run", &next_run.to_rfc3339()),
]).await?;
conn.zadd::<_, _, _, ()>(
CRON_SCHEDULE_KEY,
job_id,
next_run.timestamp()
).await?;
info!("📅 Cron job {} rescheduled for {}", job_id, next_run);
Ok(())
}
async fn enqueue_raw_job(payload: &str, queue: &str) -> Result<String> {
use crate::utils::constants::{PREFIX_JOB, PREFIX_QUEUE};
use nanoid::nanoid;
let mut conn = get_redis_connection().await?;
let job_id = nanoid!(10);
let now = Utc::now().to_rfc3339();
let queue_key = format!("{PREFIX_QUEUE}:{}", queue);
let job_key = format!("{PREFIX_JOB}:{job_id}");
conn.hset_multiple::<_, _, _, ()>(&job_key, &[
("queue", queue),
("status", "pending"),
("payload", payload),
("created_at", &now),
]).await?;
conn.rpush::<_, _, ()>(&queue_key, &job_id).await?;
conn.sadd::<_, _, ()>("snm:queues", queue).await?;
Ok(job_id)
}
pub async fn list_cron_jobs() -> Result<Vec<CronJobMeta>> {
let mut conn = get_redis_connection().await?;
let mut cursor: u64 = 0;
let mut keys: Vec<String> = Vec::new();
loop {
let (next, batch): (u64, Vec<String>) = redis::cmd("SCAN")
.cursor_arg(cursor)
.arg("MATCH").arg(format!("{}:*", CRON_JOBS_KEY))
.arg("COUNT").arg(1000)
.query_async(&mut conn).await?;
keys.extend(batch);
if next == 0 { break; }
cursor = next;
}
let mut jobs = Vec::new();
for key in keys {
let job_data: HashMap<String, String> = conn.hgetall(&key).await?;
if let Some(id) = key.strip_prefix(&format!("{}:", CRON_JOBS_KEY)) {
let job_meta = CronJobMeta {
id: id.to_string(),
name: job_data.get("name").unwrap_or(&"Unknown".to_string()).clone(),
queue: job_data.get("queue").unwrap_or(&"default".to_string()).clone(),
cron_expression: job_data.get("cron_expression").unwrap_or(&"".to_string()).clone(),
timezone: job_data.get("timezone").unwrap_or(&"UTC".to_string()).clone(),
enabled: job_data.get("enabled").and_then(|v| v.parse().ok()).unwrap_or(false),
last_run: job_data.get("last_run").cloned(),
next_run: job_data.get("next_run").unwrap_or(&"".to_string()).clone(),
created_at: job_data.get("created_at").unwrap_or(&"".to_string()).clone(),
payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(),
};
jobs.push(job_meta);
}
}
Ok(jobs)
}
pub async fn toggle_cron_job(job_id: &str, enabled: bool) -> Result<()> {
let mut conn = get_redis_connection().await?;
let job_key = format!("{}:{}", CRON_JOBS_KEY, job_id);
conn.hset::<_, _, _, ()>(&job_key, "enabled", enabled.to_string()).await?;
if enabled {
info!("✅ Enabled cron job: {}", job_id);
let job_data: HashMap<String, String> = conn.hgetall(&job_key).await?;
if let (Some(cron_expr), Some(tz)) = (job_data.get("cron_expression"), job_data.get("timezone")) {
let now = Utc::now();
let anchor = job_data.get("next_run")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or(now);
let mut next_run = CronParser::next_execution(cron_expr, anchor, tz).unwrap_or(now);
while next_run <= now {
next_run = CronParser::next_execution(cron_expr, next_run, tz).unwrap_or(now + chrono::Duration::minutes(1));
}
conn.hset::<_, _, _, ()>(&job_key, "next_run", next_run.to_rfc3339()).await?;
conn.zadd::<_, _, _, ()>(CRON_SCHEDULE_KEY, job_id, next_run.timestamp()).await?;
}
} else {
info!("❌ Disabled cron job: {}", job_id);
conn.zrem::<_, _, ()>(CRON_SCHEDULE_KEY, job_id).await?;
}
Ok(())
}
pub async fn delete_cron_job(job_id: &str) -> Result<()> {
let mut conn = get_redis_connection().await?;
let job_key = format!("{}:{}", CRON_JOBS_KEY, job_id);
conn.del::<_, ()>(&job_key).await?;
conn.zrem::<_, _, ()>(CRON_SCHEDULE_KEY, job_id).await?;
info!("🗑️ Deleted cron job: {}", job_id);
Ok(())
}
pub async fn run_now(job_id: &str) -> Result<String> {
let mut conn = get_redis_connection().await?;
let job_key = format!("{}:{}", CRON_JOBS_KEY, job_id);
let job_data: HashMap<String, String> = conn.hgetall(&job_key).await?;
if job_data.is_empty() {
return Err(anyhow::anyhow!("Cron job {} not found", job_id));
}
let payload = job_data.get("payload")
.ok_or_else(|| anyhow::anyhow!("Missing payload"))?;
let queue = job_data.get("queue")
.map(|s| s.clone())
.unwrap_or_else(|| "default".to_string());
let enqueued_job_id = Self::enqueue_raw_job(payload, &queue).await?;
info!("🚀 Manually triggered cron job {} as {}", job_id, enqueued_job_id);
Ok(enqueued_job_id)
}
}