rust_task_queue/
queue.rs

1use dashmap::DashMap;
2
3/// Standard queue names used throughout the task queue system
4pub mod queue_names {
5    pub const DEFAULT: &str = "default";
6    pub const HIGH_PRIORITY: &str = "high_priority";
7    pub const LOW_PRIORITY: &str = "low_priority";
8}
9
10#[derive(Debug, Clone)]
11pub struct QueueConfig {
12    pub name: String,
13    pub priority: i32,
14    pub max_retries: u32,
15    pub timeout_seconds: u64,
16    pub rate_limit: Option<u32>, // Tasks per second
17}
18
19impl Default for QueueConfig {
20    fn default() -> Self {
21        Self {
22            name: queue_names::DEFAULT.to_string(),
23            priority: 0,
24            max_retries: 3,
25            timeout_seconds: 300,
26            rate_limit: None,
27        }
28    }
29}
30
31pub struct QueueManager {
32    queues: DashMap<String, QueueConfig>,
33}
34
35impl Default for QueueManager {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl QueueManager {
42    pub fn new() -> Self {
43        let manager = Self {
44            queues: DashMap::new(),
45        };
46
47        // Add default queues using constants
48        manager.add_queue(QueueConfig::default());
49        manager.add_queue(QueueConfig {
50            name: queue_names::HIGH_PRIORITY.to_string(),
51            priority: 10,
52            max_retries: 5,
53            timeout_seconds: 600,
54            rate_limit: None,
55        });
56        manager.add_queue(QueueConfig {
57            name: queue_names::LOW_PRIORITY.to_string(),
58            priority: -10,
59            max_retries: 2,
60            timeout_seconds: 120,
61            rate_limit: Some(10), // 10 tasks per second
62        });
63
64        manager
65    }
66
67    pub fn add_queue(&self, config: QueueConfig) {
68        self.queues.insert(config.name.clone(), config);
69    }
70
71    pub fn get_queue_config(&self, name: &str) -> Option<QueueConfig> {
72        self.queues.get(name).map(|entry| entry.clone())
73    }
74
75    pub fn get_queue_names(&self) -> Vec<String> {
76        self.queues
77            .iter()
78            .map(|entry| entry.key().clone())
79            .collect()
80    }
81
82    pub fn get_queues_by_priority(&self) -> Vec<QueueConfig> {
83        let mut queues: Vec<QueueConfig> = self
84            .queues
85            .iter()
86            .map(|entry| entry.value().clone())
87            .collect();
88
89        queues.sort_by(|a, b| b.priority.cmp(&a.priority));
90        queues
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    #[test]
99    fn test_queue_config_default() {
100        let config = QueueConfig::default();
101        assert_eq!(config.name, queue_names::DEFAULT);
102        assert_eq!(config.priority, 0);
103        assert_eq!(config.max_retries, 3);
104        assert_eq!(config.timeout_seconds, 300);
105        assert_eq!(config.rate_limit, None);
106    }
107
108    #[test]
109    fn test_queue_config_creation() {
110        let config = QueueConfig {
111            name: "test_queue".to_string(),
112            priority: 5,
113            max_retries: 2,
114            timeout_seconds: 600,
115            rate_limit: Some(100),
116        };
117
118        assert_eq!(config.name, "test_queue");
119        assert_eq!(config.priority, 5);
120        assert_eq!(config.max_retries, 2);
121        assert_eq!(config.timeout_seconds, 600);
122        assert_eq!(config.rate_limit, Some(100));
123    }
124
125    #[test]
126    fn test_queue_manager_creation() {
127        let manager = QueueManager::new();
128        let queue_names = manager.get_queue_names();
129
130        assert_eq!(queue_names.len(), 3);
131        assert!(queue_names.contains(&queue_names::DEFAULT.to_string()));
132        assert!(queue_names.contains(&queue_names::HIGH_PRIORITY.to_string()));
133        assert!(queue_names.contains(&queue_names::LOW_PRIORITY.to_string()));
134    }
135
136    #[test]
137    fn test_queue_manager_default() {
138        let manager = QueueManager::default();
139        let queue_names = manager.get_queue_names();
140        assert_eq!(queue_names.len(), 3);
141    }
142
143    #[test]
144    fn test_add_custom_queue() {
145        let manager = QueueManager::new();
146
147        let custom_config = QueueConfig {
148            name: "custom".to_string(),
149            priority: 100,
150            max_retries: 1,
151            timeout_seconds: 30,
152            rate_limit: Some(50),
153        };
154
155        manager.add_queue(custom_config.clone());
156
157        let retrieved = manager.get_queue_config("custom").unwrap();
158        assert_eq!(retrieved.name, "custom");
159        assert_eq!(retrieved.priority, 100);
160        assert_eq!(retrieved.max_retries, 1);
161        assert_eq!(retrieved.timeout_seconds, 30);
162        assert_eq!(retrieved.rate_limit, Some(50));
163    }
164
165    #[test]
166    fn test_get_queue_config_existing() {
167        let manager = QueueManager::new();
168
169        let default_config = manager.get_queue_config(queue_names::DEFAULT).unwrap();
170        assert_eq!(default_config.name, queue_names::DEFAULT);
171        assert_eq!(default_config.priority, 0);
172
173        let high_config = manager
174            .get_queue_config(queue_names::HIGH_PRIORITY)
175            .unwrap();
176        assert_eq!(high_config.name, queue_names::HIGH_PRIORITY);
177        assert_eq!(high_config.priority, 10);
178
179        let low_config = manager.get_queue_config(queue_names::LOW_PRIORITY).unwrap();
180        assert_eq!(low_config.name, queue_names::LOW_PRIORITY);
181        assert_eq!(low_config.priority, -10);
182    }
183
184    #[test]
185    fn test_get_queue_config_nonexistent() {
186        let manager = QueueManager::new();
187        let result = manager.get_queue_config("nonexistent");
188        assert!(result.is_none());
189    }
190
191    #[test]
192    fn test_get_queues_by_priority() {
193        let manager = QueueManager::new();
194        let queues = manager.get_queues_by_priority();
195
196        assert_eq!(queues.len(), 3);
197
198        // Should be sorted by priority in descending order
199        assert_eq!(queues[0].priority, 10); // high_priority
200        assert_eq!(queues[1].priority, 0); // default
201        assert_eq!(queues[2].priority, -10); // low_priority
202
203        assert_eq!(queues[0].name, queue_names::HIGH_PRIORITY);
204        assert_eq!(queues[1].name, queue_names::DEFAULT);
205        assert_eq!(queues[2].name, queue_names::LOW_PRIORITY);
206    }
207
208    #[test]
209    fn test_queue_priority_sorting() {
210        let manager = QueueManager::new();
211
212        // Add some custom queues with various priorities
213        manager.add_queue(QueueConfig {
214            name: "highest".to_string(),
215            priority: 100,
216            max_retries: 3,
217            timeout_seconds: 300,
218            rate_limit: None,
219        });
220
221        manager.add_queue(QueueConfig {
222            name: "lowest".to_string(),
223            priority: -100,
224            max_retries: 3,
225            timeout_seconds: 300,
226            rate_limit: None,
227        });
228
229        let queues = manager.get_queues_by_priority();
230        assert_eq!(queues.len(), 5);
231
232        // Verify sorting: 100, 10, 0, -10, -100
233        assert_eq!(queues[0].priority, 100);
234        assert_eq!(queues[1].priority, 10);
235        assert_eq!(queues[2].priority, 0);
236        assert_eq!(queues[3].priority, -10);
237        assert_eq!(queues[4].priority, -100);
238    }
239
240    #[test]
241    fn test_queue_config_clone() {
242        let original = QueueConfig {
243            name: "test".to_string(),
244            priority: 5,
245            max_retries: 2,
246            timeout_seconds: 600,
247            rate_limit: Some(100),
248        };
249
250        let cloned = original.clone();
251        assert_eq!(original.name, cloned.name);
252        assert_eq!(original.priority, cloned.priority);
253        assert_eq!(original.max_retries, cloned.max_retries);
254        assert_eq!(original.timeout_seconds, cloned.timeout_seconds);
255        assert_eq!(original.rate_limit, cloned.rate_limit);
256    }
257
258    #[test]
259    fn test_queue_config_debug() {
260        let config = QueueConfig::default();
261        let debug_str = format!("{:?}", config);
262        assert!(debug_str.contains("QueueConfig"));
263        assert!(debug_str.contains(&config.name));
264    }
265
266    #[test]
267    fn test_queue_manager_concurrent_access() {
268        use std::sync::Arc;
269        use std::thread;
270
271        let manager = Arc::new(QueueManager::new());
272        let mut handles = Vec::new();
273
274        // Spawn multiple threads adding queues
275        for i in 0..10 {
276            let manager_clone = Arc::clone(&manager);
277            let handle = thread::spawn(move || {
278                let config = QueueConfig {
279                    name: format!("thread_queue_{}", i),
280                    priority: i,
281                    max_retries: 3,
282                    timeout_seconds: 300,
283                    rate_limit: None,
284                };
285                manager_clone.add_queue(config);
286            });
287            handles.push(handle);
288        }
289
290        // Wait for all threads to complete
291        for handle in handles {
292            handle.join().unwrap();
293        }
294
295        // Verify all queues were added (3 default + 10 thread queues)
296        let queue_names = manager.get_queue_names();
297        assert_eq!(queue_names.len(), 13);
298    }
299
300    #[test]
301    fn test_queue_names_constants() {
302        assert_eq!(queue_names::DEFAULT, "default");
303        assert_eq!(queue_names::HIGH_PRIORITY, "high_priority");
304        assert_eq!(queue_names::LOW_PRIORITY, "low_priority");
305    }
306
307    #[test]
308    fn test_queue_manager_overwrite_existing() {
309        let manager = QueueManager::new();
310
311        // Overwrite the default queue with custom settings
312        let custom_default = QueueConfig {
313            name: queue_names::DEFAULT.to_string(),
314            priority: 999,
315            max_retries: 10,
316            timeout_seconds: 1000,
317            rate_limit: Some(1),
318        };
319
320        manager.add_queue(custom_default);
321
322        let retrieved = manager.get_queue_config(queue_names::DEFAULT).unwrap();
323        assert_eq!(retrieved.priority, 999);
324        assert_eq!(retrieved.max_retries, 10);
325        assert_eq!(retrieved.timeout_seconds, 1000);
326        assert_eq!(retrieved.rate_limit, Some(1));
327    }
328}