use std::{convert::Infallible, path::Path, sync::Arc};
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
use tokio::sync::Notify;
use tracing::{debug, error, info};
use uuid::Uuid;
pub type Never = Infallible;
#[typetag::serde(tag = "type")]
#[async_trait]
pub trait Job: Send + Sync {
async fn run(&mut self, queue: Arc<AppQueue>) -> Result<()>;
fn is_fatal_error(&self, _: &anyhow::Error) -> bool {
false
}
fn get_next_retry(&self, retries: i64) -> DateTime<Utc> {
let retries = retries.min(10) as u32;
let duration_secs = 2i64.pow(retries).min(600);
Utc::now() + Duration::seconds(duration_secs)
}
}
pub struct AppQueue {
db_conn: SqlitePool,
notifier: Notify,
}
impl AppQueue {
pub async fn new(db_path: impl AsRef<Path>) -> Result<Arc<Self>> {
let db_conn = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true),
)
.await
.context("Opening sqlite database")?;
let notifier = Notify::new();
let db = Self { db_conn, notifier };
db.initialize_db()
.await
.context("initializing sqlite datababse")?;
Ok(Arc::new(db))
}
async fn initialize_db(&self) -> Result<()> {
sqlx::migrate!("./migrations")
.run(&self.db_conn)
.await
.context("Migrating database")?;
debug!("Database initialized, rescheduling aborted jobs");
query!("UPDATE jobs SET is_running = 0, retries = retries + 1 WHERE is_running = 1")
.execute(&self.db_conn)
.await
.context("Requeuing aborted jobs")?;
Ok(())
}
async fn run_job(self: &Arc<Self>) -> Result<bool> {
let job_info = query!(
r#"
UPDATE jobs
SET is_running = 1
WHERE id IN (
SELECT id FROM jobs
WHERE is_running = 0
AND run_after <= datetime('now')
ORDER BY run_after ASC
LIMIT 1)
RETURNING id, job_data, retries
"#
)
.fetch_optional(&self.db_conn)
.await
.context("Loading idle queue entry from database")?;
let job_info = match job_info {
Some(job_info) => job_info,
None => return Ok(false),
};
debug!("Fetched job ID: {}", job_info.id);
let mut de: Box<dyn Job> = ciborium::de::from_reader(job_info.job_data.as_slice())
.context("Deserializing the job data")?;
match de.run(Arc::clone(self)).await {
Ok(()) => {
debug!("Job {} completed successfully", job_info.id);
query!("DELETE FROM jobs WHERE id =?", job_info.id)
.execute(&self.db_conn)
.await?;
}
Err(e) => {
error!("Job {} failed: {:#?}", job_info.id, e);
if de.is_fatal_error(&e) {
error!(
"Job {} failed due to fatal error. Aborting retries.",
job_info.id
);
query!("DELETE FROM jobs WHERE id =?", job_info.id)
.execute(&self.db_conn)
.await?;
return Ok(true);
}
let next_retry = de.get_next_retry(job_info.retries);
debug!("Job {} failed. Retrying at {}", job_info.id, next_retry);
let new_retry_count = job_info.retries + 1;
let mut job_data = Vec::new();
ciborium::into_writer(&de, &mut job_data)?;
query!(
"UPDATE jobs SET is_running = 0, run_after =?, retries =?, job_data =? WHERE id =?",
next_retry,
new_retry_count,
job_data,
job_info.id,
)
.execute(&self.db_conn)
.await?;
}
}
Ok(true)
}
pub async fn run_job_loop(self: Arc<Self>) -> Result<Never> {
self.notifier.notify_one();
info!("Starting job worker.");
loop {
self.notifier.notified().await;
debug!("Received queue notification.");
while self.run_job().await? {}
debug!("No more jobs to run for now. Sleeping.")
}
}
pub fn run_job_workers(self: Arc<Self>, num_workers: usize) {
for _ in 0..num_workers {
tokio::spawn(Arc::clone(&self).run_job_loop());
}
}
pub fn run_job_workers_default(self: Arc<Self>) {
self.run_job_workers(num_cpus::get());
}
pub async fn add_unique_job(&self, id: impl AsRef<str>, job: Box<dyn Job>) -> Result<()> {
let id = id.as_ref();
let mut job_data = Vec::new();
ciborium::into_writer(&job, &mut job_data)?;
query!(
"INSERT INTO jobs (unique_job_id, run_after, job_data) VALUES (?,datetime('now'),?)",
id,
job_data
)
.execute(&self.db_conn)
.await?;
self.notifier.notify_one();
Ok(())
}
pub async fn add_job(&self, job: Box<dyn Job>) -> Result<()> {
let id = Uuid::new_v4();
self.add_unique_job(id.to_string(), job).await
}
}