1#![allow(dead_code)]
2use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub enum QueuePriority {
19 Low = 0,
21 Normal = 1,
23 High = 2,
25 Critical = 3,
27}
28
29impl QueuePriority {
30 pub fn label(self) -> &'static str {
32 match self {
33 Self::Low => "low",
34 Self::Normal => "normal",
35 Self::High => "high",
36 Self::Critical => "critical",
37 }
38 }
39
40 pub fn at_least(self, other: Self) -> bool {
42 self >= other
43 }
44}
45
46impl std::fmt::Display for QueuePriority {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.write_str(self.label())
49 }
50}
51
52#[derive(Debug, Clone, PartialEq)]
58pub struct CloudTask {
59 pub id: u64,
61 pub priority: QueuePriority,
63 pub payload: Vec<u8>,
65 pub submitted_at: Instant,
67 pub ttl: Option<Duration>,
69}
70
71impl CloudTask {
72 pub fn new(id: u64, priority: QueuePriority, payload: Vec<u8>) -> Self {
74 Self {
75 id,
76 priority,
77 payload,
78 submitted_at: Instant::now(),
79 ttl: None,
80 }
81 }
82
83 pub fn with_ttl(mut self, ttl: Duration) -> Self {
85 self.ttl = Some(ttl);
86 self
87 }
88
89 pub fn is_expired(&self) -> bool {
91 match self.ttl {
92 Some(ttl) => self.submitted_at.elapsed() >= ttl,
93 None => false,
94 }
95 }
96
97 pub fn age(&self) -> Duration {
99 self.submitted_at.elapsed()
100 }
101}
102
103#[derive(Debug, PartialEq, Eq)]
109pub enum TaskQueueError {
110 QueueFull,
112 QueueEmpty,
114 InvalidBatchSize,
116}
117
118impl std::fmt::Display for TaskQueueError {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 match self {
121 Self::QueueFull => write!(f, "task queue is full"),
122 Self::QueueEmpty => write!(f, "task queue is empty"),
123 Self::InvalidBatchSize => write!(f, "batch size must be greater than zero"),
124 }
125 }
126}
127
128#[derive(Debug)]
134pub struct CloudTaskQueue {
135 lanes: [VecDeque<CloudTask>; 4],
137 capacity: usize,
139 next_id: u64,
141}
142
143impl CloudTaskQueue {
144 pub fn new(capacity: usize) -> Self {
146 Self {
147 lanes: [
148 VecDeque::new(),
149 VecDeque::new(),
150 VecDeque::new(),
151 VecDeque::new(),
152 ],
153 capacity,
154 next_id: 1,
155 }
156 }
157
158 fn lane(priority: QueuePriority) -> usize {
160 priority as usize
161 }
162
163 pub fn len(&self) -> usize {
165 self.lanes.iter().map(|l| l.len()).sum()
166 }
167
168 pub fn is_empty(&self) -> bool {
170 self.len() == 0
171 }
172
173 pub fn enqueue(
178 &mut self,
179 priority: QueuePriority,
180 payload: Vec<u8>,
181 ) -> Result<u64, TaskQueueError> {
182 self.purge_expired();
183 if self.len() >= self.capacity {
184 return Err(TaskQueueError::QueueFull);
185 }
186 let id = self.next_id;
187 self.next_id += 1;
188 let task = CloudTask::new(id, priority, payload);
189 self.lanes[Self::lane(priority)].push_back(task);
190 Ok(id)
191 }
192
193 pub fn enqueue_task(&mut self, task: CloudTask) -> Result<(), TaskQueueError> {
195 self.purge_expired();
196 if self.len() >= self.capacity {
197 return Err(TaskQueueError::QueueFull);
198 }
199 let lane = Self::lane(task.priority);
200 self.lanes[lane].push_back(task);
201 Ok(())
202 }
203
204 pub fn dequeue(&mut self) -> Result<CloudTask, TaskQueueError> {
206 self.purge_expired();
207 for lane in (0..4).rev() {
209 if let Some(task) = self.lanes[lane].pop_front() {
210 return Ok(task);
211 }
212 }
213 Err(TaskQueueError::QueueEmpty)
214 }
215
216 pub fn dequeue_batch(&mut self, max_count: usize) -> Result<Vec<CloudTask>, TaskQueueError> {
221 if max_count == 0 {
222 return Err(TaskQueueError::InvalidBatchSize);
223 }
224 self.purge_expired();
225 let mut batch = Vec::with_capacity(max_count);
226 'outer: for lane in (0..4).rev() {
227 while let Some(task) = self.lanes[lane].pop_front() {
228 batch.push(task);
229 if batch.len() >= max_count {
230 break 'outer;
231 }
232 }
233 }
234 Ok(batch)
235 }
236
237 pub fn lane_depth(&self, priority: QueuePriority) -> usize {
239 self.lanes[Self::lane(priority)].len()
240 }
241
242 fn purge_expired(&mut self) {
244 for lane in &mut self.lanes {
245 lane.retain(|t| !t.is_expired());
246 }
247 }
248
249 pub fn clear(&mut self) {
251 for lane in &mut self.lanes {
252 lane.clear();
253 }
254 }
255}
256
257#[cfg(test)]
262mod tests {
263 use super::*;
264
265 fn make_queue(cap: usize) -> CloudTaskQueue {
266 CloudTaskQueue::new(cap)
267 }
268
269 #[test]
270 fn test_priority_ordering() {
271 assert!(QueuePriority::Critical > QueuePriority::High);
272 assert!(QueuePriority::High > QueuePriority::Normal);
273 assert!(QueuePriority::Normal > QueuePriority::Low);
274 }
275
276 #[test]
277 fn test_priority_at_least() {
278 assert!(QueuePriority::Critical.at_least(QueuePriority::High));
279 assert!(QueuePriority::Normal.at_least(QueuePriority::Normal));
280 assert!(!QueuePriority::Low.at_least(QueuePriority::Normal));
281 }
282
283 #[test]
284 fn test_priority_label() {
285 assert_eq!(QueuePriority::Low.label(), "low");
286 assert_eq!(QueuePriority::Critical.label(), "critical");
287 }
288
289 #[test]
290 fn test_priority_display() {
291 assert_eq!(QueuePriority::Normal.to_string(), "normal");
292 }
293
294 #[test]
295 fn test_enqueue_returns_incrementing_ids() {
296 let mut q = make_queue(10);
297 let id1 = q
298 .enqueue(QueuePriority::Normal, b"a".to_vec())
299 .expect("id1 should be valid");
300 let id2 = q
301 .enqueue(QueuePriority::Normal, b"b".to_vec())
302 .expect("id2 should be valid");
303 assert_eq!(id1, 1);
304 assert_eq!(id2, 2);
305 }
306
307 #[test]
308 fn test_queue_empty_initially() {
309 let q = make_queue(10);
310 assert!(q.is_empty());
311 assert_eq!(q.len(), 0);
312 }
313
314 #[test]
315 fn test_len_increases_on_enqueue() {
316 let mut q = make_queue(10);
317 q.enqueue(QueuePriority::Low, b"x".to_vec())
318 .expect("test expectation failed");
319 q.enqueue(QueuePriority::High, b"y".to_vec())
320 .expect("test expectation failed");
321 assert_eq!(q.len(), 2);
322 }
323
324 #[test]
325 fn test_dequeue_empty_returns_error() {
326 let mut q = make_queue(10);
327 assert_eq!(q.dequeue(), Err(TaskQueueError::QueueEmpty));
328 }
329
330 #[test]
331 fn test_dequeue_respects_priority() {
332 let mut q = make_queue(10);
333 q.enqueue(QueuePriority::Low, b"low".to_vec())
334 .expect("test expectation failed");
335 q.enqueue(QueuePriority::Critical, b"crit".to_vec())
336 .expect("test expectation failed");
337 q.enqueue(QueuePriority::Normal, b"norm".to_vec())
338 .expect("test expectation failed");
339 let first = q.dequeue().expect("first should be valid");
340 assert_eq!(first.payload, b"crit");
341 let second = q.dequeue().expect("second should be valid");
342 assert_eq!(second.payload, b"norm");
343 }
344
345 #[test]
346 fn test_dequeue_batch_returns_correct_count() {
347 let mut q = make_queue(20);
348 for _ in 0..5 {
349 q.enqueue(QueuePriority::Normal, b"item".to_vec())
350 .expect("test expectation failed");
351 }
352 let batch = q.dequeue_batch(3).expect("batch should be valid");
353 assert_eq!(batch.len(), 3);
354 assert_eq!(q.len(), 2);
355 }
356
357 #[test]
358 fn test_dequeue_batch_zero_size_error() {
359 let mut q = make_queue(10);
360 assert_eq!(q.dequeue_batch(0), Err(TaskQueueError::InvalidBatchSize));
361 }
362
363 #[test]
364 fn test_dequeue_batch_partial_fill() {
365 let mut q = make_queue(10);
366 q.enqueue(QueuePriority::Normal, b"only".to_vec())
367 .expect("test expectation failed");
368 let batch = q.dequeue_batch(5).expect("batch should be valid");
369 assert_eq!(batch.len(), 1);
370 }
371
372 #[test]
373 fn test_queue_full_error() {
374 let mut q = make_queue(2);
375 q.enqueue(QueuePriority::Normal, b"a".to_vec())
376 .expect("test expectation failed");
377 q.enqueue(QueuePriority::Normal, b"b".to_vec())
378 .expect("test expectation failed");
379 assert_eq!(
380 q.enqueue(QueuePriority::Normal, b"c".to_vec()),
381 Err(TaskQueueError::QueueFull)
382 );
383 }
384
385 #[test]
386 fn test_lane_depth() {
387 let mut q = make_queue(20);
388 q.enqueue(QueuePriority::High, b"h1".to_vec())
389 .expect("test expectation failed");
390 q.enqueue(QueuePriority::High, b"h2".to_vec())
391 .expect("test expectation failed");
392 q.enqueue(QueuePriority::Low, b"l1".to_vec())
393 .expect("test expectation failed");
394 assert_eq!(q.lane_depth(QueuePriority::High), 2);
395 assert_eq!(q.lane_depth(QueuePriority::Low), 1);
396 assert_eq!(q.lane_depth(QueuePriority::Normal), 0);
397 }
398
399 #[test]
400 fn test_clear_empties_all_lanes() {
401 let mut q = make_queue(20);
402 q.enqueue(QueuePriority::Low, b"a".to_vec())
403 .expect("test expectation failed");
404 q.enqueue(QueuePriority::Critical, b"b".to_vec())
405 .expect("test expectation failed");
406 q.clear();
407 assert!(q.is_empty());
408 }
409
410 #[test]
411 fn test_task_not_expired_by_default() {
412 let task = CloudTask::new(1, QueuePriority::Normal, b"data".to_vec());
413 assert!(!task.is_expired());
414 }
415
416 #[test]
417 fn test_task_with_long_ttl_not_expired() {
418 let task = CloudTask::new(1, QueuePriority::Normal, b"data".to_vec())
419 .with_ttl(Duration::from_secs(3600));
420 assert!(!task.is_expired());
421 }
422
423 #[test]
424 fn test_enqueue_task_direct() {
425 let mut q = make_queue(10);
426 let task = CloudTask::new(99, QueuePriority::High, b"direct".to_vec());
427 q.enqueue_task(task).expect("enqueue_task should succeed");
428 assert_eq!(q.len(), 1);
429 assert_eq!(q.lane_depth(QueuePriority::High), 1);
430 }
431
432 #[test]
433 fn test_error_display() {
434 assert_eq!(TaskQueueError::QueueFull.to_string(), "task queue is full");
435 assert_eq!(
436 TaskQueueError::QueueEmpty.to_string(),
437 "task queue is empty"
438 );
439 assert_eq!(
440 TaskQueueError::InvalidBatchSize.to_string(),
441 "batch size must be greater than zero"
442 );
443 }
444}