ferro_queue/
config.rs

1//! Queue configuration.
2
3use std::env;
4use std::time::Duration;
5
6/// Queue system configuration.
7#[derive(Debug, Clone)]
8pub struct QueueConfig {
9    /// Redis connection URL.
10    pub redis_url: String,
11    /// Default queue name.
12    pub default_queue: String,
13    /// Prefix for queue keys in Redis.
14    pub prefix: String,
15    /// How long to block waiting for jobs (in seconds).
16    pub block_timeout: Duration,
17    /// Maximum number of concurrent jobs per worker.
18    pub max_concurrent_jobs: usize,
19    /// How often to check for delayed jobs.
20    pub delayed_job_poll_interval: Duration,
21}
22
23impl Default for QueueConfig {
24    fn default() -> Self {
25        Self {
26            redis_url: "redis://127.0.0.1:6379".to_string(),
27            default_queue: "default".to_string(),
28            prefix: "ferro_queue".to_string(),
29            block_timeout: Duration::from_secs(5),
30            max_concurrent_jobs: 10,
31            delayed_job_poll_interval: Duration::from_secs(1),
32        }
33    }
34}
35
36impl QueueConfig {
37    /// Create a new configuration with a Redis URL.
38    pub fn new(redis_url: impl Into<String>) -> Self {
39        Self {
40            redis_url: redis_url.into(),
41            ..Default::default()
42        }
43    }
44
45    /// Create configuration from environment variables.
46    ///
47    /// Reads the following environment variables:
48    /// - `QUEUE_CONNECTION`: "sync" or "redis" (defaults to "sync")
49    /// - `QUEUE_DEFAULT`: Default queue name (defaults to "default")
50    /// - `QUEUE_PREFIX`: Key prefix in Redis (defaults to "ferro_queue")
51    /// - `QUEUE_BLOCK_TIMEOUT`: Seconds to block waiting for jobs (defaults to 5)
52    /// - `QUEUE_MAX_CONCURRENT`: Max concurrent jobs per worker (defaults to 10)
53    /// - `REDIS_URL`: Full Redis URL (takes precedence if set)
54    /// - `REDIS_HOST`: Redis host (defaults to "127.0.0.1")
55    /// - `REDIS_PORT`: Redis port (defaults to 6379)
56    /// - `REDIS_PASSWORD`: Redis password (optional)
57    /// - `REDIS_DATABASE`: Redis database number (defaults to 0)
58    ///
59    /// # Example
60    ///
61    /// ```rust,ignore
62    /// use ferro_queue::QueueConfig;
63    ///
64    /// // In bootstrap.rs
65    /// let config = QueueConfig::from_env();
66    /// Queue::init(config).await?;
67    /// ```
68    pub fn from_env() -> Self {
69        let redis_url = Self::build_redis_url();
70
71        Self {
72            redis_url,
73            default_queue: env::var("QUEUE_DEFAULT").unwrap_or_else(|_| "default".to_string()),
74            prefix: env::var("QUEUE_PREFIX").unwrap_or_else(|_| "ferro_queue".to_string()),
75            block_timeout: Duration::from_secs(
76                env::var("QUEUE_BLOCK_TIMEOUT")
77                    .ok()
78                    .and_then(|v| v.parse().ok())
79                    .unwrap_or(5),
80            ),
81            max_concurrent_jobs: env::var("QUEUE_MAX_CONCURRENT")
82                .ok()
83                .and_then(|v| v.parse().ok())
84                .unwrap_or(10),
85            delayed_job_poll_interval: Duration::from_secs(1),
86        }
87    }
88
89    /// Build Redis URL from environment variables.
90    fn build_redis_url() -> String {
91        // Check for explicit REDIS_URL first
92        if let Ok(url) = env::var("REDIS_URL") {
93            return url;
94        }
95
96        let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
97        let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string());
98        let password = env::var("REDIS_PASSWORD").ok().filter(|p| !p.is_empty());
99        let database = env::var("REDIS_DATABASE").unwrap_or_else(|_| "0".to_string());
100
101        match password {
102            Some(pass) => format!("redis://:{}@{}:{}/{}", pass, host, port, database),
103            None => format!("redis://{}:{}/{}", host, port, database),
104        }
105    }
106
107    /// Check if sync queue mode is configured.
108    ///
109    /// When QUEUE_CONNECTION=sync, jobs are processed immediately instead
110    /// of being pushed to Redis.
111    pub fn is_sync_mode() -> bool {
112        env::var("QUEUE_CONNECTION")
113            .map(|v| v.to_lowercase() == "sync")
114            .unwrap_or(true) // Default to sync for development
115    }
116
117    /// Set the default queue name.
118    pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
119        self.default_queue = queue.into();
120        self
121    }
122
123    /// Set the key prefix.
124    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
125        self.prefix = prefix.into();
126        self
127    }
128
129    /// Set the block timeout.
130    pub fn block_timeout(mut self, timeout: Duration) -> Self {
131        self.block_timeout = timeout;
132        self
133    }
134
135    /// Set max concurrent jobs.
136    pub fn max_concurrent_jobs(mut self, count: usize) -> Self {
137        self.max_concurrent_jobs = count;
138        self
139    }
140
141    /// Get the Redis key for a queue.
142    pub fn queue_key(&self, queue: &str) -> String {
143        format!("{}:{}", self.prefix, queue)
144    }
145
146    /// Get the Redis key for delayed jobs.
147    pub fn delayed_key(&self, queue: &str) -> String {
148        format!("{}:{}:delayed", self.prefix, queue)
149    }
150
151    /// Get the Redis key for reserved jobs.
152    pub fn reserved_key(&self, queue: &str) -> String {
153        format!("{}:{}:reserved", self.prefix, queue)
154    }
155
156    /// Get the Redis key for failed jobs.
157    pub fn failed_key(&self) -> String {
158        format!("{}:failed", self.prefix)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use serial_test::serial;
166
167    #[test]
168    fn test_default_config() {
169        let config = QueueConfig::default();
170        assert_eq!(config.default_queue, "default");
171        assert_eq!(config.prefix, "ferro_queue");
172    }
173
174    #[test]
175    fn test_queue_key() {
176        let config = QueueConfig::default();
177        assert_eq!(config.queue_key("emails"), "ferro_queue:emails");
178        assert_eq!(config.delayed_key("emails"), "ferro_queue:emails:delayed");
179    }
180
181    #[test]
182    fn test_builder_pattern() {
183        let config = QueueConfig::new("redis://localhost:6380")
184            .default_queue("high-priority")
185            .prefix("myapp")
186            .max_concurrent_jobs(5);
187
188        assert_eq!(config.redis_url, "redis://localhost:6380");
189        assert_eq!(config.default_queue, "high-priority");
190        assert_eq!(config.prefix, "myapp");
191        assert_eq!(config.max_concurrent_jobs, 5);
192    }
193
194    #[test]
195    #[serial]
196    fn test_from_env_defaults() {
197        // Clear any existing env vars
198        env::remove_var("QUEUE_DEFAULT");
199        env::remove_var("QUEUE_PREFIX");
200        env::remove_var("QUEUE_BLOCK_TIMEOUT");
201        env::remove_var("QUEUE_MAX_CONCURRENT");
202        env::remove_var("REDIS_URL");
203        env::remove_var("REDIS_HOST");
204        env::remove_var("REDIS_PORT");
205        env::remove_var("REDIS_PASSWORD");
206        env::remove_var("REDIS_DATABASE");
207
208        let config = QueueConfig::from_env();
209        assert_eq!(config.default_queue, "default");
210        assert_eq!(config.prefix, "ferro_queue");
211        assert_eq!(config.redis_url, "redis://127.0.0.1:6379/0");
212        assert_eq!(config.max_concurrent_jobs, 10);
213    }
214
215    #[test]
216    #[serial]
217    fn test_from_env_with_redis_url() {
218        env::set_var("REDIS_URL", "redis://custom:6380/5");
219        let config = QueueConfig::from_env();
220        assert_eq!(config.redis_url, "redis://custom:6380/5");
221        env::remove_var("REDIS_URL");
222    }
223
224    #[test]
225    #[serial]
226    fn test_build_redis_url_with_password() {
227        env::remove_var("REDIS_URL");
228        env::set_var("REDIS_HOST", "redis.example.com");
229        env::set_var("REDIS_PORT", "6380");
230        env::set_var("REDIS_PASSWORD", "secret123");
231        env::set_var("REDIS_DATABASE", "3");
232
233        let url = QueueConfig::build_redis_url();
234        assert_eq!(url, "redis://:secret123@redis.example.com:6380/3");
235
236        env::remove_var("REDIS_HOST");
237        env::remove_var("REDIS_PORT");
238        env::remove_var("REDIS_PASSWORD");
239        env::remove_var("REDIS_DATABASE");
240    }
241
242    #[test]
243    #[serial]
244    fn test_is_sync_mode() {
245        env::remove_var("QUEUE_CONNECTION");
246        assert!(QueueConfig::is_sync_mode()); // default is sync
247
248        env::set_var("QUEUE_CONNECTION", "sync");
249        assert!(QueueConfig::is_sync_mode());
250
251        env::set_var("QUEUE_CONNECTION", "redis");
252        assert!(!QueueConfig::is_sync_mode());
253
254        env::set_var("QUEUE_CONNECTION", "SYNC");
255        assert!(QueueConfig::is_sync_mode()); // case insensitive
256
257        env::remove_var("QUEUE_CONNECTION");
258    }
259}