oximedia_cloud/
cloud_queue.rs1#![allow(dead_code)]
2use std::collections::VecDeque;
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone)]
9pub struct CloudQueueConfig {
10 pub capacity: usize,
12 pub max_retention_secs: u64,
14 pub max_message_bytes: usize,
16}
17
18impl CloudQueueConfig {
19 pub fn new(capacity: usize, max_retention_secs: u64) -> Self {
21 Self {
22 capacity,
23 max_retention_secs,
24 max_message_bytes: 256 * 1024, }
26 }
27
28 pub fn max_retention_secs(&self) -> u64 {
30 self.max_retention_secs
31 }
32}
33
34impl Default for CloudQueueConfig {
35 fn default() -> Self {
36 Self::new(10_000, 3600)
37 }
38}
39
40#[derive(Debug, Clone, PartialEq)]
42pub struct QueueMessage {
43 pub id: u64,
45 pub payload: Vec<u8>,
47 pub enqueued_at: Instant,
49 pub retention: Duration,
51}
52
53impl QueueMessage {
54 pub fn new(id: u64, payload: Vec<u8>, retention_secs: u64) -> Self {
56 Self {
57 id,
58 payload,
59 enqueued_at: Instant::now(),
60 retention: Duration::from_secs(retention_secs),
61 }
62 }
63
64 pub fn is_expired(&self) -> bool {
66 self.enqueued_at.elapsed() >= self.retention
67 }
68
69 pub fn payload_len(&self) -> usize {
71 self.payload.len()
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
77pub enum QueueError {
78 QueueFull,
80 PayloadTooLarge,
82 QueueEmpty,
84}
85
86impl std::fmt::Display for QueueError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 Self::QueueFull => write!(f, "queue is full"),
90 Self::PayloadTooLarge => write!(f, "message payload exceeds limit"),
91 Self::QueueEmpty => write!(f, "queue is empty"),
92 }
93 }
94}
95
96#[derive(Debug)]
98pub struct CloudQueue {
99 config: CloudQueueConfig,
100 messages: VecDeque<QueueMessage>,
101 next_id: u64,
102}
103
104impl CloudQueue {
105 pub fn new(config: CloudQueueConfig) -> Self {
107 Self {
108 messages: VecDeque::with_capacity(config.capacity),
109 config,
110 next_id: 1,
111 }
112 }
113
114 pub fn enqueue(&mut self, payload: Vec<u8>) -> Result<u64, QueueError> {
118 if payload.len() > self.config.max_message_bytes {
119 return Err(QueueError::PayloadTooLarge);
120 }
121 self.purge_expired();
122 if self.messages.len() >= self.config.capacity {
123 return Err(QueueError::QueueFull);
124 }
125 let id = self.next_id;
126 self.next_id += 1;
127 self.messages.push_back(QueueMessage::new(
128 id,
129 payload,
130 self.config.max_retention_secs,
131 ));
132 Ok(id)
133 }
134
135 pub fn dequeue(&mut self) -> Result<QueueMessage, QueueError> {
137 self.purge_expired();
138 self.messages.pop_front().ok_or(QueueError::QueueEmpty)
139 }
140
141 pub fn depth(&self) -> usize {
143 self.messages.iter().filter(|m| !m.is_expired()).count()
144 }
145
146 fn purge_expired(&mut self) {
148 while let Some(front) = self.messages.front() {
149 if front.is_expired() {
150 self.messages.pop_front();
151 } else {
152 break;
153 }
154 }
155 }
156
157 pub fn is_empty(&self) -> bool {
159 self.depth() == 0
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 fn make_queue(cap: usize) -> CloudQueue {
168 CloudQueue::new(CloudQueueConfig::new(cap, 3600))
169 }
170
171 #[test]
172 fn test_config_max_retention() {
173 let cfg = CloudQueueConfig::new(100, 7200);
174 assert_eq!(cfg.max_retention_secs(), 7200);
175 }
176
177 #[test]
178 fn test_config_default() {
179 let cfg = CloudQueueConfig::default();
180 assert_eq!(cfg.capacity, 10_000);
181 assert_eq!(cfg.max_retention_secs, 3600);
182 }
183
184 #[test]
185 fn test_enqueue_returns_id() {
186 let mut q = make_queue(10);
187 let id = q.enqueue(b"hello".to_vec()).expect("id should be valid");
188 assert_eq!(id, 1);
189 }
190
191 #[test]
192 fn test_enqueue_increments_id() {
193 let mut q = make_queue(10);
194 let id1 = q.enqueue(b"a".to_vec()).expect("id1 should be valid");
195 let id2 = q.enqueue(b"b".to_vec()).expect("id2 should be valid");
196 assert_eq!(id2, id1 + 1);
197 }
198
199 #[test]
200 fn test_depth_after_enqueue() {
201 let mut q = make_queue(10);
202 q.enqueue(b"x".to_vec()).expect("test expectation failed");
203 q.enqueue(b"y".to_vec()).expect("test expectation failed");
204 assert_eq!(q.depth(), 2);
205 }
206
207 #[test]
208 fn test_dequeue_fifo_order() {
209 let mut q = make_queue(10);
210 q.enqueue(b"first".to_vec())
211 .expect("test expectation failed");
212 q.enqueue(b"second".to_vec())
213 .expect("test expectation failed");
214 let msg = q.dequeue().expect("msg should be valid");
215 assert_eq!(msg.payload, b"first");
216 }
217
218 #[test]
219 fn test_dequeue_empty_error() {
220 let mut q = make_queue(10);
221 assert_eq!(q.dequeue(), Err(QueueError::QueueEmpty));
222 }
223
224 #[test]
225 fn test_queue_full_error() {
226 let mut q = make_queue(2);
227 q.enqueue(b"a".to_vec()).expect("test expectation failed");
228 q.enqueue(b"b".to_vec()).expect("test expectation failed");
229 assert_eq!(q.enqueue(b"c".to_vec()), Err(QueueError::QueueFull));
230 }
231
232 #[test]
233 fn test_payload_too_large_error() {
234 let cfg = CloudQueueConfig {
235 capacity: 10,
236 max_retention_secs: 3600,
237 max_message_bytes: 5,
238 };
239 let mut q = CloudQueue::new(cfg);
240 assert_eq!(
241 q.enqueue(b"toolongpayload".to_vec()),
242 Err(QueueError::PayloadTooLarge)
243 );
244 }
245
246 #[test]
247 fn test_is_empty_initially() {
248 let q = make_queue(10);
249 assert!(q.is_empty());
250 }
251
252 #[test]
253 fn test_is_empty_after_enqueue() {
254 let mut q = make_queue(10);
255 q.enqueue(b"data".to_vec())
256 .expect("test expectation failed");
257 assert!(!q.is_empty());
258 }
259
260 #[test]
261 fn test_message_not_expired_immediately() {
262 let msg = QueueMessage::new(1, b"data".to_vec(), 3600);
263 assert!(!msg.is_expired());
264 }
265
266 #[test]
267 fn test_message_payload_len() {
268 let msg = QueueMessage::new(1, b"hello".to_vec(), 60);
269 assert_eq!(msg.payload_len(), 5);
270 }
271
272 #[test]
273 fn test_depth_decreases_after_dequeue() {
274 let mut q = make_queue(10);
275 q.enqueue(b"a".to_vec()).expect("test expectation failed");
276 q.enqueue(b"b".to_vec()).expect("test expectation failed");
277 q.dequeue().expect("dequeue should succeed");
278 assert_eq!(q.depth(), 1);
279 }
280
281 #[test]
282 fn test_queue_error_display() {
283 assert_eq!(QueueError::QueueFull.to_string(), "queue is full");
284 assert_eq!(QueueError::QueueEmpty.to_string(), "queue is empty");
285 assert_eq!(
286 QueueError::PayloadTooLarge.to_string(),
287 "message payload exceeds limit"
288 );
289 }
290}