1use crate::error::{MqttError, Result};
2use crate::prelude::{String, Vec, VecDeque};
3use crate::session::limits::{ExpiringMessage, LimitsManager};
4use crate::time::{Duration, Instant};
5use crate::QoS;
6
7#[derive(Debug, Clone)]
8pub struct QueueResult {
9 pub was_queued: bool,
10 pub messages_dropped: usize,
11 pub current_size: usize,
12 pub message_count: usize,
13}
14
15#[derive(Debug, Clone)]
16pub struct QueuedMessage {
17 pub topic: String,
18 pub payload: Vec<u8>,
19 pub qos: QoS,
20 pub retain: bool,
21 pub packet_id: Option<u16>,
22}
23
24impl QueuedMessage {
25 #[must_use]
26 pub fn to_expiring(self, limits: &LimitsManager) -> ExpiringMessage {
27 ExpiringMessage::new(
28 self.topic,
29 self.payload,
30 self.qos,
31 self.retain,
32 self.packet_id,
33 None,
34 limits,
35 )
36 }
37}
38
39#[derive(Debug, Clone)]
40struct QueuedMessageInternal {
41 message: ExpiringMessage,
42 queued_at: Instant,
43 size: usize,
44}
45
46#[derive(Debug)]
47pub struct MessageQueue {
48 queue: VecDeque<QueuedMessageInternal>,
49 max_messages: usize,
50 max_size: usize,
51 current_size: usize,
52}
53
54impl MessageQueue {
55 #[must_use]
56 pub fn new(max_messages: usize, max_size: usize) -> Self {
57 Self {
58 queue: VecDeque::new(),
59 max_messages,
60 max_size,
61 current_size: 0,
62 }
63 }
64
65 pub fn enqueue(&mut self, message: ExpiringMessage) -> Result<QueueResult> {
68 let size = message.topic.len() + message.payload.len();
69
70 if size > self.max_size {
71 return Err(MqttError::MessageTooLarge);
72 }
73
74 if message.is_expired() {
75 return Ok(QueueResult {
76 was_queued: false,
77 messages_dropped: 0,
78 current_size: self.current_size,
79 message_count: self.queue.len(),
80 });
81 }
82
83 let mut messages_dropped = 0;
84 while !self.queue.is_empty()
85 && (self.queue.len() >= self.max_messages || self.current_size + size > self.max_size)
86 {
87 if let Some(removed) = self.queue.pop_front() {
88 self.current_size -= removed.size;
89 messages_dropped += 1;
90 }
91 }
92
93 let internal = QueuedMessageInternal {
94 message,
95 queued_at: Instant::now(),
96 size,
97 };
98
99 self.queue.push_back(internal);
100 self.current_size += size;
101
102 Ok(QueueResult {
103 was_queued: true,
104 messages_dropped,
105 current_size: self.current_size,
106 message_count: self.queue.len(),
107 })
108 }
109
110 #[must_use]
111 pub fn dequeue(&mut self) -> Option<ExpiringMessage> {
112 while let Some(internal) = self.queue.front() {
113 if internal.message.is_expired() {
114 if let Some(removed) = self.queue.pop_front() {
115 self.current_size -= removed.size;
116 }
117 } else {
118 break;
119 }
120 }
121
122 if let Some(internal) = self.queue.pop_front() {
123 self.current_size -= internal.size;
124 Some(internal.message)
125 } else {
126 None
127 }
128 }
129
130 #[must_use]
131 pub fn dequeue_batch(&mut self, limit: usize) -> Vec<ExpiringMessage> {
132 let mut messages = Vec::with_capacity(limit.min(self.queue.len()));
133
134 for _ in 0..limit {
135 if let Some(message) = self.dequeue() {
136 messages.push(message);
137 } else {
138 break;
139 }
140 }
141
142 messages
143 }
144
145 #[must_use]
146 pub fn len(&self) -> usize {
147 self.queue.len()
148 }
149
150 #[must_use]
151 pub fn is_empty(&self) -> bool {
152 self.queue.is_empty()
153 }
154
155 #[must_use]
156 pub fn size(&self) -> usize {
157 self.current_size
158 }
159
160 pub fn clear(&mut self) {
161 self.queue.clear();
162 self.current_size = 0;
163 }
164
165 pub fn remove_expired(&mut self, queue_timeout: Duration) {
166 let now = Instant::now();
167 let current_size = &mut self.current_size;
168
169 self.queue.retain(|internal| {
170 let should_keep = !internal.message.is_expired()
171 && now.duration_since(internal.queued_at) <= queue_timeout;
172 if !should_keep {
173 *current_size -= internal.size;
174 }
175 should_keep
176 });
177 }
178
179 #[must_use]
180 pub fn stats(&self) -> QueueStats {
181 let oldest_message_age = self.queue.front().map(|m| m.queued_at.elapsed());
182 let newest_message_age = self.queue.back().map(|m| m.queued_at.elapsed());
183
184 QueueStats {
185 message_count: self.queue.len(),
186 total_size: self.current_size,
187 max_messages: self.max_messages,
188 max_size: self.max_size,
189 oldest_message_age,
190 newest_message_age,
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
196pub struct QueueStats {
197 pub message_count: usize,
198 pub total_size: usize,
199 pub max_messages: usize,
200 pub max_size: usize,
201 pub oldest_message_age: Option<Duration>,
202 pub newest_message_age: Option<Duration>,
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::prelude::format;
209 use crate::session::limits::LimitsConfig;
210
211 fn test_expiring_message(idx: u8) -> ExpiringMessage {
212 let limits = LimitsManager::with_defaults();
213 ExpiringMessage::new(
214 format!("test/{idx}"),
215 vec![idx],
216 QoS::AtMostOnce,
217 false,
218 None,
219 None,
220 &limits,
221 )
222 }
223
224 #[test]
225 fn test_queue_basic_operations() {
226 let mut queue = MessageQueue::new(10, 1024);
227 let limits = LimitsManager::with_defaults();
228
229 let msg1 = ExpiringMessage::new(
230 "test/1".into(),
231 vec![1, 2, 3],
232 QoS::AtLeastOnce,
233 false,
234 Some(1),
235 None,
236 &limits,
237 );
238
239 let msg2 = ExpiringMessage::new(
240 "test/2".into(),
241 vec![4, 5, 6],
242 QoS::AtMostOnce,
243 false,
244 None,
245 None,
246 &limits,
247 );
248
249 queue.enqueue(msg1.clone()).unwrap();
250 queue.enqueue(msg2.clone()).unwrap();
251
252 assert_eq!(queue.len(), 2);
253 assert_eq!(queue.size(), 18);
254
255 let dequeued = queue.dequeue().unwrap();
256 assert_eq!(dequeued.topic, "test/1");
257 assert_eq!(queue.len(), 1);
258
259 let dequeued = queue.dequeue().unwrap();
260 assert_eq!(dequeued.topic, "test/2");
261 assert_eq!(queue.len(), 0);
262 assert!(queue.is_empty());
263 }
264
265 #[test]
266 fn test_queue_max_messages() {
267 let mut queue = MessageQueue::new(2, 1024);
268
269 for i in 0u8..3 {
270 let msg = test_expiring_message(i);
271 let result = queue.enqueue(msg).unwrap();
272 assert!(result.was_queued);
273 }
274
275 assert_eq!(queue.len(), 2);
276
277 let messages = queue.dequeue_batch(10);
278 assert_eq!(messages.len(), 2);
279 assert_eq!(messages[0].topic, "test/1");
280 assert_eq!(messages[1].topic, "test/2");
281 }
282
283 #[test]
284 fn test_queue_max_size() {
285 let mut queue = MessageQueue::new(10, 20);
286 let limits = LimitsManager::with_defaults();
287
288 let msg1 = ExpiringMessage::new(
289 "test".into(),
290 vec![0; 10],
291 QoS::AtMostOnce,
292 false,
293 None,
294 None,
295 &limits,
296 );
297
298 let msg2 = ExpiringMessage::new(
299 "test2".into(),
300 vec![0; 5],
301 QoS::AtMostOnce,
302 false,
303 None,
304 None,
305 &limits,
306 );
307
308 queue.enqueue(msg1).unwrap();
309 queue.enqueue(msg2).unwrap();
310
311 assert_eq!(queue.len(), 1);
312 assert_eq!(queue.size(), 10);
313
314 let dequeued = queue.dequeue().unwrap();
315 assert_eq!(dequeued.topic, "test2");
316 }
317
318 #[test]
319 fn test_queue_message_too_large() {
320 let mut queue = MessageQueue::new(10, 20);
321 let limits = LimitsManager::with_defaults();
322
323 let msg = ExpiringMessage::new(
324 "test".into(),
325 vec![0; 50],
326 QoS::AtMostOnce,
327 false,
328 None,
329 None,
330 &limits,
331 );
332
333 assert!(queue.enqueue(msg).is_err());
334 assert_eq!(queue.len(), 0);
335 }
336
337 #[test]
338 fn test_queue_batch_dequeue() {
339 let mut queue = MessageQueue::new(10, 1024);
340
341 for i in 0u8..5 {
342 let msg = test_expiring_message(i);
343 let result = queue.enqueue(msg).unwrap();
344 assert!(result.was_queued);
345 }
346
347 let batch = queue.dequeue_batch(3);
348 assert_eq!(batch.len(), 3);
349 assert_eq!(batch[0].topic, "test/0");
350 assert_eq!(batch[1].topic, "test/1");
351 assert_eq!(batch[2].topic, "test/2");
352
353 assert_eq!(queue.len(), 2);
354 }
355
356 #[test]
357 fn test_queue_clear() {
358 let mut queue = MessageQueue::new(10, 1024);
359
360 for i in 0u8..3 {
361 let msg = test_expiring_message(i);
362 let result = queue.enqueue(msg).unwrap();
363 assert!(result.was_queued);
364 }
365
366 queue.clear();
367 assert_eq!(queue.len(), 0);
368 assert_eq!(queue.size(), 0);
369 assert!(queue.is_empty());
370 }
371
372 #[test]
373 fn test_queue_stats() {
374 let mut queue = MessageQueue::new(10, 1024);
375 let limits = LimitsManager::with_defaults();
376
377 let msg = ExpiringMessage::new(
378 "test".into(),
379 vec![1, 2, 3],
380 QoS::AtMostOnce,
381 false,
382 None,
383 None,
384 &limits,
385 );
386
387 queue.enqueue(msg).unwrap();
388
389 let stats = queue.stats();
390 assert_eq!(stats.message_count, 1);
391 assert_eq!(stats.total_size, 7);
392 assert_eq!(stats.max_messages, 10);
393 assert_eq!(stats.max_size, 1024);
394 assert!(stats.oldest_message_age.is_some());
395 assert!(stats.newest_message_age.is_some());
396 }
397
398 #[test]
399 fn test_queue_with_expiring_messages() {
400 let mut queue = MessageQueue::new(10, 1024);
401 let config = LimitsConfig {
402 default_message_expiry: Some(Duration::from_millis(50)),
403 ..Default::default()
404 };
405 let limits = LimitsManager::new(config);
406
407 let msg = ExpiringMessage::new(
408 "test/expiring".into(),
409 vec![1, 2, 3],
410 QoS::AtLeastOnce,
411 false,
412 Some(1),
413 Some(0),
414 &limits,
415 );
416
417 queue.enqueue(msg).unwrap();
418
419 #[cfg(feature = "std")]
420 std::thread::sleep(Duration::from_millis(10));
421
422 let dequeued = queue.dequeue();
423 assert!(dequeued.is_none());
424 assert_eq!(queue.len(), 0);
425 }
426}