Skip to main content

lilqueue_seaorm/
lib.rs

1use async_trait::async_trait;
2use lilqueue::{
3    BoxError, ClaimedJob, JobQueue, LockableQueue, NewJob, QueueResult, RetryableQueue,
4    dashboard::{DashboardData, DashboardJob, DashboardStats},
5};
6use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, Statement};
7use std::{
8    collections::HashSet,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16const STATUS_QUEUED: &str = "queued";
17const STATUS_PROCESSING: &str = "processing";
18const STATUS_COMPLETED: &str = "completed";
19const STATUS_FAILED: &str = "failed";
20
21#[derive(Debug, Clone)]
22pub struct SeaOrmQueueOptions {
23    pub lock_timeout: Duration,
24}
25
26impl Default for SeaOrmQueueOptions {
27    fn default() -> Self {
28        Self {
29            lock_timeout: Duration::from_secs(300),
30        }
31    }
32}
33
34#[derive(Clone)]
35pub struct SeaOrmQueue {
36    db: DatabaseConnection,
37    options: SeaOrmQueueOptions,
38    worker_id: String,
39    claim_counter: Arc<AtomicU64>,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct SeaOrmClaim {
44    pub lock_token: String,
45    pub started_at: i64,
46}
47
48impl SeaOrmQueue {
49    pub async fn connect(database_url: &str, options: SeaOrmQueueOptions) -> Result<Self, DbErr> {
50        let db = Database::connect(database_url).await?;
51        Self::new(db, options).await
52    }
53
54    pub async fn new(db: DatabaseConnection, options: SeaOrmQueueOptions) -> Result<Self, DbErr> {
55        let queue = Self {
56            db,
57            options,
58            worker_id: make_worker_id(),
59            claim_counter: Arc::new(AtomicU64::new(1)),
60        };
61        queue.initialize_schema().await?;
62        Ok(queue)
63    }
64
65    pub fn db(&self) -> &DatabaseConnection {
66        &self.db
67    }
68
69    pub fn options(&self) -> &SeaOrmQueueOptions {
70        &self.options
71    }
72
73    async fn initialize_schema(&self) -> Result<(), DbErr> {
74        self.db
75            .execute(Statement::from_string(
76                DbBackend::Sqlite,
77                "CREATE TABLE IF NOT EXISTS jobs (
78                    id INTEGER PRIMARY KEY AUTOINCREMENT,
79                    job_type TEXT NOT NULL,
80                    payload TEXT NOT NULL,
81                    status TEXT NOT NULL,
82                    attempts INTEGER NOT NULL DEFAULT 0,
83                    max_attempts INTEGER NOT NULL,
84                    available_at INTEGER NOT NULL,
85                    locked_at INTEGER NULL,
86                    lock_token TEXT NULL,
87                    last_error TEXT NULL,
88                    created_at INTEGER NOT NULL,
89                    updated_at INTEGER NOT NULL,
90                    completed_at INTEGER NULL,
91                    first_enqueued_at INTEGER NULL,
92                    last_enqueued_at INTEGER NULL,
93                    first_started_at INTEGER NULL,
94                    last_started_at INTEGER NULL,
95                    last_finished_at INTEGER NULL,
96                    queued_ms_total INTEGER NOT NULL DEFAULT 0,
97                    queued_ms_last INTEGER NULL,
98                    processing_ms_total INTEGER NOT NULL DEFAULT 0,
99                    processing_ms_last INTEGER NULL
100                )"
101                .to_string(),
102            ))
103            .await?;
104
105        self.ensure_timing_columns().await?;
106
107        self.db
108            .execute(Statement::from_string(
109                DbBackend::Sqlite,
110                "CREATE INDEX IF NOT EXISTS idx_jobs_ready
111                    ON jobs (job_type, status, available_at, id)"
112                    .to_string(),
113            ))
114            .await?;
115
116        self.db
117            .execute(Statement::from_string(
118                DbBackend::Sqlite,
119                "CREATE INDEX IF NOT EXISTS idx_jobs_processing
120                    ON jobs (job_type, status, locked_at)"
121                    .to_string(),
122            ))
123            .await?;
124
125        Ok(())
126    }
127
128    async fn ensure_timing_columns(&self) -> Result<(), DbErr> {
129        let existing = self.job_columns().await?;
130        for (column, definition) in timing_column_definitions() {
131            if !existing.contains(column) {
132                self.db
133                    .execute(Statement::from_string(
134                        DbBackend::Sqlite,
135                        format!("ALTER TABLE jobs ADD COLUMN {column} {definition}"),
136                    ))
137                    .await?;
138            }
139        }
140
141        self.db
142            .execute(Statement::from_string(
143                DbBackend::Sqlite,
144                "UPDATE jobs
145                 SET first_enqueued_at = COALESCE(first_enqueued_at, created_at)"
146                    .to_string(),
147            ))
148            .await?;
149
150        Ok(())
151    }
152
153    async fn job_columns(&self) -> Result<HashSet<String>, DbErr> {
154        let rows = self
155            .db
156            .query_all(Statement::from_string(
157                DbBackend::Sqlite,
158                "PRAGMA table_info(jobs)".to_string(),
159            ))
160            .await?;
161
162        let mut columns = HashSet::with_capacity(rows.len());
163        for row in rows {
164            columns.insert(row.try_get_by_index::<String>(1)?);
165        }
166
167        Ok(columns)
168    }
169
170    async fn reclaim_stale_locks(&self, job_type: &str, now: i64) -> Result<(), DbErr> {
171        let stale_before = now.saturating_sub(duration_to_secs(self.options.lock_timeout));
172        self.db
173            .execute(Statement::from_sql_and_values(
174                DbBackend::Sqlite,
175                "UPDATE jobs
176                 SET status = ?,
177                     locked_at = NULL,
178                     lock_token = NULL,
179                     updated_at = ?,
180                     last_enqueued_at = ?,
181                     last_finished_at = ?,
182                     processing_ms_last = CASE
183                         WHEN ? >= COALESCE(last_started_at, locked_at, ?)
184                         THEN (? - COALESCE(last_started_at, locked_at, ?)) * 1000
185                         ELSE 0
186                     END,
187                     processing_ms_total = processing_ms_total + CASE
188                         WHEN ? >= COALESCE(last_started_at, locked_at, ?)
189                         THEN (? - COALESCE(last_started_at, locked_at, ?)) * 1000
190                         ELSE 0
191                     END
192                 WHERE job_type = ?
193                   AND status = ?
194                   AND locked_at IS NOT NULL
195                   AND locked_at <= ?"
196                    .to_string(),
197                vec![
198                    STATUS_QUEUED.into(),
199                    now.into(),
200                    now.into(),
201                    now.into(),
202                    now.into(),
203                    now.into(),
204                    now.into(),
205                    now.into(),
206                    now.into(),
207                    now.into(),
208                    now.into(),
209                    now.into(),
210                    job_type.into(),
211                    STATUS_PROCESSING.into(),
212                    stale_before.into(),
213                ],
214            ))
215            .await?;
216        Ok(())
217    }
218
219    fn next_lock_token(&self, now: i64) -> String {
220        let counter = self.claim_counter.fetch_add(1, Ordering::Relaxed);
221        format!("{}-{}-{}", self.worker_id, now, counter)
222    }
223}
224
225#[async_trait]
226impl JobQueue for SeaOrmQueue {
227    async fn enqueue(&self, job: NewJob) -> QueueResult<i64> {
228        let row = self
229            .db
230            .query_one(Statement::from_sql_and_values(
231                DbBackend::Sqlite,
232                "INSERT INTO jobs
233                 (job_type, payload, status, attempts, max_attempts, available_at, locked_at,
234                  lock_token, last_error, created_at, updated_at, completed_at,
235                  first_enqueued_at, last_enqueued_at, first_started_at, last_started_at,
236                  last_finished_at, queued_ms_total, queued_ms_last, processing_ms_total,
237                  processing_ms_last)
238                 VALUES (?, ?, ?, 0, ?, ?, NULL, NULL, NULL, ?, ?, NULL, ?, ?, NULL, NULL,
239                         NULL, 0, NULL, 0, NULL)
240                 RETURNING id"
241                    .to_string(),
242                vec![
243                    job.job_type.into(),
244                    job.payload.into(),
245                    STATUS_QUEUED.into(),
246                    i64::from(job.max_attempts).into(),
247                    job.available_at.into(),
248                    job.enqueued_at.into(),
249                    job.enqueued_at.into(),
250                    job.enqueued_at.into(),
251                    job.enqueued_at.into(),
252                ],
253            ))
254            .await?
255            .ok_or_else(|| std::io::Error::other("insert returned no row"))?;
256
257        Ok(row.try_get_by_index::<i64>(0)?)
258    }
259
260    async fn next_wakeup_at(&self, job_type: &str) -> QueueResult<Option<i64>> {
261        let lock_timeout_secs = duration_to_secs(self.options.lock_timeout);
262        let row = self
263            .db
264            .query_one(Statement::from_sql_and_values(
265                DbBackend::Sqlite,
266                "SELECT MIN(
267                    CASE
268                        WHEN status = ? THEN available_at
269                        WHEN status = ? AND locked_at IS NOT NULL THEN locked_at + ?
270                        ELSE NULL
271                    END
272                 )
273                 FROM jobs
274                 WHERE job_type = ?
275                   AND status IN (?, ?)"
276                    .to_string(),
277                vec![
278                    STATUS_QUEUED.into(),
279                    STATUS_PROCESSING.into(),
280                    lock_timeout_secs.into(),
281                    job_type.into(),
282                    STATUS_QUEUED.into(),
283                    STATUS_PROCESSING.into(),
284                ],
285            ))
286            .await?;
287
288        match row {
289            Some(row) => Ok(row.try_get_by_index::<Option<i64>>(0)?),
290            None => Ok(None),
291        }
292    }
293}
294
295#[async_trait]
296impl LockableQueue for SeaOrmQueue {
297    type Claim = SeaOrmClaim;
298
299    async fn claim(&self, job_type: &str) -> QueueResult<Option<ClaimedJob<Self::Claim>>> {
300        let now = now_epoch_seconds()?;
301        self.reclaim_stale_locks(job_type, now).await?;
302
303        let lock_token = self.next_lock_token(now);
304        let row = self
305            .db
306            .query_one(Statement::from_sql_and_values(
307                DbBackend::Sqlite,
308                "UPDATE jobs
309                 SET status = ?,
310                     attempts = attempts + 1,
311                     locked_at = ?,
312                     lock_token = ?,
313                     updated_at = ?,
314                     queued_ms_last = CASE
315                         WHEN ? >= COALESCE(last_enqueued_at, ?)
316                         THEN (? - COALESCE(last_enqueued_at, ?)) * 1000
317                         ELSE 0
318                     END,
319                     queued_ms_total = queued_ms_total + CASE
320                         WHEN ? >= COALESCE(last_enqueued_at, ?)
321                         THEN (? - COALESCE(last_enqueued_at, ?)) * 1000
322                         ELSE 0
323                     END,
324                     first_started_at = COALESCE(first_started_at, ?),
325                     last_started_at = ?
326                 WHERE id = (
327                     SELECT id
328                     FROM jobs
329                     WHERE job_type = ?
330                       AND status = ?
331                       AND available_at <= ?
332                     ORDER BY available_at ASC, id ASC
333                     LIMIT 1
334                 )
335                 AND status = ?
336                 RETURNING id, payload, attempts, max_attempts, lock_token, last_started_at"
337                    .to_string(),
338                vec![
339                    STATUS_PROCESSING.into(),
340                    now.into(),
341                    lock_token.clone().into(),
342                    now.into(),
343                    now.into(),
344                    now.into(),
345                    now.into(),
346                    now.into(),
347                    now.into(),
348                    now.into(),
349                    now.into(),
350                    now.into(),
351                    now.into(),
352                    now.into(),
353                    job_type.into(),
354                    STATUS_QUEUED.into(),
355                    now.into(),
356                    STATUS_QUEUED.into(),
357                ],
358            ))
359            .await?;
360
361        let Some(row) = row else {
362            return Ok(None);
363        };
364
365        let id = row.try_get_by_index::<i64>(0)?;
366        let payload = row.try_get_by_index::<String>(1)?;
367        let attempts = row.try_get_by_index::<i64>(2)?;
368        let max_attempts = row.try_get_by_index::<i64>(3)?;
369        let stored_lock_token = row.try_get_by_index::<Option<String>>(4)?;
370        let started_at = row.try_get_by_index::<Option<i64>>(5)?;
371
372        Ok(Some(ClaimedJob {
373            id,
374            payload,
375            attempts: u32::try_from(attempts)?,
376            max_attempts: u32::try_from(max_attempts)?,
377            claim: SeaOrmClaim {
378                lock_token: stored_lock_token.unwrap_or(lock_token),
379                started_at: started_at.unwrap_or(now),
380            },
381        }))
382    }
383
384    async fn complete(&self, job: ClaimedJob<Self::Claim>) -> QueueResult<()> {
385        let now = now_epoch_seconds()?;
386        let processing_ms = elapsed_ms(now, job.claim.started_at);
387        let result = self
388            .db
389            .execute(Statement::from_sql_and_values(
390                DbBackend::Sqlite,
391                "UPDATE jobs
392                 SET status = ?,
393                     completed_at = ?,
394                     locked_at = NULL,
395                     lock_token = NULL,
396                     last_error = NULL,
397                     updated_at = ?,
398                     last_finished_at = ?,
399                     processing_ms_last = ?,
400                     processing_ms_total = processing_ms_total + ?
401                 WHERE id = ? AND status = ? AND lock_token = ?"
402                    .to_string(),
403                vec![
404                    STATUS_COMPLETED.into(),
405                    now.into(),
406                    now.into(),
407                    now.into(),
408                    processing_ms.into(),
409                    processing_ms.into(),
410                    job.id.into(),
411                    STATUS_PROCESSING.into(),
412                    job.claim.lock_token.into(),
413                ],
414            ))
415            .await?;
416
417        ensure_lease(result.rows_affected(), job.id)?;
418        Ok(())
419    }
420}
421
422#[async_trait]
423impl RetryableQueue for SeaOrmQueue {
424    async fn retry(
425        &self,
426        job: ClaimedJob<Self::Claim>,
427        next_run_at: i64,
428        error: String,
429    ) -> QueueResult<()> {
430        let now = now_epoch_seconds()?;
431        let processing_ms = elapsed_ms(now, job.claim.started_at);
432        let result = self
433            .db
434            .execute(Statement::from_sql_and_values(
435                DbBackend::Sqlite,
436                "UPDATE jobs
437                 SET status = ?,
438                     available_at = ?,
439                     locked_at = NULL,
440                     lock_token = NULL,
441                     last_error = ?,
442                     updated_at = ?,
443                     last_enqueued_at = ?,
444                     last_finished_at = ?,
445                     processing_ms_last = ?,
446                     processing_ms_total = processing_ms_total + ?
447                 WHERE id = ? AND status = ? AND lock_token = ?"
448                    .to_string(),
449                vec![
450                    STATUS_QUEUED.into(),
451                    next_run_at.into(),
452                    error.into(),
453                    now.into(),
454                    now.into(),
455                    now.into(),
456                    processing_ms.into(),
457                    processing_ms.into(),
458                    job.id.into(),
459                    STATUS_PROCESSING.into(),
460                    job.claim.lock_token.into(),
461                ],
462            ))
463            .await?;
464
465        ensure_lease(result.rows_affected(), job.id)?;
466        Ok(())
467    }
468
469    async fn fail(&self, job: ClaimedJob<Self::Claim>, error: String) -> QueueResult<()> {
470        let now = now_epoch_seconds()?;
471        let processing_ms = elapsed_ms(now, job.claim.started_at);
472        let result = self
473            .db
474            .execute(Statement::from_sql_and_values(
475                DbBackend::Sqlite,
476                "UPDATE jobs
477                 SET status = ?,
478                     locked_at = NULL,
479                     lock_token = NULL,
480                     last_error = ?,
481                     updated_at = ?,
482                     last_finished_at = ?,
483                     processing_ms_last = ?,
484                     processing_ms_total = processing_ms_total + ?
485                 WHERE id = ? AND status = ? AND lock_token = ?"
486                    .to_string(),
487                vec![
488                    STATUS_FAILED.into(),
489                    error.into(),
490                    now.into(),
491                    now.into(),
492                    processing_ms.into(),
493                    processing_ms.into(),
494                    job.id.into(),
495                    STATUS_PROCESSING.into(),
496                    job.claim.lock_token.into(),
497                ],
498            ))
499            .await?;
500
501        ensure_lease(result.rows_affected(), job.id)?;
502        Ok(())
503    }
504}
505
506#[async_trait]
507impl DashboardData for SeaOrmQueue {
508    async fn dashboard_stats(&self) -> Result<DashboardStats, BoxError> {
509        let row = self
510            .db
511            .query_one(Statement::from_string(
512                DbBackend::Sqlite,
513                "SELECT
514                    COUNT(*) AS total,
515                    COALESCE(SUM(CASE WHEN status = 'queued' THEN 1 ELSE 0 END), 0) AS queued,
516                    COALESCE(SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END), 0) AS processing,
517                    COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
518                    COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
519                    COALESCE(SUM(CASE WHEN status = 'cleared' THEN 1 ELSE 0 END), 0) AS cleared
520                 FROM jobs"
521                    .to_string(),
522            ))
523            .await?
524            .ok_or_else(|| std::io::Error::other("stats query returned no row"))?;
525
526        Ok(DashboardStats {
527            total: row.try_get_by_index::<i64>(0)?,
528            queued: row.try_get_by_index::<i64>(1)?,
529            processing: row.try_get_by_index::<i64>(2)?,
530            completed: row.try_get_by_index::<i64>(3)?,
531            failed: row.try_get_by_index::<i64>(4)?,
532            cleared: row.try_get_by_index::<i64>(5)?,
533        })
534    }
535
536    async fn dashboard_jobs(&self, limit: i64) -> Result<Vec<DashboardJob>, BoxError> {
537        let rows = self
538            .db
539            .query_all(Statement::from_sql_and_values(
540                DbBackend::Sqlite,
541                dashboard_jobs_sql().to_string(),
542                vec![limit.into()],
543            ))
544            .await?;
545
546        rows.into_iter().map(dashboard_job_from_seaorm).collect()
547    }
548}
549
550fn dashboard_job_from_seaorm(row: sea_orm::QueryResult) -> Result<DashboardJob, BoxError> {
551    Ok(DashboardJob {
552        id: row.try_get_by_index::<i64>(0)?,
553        job_type: row.try_get_by_index::<String>(1)?,
554        status: row.try_get_by_index::<String>(2)?,
555        payload: row.try_get_by_index::<String>(3)?,
556        attempts: row.try_get_by_index::<i32>(4)?,
557        max_attempts: row.try_get_by_index::<i32>(5)?,
558        available_at: row.try_get_by_index::<i64>(6)?,
559        locked_at: row.try_get_by_index::<Option<i64>>(7)?,
560        last_error: row.try_get_by_index::<Option<String>>(8)?,
561        created_at: row.try_get_by_index::<i64>(9)?,
562        updated_at: row.try_get_by_index::<i64>(10)?,
563        completed_at: row.try_get_by_index::<Option<i64>>(11)?,
564        first_enqueued_at: row.try_get_by_index::<Option<i64>>(12)?,
565        last_enqueued_at: row.try_get_by_index::<Option<i64>>(13)?,
566        first_started_at: row.try_get_by_index::<Option<i64>>(14)?,
567        last_started_at: row.try_get_by_index::<Option<i64>>(15)?,
568        last_finished_at: row.try_get_by_index::<Option<i64>>(16)?,
569        queued_ms_total: row.try_get_by_index::<i64>(17)?,
570        queued_ms_last: row.try_get_by_index::<Option<i64>>(18)?,
571        processing_ms_total: row.try_get_by_index::<i64>(19)?,
572        processing_ms_last: row.try_get_by_index::<Option<i64>>(20)?,
573    })
574}
575
576fn dashboard_jobs_sql() -> &'static str {
577    "SELECT
578        id,
579        job_type,
580        status,
581        payload,
582        attempts,
583        max_attempts,
584        available_at,
585        locked_at,
586        last_error,
587        created_at,
588        updated_at,
589        completed_at,
590        first_enqueued_at,
591        last_enqueued_at,
592        first_started_at,
593        last_started_at,
594        last_finished_at,
595        COALESCE(queued_ms_total, 0) AS queued_ms_total,
596        queued_ms_last,
597        COALESCE(processing_ms_total, 0) AS processing_ms_total,
598        processing_ms_last
599     FROM jobs
600     ORDER BY id DESC
601     LIMIT ?"
602}
603
604fn timing_column_definitions() -> [(&'static str, &'static str); 9] {
605    [
606        ("first_enqueued_at", "INTEGER NULL"),
607        ("last_enqueued_at", "INTEGER NULL"),
608        ("first_started_at", "INTEGER NULL"),
609        ("last_started_at", "INTEGER NULL"),
610        ("last_finished_at", "INTEGER NULL"),
611        ("queued_ms_total", "INTEGER NOT NULL DEFAULT 0"),
612        ("queued_ms_last", "INTEGER NULL"),
613        ("processing_ms_total", "INTEGER NOT NULL DEFAULT 0"),
614        ("processing_ms_last", "INTEGER NULL"),
615    ]
616}
617
618fn ensure_lease(rows_affected: u64, job_id: i64) -> QueueResult<()> {
619    if rows_affected == 0 {
620        return Err(
621            std::io::Error::other(format!("lease was lost while processing job {job_id}")).into(),
622        );
623    }
624    Ok(())
625}
626
627fn make_worker_id() -> String {
628    format!("pid{}", std::process::id())
629}
630
631fn now_epoch_seconds() -> Result<i64, std::time::SystemTimeError> {
632    let secs = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
633    Ok(i64::try_from(secs).unwrap_or(i64::MAX))
634}
635
636fn duration_to_secs(duration: Duration) -> i64 {
637    i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
638}
639
640fn elapsed_ms(now_secs: i64, started_at_secs: i64) -> i64 {
641    now_secs
642        .saturating_sub(started_at_secs)
643        .max(0)
644        .saturating_mul(1_000)
645}