1use async_trait::async_trait;
2use lilqueue::{
3 BoxError, ClaimedJob, JobQueue, LockableQueue, NewJob, QueueResult, RetryableQueue,
4 dashboard::{DashboardData, DashboardJob, DashboardStats},
5};
6use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, Statement};
7use std::{
8 collections::HashSet,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16const STATUS_QUEUED: &str = "queued";
17const STATUS_PROCESSING: &str = "processing";
18const STATUS_COMPLETED: &str = "completed";
19const STATUS_FAILED: &str = "failed";
20
21#[derive(Debug, Clone)]
22pub struct SeaOrmQueueOptions {
23 pub lock_timeout: Duration,
24}
25
26impl Default for SeaOrmQueueOptions {
27 fn default() -> Self {
28 Self {
29 lock_timeout: Duration::from_secs(300),
30 }
31 }
32}
33
34#[derive(Clone)]
35pub struct SeaOrmQueue {
36 db: DatabaseConnection,
37 options: SeaOrmQueueOptions,
38 worker_id: String,
39 claim_counter: Arc<AtomicU64>,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct SeaOrmClaim {
44 pub lock_token: String,
45 pub started_at: i64,
46}
47
48impl SeaOrmQueue {
49 pub async fn connect(database_url: &str, options: SeaOrmQueueOptions) -> Result<Self, DbErr> {
50 let db = Database::connect(database_url).await?;
51 Self::new(db, options).await
52 }
53
54 pub async fn new(db: DatabaseConnection, options: SeaOrmQueueOptions) -> Result<Self, DbErr> {
55 let queue = Self {
56 db,
57 options,
58 worker_id: make_worker_id(),
59 claim_counter: Arc::new(AtomicU64::new(1)),
60 };
61 queue.initialize_schema().await?;
62 Ok(queue)
63 }
64
65 pub fn db(&self) -> &DatabaseConnection {
66 &self.db
67 }
68
69 pub fn options(&self) -> &SeaOrmQueueOptions {
70 &self.options
71 }
72
73 async fn initialize_schema(&self) -> Result<(), DbErr> {
74 self.db
75 .execute(Statement::from_string(
76 DbBackend::Sqlite,
77 "CREATE TABLE IF NOT EXISTS jobs (
78 id INTEGER PRIMARY KEY AUTOINCREMENT,
79 job_type TEXT NOT NULL,
80 payload TEXT NOT NULL,
81 status TEXT NOT NULL,
82 attempts INTEGER NOT NULL DEFAULT 0,
83 max_attempts INTEGER NOT NULL,
84 available_at INTEGER NOT NULL,
85 locked_at INTEGER NULL,
86 lock_token TEXT NULL,
87 last_error TEXT NULL,
88 created_at INTEGER NOT NULL,
89 updated_at INTEGER NOT NULL,
90 completed_at INTEGER NULL,
91 first_enqueued_at INTEGER NULL,
92 last_enqueued_at INTEGER NULL,
93 first_started_at INTEGER NULL,
94 last_started_at INTEGER NULL,
95 last_finished_at INTEGER NULL,
96 queued_ms_total INTEGER NOT NULL DEFAULT 0,
97 queued_ms_last INTEGER NULL,
98 processing_ms_total INTEGER NOT NULL DEFAULT 0,
99 processing_ms_last INTEGER NULL
100 )"
101 .to_string(),
102 ))
103 .await?;
104
105 self.ensure_timing_columns().await?;
106
107 self.db
108 .execute(Statement::from_string(
109 DbBackend::Sqlite,
110 "CREATE INDEX IF NOT EXISTS idx_jobs_ready
111 ON jobs (job_type, status, available_at, id)"
112 .to_string(),
113 ))
114 .await?;
115
116 self.db
117 .execute(Statement::from_string(
118 DbBackend::Sqlite,
119 "CREATE INDEX IF NOT EXISTS idx_jobs_processing
120 ON jobs (job_type, status, locked_at)"
121 .to_string(),
122 ))
123 .await?;
124
125 Ok(())
126 }
127
128 async fn ensure_timing_columns(&self) -> Result<(), DbErr> {
129 let existing = self.job_columns().await?;
130 for (column, definition) in timing_column_definitions() {
131 if !existing.contains(column) {
132 self.db
133 .execute(Statement::from_string(
134 DbBackend::Sqlite,
135 format!("ALTER TABLE jobs ADD COLUMN {column} {definition}"),
136 ))
137 .await?;
138 }
139 }
140
141 self.db
142 .execute(Statement::from_string(
143 DbBackend::Sqlite,
144 "UPDATE jobs
145 SET first_enqueued_at = COALESCE(first_enqueued_at, created_at)"
146 .to_string(),
147 ))
148 .await?;
149
150 Ok(())
151 }
152
153 async fn job_columns(&self) -> Result<HashSet<String>, DbErr> {
154 let rows = self
155 .db
156 .query_all(Statement::from_string(
157 DbBackend::Sqlite,
158 "PRAGMA table_info(jobs)".to_string(),
159 ))
160 .await?;
161
162 let mut columns = HashSet::with_capacity(rows.len());
163 for row in rows {
164 columns.insert(row.try_get_by_index::<String>(1)?);
165 }
166
167 Ok(columns)
168 }
169
170 async fn reclaim_stale_locks(&self, job_type: &str, now: i64) -> Result<(), DbErr> {
171 let stale_before = now.saturating_sub(duration_to_secs(self.options.lock_timeout));
172 self.db
173 .execute(Statement::from_sql_and_values(
174 DbBackend::Sqlite,
175 "UPDATE jobs
176 SET status = ?,
177 locked_at = NULL,
178 lock_token = NULL,
179 updated_at = ?,
180 last_enqueued_at = ?,
181 last_finished_at = ?,
182 processing_ms_last = CASE
183 WHEN ? >= COALESCE(last_started_at, locked_at, ?)
184 THEN (? - COALESCE(last_started_at, locked_at, ?)) * 1000
185 ELSE 0
186 END,
187 processing_ms_total = processing_ms_total + CASE
188 WHEN ? >= COALESCE(last_started_at, locked_at, ?)
189 THEN (? - COALESCE(last_started_at, locked_at, ?)) * 1000
190 ELSE 0
191 END
192 WHERE job_type = ?
193 AND status = ?
194 AND locked_at IS NOT NULL
195 AND locked_at <= ?"
196 .to_string(),
197 vec![
198 STATUS_QUEUED.into(),
199 now.into(),
200 now.into(),
201 now.into(),
202 now.into(),
203 now.into(),
204 now.into(),
205 now.into(),
206 now.into(),
207 now.into(),
208 now.into(),
209 now.into(),
210 job_type.into(),
211 STATUS_PROCESSING.into(),
212 stale_before.into(),
213 ],
214 ))
215 .await?;
216 Ok(())
217 }
218
219 fn next_lock_token(&self, now: i64) -> String {
220 let counter = self.claim_counter.fetch_add(1, Ordering::Relaxed);
221 format!("{}-{}-{}", self.worker_id, now, counter)
222 }
223}
224
225#[async_trait]
226impl JobQueue for SeaOrmQueue {
227 async fn enqueue(&self, job: NewJob) -> QueueResult<i64> {
228 let row = self
229 .db
230 .query_one(Statement::from_sql_and_values(
231 DbBackend::Sqlite,
232 "INSERT INTO jobs
233 (job_type, payload, status, attempts, max_attempts, available_at, locked_at,
234 lock_token, last_error, created_at, updated_at, completed_at,
235 first_enqueued_at, last_enqueued_at, first_started_at, last_started_at,
236 last_finished_at, queued_ms_total, queued_ms_last, processing_ms_total,
237 processing_ms_last)
238 VALUES (?, ?, ?, 0, ?, ?, NULL, NULL, NULL, ?, ?, NULL, ?, ?, NULL, NULL,
239 NULL, 0, NULL, 0, NULL)
240 RETURNING id"
241 .to_string(),
242 vec![
243 job.job_type.into(),
244 job.payload.into(),
245 STATUS_QUEUED.into(),
246 i64::from(job.max_attempts).into(),
247 job.available_at.into(),
248 job.enqueued_at.into(),
249 job.enqueued_at.into(),
250 job.enqueued_at.into(),
251 job.enqueued_at.into(),
252 ],
253 ))
254 .await?
255 .ok_or_else(|| std::io::Error::other("insert returned no row"))?;
256
257 Ok(row.try_get_by_index::<i64>(0)?)
258 }
259
260 async fn next_wakeup_at(&self, job_type: &str) -> QueueResult<Option<i64>> {
261 let lock_timeout_secs = duration_to_secs(self.options.lock_timeout);
262 let row = self
263 .db
264 .query_one(Statement::from_sql_and_values(
265 DbBackend::Sqlite,
266 "SELECT MIN(
267 CASE
268 WHEN status = ? THEN available_at
269 WHEN status = ? AND locked_at IS NOT NULL THEN locked_at + ?
270 ELSE NULL
271 END
272 )
273 FROM jobs
274 WHERE job_type = ?
275 AND status IN (?, ?)"
276 .to_string(),
277 vec![
278 STATUS_QUEUED.into(),
279 STATUS_PROCESSING.into(),
280 lock_timeout_secs.into(),
281 job_type.into(),
282 STATUS_QUEUED.into(),
283 STATUS_PROCESSING.into(),
284 ],
285 ))
286 .await?;
287
288 match row {
289 Some(row) => Ok(row.try_get_by_index::<Option<i64>>(0)?),
290 None => Ok(None),
291 }
292 }
293}
294
295#[async_trait]
296impl LockableQueue for SeaOrmQueue {
297 type Claim = SeaOrmClaim;
298
299 async fn claim(&self, job_type: &str) -> QueueResult<Option<ClaimedJob<Self::Claim>>> {
300 let now = now_epoch_seconds()?;
301 self.reclaim_stale_locks(job_type, now).await?;
302
303 let lock_token = self.next_lock_token(now);
304 let row = self
305 .db
306 .query_one(Statement::from_sql_and_values(
307 DbBackend::Sqlite,
308 "UPDATE jobs
309 SET status = ?,
310 attempts = attempts + 1,
311 locked_at = ?,
312 lock_token = ?,
313 updated_at = ?,
314 queued_ms_last = CASE
315 WHEN ? >= COALESCE(last_enqueued_at, ?)
316 THEN (? - COALESCE(last_enqueued_at, ?)) * 1000
317 ELSE 0
318 END,
319 queued_ms_total = queued_ms_total + CASE
320 WHEN ? >= COALESCE(last_enqueued_at, ?)
321 THEN (? - COALESCE(last_enqueued_at, ?)) * 1000
322 ELSE 0
323 END,
324 first_started_at = COALESCE(first_started_at, ?),
325 last_started_at = ?
326 WHERE id = (
327 SELECT id
328 FROM jobs
329 WHERE job_type = ?
330 AND status = ?
331 AND available_at <= ?
332 ORDER BY available_at ASC, id ASC
333 LIMIT 1
334 )
335 AND status = ?
336 RETURNING id, payload, attempts, max_attempts, lock_token, last_started_at"
337 .to_string(),
338 vec![
339 STATUS_PROCESSING.into(),
340 now.into(),
341 lock_token.clone().into(),
342 now.into(),
343 now.into(),
344 now.into(),
345 now.into(),
346 now.into(),
347 now.into(),
348 now.into(),
349 now.into(),
350 now.into(),
351 now.into(),
352 now.into(),
353 job_type.into(),
354 STATUS_QUEUED.into(),
355 now.into(),
356 STATUS_QUEUED.into(),
357 ],
358 ))
359 .await?;
360
361 let Some(row) = row else {
362 return Ok(None);
363 };
364
365 let id = row.try_get_by_index::<i64>(0)?;
366 let payload = row.try_get_by_index::<String>(1)?;
367 let attempts = row.try_get_by_index::<i64>(2)?;
368 let max_attempts = row.try_get_by_index::<i64>(3)?;
369 let stored_lock_token = row.try_get_by_index::<Option<String>>(4)?;
370 let started_at = row.try_get_by_index::<Option<i64>>(5)?;
371
372 Ok(Some(ClaimedJob {
373 id,
374 payload,
375 attempts: u32::try_from(attempts)?,
376 max_attempts: u32::try_from(max_attempts)?,
377 claim: SeaOrmClaim {
378 lock_token: stored_lock_token.unwrap_or(lock_token),
379 started_at: started_at.unwrap_or(now),
380 },
381 }))
382 }
383
384 async fn complete(&self, job: ClaimedJob<Self::Claim>) -> QueueResult<()> {
385 let now = now_epoch_seconds()?;
386 let processing_ms = elapsed_ms(now, job.claim.started_at);
387 let result = self
388 .db
389 .execute(Statement::from_sql_and_values(
390 DbBackend::Sqlite,
391 "UPDATE jobs
392 SET status = ?,
393 completed_at = ?,
394 locked_at = NULL,
395 lock_token = NULL,
396 last_error = NULL,
397 updated_at = ?,
398 last_finished_at = ?,
399 processing_ms_last = ?,
400 processing_ms_total = processing_ms_total + ?
401 WHERE id = ? AND status = ? AND lock_token = ?"
402 .to_string(),
403 vec![
404 STATUS_COMPLETED.into(),
405 now.into(),
406 now.into(),
407 now.into(),
408 processing_ms.into(),
409 processing_ms.into(),
410 job.id.into(),
411 STATUS_PROCESSING.into(),
412 job.claim.lock_token.into(),
413 ],
414 ))
415 .await?;
416
417 ensure_lease(result.rows_affected(), job.id)?;
418 Ok(())
419 }
420}
421
422#[async_trait]
423impl RetryableQueue for SeaOrmQueue {
424 async fn retry(
425 &self,
426 job: ClaimedJob<Self::Claim>,
427 next_run_at: i64,
428 error: String,
429 ) -> QueueResult<()> {
430 let now = now_epoch_seconds()?;
431 let processing_ms = elapsed_ms(now, job.claim.started_at);
432 let result = self
433 .db
434 .execute(Statement::from_sql_and_values(
435 DbBackend::Sqlite,
436 "UPDATE jobs
437 SET status = ?,
438 available_at = ?,
439 locked_at = NULL,
440 lock_token = NULL,
441 last_error = ?,
442 updated_at = ?,
443 last_enqueued_at = ?,
444 last_finished_at = ?,
445 processing_ms_last = ?,
446 processing_ms_total = processing_ms_total + ?
447 WHERE id = ? AND status = ? AND lock_token = ?"
448 .to_string(),
449 vec![
450 STATUS_QUEUED.into(),
451 next_run_at.into(),
452 error.into(),
453 now.into(),
454 now.into(),
455 now.into(),
456 processing_ms.into(),
457 processing_ms.into(),
458 job.id.into(),
459 STATUS_PROCESSING.into(),
460 job.claim.lock_token.into(),
461 ],
462 ))
463 .await?;
464
465 ensure_lease(result.rows_affected(), job.id)?;
466 Ok(())
467 }
468
469 async fn fail(&self, job: ClaimedJob<Self::Claim>, error: String) -> QueueResult<()> {
470 let now = now_epoch_seconds()?;
471 let processing_ms = elapsed_ms(now, job.claim.started_at);
472 let result = self
473 .db
474 .execute(Statement::from_sql_and_values(
475 DbBackend::Sqlite,
476 "UPDATE jobs
477 SET status = ?,
478 locked_at = NULL,
479 lock_token = NULL,
480 last_error = ?,
481 updated_at = ?,
482 last_finished_at = ?,
483 processing_ms_last = ?,
484 processing_ms_total = processing_ms_total + ?
485 WHERE id = ? AND status = ? AND lock_token = ?"
486 .to_string(),
487 vec![
488 STATUS_FAILED.into(),
489 error.into(),
490 now.into(),
491 now.into(),
492 processing_ms.into(),
493 processing_ms.into(),
494 job.id.into(),
495 STATUS_PROCESSING.into(),
496 job.claim.lock_token.into(),
497 ],
498 ))
499 .await?;
500
501 ensure_lease(result.rows_affected(), job.id)?;
502 Ok(())
503 }
504}
505
506#[async_trait]
507impl DashboardData for SeaOrmQueue {
508 async fn dashboard_stats(&self) -> Result<DashboardStats, BoxError> {
509 let row = self
510 .db
511 .query_one(Statement::from_string(
512 DbBackend::Sqlite,
513 "SELECT
514 COUNT(*) AS total,
515 COALESCE(SUM(CASE WHEN status = 'queued' THEN 1 ELSE 0 END), 0) AS queued,
516 COALESCE(SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END), 0) AS processing,
517 COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0) AS completed,
518 COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0) AS failed,
519 COALESCE(SUM(CASE WHEN status = 'cleared' THEN 1 ELSE 0 END), 0) AS cleared
520 FROM jobs"
521 .to_string(),
522 ))
523 .await?
524 .ok_or_else(|| std::io::Error::other("stats query returned no row"))?;
525
526 Ok(DashboardStats {
527 total: row.try_get_by_index::<i64>(0)?,
528 queued: row.try_get_by_index::<i64>(1)?,
529 processing: row.try_get_by_index::<i64>(2)?,
530 completed: row.try_get_by_index::<i64>(3)?,
531 failed: row.try_get_by_index::<i64>(4)?,
532 cleared: row.try_get_by_index::<i64>(5)?,
533 })
534 }
535
536 async fn dashboard_jobs(&self, limit: i64) -> Result<Vec<DashboardJob>, BoxError> {
537 let rows = self
538 .db
539 .query_all(Statement::from_sql_and_values(
540 DbBackend::Sqlite,
541 dashboard_jobs_sql().to_string(),
542 vec![limit.into()],
543 ))
544 .await?;
545
546 rows.into_iter().map(dashboard_job_from_seaorm).collect()
547 }
548}
549
550fn dashboard_job_from_seaorm(row: sea_orm::QueryResult) -> Result<DashboardJob, BoxError> {
551 Ok(DashboardJob {
552 id: row.try_get_by_index::<i64>(0)?,
553 job_type: row.try_get_by_index::<String>(1)?,
554 status: row.try_get_by_index::<String>(2)?,
555 payload: row.try_get_by_index::<String>(3)?,
556 attempts: row.try_get_by_index::<i32>(4)?,
557 max_attempts: row.try_get_by_index::<i32>(5)?,
558 available_at: row.try_get_by_index::<i64>(6)?,
559 locked_at: row.try_get_by_index::<Option<i64>>(7)?,
560 last_error: row.try_get_by_index::<Option<String>>(8)?,
561 created_at: row.try_get_by_index::<i64>(9)?,
562 updated_at: row.try_get_by_index::<i64>(10)?,
563 completed_at: row.try_get_by_index::<Option<i64>>(11)?,
564 first_enqueued_at: row.try_get_by_index::<Option<i64>>(12)?,
565 last_enqueued_at: row.try_get_by_index::<Option<i64>>(13)?,
566 first_started_at: row.try_get_by_index::<Option<i64>>(14)?,
567 last_started_at: row.try_get_by_index::<Option<i64>>(15)?,
568 last_finished_at: row.try_get_by_index::<Option<i64>>(16)?,
569 queued_ms_total: row.try_get_by_index::<i64>(17)?,
570 queued_ms_last: row.try_get_by_index::<Option<i64>>(18)?,
571 processing_ms_total: row.try_get_by_index::<i64>(19)?,
572 processing_ms_last: row.try_get_by_index::<Option<i64>>(20)?,
573 })
574}
575
576fn dashboard_jobs_sql() -> &'static str {
577 "SELECT
578 id,
579 job_type,
580 status,
581 payload,
582 attempts,
583 max_attempts,
584 available_at,
585 locked_at,
586 last_error,
587 created_at,
588 updated_at,
589 completed_at,
590 first_enqueued_at,
591 last_enqueued_at,
592 first_started_at,
593 last_started_at,
594 last_finished_at,
595 COALESCE(queued_ms_total, 0) AS queued_ms_total,
596 queued_ms_last,
597 COALESCE(processing_ms_total, 0) AS processing_ms_total,
598 processing_ms_last
599 FROM jobs
600 ORDER BY id DESC
601 LIMIT ?"
602}
603
604fn timing_column_definitions() -> [(&'static str, &'static str); 9] {
605 [
606 ("first_enqueued_at", "INTEGER NULL"),
607 ("last_enqueued_at", "INTEGER NULL"),
608 ("first_started_at", "INTEGER NULL"),
609 ("last_started_at", "INTEGER NULL"),
610 ("last_finished_at", "INTEGER NULL"),
611 ("queued_ms_total", "INTEGER NOT NULL DEFAULT 0"),
612 ("queued_ms_last", "INTEGER NULL"),
613 ("processing_ms_total", "INTEGER NOT NULL DEFAULT 0"),
614 ("processing_ms_last", "INTEGER NULL"),
615 ]
616}
617
618fn ensure_lease(rows_affected: u64, job_id: i64) -> QueueResult<()> {
619 if rows_affected == 0 {
620 return Err(
621 std::io::Error::other(format!("lease was lost while processing job {job_id}")).into(),
622 );
623 }
624 Ok(())
625}
626
627fn make_worker_id() -> String {
628 format!("pid{}", std::process::id())
629}
630
631fn now_epoch_seconds() -> Result<i64, std::time::SystemTimeError> {
632 let secs = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
633 Ok(i64::try_from(secs).unwrap_or(i64::MAX))
634}
635
636fn duration_to_secs(duration: Duration) -> i64 {
637 i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
638}
639
640fn elapsed_ms(now_secs: i64, started_at_secs: i64) -> i64 {
641 now_secs
642 .saturating_sub(started_at_secs)
643 .max(0)
644 .saturating_mul(1_000)
645}