app_queue/
lib.rs

1//! In-app persistent queue
2//!
3//! This crate implements a job queue for monolithic applications that persists across application restarts.
4//!
5//! Thanks to the [`typetag`](https://crates.io/crates/typetag) crate, your [`Job`](trait.Job.html)s can use any serializable data type.
6//!
7//! ```
8//! # use serde::{Deserialize, Serialize};
9//! # use anyhow::Result;
10//! # use std::sync::Arc;
11//! # use app_queue::{AppQueue, Job};
12//! # static NOTIFIER: tokio::sync::Notify = tokio::sync::Notify::const_new();
13//! #[derive(Clone, Debug, Serialize, Deserialize)]
14//! pub struct MyJob {
15//!     message: String
16//! }
17//!
18//! #[typetag::serde]
19//! #[async_trait::async_trait]
20//! impl Job for MyJob {
21//!   async fn run(&mut self, _: Arc<AppQueue>) -> Result<()> {
22//!     println!("{}", self.message);
23//! #    NOTIFIER.notify_one();
24//!     Ok(())
25//!   }
26//! }
27//!
28//! #[tokio::main]
29//! async fn main() -> Result<()> {
30//! # tracing_subscriber::fmt::init();
31//!   let queue = AppQueue::new("/tmp/queue.db").await?;
32//!   let job = MyJob {
33//!     message: "Hello, world!".into()
34//!   };
35//!   queue.add_job(Box::new(job)).await?;
36//!   queue.run_job_workers_default();
37//! # NOTIFIER.notified().await;
38//!   Ok(())
39//! }
40//! ```
41
42use std::{convert::Infallible, path::Path, sync::Arc};
43
44use anyhow::{Context, Result};
45use async_trait::async_trait;
46use chrono::{DateTime, Duration, Utc};
47use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
48use tokio::sync::Notify;
49use tracing::{debug, error, info};
50use uuid::Uuid;
51
52/// The Never `!` type. Can’t be created. Indicates that the function will never return.
53pub type Never = Infallible;
54
55/// Queued Job interface
56#[typetag::serde(tag = "type")]
57#[async_trait]
58pub trait Job: Send + Sync {
59    /// Attempt to run the job.
60    ///
61    /// # Errors
62    ///
63    /// In addition to any errors caused by the job itself, the job may return an error to indicate that the job should be requeued.
64    async fn run(&mut self, queue: Arc<AppQueue>) -> Result<()>;
65
66    /// Check if an error is fatal.
67    ///
68    /// Jobs that return a fatal error will not be requeued.
69    ///
70    /// The default implementation treats all errors as non-fatal.
71    fn is_fatal_error(&self, _: &anyhow::Error) -> bool {
72        false
73    }
74
75    /// Calculates the next time the job should be retried.
76    ///
77    /// It receives the number of times the job has already been retried.
78    ///
79    /// The default behavior is an exponential backoff, with a maximum retry period of 10 minutes.
80    fn get_next_retry(&self, retries: i64) -> DateTime<Utc> {
81        let retries = retries.min(10) as u32;
82        let duration_secs = 2i64.pow(retries).min(600); // clamped to 10 min.
83
84        Utc::now() + Duration::seconds(duration_secs)
85    }
86}
87
88pub struct AppQueue {
89    db_conn: SqlitePool,
90    notifier: Notify,
91}
92
93/// Central queue interface
94///
95/// See the crate documentation to see how to use this crate.
96impl AppQueue {
97    /// Opens or creates a new queue.
98    ///
99    /// # Errors
100    /// This function returns an error if the database cannot be opened, such as when the database is inaccessible or corrupt.
101    pub async fn new(db_path: impl AsRef<Path>) -> Result<Arc<Self>> {
102        let db_conn = SqlitePool::connect_with(
103            SqliteConnectOptions::new()
104                .filename(db_path)
105                .create_if_missing(true),
106        )
107        .await
108        .context("Opening sqlite database")?;
109        let notifier = Notify::new();
110
111        let db = Self { db_conn, notifier };
112
113        db.initialize_db()
114            .await
115            .context("initializing sqlite datababse")?;
116
117        Ok(Arc::new(db))
118    }
119
120    /// Initializes the database if it is uninitialized.
121    async fn initialize_db(&self) -> Result<()> {
122        sqlx::migrate!("./migrations")
123            .run(&self.db_conn)
124            .await
125            .context("Migrating database")?;
126        debug!("Database initialized, rescheduling aborted jobs");
127        query!("UPDATE jobs SET is_running = 0, retries = retries + 1 WHERE is_running = 1")
128            .execute(&self.db_conn)
129            .await
130            .context("Requeuing aborted jobs")?;
131        Ok(())
132    }
133
134    /// Runs a single job
135    ///
136    /// # Errors
137    /// This function can fail if there is a database error, or if the format of the event is invalid (due to a missing structure or a deleted event type).
138    ///
139    /// # Return value
140    ///
141    /// Returns `false` if there are currently no jobs to run.
142    async fn run_job(self: &Arc<Self>) -> Result<bool> {
143        let job_info = query!(
144            r#"
145UPDATE jobs
146    SET is_running = 1
147    WHERE id IN (
148        SELECT id FROM jobs
149        WHERE is_running = 0
150        AND run_after <= datetime('now')
151        ORDER BY run_after ASC
152        LIMIT 1)
153    RETURNING id, job_data, retries
154        "#
155        )
156        .fetch_optional(&self.db_conn)
157        .await
158        .context("Loading idle queue entry from database")?;
159
160        let job_info = match job_info {
161            Some(job_info) => job_info,
162            None => return Ok(false),
163        };
164        debug!("Fetched job ID: {}", job_info.id);
165
166        let mut de: Box<dyn Job> = ciborium::de::from_reader(job_info.job_data.as_slice())
167            .context("Deserializing the job data")?;
168
169        match de.run(Arc::clone(self)).await {
170            Ok(()) => {
171                debug!("Job {} completed successfully", job_info.id);
172                query!("DELETE FROM jobs WHERE id =?", job_info.id)
173                    .execute(&self.db_conn)
174                    .await?;
175            }
176            Err(e) => {
177                error!("Job {} failed: {:#?}", job_info.id, e);
178                if de.is_fatal_error(&e) {
179                    error!(
180                        "Job {} failed due to fatal error. Aborting retries.",
181                        job_info.id
182                    );
183                    query!("DELETE FROM jobs WHERE id =?", job_info.id)
184                        .execute(&self.db_conn)
185                        .await?;
186                    return Ok(true);
187                }
188                let next_retry = de.get_next_retry(job_info.retries);
189                debug!("Job {} failed. Retrying at {}", job_info.id, next_retry);
190                let new_retry_count = job_info.retries + 1;
191                let mut job_data = Vec::new();
192                ciborium::into_writer(&de, &mut job_data)?;
193                query!(
194                    "UPDATE jobs SET is_running = 0, run_after =?, retries =?, job_data =? WHERE id =?",
195                    next_retry,
196                    new_retry_count,
197                    job_data,
198                    job_info.id,
199                )
200                .execute(&self.db_conn)
201                .await?;
202            }
203        }
204
205        Ok(true)
206    }
207
208    /// Uses the current task to serve as a job runner.
209    ///
210    /// This future will never resolve, unless an error occurs.
211    pub async fn run_job_loop(self: Arc<Self>) -> Result<Never> {
212        self.notifier.notify_one();
213        info!("Starting job worker.");
214
215        loop {
216            self.notifier.notified().await;
217            debug!("Received queue notification.");
218            while self.run_job().await? {}
219            debug!("No more jobs to run for now. Sleeping.")
220        }
221    }
222
223    /// Spawns a number of worker tasks for running jobs.
224    pub fn run_job_workers(self: Arc<Self>, num_workers: usize) {
225        for _ in 0..num_workers {
226            tokio::spawn(Arc::clone(&self).run_job_loop());
227        }
228    }
229
230    /// Spawns a default number of worker tasks for running jobs.
231    pub fn run_job_workers_default(self: Arc<Self>) {
232        self.run_job_workers(num_cpus::get());
233    }
234
235    /// Adds a job with a specific opaque ID to the queue.
236    ///
237    /// This will not do anything if the job is already in the queue.
238    pub async fn add_unique_job(&self, id: impl AsRef<str>, job: Box<dyn Job>) -> Result<()> {
239        let id = id.as_ref();
240        let mut job_data = Vec::new();
241        ciborium::into_writer(&job, &mut job_data)?;
242        query!(
243            "INSERT INTO jobs (unique_job_id, run_after, job_data) VALUES (?,datetime('now'),?)",
244            id,
245            job_data
246        )
247        .execute(&self.db_conn)
248        .await?;
249
250        self.notifier.notify_one();
251
252        Ok(())
253    }
254
255    /// Adds a job to the queue.
256    ///
257    /// unlike [`add_unique_job`], this will use a random ID.
258    pub async fn add_job(&self, job: Box<dyn Job>) -> Result<()> {
259        let id = Uuid::new_v4();
260        self.add_unique_job(id.to_string(), job).await
261    }
262}