Skip to main content

oximedia_cloud/
cloud_queue.rs

1#![allow(dead_code)]
2//! Simple cloud message queue abstraction.
3
4use std::collections::VecDeque;
5use std::time::{Duration, Instant};
6
7/// Configuration for a `CloudQueue`.
8#[derive(Debug, Clone)]
9pub struct CloudQueueConfig {
10    /// Maximum number of messages the queue may hold.
11    pub capacity: usize,
12    /// Maximum time in seconds a message is retained before being discarded.
13    pub max_retention_secs: u64,
14    /// Maximum number of bytes per message payload.
15    pub max_message_bytes: usize,
16}
17
18impl CloudQueueConfig {
19    /// Create a config with the given capacity and retention.
20    pub fn new(capacity: usize, max_retention_secs: u64) -> Self {
21        Self {
22            capacity,
23            max_retention_secs,
24            max_message_bytes: 256 * 1024, // 256 KiB default
25        }
26    }
27
28    /// Returns the maximum retention duration in seconds.
29    pub fn max_retention_secs(&self) -> u64 {
30        self.max_retention_secs
31    }
32}
33
34impl Default for CloudQueueConfig {
35    fn default() -> Self {
36        Self::new(10_000, 3600)
37    }
38}
39
40/// A message held in the queue.
41#[derive(Debug, Clone, PartialEq)]
42pub struct QueueMessage {
43    /// Unique identifier.
44    pub id: u64,
45    /// Message payload bytes.
46    pub payload: Vec<u8>,
47    /// Wall-clock instant at which the message was enqueued.
48    pub enqueued_at: Instant,
49    /// Configured retention for this message.
50    pub retention: Duration,
51}
52
53impl QueueMessage {
54    /// Create a new message.
55    pub fn new(id: u64, payload: Vec<u8>, retention_secs: u64) -> Self {
56        Self {
57            id,
58            payload,
59            enqueued_at: Instant::now(),
60            retention: Duration::from_secs(retention_secs),
61        }
62    }
63
64    /// Returns `true` if this message has exceeded its retention window.
65    pub fn is_expired(&self) -> bool {
66        self.enqueued_at.elapsed() >= self.retention
67    }
68
69    /// Returns the payload length in bytes.
70    pub fn payload_len(&self) -> usize {
71        self.payload.len()
72    }
73}
74
75/// Errors that can occur when interacting with a `CloudQueue`.
76#[derive(Debug, PartialEq, Eq)]
77pub enum QueueError {
78    /// Queue has reached maximum capacity.
79    QueueFull,
80    /// Message payload exceeds the configured limit.
81    PayloadTooLarge,
82    /// Queue is empty; no message to dequeue.
83    QueueEmpty,
84}
85
86impl std::fmt::Display for QueueError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::QueueFull => write!(f, "queue is full"),
90            Self::PayloadTooLarge => write!(f, "message payload exceeds limit"),
91            Self::QueueEmpty => write!(f, "queue is empty"),
92        }
93    }
94}
95
96/// A first-in, first-out cloud message queue.
97#[derive(Debug)]
98pub struct CloudQueue {
99    config: CloudQueueConfig,
100    messages: VecDeque<QueueMessage>,
101    next_id: u64,
102}
103
104impl CloudQueue {
105    /// Create a new queue with the given configuration.
106    pub fn new(config: CloudQueueConfig) -> Self {
107        Self {
108            messages: VecDeque::with_capacity(config.capacity),
109            config,
110            next_id: 1,
111        }
112    }
113
114    /// Enqueue a message payload.
115    ///
116    /// Returns the assigned message ID on success, or a `QueueError` on failure.
117    pub fn enqueue(&mut self, payload: Vec<u8>) -> Result<u64, QueueError> {
118        if payload.len() > self.config.max_message_bytes {
119            return Err(QueueError::PayloadTooLarge);
120        }
121        self.purge_expired();
122        if self.messages.len() >= self.config.capacity {
123            return Err(QueueError::QueueFull);
124        }
125        let id = self.next_id;
126        self.next_id += 1;
127        self.messages.push_back(QueueMessage::new(
128            id,
129            payload,
130            self.config.max_retention_secs,
131        ));
132        Ok(id)
133    }
134
135    /// Dequeue and return the oldest non-expired message.
136    pub fn dequeue(&mut self) -> Result<QueueMessage, QueueError> {
137        self.purge_expired();
138        self.messages.pop_front().ok_or(QueueError::QueueEmpty)
139    }
140
141    /// Returns the current number of messages in the queue (excluding expired).
142    pub fn depth(&self) -> usize {
143        self.messages.iter().filter(|m| !m.is_expired()).count()
144    }
145
146    /// Purge all expired messages from the front of the queue.
147    fn purge_expired(&mut self) {
148        while let Some(front) = self.messages.front() {
149            if front.is_expired() {
150                self.messages.pop_front();
151            } else {
152                break;
153            }
154        }
155    }
156
157    /// Returns `true` if the queue has no messages.
158    pub fn is_empty(&self) -> bool {
159        self.depth() == 0
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    fn make_queue(cap: usize) -> CloudQueue {
168        CloudQueue::new(CloudQueueConfig::new(cap, 3600))
169    }
170
171    #[test]
172    fn test_config_max_retention() {
173        let cfg = CloudQueueConfig::new(100, 7200);
174        assert_eq!(cfg.max_retention_secs(), 7200);
175    }
176
177    #[test]
178    fn test_config_default() {
179        let cfg = CloudQueueConfig::default();
180        assert_eq!(cfg.capacity, 10_000);
181        assert_eq!(cfg.max_retention_secs, 3600);
182    }
183
184    #[test]
185    fn test_enqueue_returns_id() {
186        let mut q = make_queue(10);
187        let id = q.enqueue(b"hello".to_vec()).expect("id should be valid");
188        assert_eq!(id, 1);
189    }
190
191    #[test]
192    fn test_enqueue_increments_id() {
193        let mut q = make_queue(10);
194        let id1 = q.enqueue(b"a".to_vec()).expect("id1 should be valid");
195        let id2 = q.enqueue(b"b".to_vec()).expect("id2 should be valid");
196        assert_eq!(id2, id1 + 1);
197    }
198
199    #[test]
200    fn test_depth_after_enqueue() {
201        let mut q = make_queue(10);
202        q.enqueue(b"x".to_vec()).expect("test expectation failed");
203        q.enqueue(b"y".to_vec()).expect("test expectation failed");
204        assert_eq!(q.depth(), 2);
205    }
206
207    #[test]
208    fn test_dequeue_fifo_order() {
209        let mut q = make_queue(10);
210        q.enqueue(b"first".to_vec())
211            .expect("test expectation failed");
212        q.enqueue(b"second".to_vec())
213            .expect("test expectation failed");
214        let msg = q.dequeue().expect("msg should be valid");
215        assert_eq!(msg.payload, b"first");
216    }
217
218    #[test]
219    fn test_dequeue_empty_error() {
220        let mut q = make_queue(10);
221        assert_eq!(q.dequeue(), Err(QueueError::QueueEmpty));
222    }
223
224    #[test]
225    fn test_queue_full_error() {
226        let mut q = make_queue(2);
227        q.enqueue(b"a".to_vec()).expect("test expectation failed");
228        q.enqueue(b"b".to_vec()).expect("test expectation failed");
229        assert_eq!(q.enqueue(b"c".to_vec()), Err(QueueError::QueueFull));
230    }
231
232    #[test]
233    fn test_payload_too_large_error() {
234        let cfg = CloudQueueConfig {
235            capacity: 10,
236            max_retention_secs: 3600,
237            max_message_bytes: 5,
238        };
239        let mut q = CloudQueue::new(cfg);
240        assert_eq!(
241            q.enqueue(b"toolongpayload".to_vec()),
242            Err(QueueError::PayloadTooLarge)
243        );
244    }
245
246    #[test]
247    fn test_is_empty_initially() {
248        let q = make_queue(10);
249        assert!(q.is_empty());
250    }
251
252    #[test]
253    fn test_is_empty_after_enqueue() {
254        let mut q = make_queue(10);
255        q.enqueue(b"data".to_vec())
256            .expect("test expectation failed");
257        assert!(!q.is_empty());
258    }
259
260    #[test]
261    fn test_message_not_expired_immediately() {
262        let msg = QueueMessage::new(1, b"data".to_vec(), 3600);
263        assert!(!msg.is_expired());
264    }
265
266    #[test]
267    fn test_message_payload_len() {
268        let msg = QueueMessage::new(1, b"hello".to_vec(), 60);
269        assert_eq!(msg.payload_len(), 5);
270    }
271
272    #[test]
273    fn test_depth_decreases_after_dequeue() {
274        let mut q = make_queue(10);
275        q.enqueue(b"a".to_vec()).expect("test expectation failed");
276        q.enqueue(b"b".to_vec()).expect("test expectation failed");
277        q.dequeue().expect("dequeue should succeed");
278        assert_eq!(q.depth(), 1);
279    }
280
281    #[test]
282    fn test_queue_error_display() {
283        assert_eq!(QueueError::QueueFull.to_string(), "queue is full");
284        assert_eq!(QueueError::QueueEmpty.to_string(), "queue is empty");
285        assert_eq!(
286            QueueError::PayloadTooLarge.to_string(),
287            "message payload exceeds limit"
288        );
289    }
290}