1use std::{convert::Infallible, path::Path, sync::Arc};
43
44use anyhow::{Context, Result};
45use async_trait::async_trait;
46use chrono::{DateTime, Duration, Utc};
47use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
48use tokio::sync::Notify;
49use tracing::{debug, error, info};
50use uuid::Uuid;
51
52pub type Never = Infallible;
54
55#[typetag::serde(tag = "type")]
57#[async_trait]
58pub trait Job: Send + Sync {
59 async fn run(&mut self, queue: Arc<AppQueue>) -> Result<()>;
65
66 fn is_fatal_error(&self, _: &anyhow::Error) -> bool {
72 false
73 }
74
75 fn get_next_retry(&self, retries: i64) -> DateTime<Utc> {
81 let retries = retries.min(10) as u32;
82 let duration_secs = 2i64.pow(retries).min(600); Utc::now() + Duration::seconds(duration_secs)
85 }
86}
87
88pub struct AppQueue {
89 db_conn: SqlitePool,
90 notifier: Notify,
91}
92
93impl AppQueue {
97 pub async fn new(db_path: impl AsRef<Path>) -> Result<Arc<Self>> {
102 let db_conn = SqlitePool::connect_with(
103 SqliteConnectOptions::new()
104 .filename(db_path)
105 .create_if_missing(true),
106 )
107 .await
108 .context("Opening sqlite database")?;
109 let notifier = Notify::new();
110
111 let db = Self { db_conn, notifier };
112
113 db.initialize_db()
114 .await
115 .context("initializing sqlite datababse")?;
116
117 Ok(Arc::new(db))
118 }
119
120 async fn initialize_db(&self) -> Result<()> {
122 sqlx::migrate!("./migrations")
123 .run(&self.db_conn)
124 .await
125 .context("Migrating database")?;
126 debug!("Database initialized, rescheduling aborted jobs");
127 query!("UPDATE jobs SET is_running = 0, retries = retries + 1 WHERE is_running = 1")
128 .execute(&self.db_conn)
129 .await
130 .context("Requeuing aborted jobs")?;
131 Ok(())
132 }
133
134 async fn run_job(self: &Arc<Self>) -> Result<bool> {
143 let job_info = query!(
144 r#"
145UPDATE jobs
146 SET is_running = 1
147 WHERE id IN (
148 SELECT id FROM jobs
149 WHERE is_running = 0
150 AND run_after <= datetime('now')
151 ORDER BY run_after ASC
152 LIMIT 1)
153 RETURNING id, job_data, retries
154 "#
155 )
156 .fetch_optional(&self.db_conn)
157 .await
158 .context("Loading idle queue entry from database")?;
159
160 let job_info = match job_info {
161 Some(job_info) => job_info,
162 None => return Ok(false),
163 };
164 debug!("Fetched job ID: {}", job_info.id);
165
166 let mut de: Box<dyn Job> = ciborium::de::from_reader(job_info.job_data.as_slice())
167 .context("Deserializing the job data")?;
168
169 match de.run(Arc::clone(self)).await {
170 Ok(()) => {
171 debug!("Job {} completed successfully", job_info.id);
172 query!("DELETE FROM jobs WHERE id =?", job_info.id)
173 .execute(&self.db_conn)
174 .await?;
175 }
176 Err(e) => {
177 error!("Job {} failed: {:#?}", job_info.id, e);
178 if de.is_fatal_error(&e) {
179 error!(
180 "Job {} failed due to fatal error. Aborting retries.",
181 job_info.id
182 );
183 query!("DELETE FROM jobs WHERE id =?", job_info.id)
184 .execute(&self.db_conn)
185 .await?;
186 return Ok(true);
187 }
188 let next_retry = de.get_next_retry(job_info.retries);
189 debug!("Job {} failed. Retrying at {}", job_info.id, next_retry);
190 let new_retry_count = job_info.retries + 1;
191 let mut job_data = Vec::new();
192 ciborium::into_writer(&de, &mut job_data)?;
193 query!(
194 "UPDATE jobs SET is_running = 0, run_after =?, retries =?, job_data =? WHERE id =?",
195 next_retry,
196 new_retry_count,
197 job_data,
198 job_info.id,
199 )
200 .execute(&self.db_conn)
201 .await?;
202 }
203 }
204
205 Ok(true)
206 }
207
208 pub async fn run_job_loop(self: Arc<Self>) -> Result<Never> {
212 self.notifier.notify_one();
213 info!("Starting job worker.");
214
215 loop {
216 self.notifier.notified().await;
217 debug!("Received queue notification.");
218 while self.run_job().await? {}
219 debug!("No more jobs to run for now. Sleeping.")
220 }
221 }
222
223 pub fn run_job_workers(self: Arc<Self>, num_workers: usize) {
225 for _ in 0..num_workers {
226 tokio::spawn(Arc::clone(&self).run_job_loop());
227 }
228 }
229
230 pub fn run_job_workers_default(self: Arc<Self>) {
232 self.run_job_workers(num_cpus::get());
233 }
234
235 pub async fn add_unique_job(&self, id: impl AsRef<str>, job: Box<dyn Job>) -> Result<()> {
239 let id = id.as_ref();
240 let mut job_data = Vec::new();
241 ciborium::into_writer(&job, &mut job_data)?;
242 query!(
243 "INSERT INTO jobs (unique_job_id, run_after, job_data) VALUES (?,datetime('now'),?)",
244 id,
245 job_data
246 )
247 .execute(&self.db_conn)
248 .await?;
249
250 self.notifier.notify_one();
251
252 Ok(())
253 }
254
255 pub async fn add_job(&self, job: Box<dyn Job>) -> Result<()> {
259 let id = Uuid::new_v4();
260 self.add_unique_job(id.to_string(), job).await
261 }
262}