simple_queue/queue.rs
1use std::sync::Arc;
2
3mod builder;
4mod handler_api;
5mod job_api;
6mod logic;
7#[cfg(feature = "wait-for-job")]
8mod wait_for_job;
9mod workers_api;
10
11use crate::handler;
12use crate::sync::{self, BackoffStrategy};
13use dashmap::DashMap;
14use sqlx::PgPool;
15use tokio::sync::Semaphore;
16
17// Implementations split across modules
18
19/// Queue engine struct
20/// Use [`SimpleQueue::new(pool: PgPool)`] to create an instance.
21///
22/// Queue is configurable by builder-style methods.
23///
24/// Following parameters are configuratble:
25/// - `heartbeat_interval` - how often running job will be touched (thus indicating the job is actually running)
26/// when job stops being updated it's recognized as stalled and is requeued by a reaper worker
27/// - `empty_poll_sleep` - how long to sleep between between polling when queue is empty -
28/// should be low for high throughput queues, high for low throughput queues
29/// - `max_reprocess_count` - how many times a job will be reprocessed before being discarded - poison job discovery
30/// (count increases only on job reschedule and stalled job recovery)
31/// - `janitor_interval` - how often the janitor task will run (i.e. archive completed jobs, move failed jobs to dead queue)
32/// - `hold_queue_semaphore` - how long to hold a queue semaphore before releasing it
33/// (when queue is congested releasing immediately might result in polling repicking same jobs over and over, starving other queues)
34pub struct SimpleQueue {
35 pool: PgPool,
36 job_registry: DashMap<&'static str, Arc<dyn handler::DynJobHandler>>,
37 global_semaphore: Arc<Semaphore>,
38 queue_strategies: DashMap<String, Arc<dyn sync::JobStrategy>>,
39 queue_semaphores: DashMap<String, Arc<Semaphore>>,
40 queue_sem_count: usize,
41 heartbeat_interval: tokio::time::Duration,
42 default_backoff_strategy: BackoffStrategy,
43 default_queue_strategy: Arc<dyn sync::JobStrategy>,
44 queue_backoff_strategies: DashMap<String, BackoffStrategy>,
45 empty_poll_sleep: tokio::time::Duration,
46 max_reprocess_count: usize,
47 janitor_interval: tokio::time::Duration,
48 hold_queue_semaphore: tokio::time::Duration,
49}