Skip to main content

ferro_queue/
config.rs

1//! Queue configuration.
2
3use std::env;
4use std::time::Duration;
5
6/// Queue system configuration.
7#[derive(Debug, Clone)]
8pub struct QueueConfig {
9    /// Redis connection URL.
10    pub redis_url: String,
11    /// Default queue name.
12    pub default_queue: String,
13    /// Prefix for queue keys in Redis.
14    pub prefix: String,
15    /// How long to block waiting for jobs (in seconds).
16    pub block_timeout: Duration,
17    /// Maximum number of concurrent jobs per worker.
18    pub max_concurrent_jobs: usize,
19    /// How often to check for delayed jobs.
20    pub delayed_job_poll_interval: Duration,
21}
22
23impl Default for QueueConfig {
24    fn default() -> Self {
25        Self {
26            redis_url: "redis://127.0.0.1:6379".to_string(),
27            default_queue: "default".to_string(),
28            prefix: "ferro_queue".to_string(),
29            block_timeout: Duration::from_secs(5),
30            max_concurrent_jobs: 10,
31            delayed_job_poll_interval: Duration::from_secs(1),
32        }
33    }
34}
35
36impl QueueConfig {
37    /// Create a new configuration with a Redis URL.
38    pub fn new(redis_url: impl Into<String>) -> Self {
39        Self {
40            redis_url: redis_url.into(),
41            ..Default::default()
42        }
43    }
44
45    /// Create configuration from environment variables.
46    ///
47    /// Reads the following environment variables:
48    /// - `QUEUE_CONNECTION`: "sync" or "redis" (defaults to "sync")
49    /// - `QUEUE_DEFAULT`: Default queue name (defaults to "default")
50    /// - `QUEUE_PREFIX`: Key prefix in Redis (defaults to "ferro_queue")
51    /// - `QUEUE_BLOCK_TIMEOUT`: Seconds to block waiting for jobs (defaults to 5)
52    /// - `QUEUE_MAX_CONCURRENT`: Max concurrent jobs per worker (defaults to 10)
53    /// - `REDIS_URL`: Full Redis URL (takes precedence if set)
54    /// - `REDIS_HOST`: Redis host (defaults to "127.0.0.1")
55    /// - `REDIS_PORT`: Redis port (defaults to 6379)
56    /// - `REDIS_PASSWORD`: Redis password (optional)
57    /// - `REDIS_DATABASE`: Redis database number (defaults to 0)
58    ///
59    /// # Example
60    ///
61    /// ```rust,ignore
62    /// use ferro_queue::QueueConfig;
63    ///
64    /// // In bootstrap.rs
65    /// let config = QueueConfig::from_env();
66    /// Queue::init(config).await?;
67    /// ```
68    pub fn from_env() -> Self {
69        let redis_url = Self::build_redis_url();
70
71        Self {
72            redis_url,
73            default_queue: env::var("QUEUE_DEFAULT").unwrap_or_else(|_| "default".to_string()),
74            prefix: env::var("QUEUE_PREFIX").unwrap_or_else(|_| "ferro_queue".to_string()),
75            block_timeout: Duration::from_secs(
76                env::var("QUEUE_BLOCK_TIMEOUT")
77                    .ok()
78                    .and_then(|v| v.parse().ok())
79                    .unwrap_or(5),
80            ),
81            max_concurrent_jobs: env::var("QUEUE_MAX_CONCURRENT")
82                .ok()
83                .and_then(|v| v.parse().ok())
84                .unwrap_or(10),
85            delayed_job_poll_interval: Duration::from_secs(1),
86        }
87    }
88
89    /// Build Redis URL from environment variables.
90    fn build_redis_url() -> String {
91        // Check for explicit REDIS_URL first
92        if let Ok(url) = env::var("REDIS_URL") {
93            return url;
94        }
95
96        let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
97        let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string());
98        let password = env::var("REDIS_PASSWORD").ok().filter(|p| !p.is_empty());
99        let database = env::var("REDIS_DATABASE").unwrap_or_else(|_| "0".to_string());
100
101        match password {
102            Some(pass) => format!("redis://:{pass}@{host}:{port}/{database}"),
103            None => format!("redis://{host}:{port}/{database}"),
104        }
105    }
106
107    /// Check if sync queue mode is configured.
108    ///
109    /// When QUEUE_CONNECTION=sync, jobs are processed immediately instead
110    /// of being pushed to Redis.
111    pub fn is_sync_mode() -> bool {
112        env::var("QUEUE_CONNECTION")
113            .map(|v| v.to_lowercase() == "sync")
114            .unwrap_or(true) // Default to sync for development
115    }
116
117    /// Set the default queue name.
118    pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
119        self.default_queue = queue.into();
120        self
121    }
122
123    /// Set the key prefix.
124    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
125        self.prefix = prefix.into();
126        self
127    }
128
129    /// Set the block timeout.
130    pub fn block_timeout(mut self, timeout: Duration) -> Self {
131        self.block_timeout = timeout;
132        self
133    }
134
135    /// Set max concurrent jobs.
136    pub fn max_concurrent_jobs(mut self, count: usize) -> Self {
137        self.max_concurrent_jobs = count;
138        self
139    }
140
141    /// Get the Redis key for a queue.
142    pub fn queue_key(&self, queue: &str) -> String {
143        format!("{}:{}", self.prefix, queue)
144    }
145
146    /// Get the Redis key for delayed jobs.
147    pub fn delayed_key(&self, queue: &str) -> String {
148        format!("{}:{}:delayed", self.prefix, queue)
149    }
150
151    /// Get the Redis key for reserved jobs.
152    pub fn reserved_key(&self, queue: &str) -> String {
153        format!("{}:{}:reserved", self.prefix, queue)
154    }
155
156    /// Get the Redis key for failed jobs.
157    pub fn failed_key(&self) -> String {
158        format!("{}:failed", self.prefix)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use serial_test::serial;
166
167    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
168    struct EnvGuard {
169        vars: Vec<String>,
170    }
171
172    impl EnvGuard {
173        fn set(key: &str, value: &str) -> Self {
174            env::set_var(key, value);
175            Self {
176                vars: vec![key.to_string()],
177            }
178        }
179
180        fn also_set(&mut self, key: &str, value: &str) {
181            env::set_var(key, value);
182            self.vars.push(key.to_string());
183        }
184
185        fn also_remove(&mut self, key: &str) {
186            env::remove_var(key);
187            self.vars.push(key.to_string());
188        }
189    }
190
191    impl Drop for EnvGuard {
192        fn drop(&mut self) {
193            for var in &self.vars {
194                env::remove_var(var);
195            }
196        }
197    }
198
199    #[test]
200    fn test_default_config() {
201        let config = QueueConfig::default();
202        assert_eq!(config.default_queue, "default");
203        assert_eq!(config.prefix, "ferro_queue");
204    }
205
206    #[test]
207    fn test_queue_key() {
208        let config = QueueConfig::default();
209        assert_eq!(config.queue_key("emails"), "ferro_queue:emails");
210        assert_eq!(config.delayed_key("emails"), "ferro_queue:emails:delayed");
211    }
212
213    #[test]
214    fn test_builder_pattern() {
215        let config = QueueConfig::new("redis://localhost:6380")
216            .default_queue("high-priority")
217            .prefix("myapp")
218            .max_concurrent_jobs(5);
219
220        assert_eq!(config.redis_url, "redis://localhost:6380");
221        assert_eq!(config.default_queue, "high-priority");
222        assert_eq!(config.prefix, "myapp");
223        assert_eq!(config.max_concurrent_jobs, 5);
224    }
225
226    #[test]
227    #[serial]
228    fn test_from_env_defaults() {
229        let mut guard = EnvGuard { vars: Vec::new() };
230        // Clear any existing env vars (guard tracks them for cleanup)
231        guard.also_remove("QUEUE_DEFAULT");
232        guard.also_remove("QUEUE_PREFIX");
233        guard.also_remove("QUEUE_BLOCK_TIMEOUT");
234        guard.also_remove("QUEUE_MAX_CONCURRENT");
235        guard.also_remove("REDIS_URL");
236        guard.also_remove("REDIS_HOST");
237        guard.also_remove("REDIS_PORT");
238        guard.also_remove("REDIS_PASSWORD");
239        guard.also_remove("REDIS_DATABASE");
240
241        let config = QueueConfig::from_env();
242        assert_eq!(config.default_queue, "default");
243        assert_eq!(config.prefix, "ferro_queue");
244        assert_eq!(config.redis_url, "redis://127.0.0.1:6379/0");
245        assert_eq!(config.max_concurrent_jobs, 10);
246    }
247
248    #[test]
249    #[serial]
250    fn test_from_env_with_redis_url() {
251        let _guard = EnvGuard::set("REDIS_URL", "redis://custom:6380/5");
252        let config = QueueConfig::from_env();
253        assert_eq!(config.redis_url, "redis://custom:6380/5");
254    }
255
256    #[test]
257    #[serial]
258    fn test_build_redis_url_with_password() {
259        let mut guard = EnvGuard { vars: Vec::new() };
260        guard.also_remove("REDIS_URL");
261        guard.also_set("REDIS_HOST", "redis.example.com");
262        guard.also_set("REDIS_PORT", "6380");
263        guard.also_set("REDIS_PASSWORD", "secret123");
264        guard.also_set("REDIS_DATABASE", "3");
265
266        let url = QueueConfig::build_redis_url();
267        assert_eq!(url, "redis://:secret123@redis.example.com:6380/3");
268    }
269
270    #[test]
271    #[serial]
272    fn test_is_sync_mode() {
273        let mut guard = EnvGuard { vars: Vec::new() };
274        guard.also_remove("QUEUE_CONNECTION");
275        assert!(QueueConfig::is_sync_mode()); // default is sync
276
277        guard.also_set("QUEUE_CONNECTION", "sync");
278        assert!(QueueConfig::is_sync_mode());
279
280        env::set_var("QUEUE_CONNECTION", "redis");
281        assert!(!QueueConfig::is_sync_mode());
282
283        env::set_var("QUEUE_CONNECTION", "SYNC");
284        assert!(QueueConfig::is_sync_mode()); // case insensitive
285    }
286}