apalis_redis/config.rs
1use std::time::Duration;
2
3use apalis_core::backend::{BackendExt, ConfigExt, queue::Queue};
4use redis::RedisError;
5use ulid::Ulid;
6
7use crate::{RedisContext, RedisStorage};
8
9const ACTIVE_TASKS_LIST: &str = "{queue}:active";
10const WORKERS_SET: &str = "{queue}:workers";
11const DEAD_TASKS_SET: &str = "{queue}:dead";
12const DONE_TASKS_SET: &str = "{queue}:done";
13const FAILED_TASKS_SET: &str = "{queue}:failed";
14const INFLIGHT_TASKS_SET: &str = "{queue}:inflight";
15const TASK_DATA_HASH: &str = "{queue}:data";
16const JOB_META_HASH: &str = "{queue}:meta";
17const SCHEDULED_TASKS_SET: &str = "{queue}:scheduled";
18const SIGNAL_LIST: &str = "{queue}:signal";
19
20/// Config for a [`RedisStorage`]
21///
22/// RedisConfig allows you to customize various settings for the Redis storage backend,
23/// including polling intervals, buffer sizes, namespaces, and job re-enqueueing behavior.
24///
25/// [`RedisStorage`]: crate::RedisStorage
26#[derive(Clone, Debug)]
27pub struct RedisConfig {
28 poll_interval: Duration,
29 buffer_size: usize,
30 keep_alive: Duration,
31 enqueue_scheduled: Duration,
32 reenqueue_orphaned_after: Duration,
33 queue: Queue,
34}
35
36impl Default for RedisConfig {
37 fn default() -> Self {
38 Self {
39 poll_interval: Duration::from_millis(100),
40 buffer_size: 10,
41 keep_alive: Duration::from_secs(30),
42 enqueue_scheduled: Duration::from_secs(1),
43 reenqueue_orphaned_after: Duration::from_secs(300),
44 queue: Queue::from("default"),
45 }
46 }
47}
48
49impl RedisConfig {
50 /// Creates a new RedisConfig with the specified queue namespace.
51 pub fn new(queue: &str) -> Self {
52 Self {
53 queue: Queue::from(queue),
54 ..Default::default()
55 }
56 }
57 /// Get the interval of polling
58 pub fn get_poll_interval(&self) -> &Duration {
59 &self.poll_interval
60 }
61
62 /// Get the number of jobs to fetch
63 pub fn get_buffer_size(&self) -> usize {
64 self.buffer_size
65 }
66
67 /// get the keep live rate
68 pub fn get_keep_alive(&self) -> &Duration {
69 &self.keep_alive
70 }
71
72 /// get the enqueued setting
73 pub fn get_enqueue_scheduled(&self) -> &Duration {
74 &self.enqueue_scheduled
75 }
76
77 /// get the namespace
78 pub fn get_namespace(&self) -> &Queue {
79 &self.queue
80 }
81
82 /// get the poll interval
83 pub fn set_poll_interval(mut self, poll_interval: Duration) -> Self {
84 self.poll_interval = poll_interval;
85 self
86 }
87
88 /// set the buffer setting
89 pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
90 self.buffer_size = buffer_size;
91 self
92 }
93
94 /// set the keep-alive setting
95 pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
96 self.keep_alive = keep_alive;
97 self
98 }
99
100 /// get the enqueued setting
101 pub fn set_enqueue_scheduled(mut self, enqueue_scheduled: Duration) -> Self {
102 self.enqueue_scheduled = enqueue_scheduled;
103 self
104 }
105
106 /// set the namespace for the Storage
107 pub fn set_namespace(mut self, namespace: &str) -> Self {
108 self.queue = Queue::from(namespace);
109 self
110 }
111
112 /// Returns the Redis key for the list of pending jobs associated with the queue.
113 /// The key is dynamically generated using the namespace of the queue.
114 ///
115 /// # Returns
116 /// A `String` representing the Redis key for the pending jobs list.
117 pub fn active_jobs_list(&self) -> String {
118 ACTIVE_TASKS_LIST.replace("{queue}", self.queue.as_ref())
119 }
120
121 /// Returns the Redis key for the set of workers associated with the queue.
122 /// The key is dynamically generated using the namespace of the queue.
123 ///
124 /// # Returns
125 /// A `String` representing the Redis key for the workers set.
126 pub fn workers_set(&self) -> String {
127 WORKERS_SET.replace("{queue}", self.queue.as_ref())
128 }
129
130 /// Returns the Redis key for the set of dead jobs associated with the queue.
131 /// The key is dynamically generated using the namespace of the queue.
132 ///
133 /// # Returns
134 /// A `String` representing the Redis key for the dead jobs set.
135 pub fn dead_jobs_set(&self) -> String {
136 DEAD_TASKS_SET.replace("{queue}", self.queue.as_ref())
137 }
138
139 /// Returns the Redis key for the set of done jobs associated with the queue.
140 /// The key is dynamically generated using the namespace of the queue.
141 ///
142 /// # Returns
143 /// A `String` representing the Redis key for the done jobs set.
144 pub fn done_jobs_set(&self) -> String {
145 DONE_TASKS_SET.replace("{queue}", self.queue.as_ref())
146 }
147
148 /// Returns the Redis key for the set of failed jobs associated with the queue.
149 /// The key is dynamically generated using the namespace of the queue.
150 ///
151 /// # Returns
152 /// A `String` representing the Redis key for the failed jobs set.
153 pub fn failed_jobs_set(&self) -> String {
154 FAILED_TASKS_SET.replace("{queue}", self.queue.as_ref())
155 }
156
157 /// Returns the Redis key for the set of inflight jobs associated with the queue.
158 /// The key is dynamically generated using the namespace of the queue.
159 ///
160 /// # Returns
161 /// A `String` representing the Redis key for the inflight jobs set.
162 pub fn inflight_jobs_set(&self) -> String {
163 INFLIGHT_TASKS_SET.replace("{queue}", self.queue.as_ref())
164 }
165
166 /// Returns the Redis key for the hash storing job data associated with the queue.
167 /// The key is dynamically generated using the namespace of the queue.
168 ///
169 /// # Returns
170 /// A `String` representing the Redis key for the job data hash.
171 pub fn job_data_hash(&self) -> String {
172 TASK_DATA_HASH.replace("{queue}", self.queue.as_ref())
173 }
174
175 /// Returns the Redis key for the hash storing job metadata associated with the queue.
176 /// The key is dynamically generated using the namespace of the queue.
177 ///
178 /// # Returns
179 /// A `String` representing the Redis key for the job meta hash.
180 pub fn job_meta_hash(&self) -> String {
181 JOB_META_HASH.replace("{queue}", self.queue.as_ref())
182 }
183
184 /// Returns the Redis key for the set of scheduled jobs associated with the queue.
185 /// The key is dynamically generated using the namespace of the queue.
186 ///
187 /// # Returns
188 /// A `String` representing the Redis key for the scheduled jobs set.
189 pub fn scheduled_jobs_set(&self) -> String {
190 SCHEDULED_TASKS_SET.replace("{queue}", self.queue.as_ref())
191 }
192
193 /// Returns the Redis key for the list of signals associated with the queue.
194 /// The key is dynamically generated using the namespace of the queue.
195 ///
196 /// # Returns
197 /// A `String` representing the Redis key for the signal list.
198 pub fn signal_list(&self) -> String {
199 SIGNAL_LIST.replace("{queue}", self.queue.as_ref())
200 }
201
202 /// Gets the reenqueue_orphaned_after duration.
203 pub fn reenqueue_orphaned_after(&self) -> Duration {
204 self.reenqueue_orphaned_after
205 }
206
207 /// Gets a mutable reference to the reenqueue_orphaned_after.
208 pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
209 &mut self.reenqueue_orphaned_after
210 }
211
212 /// Occasionally some workers die, or abandon jobs because of panics.
213 /// This is the time a task takes before its back to the queue
214 ///
215 /// Defaults to 5 minutes
216 pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
217 self.reenqueue_orphaned_after = after;
218 self
219 }
220}
221
222impl<Args: Sync, Conn, C> ConfigExt for RedisStorage<Args, Conn, C>
223where
224 RedisStorage<Args, Conn, C>:
225 BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
226{
227 fn get_queue(&self) -> Queue {
228 self.config.queue.clone()
229 }
230}