simple_queue/queue/builder.rs
1use std::sync::Arc;
2
3use dashmap::DashMap;
4use sqlx::PgPool;
5use tokio::sync::Semaphore;
6
7use crate::sync::{self, BackoffStrategy};
8use crate::*;
9
10impl SimpleQueue {
11 /// Creates new Queue with default configuration:
12 /// - global semaphore with 500 permits (global limit)
13 /// - queue semaphore (per queue limit) with 100 permits
14 /// - no custrom semaphore strategy
15 /// - heartbeat interval of 5 seconds
16 /// - linear backoff strategy with 5s retry interval
17 /// - pool sleep when table is empty of 1 second
18 /// - max reprocess count (poison/loop detection) of 100
19 pub fn new(pool: PgPool) -> Self {
20 Self {
21 pool,
22 job_registry: DashMap::new(),
23 default_backoff_strategy: BackoffStrategy::default(),
24 default_queue_strategy: Arc::new(sync::InstantStrategy {}),
25 queue_backoff_strategies: DashMap::new(),
26 global_semaphore: Arc::new(Semaphore::new(500)),
27 queue_strategies: DashMap::new(),
28 queue_semaphores: DashMap::new(),
29 queue_sem_count: 100,
30 heartbeat_interval: tokio::time::Duration::from_secs(5),
31 empty_poll_sleep: tokio::time::Duration::from_secs(1),
32 max_reprocess_count: 100,
33 janitor_interval: tokio::time::Duration::from_secs(60),
34 hold_queue_semaphore: tokio::time::Duration::from_millis(500),
35 }
36 }
37 /// Create a new `SimpleQueue` with a PostgreSQL pool created from a URL.
38 /// See [`SimpleQueue::new`] for default configuration details.
39 pub async fn new_from_url(url: &str) -> Result<Self, sqlx::Error> {
40 let pool = sqlx::PgPool::connect(url).await?;
41 Ok(Self::new(pool))
42 }
43 /// Set the number of global semaphore permits
44 ///
45 /// Default value: `500`
46 pub fn with_global_semaphore(self, permits: usize) -> Self {
47 Self {
48 global_semaphore: Arc::new(Semaphore::new(permits)),
49 ..self
50 }
51 }
52 /// Set the queue waiting strategy for a specific queue
53 ///
54 /// Default value if not set is `default_queue_strategy` parameter.
55 pub fn with_queue_strategy(
56 self,
57 queue: String,
58 strategy: impl sync::JobStrategy + 'static,
59 ) -> Self {
60 self.queue_strategies
61 .insert(queue.clone(), Arc::new(strategy));
62 self
63 }
64 /// Set the number of semaphore permits for a specific queue
65 ///
66 /// Default value if not set is in `queue_sem_count` parameter set by `with_queue_default_semaphore_size`.
67 pub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self {
68 self.queue_semaphores
69 .insert(queue, Arc::new(Semaphore::new(permits)));
70 self
71 }
72 /// Set the default number of permits for queues
73 ///
74 /// Default value: `100`
75 pub fn with_queue_default_semaphore_size(self, permits: usize) -> Self {
76 Self {
77 queue_sem_count: permits,
78 ..self
79 }
80 }
81 /// Set the heartbeat interval
82 ///
83 /// Default values: `5 seconds`
84 pub fn with_heartbeat_interval(self, interval: tokio::time::Duration) -> Self {
85 Self {
86 heartbeat_interval: interval,
87 ..self
88 }
89 }
90 /// Set the default backoff strategy.
91 ///
92 /// See also [`BackoffStrategy`].
93 ///
94 /// Default strategy: [`BackoffStrategy::Linear`] with delay of `5 seconds`.
95 pub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self {
96 Self {
97 default_backoff_strategy: strategy,
98 ..self
99 }
100 }
101 /// Set the default queue strategy
102 ///
103 /// Default: [`sync::InstantStrategy`]
104 pub fn with_default_queue_strategy(self, strategy: Arc<dyn sync::JobStrategy>) -> Self {
105 Self {
106 default_queue_strategy: strategy,
107 ..self
108 }
109 }
110 /// Set the backoff strategy for a specific queue
111 ///
112 /// See also [`BackoffStrategy`].
113 ///
114 /// Default is taken from `default_backoff_strategy`.
115 pub fn with_queue_backoff_strategy(self, queue: String, strategy: BackoffStrategy) -> Self {
116 self.queue_backoff_strategies.insert(queue, strategy);
117 self
118 }
119
120 /// Set the duration to sleep between polls when no jobs are found
121 ///
122 /// This is used only when no jobs are found in the queue. Should be set to low value for critical
123 /// and high throughput queues.
124 ///
125 /// Default: `1 second`.
126 pub fn with_empty_poll_sleep(self, duration: tokio::time::Duration) -> Self {
127 Self {
128 empty_poll_sleep: duration,
129 ..self
130 }
131 }
132 /// Set the maximum number of times a job can be reprocessed
133 /// E.g. rescheduled without attempt consumption or recovered from stalled state (default: 100)
134 ///
135 /// Reprocess count is to prevent infinite reschedules (not consuming attempt) and stall job recovery.
136 ///
137 /// Default: `100`.
138 pub fn with_max_reprocess_count(self, count: usize) -> Self {
139 Self {
140 max_reprocess_count: count,
141 ..self
142 }
143 }
144
145 /// Set the interval between janitor runs.
146 ///
147 /// Janitor moves completed and failed jobs to the archive and dlq tables
148 ///
149 /// Default: `1 minute`.
150 pub fn with_janitor_interval(self, duration: tokio::time::Duration) -> Self {
151 Self {
152 janitor_interval: duration,
153 ..self
154 }
155 }
156
157 /// Set the duration of how logn to hold the queue semaphore acquisition.
158 ///
159 /// When queue semaphore is held, spawned tokio job will wait that much time
160 /// before giving up and releasing the semaphore. This is a hold-up mechanism
161 /// so that poller don't re-pick too quickly preventing other queues from being processed.
162 ///
163 /// Default: `500 milliseconds`.
164 pub fn with_hold_queue_semaphore(self, duration: tokio::time::Duration) -> Self {
165 Self {
166 hold_queue_semaphore: duration,
167 ..self
168 }
169 }
170}