1use std::env;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
8pub struct QueueConfig {
9 pub redis_url: String,
11 pub default_queue: String,
13 pub prefix: String,
15 pub block_timeout: Duration,
17 pub max_concurrent_jobs: usize,
19 pub delayed_job_poll_interval: Duration,
21}
22
23impl Default for QueueConfig {
24 fn default() -> Self {
25 Self {
26 redis_url: "redis://127.0.0.1:6379".to_string(),
27 default_queue: "default".to_string(),
28 prefix: "cancer_queue".to_string(),
29 block_timeout: Duration::from_secs(5),
30 max_concurrent_jobs: 10,
31 delayed_job_poll_interval: Duration::from_secs(1),
32 }
33 }
34}
35
36impl QueueConfig {
37 pub fn new(redis_url: impl Into<String>) -> Self {
39 Self {
40 redis_url: redis_url.into(),
41 ..Default::default()
42 }
43 }
44
45 pub fn from_env() -> Self {
69 let redis_url = Self::build_redis_url();
70
71 Self {
72 redis_url,
73 default_queue: env::var("QUEUE_DEFAULT").unwrap_or_else(|_| "default".to_string()),
74 prefix: env::var("QUEUE_PREFIX").unwrap_or_else(|_| "cancer_queue".to_string()),
75 block_timeout: Duration::from_secs(
76 env::var("QUEUE_BLOCK_TIMEOUT")
77 .ok()
78 .and_then(|v| v.parse().ok())
79 .unwrap_or(5),
80 ),
81 max_concurrent_jobs: env::var("QUEUE_MAX_CONCURRENT")
82 .ok()
83 .and_then(|v| v.parse().ok())
84 .unwrap_or(10),
85 delayed_job_poll_interval: Duration::from_secs(1),
86 }
87 }
88
89 fn build_redis_url() -> String {
91 if let Ok(url) = env::var("REDIS_URL") {
93 return url;
94 }
95
96 let host = env::var("REDIS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
97 let port = env::var("REDIS_PORT").unwrap_or_else(|_| "6379".to_string());
98 let password = env::var("REDIS_PASSWORD").ok().filter(|p| !p.is_empty());
99 let database = env::var("REDIS_DATABASE").unwrap_or_else(|_| "0".to_string());
100
101 match password {
102 Some(pass) => format!("redis://:{}@{}:{}/{}", pass, host, port, database),
103 None => format!("redis://{}:{}/{}", host, port, database),
104 }
105 }
106
107 pub fn is_sync_mode() -> bool {
112 env::var("QUEUE_CONNECTION")
113 .map(|v| v.to_lowercase() == "sync")
114 .unwrap_or(true) }
116
117 pub fn default_queue(mut self, queue: impl Into<String>) -> Self {
119 self.default_queue = queue.into();
120 self
121 }
122
123 pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
125 self.prefix = prefix.into();
126 self
127 }
128
129 pub fn block_timeout(mut self, timeout: Duration) -> Self {
131 self.block_timeout = timeout;
132 self
133 }
134
135 pub fn max_concurrent_jobs(mut self, count: usize) -> Self {
137 self.max_concurrent_jobs = count;
138 self
139 }
140
141 pub fn queue_key(&self, queue: &str) -> String {
143 format!("{}:{}", self.prefix, queue)
144 }
145
146 pub fn delayed_key(&self, queue: &str) -> String {
148 format!("{}:{}:delayed", self.prefix, queue)
149 }
150
151 pub fn reserved_key(&self, queue: &str) -> String {
153 format!("{}:{}:reserved", self.prefix, queue)
154 }
155
156 pub fn failed_key(&self) -> String {
158 format!("{}:failed", self.prefix)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use serial_test::serial;
166
167 #[test]
168 fn test_default_config() {
169 let config = QueueConfig::default();
170 assert_eq!(config.default_queue, "default");
171 assert_eq!(config.prefix, "cancer_queue");
172 }
173
174 #[test]
175 fn test_queue_key() {
176 let config = QueueConfig::default();
177 assert_eq!(config.queue_key("emails"), "cancer_queue:emails");
178 assert_eq!(config.delayed_key("emails"), "cancer_queue:emails:delayed");
179 }
180
181 #[test]
182 fn test_builder_pattern() {
183 let config = QueueConfig::new("redis://localhost:6380")
184 .default_queue("high-priority")
185 .prefix("myapp")
186 .max_concurrent_jobs(5);
187
188 assert_eq!(config.redis_url, "redis://localhost:6380");
189 assert_eq!(config.default_queue, "high-priority");
190 assert_eq!(config.prefix, "myapp");
191 assert_eq!(config.max_concurrent_jobs, 5);
192 }
193
194 #[test]
195 #[serial]
196 fn test_from_env_defaults() {
197 env::remove_var("QUEUE_DEFAULT");
199 env::remove_var("QUEUE_PREFIX");
200 env::remove_var("QUEUE_BLOCK_TIMEOUT");
201 env::remove_var("QUEUE_MAX_CONCURRENT");
202 env::remove_var("REDIS_URL");
203 env::remove_var("REDIS_HOST");
204 env::remove_var("REDIS_PORT");
205 env::remove_var("REDIS_PASSWORD");
206 env::remove_var("REDIS_DATABASE");
207
208 let config = QueueConfig::from_env();
209 assert_eq!(config.default_queue, "default");
210 assert_eq!(config.prefix, "cancer_queue");
211 assert_eq!(config.redis_url, "redis://127.0.0.1:6379/0");
212 assert_eq!(config.max_concurrent_jobs, 10);
213 }
214
215 #[test]
216 #[serial]
217 fn test_from_env_with_redis_url() {
218 env::set_var("REDIS_URL", "redis://custom:6380/5");
219 let config = QueueConfig::from_env();
220 assert_eq!(config.redis_url, "redis://custom:6380/5");
221 env::remove_var("REDIS_URL");
222 }
223
224 #[test]
225 #[serial]
226 fn test_build_redis_url_with_password() {
227 env::remove_var("REDIS_URL");
228 env::set_var("REDIS_HOST", "redis.example.com");
229 env::set_var("REDIS_PORT", "6380");
230 env::set_var("REDIS_PASSWORD", "secret123");
231 env::set_var("REDIS_DATABASE", "3");
232
233 let url = QueueConfig::build_redis_url();
234 assert_eq!(url, "redis://:secret123@redis.example.com:6380/3");
235
236 env::remove_var("REDIS_HOST");
237 env::remove_var("REDIS_PORT");
238 env::remove_var("REDIS_PASSWORD");
239 env::remove_var("REDIS_DATABASE");
240 }
241
242 #[test]
243 #[serial]
244 fn test_is_sync_mode() {
245 env::remove_var("QUEUE_CONNECTION");
246 assert!(QueueConfig::is_sync_mode()); env::set_var("QUEUE_CONNECTION", "sync");
249 assert!(QueueConfig::is_sync_mode());
250
251 env::set_var("QUEUE_CONNECTION", "redis");
252 assert!(!QueueConfig::is_sync_mode());
253
254 env::set_var("QUEUE_CONNECTION", "SYNC");
255 assert!(QueueConfig::is_sync_mode()); env::remove_var("QUEUE_CONNECTION");
258 }
259}