Skip to main content

oximedia_cloud/
task_queue.rs

1#![allow(dead_code)]
2//! Priority-based cloud task queue.
3//!
4//! Provides a [`CloudTaskQueue`] that accepts [`CloudTask`] items tagged with a
5//! [`QueuePriority`] and serves them in priority-then-FIFO order.  A
6//! [`dequeue_batch`](CloudTaskQueue::dequeue_batch) helper returns up to *N*
7//! items in a single call, which suits bulk worker dispatch.
8
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12// ─────────────────────────────────────────────────────────────────────────────
13// Priority
14// ─────────────────────────────────────────────────────────────────────────────
15
16/// Scheduling priority for a cloud task.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
18pub enum QueuePriority {
19    /// Lowest priority — run after all others.
20    Low = 0,
21    /// Normal background workload.
22    Normal = 1,
23    /// Elevated priority — ahead of Normal tasks.
24    High = 2,
25    /// Immediately jump to the front of the queue.
26    Critical = 3,
27}
28
29impl QueuePriority {
30    /// Human-readable label for the priority level.
31    pub fn label(self) -> &'static str {
32        match self {
33            Self::Low => "low",
34            Self::Normal => "normal",
35            Self::High => "high",
36            Self::Critical => "critical",
37        }
38    }
39
40    /// Returns `true` if this priority is at least as high as `other`.
41    pub fn at_least(self, other: Self) -> bool {
42        self >= other
43    }
44}
45
46impl std::fmt::Display for QueuePriority {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.write_str(self.label())
49    }
50}
51
52// ─────────────────────────────────────────────────────────────────────────────
53// CloudTask
54// ─────────────────────────────────────────────────────────────────────────────
55
56/// A unit of work submitted to a [`CloudTaskQueue`].
57#[derive(Debug, Clone, PartialEq)]
58pub struct CloudTask {
59    /// Unique task identifier.
60    pub id: u64,
61    /// Scheduling priority.
62    pub priority: QueuePriority,
63    /// Arbitrary task payload (serialised command, job spec, …).
64    pub payload: Vec<u8>,
65    /// Wall-clock time at which the task was submitted.
66    pub submitted_at: Instant,
67    /// Optional time-to-live; the task is discarded after this duration.
68    pub ttl: Option<Duration>,
69}
70
71impl CloudTask {
72    /// Create a new task with the given id, priority and payload.
73    pub fn new(id: u64, priority: QueuePriority, payload: Vec<u8>) -> Self {
74        Self {
75            id,
76            priority,
77            payload,
78            submitted_at: Instant::now(),
79            ttl: None,
80        }
81    }
82
83    /// Attach a time-to-live to this task.
84    pub fn with_ttl(mut self, ttl: Duration) -> Self {
85        self.ttl = Some(ttl);
86        self
87    }
88
89    /// Returns `true` when the task's TTL has been exceeded.
90    pub fn is_expired(&self) -> bool {
91        match self.ttl {
92            Some(ttl) => self.submitted_at.elapsed() >= ttl,
93            None => false,
94        }
95    }
96
97    /// Age of the task since submission.
98    pub fn age(&self) -> Duration {
99        self.submitted_at.elapsed()
100    }
101}
102
103// ─────────────────────────────────────────────────────────────────────────────
104// CloudTaskQueue
105// ─────────────────────────────────────────────────────────────────────────────
106
107/// Error variants for [`CloudTaskQueue`] operations.
108#[derive(Debug, PartialEq, Eq)]
109pub enum TaskQueueError {
110    /// The queue has reached its configured capacity.
111    QueueFull,
112    /// The queue contains no ready tasks.
113    QueueEmpty,
114    /// The requested batch size is zero.
115    InvalidBatchSize,
116}
117
118impl std::fmt::Display for TaskQueueError {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self {
121            Self::QueueFull => write!(f, "task queue is full"),
122            Self::QueueEmpty => write!(f, "task queue is empty"),
123            Self::InvalidBatchSize => write!(f, "batch size must be greater than zero"),
124        }
125    }
126}
127
128/// A priority-ordered queue of [`CloudTask`] items.
129///
130/// Tasks are stored in four internal lanes — one per [`QueuePriority`] — so
131/// that higher-priority tasks are always served before lower-priority ones,
132/// regardless of submission order.
133#[derive(Debug)]
134pub struct CloudTaskQueue {
135    /// Per-priority lanes: index 0 = Low … index 3 = Critical.
136    lanes: [VecDeque<CloudTask>; 4],
137    /// Maximum total tasks across all lanes.
138    capacity: usize,
139    /// Monotonically increasing id counter.
140    next_id: u64,
141}
142
143impl CloudTaskQueue {
144    /// Create a new queue with the given maximum capacity.
145    pub fn new(capacity: usize) -> Self {
146        Self {
147            lanes: [
148                VecDeque::new(),
149                VecDeque::new(),
150                VecDeque::new(),
151                VecDeque::new(),
152            ],
153            capacity,
154            next_id: 1,
155        }
156    }
157
158    // Lane index for a priority (mirrors the discriminant).
159    fn lane(priority: QueuePriority) -> usize {
160        priority as usize
161    }
162
163    /// Total number of tasks currently held (excluding expired tasks).
164    pub fn len(&self) -> usize {
165        self.lanes.iter().map(|l| l.len()).sum()
166    }
167
168    /// Returns `true` when the queue holds no tasks.
169    pub fn is_empty(&self) -> bool {
170        self.len() == 0
171    }
172
173    /// Submit a task at the specified priority.
174    ///
175    /// Expired tasks are purged from the queue before the capacity check so
176    /// that the caller can always make room by waiting for TTLs to elapse.
177    pub fn enqueue(
178        &mut self,
179        priority: QueuePriority,
180        payload: Vec<u8>,
181    ) -> Result<u64, TaskQueueError> {
182        self.purge_expired();
183        if self.len() >= self.capacity {
184            return Err(TaskQueueError::QueueFull);
185        }
186        let id = self.next_id;
187        self.next_id += 1;
188        let task = CloudTask::new(id, priority, payload);
189        self.lanes[Self::lane(priority)].push_back(task);
190        Ok(id)
191    }
192
193    /// Submit a pre-built [`CloudTask`] directly.
194    pub fn enqueue_task(&mut self, task: CloudTask) -> Result<(), TaskQueueError> {
195        self.purge_expired();
196        if self.len() >= self.capacity {
197            return Err(TaskQueueError::QueueFull);
198        }
199        let lane = Self::lane(task.priority);
200        self.lanes[lane].push_back(task);
201        Ok(())
202    }
203
204    /// Dequeue the next highest-priority, non-expired task (Critical → Low).
205    pub fn dequeue(&mut self) -> Result<CloudTask, TaskQueueError> {
206        self.purge_expired();
207        // Iterate lanes from highest to lowest priority.
208        for lane in (0..4).rev() {
209            if let Some(task) = self.lanes[lane].pop_front() {
210                return Ok(task);
211            }
212        }
213        Err(TaskQueueError::QueueEmpty)
214    }
215
216    /// Dequeue up to `max_count` tasks in priority order.
217    ///
218    /// Returns an error if `max_count` is zero; returns a (possibly shorter)
219    /// `Vec` if fewer than `max_count` tasks are available.
220    pub fn dequeue_batch(&mut self, max_count: usize) -> Result<Vec<CloudTask>, TaskQueueError> {
221        if max_count == 0 {
222            return Err(TaskQueueError::InvalidBatchSize);
223        }
224        self.purge_expired();
225        let mut batch = Vec::with_capacity(max_count);
226        'outer: for lane in (0..4).rev() {
227            while let Some(task) = self.lanes[lane].pop_front() {
228                batch.push(task);
229                if batch.len() >= max_count {
230                    break 'outer;
231                }
232            }
233        }
234        Ok(batch)
235    }
236
237    /// Number of tasks waiting in the given priority lane.
238    pub fn lane_depth(&self, priority: QueuePriority) -> usize {
239        self.lanes[Self::lane(priority)].len()
240    }
241
242    /// Remove all expired tasks from every lane.
243    fn purge_expired(&mut self) {
244        for lane in &mut self.lanes {
245            lane.retain(|t| !t.is_expired());
246        }
247    }
248
249    /// Discard all tasks in every lane.
250    pub fn clear(&mut self) {
251        for lane in &mut self.lanes {
252            lane.clear();
253        }
254    }
255}
256
257// ─────────────────────────────────────────────────────────────────────────────
258// Tests
259// ─────────────────────────────────────────────────────────────────────────────
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    fn make_queue(cap: usize) -> CloudTaskQueue {
266        CloudTaskQueue::new(cap)
267    }
268
269    #[test]
270    fn test_priority_ordering() {
271        assert!(QueuePriority::Critical > QueuePriority::High);
272        assert!(QueuePriority::High > QueuePriority::Normal);
273        assert!(QueuePriority::Normal > QueuePriority::Low);
274    }
275
276    #[test]
277    fn test_priority_at_least() {
278        assert!(QueuePriority::Critical.at_least(QueuePriority::High));
279        assert!(QueuePriority::Normal.at_least(QueuePriority::Normal));
280        assert!(!QueuePriority::Low.at_least(QueuePriority::Normal));
281    }
282
283    #[test]
284    fn test_priority_label() {
285        assert_eq!(QueuePriority::Low.label(), "low");
286        assert_eq!(QueuePriority::Critical.label(), "critical");
287    }
288
289    #[test]
290    fn test_priority_display() {
291        assert_eq!(QueuePriority::Normal.to_string(), "normal");
292    }
293
294    #[test]
295    fn test_enqueue_returns_incrementing_ids() {
296        let mut q = make_queue(10);
297        let id1 = q
298            .enqueue(QueuePriority::Normal, b"a".to_vec())
299            .expect("id1 should be valid");
300        let id2 = q
301            .enqueue(QueuePriority::Normal, b"b".to_vec())
302            .expect("id2 should be valid");
303        assert_eq!(id1, 1);
304        assert_eq!(id2, 2);
305    }
306
307    #[test]
308    fn test_queue_empty_initially() {
309        let q = make_queue(10);
310        assert!(q.is_empty());
311        assert_eq!(q.len(), 0);
312    }
313
314    #[test]
315    fn test_len_increases_on_enqueue() {
316        let mut q = make_queue(10);
317        q.enqueue(QueuePriority::Low, b"x".to_vec())
318            .expect("test expectation failed");
319        q.enqueue(QueuePriority::High, b"y".to_vec())
320            .expect("test expectation failed");
321        assert_eq!(q.len(), 2);
322    }
323
324    #[test]
325    fn test_dequeue_empty_returns_error() {
326        let mut q = make_queue(10);
327        assert_eq!(q.dequeue(), Err(TaskQueueError::QueueEmpty));
328    }
329
330    #[test]
331    fn test_dequeue_respects_priority() {
332        let mut q = make_queue(10);
333        q.enqueue(QueuePriority::Low, b"low".to_vec())
334            .expect("test expectation failed");
335        q.enqueue(QueuePriority::Critical, b"crit".to_vec())
336            .expect("test expectation failed");
337        q.enqueue(QueuePriority::Normal, b"norm".to_vec())
338            .expect("test expectation failed");
339        let first = q.dequeue().expect("first should be valid");
340        assert_eq!(first.payload, b"crit");
341        let second = q.dequeue().expect("second should be valid");
342        assert_eq!(second.payload, b"norm");
343    }
344
345    #[test]
346    fn test_dequeue_batch_returns_correct_count() {
347        let mut q = make_queue(20);
348        for _ in 0..5 {
349            q.enqueue(QueuePriority::Normal, b"item".to_vec())
350                .expect("test expectation failed");
351        }
352        let batch = q.dequeue_batch(3).expect("batch should be valid");
353        assert_eq!(batch.len(), 3);
354        assert_eq!(q.len(), 2);
355    }
356
357    #[test]
358    fn test_dequeue_batch_zero_size_error() {
359        let mut q = make_queue(10);
360        assert_eq!(q.dequeue_batch(0), Err(TaskQueueError::InvalidBatchSize));
361    }
362
363    #[test]
364    fn test_dequeue_batch_partial_fill() {
365        let mut q = make_queue(10);
366        q.enqueue(QueuePriority::Normal, b"only".to_vec())
367            .expect("test expectation failed");
368        let batch = q.dequeue_batch(5).expect("batch should be valid");
369        assert_eq!(batch.len(), 1);
370    }
371
372    #[test]
373    fn test_queue_full_error() {
374        let mut q = make_queue(2);
375        q.enqueue(QueuePriority::Normal, b"a".to_vec())
376            .expect("test expectation failed");
377        q.enqueue(QueuePriority::Normal, b"b".to_vec())
378            .expect("test expectation failed");
379        assert_eq!(
380            q.enqueue(QueuePriority::Normal, b"c".to_vec()),
381            Err(TaskQueueError::QueueFull)
382        );
383    }
384
385    #[test]
386    fn test_lane_depth() {
387        let mut q = make_queue(20);
388        q.enqueue(QueuePriority::High, b"h1".to_vec())
389            .expect("test expectation failed");
390        q.enqueue(QueuePriority::High, b"h2".to_vec())
391            .expect("test expectation failed");
392        q.enqueue(QueuePriority::Low, b"l1".to_vec())
393            .expect("test expectation failed");
394        assert_eq!(q.lane_depth(QueuePriority::High), 2);
395        assert_eq!(q.lane_depth(QueuePriority::Low), 1);
396        assert_eq!(q.lane_depth(QueuePriority::Normal), 0);
397    }
398
399    #[test]
400    fn test_clear_empties_all_lanes() {
401        let mut q = make_queue(20);
402        q.enqueue(QueuePriority::Low, b"a".to_vec())
403            .expect("test expectation failed");
404        q.enqueue(QueuePriority::Critical, b"b".to_vec())
405            .expect("test expectation failed");
406        q.clear();
407        assert!(q.is_empty());
408    }
409
410    #[test]
411    fn test_task_not_expired_by_default() {
412        let task = CloudTask::new(1, QueuePriority::Normal, b"data".to_vec());
413        assert!(!task.is_expired());
414    }
415
416    #[test]
417    fn test_task_with_long_ttl_not_expired() {
418        let task = CloudTask::new(1, QueuePriority::Normal, b"data".to_vec())
419            .with_ttl(Duration::from_secs(3600));
420        assert!(!task.is_expired());
421    }
422
423    #[test]
424    fn test_enqueue_task_direct() {
425        let mut q = make_queue(10);
426        let task = CloudTask::new(99, QueuePriority::High, b"direct".to_vec());
427        q.enqueue_task(task).expect("enqueue_task should succeed");
428        assert_eq!(q.len(), 1);
429        assert_eq!(q.lane_depth(QueuePriority::High), 1);
430    }
431
432    #[test]
433    fn test_error_display() {
434        assert_eq!(TaskQueueError::QueueFull.to_string(), "task queue is full");
435        assert_eq!(
436            TaskQueueError::QueueEmpty.to_string(),
437            "task queue is empty"
438        );
439        assert_eq!(
440            TaskQueueError::InvalidBatchSize.to_string(),
441            "batch size must be greater than zero"
442        );
443    }
444}