Skip to main content

simple_queue/queue/
builder.rs

1use std::sync::Arc;
2
3use dashmap::DashMap;
4use sqlx::PgPool;
5use tokio::sync::Semaphore;
6
7use crate::sync::{self, BackoffStrategy};
8use crate::*;
9
10impl SimpleQueue {
11    /// Creates new Queue with default configuration:
12    /// - global semaphore with 500 permits (global limit)
13    /// - queue semaphore (per queue limit) with 100 permits
14    /// - no custrom semaphore strategy
15    /// - heartbeat interval of 5 seconds
16    /// - linear backoff strategy with 5s retry interval
17    /// - pool sleep when table is empty of 1 second
18    /// - max reprocess count (poison/loop detection) of 100
19    pub fn new(pool: PgPool) -> Self {
20        Self {
21            pool,
22            job_registry: DashMap::new(),
23            default_backoff_strategy: BackoffStrategy::default(),
24            default_queue_strategy: Arc::new(sync::InstantStrategy {}),
25            queue_backoff_strategies: DashMap::new(),
26            global_semaphore: Arc::new(Semaphore::new(500)),
27            queue_strategies: DashMap::new(),
28            queue_semaphores: DashMap::new(),
29            queue_sem_count: 100,
30            heartbeat_interval: tokio::time::Duration::from_secs(5),
31            empty_poll_sleep: tokio::time::Duration::from_secs(1),
32            max_reprocess_count: 100,
33            janitor_interval: tokio::time::Duration::from_secs(60),
34            hold_queue_semaphore: tokio::time::Duration::from_millis(500),
35        }
36    }
37    /// Create a new `SimpleQueue` with a PostgreSQL pool created from a URL.
38    /// See [`SimpleQueue::new`] for default configuration details.
39    pub async fn new_from_url(url: &str) -> Result<Self, sqlx::Error> {
40        let pool = sqlx::PgPool::connect(url).await?;
41        Ok(Self::new(pool))
42    }
43    /// Set the number of global semaphore permits
44    ///
45    /// Default value: `500`
46    pub fn with_global_semaphore(self, permits: usize) -> Self {
47        Self {
48            global_semaphore: Arc::new(Semaphore::new(permits)),
49            ..self
50        }
51    }
52    /// Set the queue waiting strategy for a specific queue
53    ///
54    /// Default value if not set is `default_queue_strategy` parameter.
55    pub fn with_queue_strategy(
56        self,
57        queue: String,
58        strategy: impl sync::JobStrategy + 'static,
59    ) -> Self {
60        self.queue_strategies
61            .insert(queue.clone(), Arc::new(strategy));
62        self
63    }
64    /// Set the number of semaphore permits for a specific queue
65    ///
66    /// Default value if not set is in `queue_sem_count` parameter set by `with_queue_default_semaphore_size`.
67    pub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self {
68        self.queue_semaphores
69            .insert(queue, Arc::new(Semaphore::new(permits)));
70        self
71    }
72    /// Set the default number of permits for queues
73    ///
74    /// Default value: `100`
75    pub fn with_queue_default_semaphore_size(self, permits: usize) -> Self {
76        Self {
77            queue_sem_count: permits,
78            ..self
79        }
80    }
81    /// Set the heartbeat interval
82    ///
83    /// Default values: `5 seconds`
84    pub fn with_heartbeat_interval(self, interval: tokio::time::Duration) -> Self {
85        Self {
86            heartbeat_interval: interval,
87            ..self
88        }
89    }
90    /// Set the default backoff strategy.
91    ///
92    /// See also [`BackoffStrategy`].
93    ///
94    /// Default strategy: [`BackoffStrategy::Linear`] with delay of `5 seconds`.
95    pub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self {
96        Self {
97            default_backoff_strategy: strategy,
98            ..self
99        }
100    }
101    /// Set the default queue strategy
102    ///
103    /// Default: [`sync::InstantStrategy`]
104    pub fn with_default_queue_strategy(self, strategy: Arc<dyn sync::JobStrategy>) -> Self {
105        Self {
106            default_queue_strategy: strategy,
107            ..self
108        }
109    }
110    /// Set the backoff strategy for a specific queue
111    ///
112    /// See also [`BackoffStrategy`].
113    ///
114    /// Default is taken from `default_backoff_strategy`.
115    pub fn with_queue_backoff_strategy(self, queue: String, strategy: BackoffStrategy) -> Self {
116        self.queue_backoff_strategies.insert(queue, strategy);
117        self
118    }
119
120    /// Set the duration to sleep between polls when no jobs are found
121    ///
122    /// This is used only when no jobs are found in the queue. Should be set to low value for critical
123    /// and high throughput queues.
124    ///
125    /// Default: `1 second`.
126    pub fn with_empty_poll_sleep(self, duration: tokio::time::Duration) -> Self {
127        Self {
128            empty_poll_sleep: duration,
129            ..self
130        }
131    }
132    /// Set the maximum number of times a job can be reprocessed
133    /// E.g. rescheduled without attempt consumption or recovered from stalled state (default: 100)
134    ///
135    /// Reprocess count is to prevent infinite reschedules (not consuming attempt) and stall job recovery.
136    ///
137    /// Default: `100`.
138    pub fn with_max_reprocess_count(self, count: usize) -> Self {
139        Self {
140            max_reprocess_count: count,
141            ..self
142        }
143    }
144
145    /// Set the interval between janitor runs.
146    ///
147    /// Janitor moves completed and failed jobs to the archive and dlq tables
148    ///
149    /// Default: `1 minute`.
150    pub fn with_janitor_interval(self, duration: tokio::time::Duration) -> Self {
151        Self {
152            janitor_interval: duration,
153            ..self
154        }
155    }
156
157    /// Set the duration of how logn to hold the queue semaphore acquisition.
158    ///
159    /// When queue semaphore is held, spawned tokio job will wait that much time
160    /// before giving up and releasing the semaphore. This is a hold-up mechanism
161    /// so that poller don't re-pick too quickly preventing other queues from being processed.
162    ///
163    /// Default: `500 milliseconds`.
164    pub fn with_hold_queue_semaphore(self, duration: tokio::time::Duration) -> Self {
165        Self {
166            hold_queue_semaphore: duration,
167            ..self
168        }
169    }
170}