1use std::env;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
8pub struct QueueConfig {
9 pub redis_url: String,
11 pub default_queue: String,
13 pub prefix: String,
15 pub block_timeout: Duration,
17 pub max_concurrent_jobs: usize,
19 pub delayed_job_poll_interval: Duration,
21}
22
23impl Default for QueueConfig {
24 fn default() -> Self {
25 Self {
26 redis_url: "redis://127.0.0.1:6379".to_string(),
27 default_queue: "default".to_string(),
28 prefix: "ferro_queue".to_string(),
29 block_timeout: Duration::from_secs(5),
30 max_concurrent_jobs: 10,
31 delayed_job_poll_interval: Duration::from_secs(1),
32 }
33 }
34}
35
36impl QueueConfig {
37 pub fn new(redis_url: impl Into<String>) -> Self {
39 Self {
40 redis_url: redis_url.into(),
41 ..Default::default()
42 }
43 }
44
45 pub fn from_env() -> Self {
69 let redis_url = Self::build_redis_url();
70
71 Self {
72 redis_url,
73 default_queue: env::var("QUEUE_DEFAULT").unwrap_or_else(|_| "default".to_string()),
74 prefix: env::var("QUEUE_PREFIX").unwrap_or_else(|_| "ferro_queue".to_string()),
75 block_timeout: Duration::from_secs(
76 env::var("QUEUE_BLOCK_TIMEOUT")
77 .ok()
78 .and_then(|v| v.parse().ok())
79 .unwrap_or(5),
80 ),
81 max_concurrent_jobs: env::var("QUEUE_MAX_CONCURRENT")
82 .ok()
83 .and_then(|v| v.parse().ok())
84 .unwrap_or(10),
85 delayed_job_poll_interval: Duration::from_secs(1),
86 }
87 }
88
89 fn build_redis_url() -> String {
91 if let Ok(url) = env::var("REDIS_URL") {
93 return url;
94 }
95
96 let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
97 let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string());
98 let password = env::var("REDIS_PASSWORD").ok().filter(|p| !p.is_empty());
99 let database = env::var("REDIS_DATABASE").unwrap_or_else(|_| "0".to_string());
100
101 match password {
102 Some(pass) => format!("redis://:{pass}@{host}:{port}/{database}"),
103 None => format!("redis://{host}:{port}/{database}"),
104 }
105 }
106
107 pub fn is_sync_mode() -> bool {
112 env::var("QUEUE_CONNECTION")
113 .map(|v| v.to_lowercase() == "sync")
114 .unwrap_or(true) }
116
117 pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
119 self.default_queue = queue.into();
120 self
121 }
122
123 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
125 self.prefix = prefix.into();
126 self
127 }
128
129 pub fn block_timeout(mut self, timeout: Duration) -> Self {
131 self.block_timeout = timeout;
132 self
133 }
134
135 pub fn max_concurrent_jobs(mut self, count: usize) -> Self {
137 self.max_concurrent_jobs = count;
138 self
139 }
140
141 pub fn queue_key(&self, queue: &str) -> String {
143 format!("{}:{}", self.prefix, queue)
144 }
145
146 pub fn delayed_key(&self, queue: &str) -> String {
148 format!("{}:{}:delayed", self.prefix, queue)
149 }
150
151 pub fn reserved_key(&self, queue: &str) -> String {
153 format!("{}:{}:reserved", self.prefix, queue)
154 }
155
156 pub fn failed_key(&self) -> String {
158 format!("{}:failed", self.prefix)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use serial_test::serial;
166
167 struct EnvGuard {
169 vars: Vec<String>,
170 }
171
172 impl EnvGuard {
173 fn set(key: &str, value: &str) -> Self {
174 env::set_var(key, value);
175 Self {
176 vars: vec![key.to_string()],
177 }
178 }
179
180 fn also_set(&mut self, key: &str, value: &str) {
181 env::set_var(key, value);
182 self.vars.push(key.to_string());
183 }
184
185 fn also_remove(&mut self, key: &str) {
186 env::remove_var(key);
187 self.vars.push(key.to_string());
188 }
189 }
190
191 impl Drop for EnvGuard {
192 fn drop(&mut self) {
193 for var in &self.vars {
194 env::remove_var(var);
195 }
196 }
197 }
198
199 #[test]
200 fn test_default_config() {
201 let config = QueueConfig::default();
202 assert_eq!(config.default_queue, "default");
203 assert_eq!(config.prefix, "ferro_queue");
204 }
205
206 #[test]
207 fn test_queue_key() {
208 let config = QueueConfig::default();
209 assert_eq!(config.queue_key("emails"), "ferro_queue:emails");
210 assert_eq!(config.delayed_key("emails"), "ferro_queue:emails:delayed");
211 }
212
213 #[test]
214 fn test_builder_pattern() {
215 let config = QueueConfig::new("redis://localhost:6380")
216 .default_queue("high-priority")
217 .prefix("myapp")
218 .max_concurrent_jobs(5);
219
220 assert_eq!(config.redis_url, "redis://localhost:6380");
221 assert_eq!(config.default_queue, "high-priority");
222 assert_eq!(config.prefix, "myapp");
223 assert_eq!(config.max_concurrent_jobs, 5);
224 }
225
226 #[test]
227 #[serial]
228 fn test_from_env_defaults() {
229 let mut guard = EnvGuard { vars: Vec::new() };
230 guard.also_remove("QUEUE_DEFAULT");
232 guard.also_remove("QUEUE_PREFIX");
233 guard.also_remove("QUEUE_BLOCK_TIMEOUT");
234 guard.also_remove("QUEUE_MAX_CONCURRENT");
235 guard.also_remove("REDIS_URL");
236 guard.also_remove("REDIS_HOST");
237 guard.also_remove("REDIS_PORT");
238 guard.also_remove("REDIS_PASSWORD");
239 guard.also_remove("REDIS_DATABASE");
240
241 let config = QueueConfig::from_env();
242 assert_eq!(config.default_queue, "default");
243 assert_eq!(config.prefix, "ferro_queue");
244 assert_eq!(config.redis_url, "redis://127.0.0.1:6379/0");
245 assert_eq!(config.max_concurrent_jobs, 10);
246 }
247
248 #[test]
249 #[serial]
250 fn test_from_env_with_redis_url() {
251 let _guard = EnvGuard::set("REDIS_URL", "redis://custom:6380/5");
252 let config = QueueConfig::from_env();
253 assert_eq!(config.redis_url, "redis://custom:6380/5");
254 }
255
256 #[test]
257 #[serial]
258 fn test_build_redis_url_with_password() {
259 let mut guard = EnvGuard { vars: Vec::new() };
260 guard.also_remove("REDIS_URL");
261 guard.also_set("REDIS_HOST", "redis.example.com");
262 guard.also_set("REDIS_PORT", "6380");
263 guard.also_set("REDIS_PASSWORD", "secret123");
264 guard.also_set("REDIS_DATABASE", "3");
265
266 let url = QueueConfig::build_redis_url();
267 assert_eq!(url, "redis://:secret123@redis.example.com:6380/3");
268 }
269
270 #[test]
271 #[serial]
272 fn test_is_sync_mode() {
273 let mut guard = EnvGuard { vars: Vec::new() };
274 guard.also_remove("QUEUE_CONNECTION");
275 assert!(QueueConfig::is_sync_mode()); guard.also_set("QUEUE_CONNECTION", "sync");
278 assert!(QueueConfig::is_sync_mode());
279
280 env::set_var("QUEUE_CONNECTION", "redis");
281 assert!(!QueueConfig::is_sync_mode());
282
283 env::set_var("QUEUE_CONNECTION", "SYNC");
284 assert!(QueueConfig::is_sync_mode()); }
286}