use std::sync::Arc;
use std::time::Duration;
use slatedb::{Db, IsolationLevel};
use tokio::sync::{Notify, watch};
use tracing::{debug, warn};
use crate::error::Result;
use crate::job::{JobRecord, JobStatus};
use crate::queue::{job_index_key, now_ms, pending_key};
use crate::stats::update_stats;
pub(crate) async fn schedule_loop(
db: Arc<Db>,
interval: Duration,
job_available: Arc<Notify>,
mut shutdown: watch::Receiver<bool>,
) {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
match promote_due_jobs(&db).await {
Ok(0) => {}
Ok(_) => job_available.notify_waiters(),
Err(e) => warn!("scheduled job promoter error: {e}"),
}
}
_ = shutdown.changed() => break,
}
}
debug!("scheduled job promoter stopped");
}
pub(crate) async fn promote_due_jobs(db: &Db) -> Result<usize> {
let now = now_ms();
let mut due_keys = Vec::new();
let mut iter = db.scan_prefix(b"scheduled:").await?;
while let Some(kv) = iter.next().await? {
let key_str = match std::str::from_utf8(&kv.key) {
Ok(s) => s,
Err(_) => continue,
};
let after = match key_str.strip_prefix("scheduled:") {
Some(s) => s,
None => continue,
};
let ts_str = match after.split(':').next() {
Some(s) => s,
None => continue,
};
let run_at = match ts_str.parse::<u64>() {
Ok(v) => v,
Err(_) => continue,
};
if run_at > now {
break;
}
due_keys.push(kv.key.clone());
}
drop(iter);
let count = due_keys.len();
for key_bytes in due_keys {
promote_job(db, &key_bytes).await?;
}
Ok(count)
}
async fn promote_job(db: &Db, scheduled_key_bytes: &[u8]) -> Result<()> {
loop {
let txn = db.begin(IsolationLevel::Snapshot).await?;
let raw = match txn.get(scheduled_key_bytes).await? {
None => {
txn.rollback();
return Ok(());
}
Some(raw) => raw,
};
let mut job: JobRecord = rmp_serde::from_slice(&raw)?;
txn.delete(scheduled_key_bytes)?;
job.status = JobStatus::Pending;
job.run_at = None;
let priority = job.priority;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Scheduled, -1)],
)?;
match txn.commit().await {
Ok(_) => {
debug!(
queue = %job.queue,
job_id = %job.id,
"scheduled job promoted to pending"
);
return Ok(());
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}