Skip to main content

modo/job/
enqueuer.rs

1use chrono::{DateTime, Utc};
2use serde::Serialize;
3use sha2::{Digest, Sha256};
4
5use crate::db::{ConnExt, ConnQueryExt, Database};
6use crate::error::{Error, Result};
7
8/// Result of an idempotent enqueue operation.
9///
10/// Returned by [`Enqueuer::enqueue_unique`] and
11/// [`Enqueuer::enqueue_unique_with`].
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum EnqueueResult {
14    /// A new job was inserted; contains its ID.
15    Created(String),
16    /// A job with the same name and payload is already pending or running;
17    /// contains the ID of the existing job.
18    Duplicate(String),
19}
20
21/// Options for customising a job enqueue operation.
22#[derive(Clone)]
23pub struct EnqueueOptions {
24    /// Name of the queue to place the job in. Defaults to `"default"`.
25    pub queue: String,
26    /// When to make the job eligible for execution. Defaults to now (immediate).
27    pub run_at: Option<DateTime<Utc>>,
28}
29
30impl Default for EnqueueOptions {
31    fn default() -> Self {
32        Self {
33            queue: "default".to_string(),
34            run_at: None,
35        }
36    }
37}
38
39/// Enqueues jobs into the `jobs` SQLite table.
40///
41/// Constructed via [`Enqueuer::new`]. Cheaply cloneable — the underlying
42/// database handle is `Arc`-wrapped.
43#[derive(Clone)]
44pub struct Enqueuer {
45    db: Database,
46}
47
48impl Enqueuer {
49    /// Create a new `Enqueuer` using the given database handle.
50    pub fn new(db: Database) -> Self {
51        Self { db }
52    }
53
54    /// Enqueue a job on the default queue for immediate execution.
55    ///
56    /// Returns the new job's ID on success.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the payload cannot be serialized to JSON or if the
61    /// database insert fails.
62    pub async fn enqueue<T: Serialize>(&self, name: &str, payload: &T) -> Result<String> {
63        self.enqueue_with(name, payload, EnqueueOptions::default())
64            .await
65    }
66
67    /// Enqueue a job on the default queue to run at a specific time.
68    ///
69    /// Returns the new job's ID on success.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the payload cannot be serialized to JSON or if the
74    /// database insert fails.
75    pub async fn enqueue_at<T: Serialize>(
76        &self,
77        name: &str,
78        payload: &T,
79        run_at: DateTime<Utc>,
80    ) -> Result<String> {
81        self.enqueue_with(
82            name,
83            payload,
84            EnqueueOptions {
85                run_at: Some(run_at),
86                ..Default::default()
87            },
88        )
89        .await
90    }
91
92    /// Enqueue a job with full control over queue and schedule.
93    ///
94    /// Returns the new job's ID on success.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if the payload cannot be serialized to JSON or if the
99    /// database insert fails.
100    pub async fn enqueue_with<T: Serialize>(
101        &self,
102        name: &str,
103        payload: &T,
104        options: EnqueueOptions,
105    ) -> Result<String> {
106        let id = crate::id::ulid();
107        let payload_json = serde_json::to_string(payload)
108            .map_err(|e| Error::internal(format!("serialize job payload: {e}")))?;
109        let now = Utc::now();
110        let run_at = options.run_at.unwrap_or(now);
111        let now_str = now.to_rfc3339();
112        let run_at_str = run_at.to_rfc3339();
113
114        self.db
115            .conn()
116            .execute_raw(
117                "INSERT INTO jobs (id, name, queue, payload, status, attempt, run_at, created_at, updated_at) \
118                 VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5, ?6, ?7)",
119                libsql::params![id.as_str(), name, options.queue.as_str(), payload_json.as_str(), run_at_str.as_str(), now_str.as_str(), now_str.as_str()],
120            )
121            .await
122            .map_err(|e| Error::internal(format!("enqueue job: {e}")))?;
123
124        Ok(id)
125    }
126
127    /// Enqueue a job only if no pending or running job with the same name and
128    /// payload already exists (idempotent enqueue on the default queue).
129    ///
130    /// The uniqueness key is a SHA-256 hash of `name + "\0" + payload_json`.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the payload cannot be serialized to JSON or if a
135    /// database operation fails (other than the expected unique-constraint
136    /// violation).
137    pub async fn enqueue_unique<T: Serialize>(
138        &self,
139        name: &str,
140        payload: &T,
141    ) -> Result<EnqueueResult> {
142        self.enqueue_unique_with(name, payload, EnqueueOptions::default())
143            .await
144    }
145
146    /// Enqueue a job only if no pending or running job with the same name and
147    /// payload already exists, with full queue and schedule options.
148    ///
149    /// The uniqueness key is a SHA-256 hash of `name + "\0" + payload_json`.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the payload cannot be serialized to JSON or if a
154    /// database operation fails (other than the expected unique-constraint
155    /// violation).
156    pub async fn enqueue_unique_with<T: Serialize>(
157        &self,
158        name: &str,
159        payload: &T,
160        options: EnqueueOptions,
161    ) -> Result<EnqueueResult> {
162        let payload_json = serde_json::to_string(payload)
163            .map_err(|e| Error::internal(format!("serialize job payload: {e}")))?;
164        let hash = compute_payload_hash(name, &payload_json);
165        let id = crate::id::ulid();
166        let now = Utc::now();
167        let run_at = options.run_at.unwrap_or(now);
168        let now_str = now.to_rfc3339();
169        let run_at_str = run_at.to_rfc3339();
170
171        match self
172            .db
173            .conn()
174            .execute_raw(
175                "INSERT INTO jobs (id, name, queue, payload, payload_hash, status, attempt, run_at, created_at, updated_at) \
176                 VALUES (?1, ?2, ?3, ?4, ?5, 'pending', 0, ?6, ?7, ?8)",
177                libsql::params![id.as_str(), name, options.queue.as_str(), payload_json.as_str(), hash.as_str(), run_at_str.as_str(), now_str.as_str(), now_str.as_str()],
178            )
179            .await
180        {
181            Ok(_) => Ok(EnqueueResult::Created(id)),
182            Err(ref e) if is_unique_violation(e) => {
183                let existing_id: String = self
184                    .db
185                    .conn()
186                    .query_one_map(
187                        "SELECT id FROM jobs WHERE payload_hash = ?1 AND status IN ('pending', 'running') LIMIT 1",
188                        libsql::params![hash.as_str()],
189                        |row| {
190                            use crate::db::FromValue;
191                            let val = row.get_value(0).map_err(crate::Error::from)?;
192                            String::from_value(val)
193                        },
194                    )
195                    .await
196                    .map_err(|e| Error::internal(format!("fetch duplicate job id: {e}")))?;
197
198                Ok(EnqueueResult::Duplicate(existing_id))
199            }
200            Err(e) => Err(Error::internal(format!("enqueue unique job: {e}"))),
201        }
202    }
203
204    /// Cancel a pending job by ID.
205    ///
206    /// Returns `true` if the job was found and cancelled, `false` if it was
207    /// not found or was already past the `pending` state.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if the database update fails.
212    pub async fn cancel(&self, id: &str) -> Result<bool> {
213        let now_str = Utc::now().to_rfc3339();
214        let affected = self
215            .db
216            .conn()
217            .execute_raw(
218                "UPDATE jobs SET status = 'cancelled', updated_at = ?1 WHERE id = ?2 AND status = 'pending'",
219                libsql::params![now_str.as_str(), id],
220            )
221            .await
222            .map_err(|e| Error::internal(format!("cancel job: {e}")))?;
223
224        Ok(affected > 0)
225    }
226}
227
228/// Check if a libsql error is a unique constraint violation.
229fn is_unique_violation(err: &libsql::Error) -> bool {
230    matches!(err, libsql::Error::SqliteFailure(2067 | 1555, _))
231}
232
233fn compute_payload_hash(name: &str, payload_json: &str) -> String {
234    let mut hasher = Sha256::new();
235    hasher.update(name.as_bytes());
236    hasher.update(b"\0");
237    hasher.update(payload_json.as_bytes());
238    crate::encoding::hex::encode(&hasher.finalize())
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn payload_hash_is_deterministic() {
247        let h1 = compute_payload_hash("test", r#"{"a":1}"#);
248        let h2 = compute_payload_hash("test", r#"{"a":1}"#);
249        assert_eq!(h1, h2);
250    }
251
252    #[test]
253    fn payload_hash_differs_by_name() {
254        let h1 = compute_payload_hash("job_a", r#"{"a":1}"#);
255        let h2 = compute_payload_hash("job_b", r#"{"a":1}"#);
256        assert_ne!(h1, h2);
257    }
258
259    #[test]
260    fn payload_hash_differs_by_payload() {
261        let h1 = compute_payload_hash("test", r#"{"a":1}"#);
262        let h2 = compute_payload_hash("test", r#"{"a":2}"#);
263        assert_ne!(h1, h2);
264    }
265
266    #[test]
267    fn payload_hash_no_boundary_collision() {
268        let h1 = compute_payload_hash("ab", "c");
269        let h2 = compute_payload_hash("a", "bc");
270        assert_ne!(h1, h2);
271    }
272}