Skip to main content

ferro_queue/
config.rs

1//! Queue configuration.
2
3use std::env;
4use std::time::Duration;
5
6/// Queue system configuration.
7#[derive(Debug, Clone)]
8pub struct QueueConfig {
9    /// Default queue name.
10    pub default_queue: String,
11    /// Maximum number of concurrent jobs per worker.
12    pub max_concurrent_jobs: usize,
13    /// How long to sleep between poll cycles when no jobs are available.
14    pub sleep_duration: Duration,
15    /// How long a claimed job stays invisible before being reclaimed by the reaper.
16    pub visibility_timeout: Duration,
17}
18
19impl Default for QueueConfig {
20    fn default() -> Self {
21        Self {
22            default_queue: "default".to_string(),
23            max_concurrent_jobs: 10,
24            sleep_duration: Duration::from_millis(500),
25            visibility_timeout: Duration::from_secs(300),
26        }
27    }
28}
29
30impl QueueConfig {
31    /// Create configuration from environment variables.
32    ///
33    /// Reads the following environment variables:
34    /// - `QUEUE_CONNECTION`: "sync" disables the DB worker loop (defaults to "sync")
35    /// - `QUEUE_DEFAULT`: Default queue name (defaults to "default")
36    /// - `QUEUE_MAX_CONCURRENT`: Max concurrent jobs per worker (defaults to 10)
37    /// - `QUEUE_VISIBILITY_TIMEOUT_SECS`: Seconds before claimed jobs are reclaimed (defaults to 300)
38    ///
39    /// # Example
40    ///
41    /// ```rust,ignore
42    /// use ferro_queue::QueueConfig;
43    ///
44    /// // In bootstrap.rs
45    /// let config = QueueConfig::from_env();
46    /// ```
47    pub fn from_env() -> Self {
48        Self {
49            default_queue: env::var("QUEUE_DEFAULT").unwrap_or_else(|_| "default".to_string()),
50            max_concurrent_jobs: env::var("QUEUE_MAX_CONCURRENT")
51                .ok()
52                .and_then(|v| v.parse().ok())
53                .unwrap_or(10),
54            sleep_duration: Duration::from_millis(500),
55            visibility_timeout: Duration::from_secs(
56                env::var("QUEUE_VISIBILITY_TIMEOUT_SECS")
57                    .ok()
58                    .and_then(|v| v.parse().ok())
59                    .unwrap_or(300),
60            ),
61        }
62    }
63
64    /// Check if sync queue mode is configured.
65    ///
66    /// When `QUEUE_CONNECTION=sync`, jobs are processed immediately in the
67    /// calling process instead of being written to the database.
68    pub fn is_sync_mode() -> bool {
69        env::var("QUEUE_CONNECTION")
70            .map(|v| v.to_lowercase() == "sync")
71            .unwrap_or(true) // Default to sync for development
72    }
73
74    /// Set the default queue name.
75    pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
76        self.default_queue = queue.into();
77        self
78    }
79
80    /// Set max concurrent jobs.
81    pub fn max_concurrent_jobs(mut self, count: usize) -> Self {
82        self.max_concurrent_jobs = count;
83        self
84    }
85
86    /// Set the sleep duration between poll cycles.
87    pub fn with_sleep_duration(mut self, d: Duration) -> Self {
88        self.sleep_duration = d;
89        self
90    }
91
92    /// Set the visibility timeout for claimed jobs.
93    pub fn with_visibility_timeout(mut self, d: Duration) -> Self {
94        self.visibility_timeout = d;
95        self
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use serial_test::serial;
103
104    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
105    struct EnvGuard {
106        vars: Vec<String>,
107    }
108
109    impl EnvGuard {
110        fn new() -> Self {
111            Self { vars: Vec::new() }
112        }
113
114        fn also_set(&mut self, key: &str, value: &str) {
115            env::set_var(key, value);
116            self.vars.push(key.to_string());
117        }
118
119        fn also_remove(&mut self, key: &str) {
120            env::remove_var(key);
121            self.vars.push(key.to_string());
122        }
123    }
124
125    impl Drop for EnvGuard {
126        fn drop(&mut self) {
127            for var in &self.vars {
128                env::remove_var(var);
129            }
130        }
131    }
132
133    #[test]
134    fn test_default_config() {
135        let config = QueueConfig::default();
136        assert_eq!(config.default_queue, "default");
137        assert_eq!(config.max_concurrent_jobs, 10);
138        assert_eq!(config.sleep_duration, Duration::from_millis(500));
139        assert_eq!(config.visibility_timeout, Duration::from_secs(300));
140    }
141
142    #[test]
143    fn test_builder_pattern() {
144        let config = QueueConfig::default()
145            .default_queue("high-priority")
146            .max_concurrent_jobs(5)
147            .with_visibility_timeout(Duration::from_secs(60));
148
149        assert_eq!(config.default_queue, "high-priority");
150        assert_eq!(config.max_concurrent_jobs, 5);
151        assert_eq!(config.visibility_timeout, Duration::from_secs(60));
152    }
153
154    #[test]
155    fn visibility_timeout_default_is_five_minutes() {
156        let config = QueueConfig::default();
157        assert_eq!(config.visibility_timeout, Duration::from_secs(300));
158    }
159
160    #[test]
161    #[serial]
162    fn test_from_env_defaults() {
163        let mut guard = EnvGuard::new();
164        guard.also_remove("QUEUE_DEFAULT");
165        guard.also_remove("QUEUE_MAX_CONCURRENT");
166        guard.also_remove("QUEUE_VISIBILITY_TIMEOUT_SECS");
167
168        let config = QueueConfig::from_env();
169        assert_eq!(config.default_queue, "default");
170        assert_eq!(config.max_concurrent_jobs, 10);
171        assert_eq!(config.visibility_timeout, Duration::from_secs(300));
172    }
173
174    #[test]
175    #[serial]
176    fn test_from_env_visibility_timeout() {
177        let mut guard = EnvGuard::new();
178        guard.also_set("QUEUE_VISIBILITY_TIMEOUT_SECS", "120");
179
180        let config = QueueConfig::from_env();
181        assert_eq!(config.visibility_timeout, Duration::from_secs(120));
182    }
183
184    #[test]
185    #[serial]
186    fn test_is_sync_mode() {
187        let mut guard = EnvGuard::new();
188        guard.also_remove("QUEUE_CONNECTION");
189        assert!(QueueConfig::is_sync_mode()); // default is sync
190
191        env::set_var("QUEUE_CONNECTION", "sync");
192        assert!(QueueConfig::is_sync_mode());
193
194        env::set_var("QUEUE_CONNECTION", "db");
195        assert!(!QueueConfig::is_sync_mode());
196
197        env::set_var("QUEUE_CONNECTION", "SYNC");
198        assert!(QueueConfig::is_sync_mode()); // case insensitive
199    }
200}