1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
use std::sync::Arc;
use dashmap::DashMap;
use sqlx::PgPool;
use tokio::sync::Semaphore;
use crate::sync::{self, BackoffStrategy};
use crate::*;
impl SimpleQueue {
/// Creates new Queue with default configuration:
/// - global semaphore with 500 permits (global limit)
/// - queue semaphore (per queue limit) with 100 permits
/// - no custrom semaphore strategy
/// - heartbeat interval of 5 seconds
/// - linear backoff strategy with 5s retry interval
/// - pool sleep when table is empty of 1 second
/// - max reprocess count (poison/loop detection) of 100
pub fn new(pool: PgPool) -> Self {
Self {
pool,
job_registry: DashMap::new(),
default_backoff_strategy: BackoffStrategy::default(),
default_queue_strategy: Arc::new(sync::InstantStrategy {}),
queue_backoff_strategies: DashMap::new(),
global_semaphore: Arc::new(Semaphore::new(500)),
queue_strategies: DashMap::new(),
queue_semaphores: DashMap::new(),
queue_sem_count: 100,
heartbeat_interval: tokio::time::Duration::from_secs(5),
empty_poll_sleep: tokio::time::Duration::from_secs(1),
max_reprocess_count: 100,
janitor_interval: tokio::time::Duration::from_secs(60),
hold_queue_semaphore: tokio::time::Duration::from_millis(500),
}
}
/// Set the number of global semaphore permits
///
/// Default value: `500`
pub fn with_global_semaphore(self, permits: usize) -> Self {
Self {
global_semaphore: Arc::new(Semaphore::new(permits)),
..self
}
}
/// Set the queue waiting strategy for a specific queue
///
/// Default value if not set is `default_queue_strategy` parameter.
pub fn with_queue_strategy(
self,
queue: String,
strategy: impl sync::JobStrategy + 'static,
) -> Self {
self.queue_strategies
.insert(queue.clone(), Arc::new(strategy));
self
}
/// Set the number of semaphore permits for a specific queue
///
/// Default value if not set is in `queue_sem_count` parameter set by `with_queue_default_semaphore_size`.
pub fn with_queue_semaphore(self, queue: String, permits: usize) -> Self {
self.queue_semaphores
.insert(queue, Arc::new(Semaphore::new(permits)));
self
}
/// Set the default number of permits for queues
///
/// Default value: `100`
pub fn with_queue_default_semaphore_size(self, permits: usize) -> Self {
Self {
queue_sem_count: permits,
..self
}
}
/// Set the heartbeat interval
///
/// Default values: `5 seconds`
pub fn with_heartbeat_interval(self, interval: tokio::time::Duration) -> Self {
Self {
heartbeat_interval: interval,
..self
}
}
/// Set the default backoff strategy.
///
/// See also [`BackoffStrategy`].
///
/// Default strategy: [`BackoffStrategy::Linear`] with delay of `5 seconds`.
pub fn with_default_backoff_strategy(self, strategy: BackoffStrategy) -> Self {
Self {
default_backoff_strategy: strategy,
..self
}
}
/// Set the default queue strategy
///
/// Default: [`sync::InstantStrategy`]
pub fn with_default_queue_strategy(self, strategy: Arc<dyn sync::JobStrategy>) -> Self {
Self {
default_queue_strategy: strategy,
..self
}
}
/// Set the backoff strategy for a specific queue
///
/// See also [`BackoffStrategy`].
///
/// Default is taken from `default_backoff_strategy`.
pub fn with_queue_backoff_strategy(self, queue: String, strategy: BackoffStrategy) -> Self {
self.queue_backoff_strategies.insert(queue, strategy);
self
}
/// Set the duration to sleep between polls when no jobs are found
///
/// This is used only when no jobs are found in the queue. Should be set to low value for critical
/// and high throughput queues.
///
/// Default: `1 second`.
pub fn with_empty_poll_sleep(self, duration: tokio::time::Duration) -> Self {
Self {
empty_poll_sleep: duration,
..self
}
}
/// Set the maximum number of times a job can be reprocessed
/// E.g. rescheduled without attempt consumption or recovered from stalled state (default: 100)
///
/// Reprocess count is to prevent infinite reschedules (not consuming attempt) and stall job recovery.
///
/// Default: `100`.
pub fn with_max_reprocess_count(self, count: usize) -> Self {
Self {
max_reprocess_count: count,
..self
}
}
/// Set the interval between janitor runs.
///
/// Janitor moves completed and failed jobs to the archive and dlq tables
///
/// Default: `1 minute`.
pub fn with_janitor_interval(self, duration: tokio::time::Duration) -> Self {
Self {
janitor_interval: duration,
..self
}
}
/// Set the duration of how logn to hold the queue semaphore acquisition.
///
/// When queue semaphore is held, spawned tokio job will wait that much time
/// before giving up and releasing the semaphore. This is a hold-up mechanism
/// so that poller don't re-pick too quickly preventing other queues from being processed.
///
/// Default: `500 milliseconds`.
pub fn with_hold_queue_semaphore(self, duration: tokio::time::Duration) -> Self {
Self {
hold_queue_semaphore: duration,
..self
}
}
}