apalis_redis/
config.rs

1use std::time::Duration;
2
3use apalis_core::backend::{BackendExt, ConfigExt, queue::Queue};
4use redis::RedisError;
5use ulid::Ulid;
6
7use crate::{RedisContext, RedisStorage};
8
9const ACTIVE_TASKS_LIST: &str = "{queue}:active";
10const WORKERS_SET: &str = "{queue}:workers";
11const DEAD_TASKS_SET: &str = "{queue}:dead";
12const DONE_TASKS_SET: &str = "{queue}:done";
13const FAILED_TASKS_SET: &str = "{queue}:failed";
14const INFLIGHT_TASKS_SET: &str = "{queue}:inflight";
15const TASK_DATA_HASH: &str = "{queue}:data";
16const JOB_META_HASH: &str = "{queue}:meta";
17const SCHEDULED_TASKS_SET: &str = "{queue}:scheduled";
18const SIGNAL_LIST: &str = "{queue}:signal";
19
20/// Config for a [`RedisStorage`]
21///
22/// RedisConfig allows you to customize various settings for the Redis storage backend,
23/// including polling intervals, buffer sizes, namespaces, and job re-enqueueing behavior.
24///
25/// [`RedisStorage`]: crate::RedisStorage
26#[derive(Clone, Debug)]
27pub struct RedisConfig {
28    poll_interval: Duration,
29    buffer_size: usize,
30    keep_alive: Duration,
31    enqueue_scheduled: Duration,
32    reenqueue_orphaned_after: Duration,
33    queue: Queue,
34}
35
36impl Default for RedisConfig {
37    fn default() -> Self {
38        Self {
39            poll_interval: Duration::from_millis(100),
40            buffer_size: 10,
41            keep_alive: Duration::from_secs(30),
42            enqueue_scheduled: Duration::from_secs(1),
43            reenqueue_orphaned_after: Duration::from_secs(300),
44            queue: Queue::from("default"),
45        }
46    }
47}
48
49impl RedisConfig {
50    /// Creates a new RedisConfig with the specified queue namespace.
51    pub fn new(queue: &str) -> Self {
52        Self {
53            queue: Queue::from(queue),
54            ..Default::default()
55        }
56    }
57    /// Get the interval of polling
58    pub fn get_poll_interval(&self) -> &Duration {
59        &self.poll_interval
60    }
61
62    /// Get the number of jobs to fetch
63    pub fn get_buffer_size(&self) -> usize {
64        self.buffer_size
65    }
66
67    /// get the keep live rate
68    pub fn get_keep_alive(&self) -> &Duration {
69        &self.keep_alive
70    }
71
72    /// get the enqueued setting
73    pub fn get_enqueue_scheduled(&self) -> &Duration {
74        &self.enqueue_scheduled
75    }
76
77    /// get the namespace
78    pub fn get_namespace(&self) -> &Queue {
79        &self.queue
80    }
81
82    /// get the poll interval
83    pub fn set_poll_interval(mut self, poll_interval: Duration) -> Self {
84        self.poll_interval = poll_interval;
85        self
86    }
87
88    /// set the buffer setting
89    pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
90        self.buffer_size = buffer_size;
91        self
92    }
93
94    /// set the keep-alive setting
95    pub fn set_keep_alive(mut self, keep_alive: Duration) -> Self {
96        self.keep_alive = keep_alive;
97        self
98    }
99
100    /// get the enqueued setting
101    pub fn set_enqueue_scheduled(mut self, enqueue_scheduled: Duration) -> Self {
102        self.enqueue_scheduled = enqueue_scheduled;
103        self
104    }
105
106    /// set the namespace for the Storage
107    pub fn set_namespace(mut self, namespace: &str) -> Self {
108        self.queue = Queue::from(namespace);
109        self
110    }
111
112    /// Returns the Redis key for the list of pending jobs associated with the queue.
113    /// The key is dynamically generated using the namespace of the queue.
114    ///
115    /// # Returns
116    /// A `String` representing the Redis key for the pending jobs list.
117    pub fn active_jobs_list(&self) -> String {
118        ACTIVE_TASKS_LIST.replace("{queue}", self.queue.as_ref())
119    }
120
121    /// Returns the Redis key for the set of workers associated with the queue.
122    /// The key is dynamically generated using the namespace of the queue.
123    ///
124    /// # Returns
125    /// A `String` representing the Redis key for the workers set.
126    pub fn workers_set(&self) -> String {
127        WORKERS_SET.replace("{queue}", self.queue.as_ref())
128    }
129
130    /// Returns the Redis key for the set of dead jobs associated with the queue.
131    /// The key is dynamically generated using the namespace of the queue.
132    ///
133    /// # Returns
134    /// A `String` representing the Redis key for the dead jobs set.
135    pub fn dead_jobs_set(&self) -> String {
136        DEAD_TASKS_SET.replace("{queue}", self.queue.as_ref())
137    }
138
139    /// Returns the Redis key for the set of done jobs associated with the queue.
140    /// The key is dynamically generated using the namespace of the queue.
141    ///
142    /// # Returns
143    /// A `String` representing the Redis key for the done jobs set.
144    pub fn done_jobs_set(&self) -> String {
145        DONE_TASKS_SET.replace("{queue}", self.queue.as_ref())
146    }
147
148    /// Returns the Redis key for the set of failed jobs associated with the queue.
149    /// The key is dynamically generated using the namespace of the queue.
150    ///
151    /// # Returns
152    /// A `String` representing the Redis key for the failed jobs set.
153    pub fn failed_jobs_set(&self) -> String {
154        FAILED_TASKS_SET.replace("{queue}", self.queue.as_ref())
155    }
156
157    /// Returns the Redis key for the set of inflight jobs associated with the queue.
158    /// The key is dynamically generated using the namespace of the queue.
159    ///
160    /// # Returns
161    /// A `String` representing the Redis key for the inflight jobs set.
162    pub fn inflight_jobs_set(&self) -> String {
163        INFLIGHT_TASKS_SET.replace("{queue}", self.queue.as_ref())
164    }
165
166    /// Returns the Redis key for the hash storing job data associated with the queue.
167    /// The key is dynamically generated using the namespace of the queue.
168    ///
169    /// # Returns
170    /// A `String` representing the Redis key for the job data hash.
171    pub fn job_data_hash(&self) -> String {
172        TASK_DATA_HASH.replace("{queue}", self.queue.as_ref())
173    }
174
175    /// Returns the Redis key for the hash storing job metadata associated with the queue.
176    /// The key is dynamically generated using the namespace of the queue.
177    ///
178    /// # Returns
179    /// A `String` representing the Redis key for the job meta hash.
180    pub fn job_meta_hash(&self) -> String {
181        JOB_META_HASH.replace("{queue}", self.queue.as_ref())
182    }
183
184    /// Returns the Redis key for the set of scheduled jobs associated with the queue.
185    /// The key is dynamically generated using the namespace of the queue.
186    ///
187    /// # Returns
188    /// A `String` representing the Redis key for the scheduled jobs set.
189    pub fn scheduled_jobs_set(&self) -> String {
190        SCHEDULED_TASKS_SET.replace("{queue}", self.queue.as_ref())
191    }
192
193    /// Returns the Redis key for the list of signals associated with the queue.
194    /// The key is dynamically generated using the namespace of the queue.
195    ///
196    /// # Returns
197    /// A `String` representing the Redis key for the signal list.
198    pub fn signal_list(&self) -> String {
199        SIGNAL_LIST.replace("{queue}", self.queue.as_ref())
200    }
201
202    /// Gets the reenqueue_orphaned_after duration.
203    pub fn reenqueue_orphaned_after(&self) -> Duration {
204        self.reenqueue_orphaned_after
205    }
206
207    /// Gets a mutable reference to the reenqueue_orphaned_after.
208    pub fn reenqueue_orphaned_after_mut(&mut self) -> &mut Duration {
209        &mut self.reenqueue_orphaned_after
210    }
211
212    /// Occasionally some workers die, or abandon jobs because of panics.
213    /// This is the time a task takes before its back to the queue
214    ///
215    /// Defaults to 5 minutes
216    pub fn set_reenqueue_orphaned_after(mut self, after: Duration) -> Self {
217        self.reenqueue_orphaned_after = after;
218        self
219    }
220}
221
222impl<Args: Sync, Conn, C> ConfigExt for RedisStorage<Args, Conn, C>
223where
224    RedisStorage<Args, Conn, C>:
225        BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
226{
227    fn get_queue(&self) -> Queue {
228        self.config.queue.clone()
229    }
230}