mqtt5_protocol/session/
queue.rs

1use crate::error::{MqttError, Result};
2use crate::prelude::{String, Vec, VecDeque};
3use crate::session::limits::{ExpiringMessage, LimitsManager};
4use crate::time::{Duration, Instant};
5use crate::QoS;
6
7#[derive(Debug, Clone)]
8pub struct QueueResult {
9    pub was_queued: bool,
10    pub messages_dropped: usize,
11    pub current_size: usize,
12    pub message_count: usize,
13}
14
15#[derive(Debug, Clone)]
16pub struct QueuedMessage {
17    pub topic: String,
18    pub payload: Vec<u8>,
19    pub qos: QoS,
20    pub retain: bool,
21    pub packet_id: Option<u16>,
22}
23
24impl QueuedMessage {
25    #[must_use]
26    pub fn to_expiring(self, limits: &LimitsManager) -> ExpiringMessage {
27        ExpiringMessage::new(
28            self.topic,
29            self.payload,
30            self.qos,
31            self.retain,
32            self.packet_id,
33            None,
34            limits,
35        )
36    }
37}
38
39#[derive(Debug, Clone)]
40struct QueuedMessageInternal {
41    message: ExpiringMessage,
42    queued_at: Instant,
43    size: usize,
44}
45
46#[derive(Debug)]
47pub struct MessageQueue {
48    queue: VecDeque<QueuedMessageInternal>,
49    max_messages: usize,
50    max_size: usize,
51    current_size: usize,
52}
53
54impl MessageQueue {
55    #[must_use]
56    pub fn new(max_messages: usize, max_size: usize) -> Self {
57        Self {
58            queue: VecDeque::new(),
59            max_messages,
60            max_size,
61            current_size: 0,
62        }
63    }
64
65    /// # Errors
66    /// Returns `MessageTooLarge` if the message exceeds the maximum queue size.
67    pub fn enqueue(&mut self, message: ExpiringMessage) -> Result<QueueResult> {
68        let size = message.topic.len() + message.payload.len();
69
70        if size > self.max_size {
71            return Err(MqttError::MessageTooLarge);
72        }
73
74        if message.is_expired() {
75            return Ok(QueueResult {
76                was_queued: false,
77                messages_dropped: 0,
78                current_size: self.current_size,
79                message_count: self.queue.len(),
80            });
81        }
82
83        let mut messages_dropped = 0;
84        while !self.queue.is_empty()
85            && (self.queue.len() >= self.max_messages || self.current_size + size > self.max_size)
86        {
87            if let Some(removed) = self.queue.pop_front() {
88                self.current_size -= removed.size;
89                messages_dropped += 1;
90            }
91        }
92
93        let internal = QueuedMessageInternal {
94            message,
95            queued_at: Instant::now(),
96            size,
97        };
98
99        self.queue.push_back(internal);
100        self.current_size += size;
101
102        Ok(QueueResult {
103            was_queued: true,
104            messages_dropped,
105            current_size: self.current_size,
106            message_count: self.queue.len(),
107        })
108    }
109
110    #[must_use]
111    pub fn dequeue(&mut self) -> Option<ExpiringMessage> {
112        while let Some(internal) = self.queue.front() {
113            if internal.message.is_expired() {
114                if let Some(removed) = self.queue.pop_front() {
115                    self.current_size -= removed.size;
116                }
117            } else {
118                break;
119            }
120        }
121
122        if let Some(internal) = self.queue.pop_front() {
123            self.current_size -= internal.size;
124            Some(internal.message)
125        } else {
126            None
127        }
128    }
129
130    #[must_use]
131    pub fn dequeue_batch(&mut self, limit: usize) -> Vec<ExpiringMessage> {
132        let mut messages = Vec::with_capacity(limit.min(self.queue.len()));
133
134        for _ in 0..limit {
135            if let Some(message) = self.dequeue() {
136                messages.push(message);
137            } else {
138                break;
139            }
140        }
141
142        messages
143    }
144
145    #[must_use]
146    pub fn len(&self) -> usize {
147        self.queue.len()
148    }
149
150    #[must_use]
151    pub fn is_empty(&self) -> bool {
152        self.queue.is_empty()
153    }
154
155    #[must_use]
156    pub fn size(&self) -> usize {
157        self.current_size
158    }
159
160    pub fn clear(&mut self) {
161        self.queue.clear();
162        self.current_size = 0;
163    }
164
165    pub fn remove_expired(&mut self, queue_timeout: Duration) {
166        let now = Instant::now();
167        let current_size = &mut self.current_size;
168
169        self.queue.retain(|internal| {
170            let should_keep = !internal.message.is_expired()
171                && now.duration_since(internal.queued_at) <= queue_timeout;
172            if !should_keep {
173                *current_size -= internal.size;
174            }
175            should_keep
176        });
177    }
178
179    #[must_use]
180    pub fn stats(&self) -> QueueStats {
181        let oldest_message_age = self.queue.front().map(|m| m.queued_at.elapsed());
182        let newest_message_age = self.queue.back().map(|m| m.queued_at.elapsed());
183
184        QueueStats {
185            message_count: self.queue.len(),
186            total_size: self.current_size,
187            max_messages: self.max_messages,
188            max_size: self.max_size,
189            oldest_message_age,
190            newest_message_age,
191        }
192    }
193}
194
195#[derive(Debug, Clone)]
196pub struct QueueStats {
197    pub message_count: usize,
198    pub total_size: usize,
199    pub max_messages: usize,
200    pub max_size: usize,
201    pub oldest_message_age: Option<Duration>,
202    pub newest_message_age: Option<Duration>,
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::prelude::format;
209    use crate::session::limits::LimitsConfig;
210
211    fn test_expiring_message(idx: u8) -> ExpiringMessage {
212        let limits = LimitsManager::with_defaults();
213        ExpiringMessage::new(
214            format!("test/{idx}"),
215            vec![idx],
216            QoS::AtMostOnce,
217            false,
218            None,
219            None,
220            &limits,
221        )
222    }
223
224    #[test]
225    fn test_queue_basic_operations() {
226        let mut queue = MessageQueue::new(10, 1024);
227        let limits = LimitsManager::with_defaults();
228
229        let msg1 = ExpiringMessage::new(
230            "test/1".into(),
231            vec![1, 2, 3],
232            QoS::AtLeastOnce,
233            false,
234            Some(1),
235            None,
236            &limits,
237        );
238
239        let msg2 = ExpiringMessage::new(
240            "test/2".into(),
241            vec![4, 5, 6],
242            QoS::AtMostOnce,
243            false,
244            None,
245            None,
246            &limits,
247        );
248
249        queue.enqueue(msg1.clone()).unwrap();
250        queue.enqueue(msg2.clone()).unwrap();
251
252        assert_eq!(queue.len(), 2);
253        assert_eq!(queue.size(), 18);
254
255        let dequeued = queue.dequeue().unwrap();
256        assert_eq!(dequeued.topic, "test/1");
257        assert_eq!(queue.len(), 1);
258
259        let dequeued = queue.dequeue().unwrap();
260        assert_eq!(dequeued.topic, "test/2");
261        assert_eq!(queue.len(), 0);
262        assert!(queue.is_empty());
263    }
264
265    #[test]
266    fn test_queue_max_messages() {
267        let mut queue = MessageQueue::new(2, 1024);
268
269        for i in 0u8..3 {
270            let msg = test_expiring_message(i);
271            let result = queue.enqueue(msg).unwrap();
272            assert!(result.was_queued);
273        }
274
275        assert_eq!(queue.len(), 2);
276
277        let messages = queue.dequeue_batch(10);
278        assert_eq!(messages.len(), 2);
279        assert_eq!(messages[0].topic, "test/1");
280        assert_eq!(messages[1].topic, "test/2");
281    }
282
283    #[test]
284    fn test_queue_max_size() {
285        let mut queue = MessageQueue::new(10, 20);
286        let limits = LimitsManager::with_defaults();
287
288        let msg1 = ExpiringMessage::new(
289            "test".into(),
290            vec![0; 10],
291            QoS::AtMostOnce,
292            false,
293            None,
294            None,
295            &limits,
296        );
297
298        let msg2 = ExpiringMessage::new(
299            "test2".into(),
300            vec![0; 5],
301            QoS::AtMostOnce,
302            false,
303            None,
304            None,
305            &limits,
306        );
307
308        queue.enqueue(msg1).unwrap();
309        queue.enqueue(msg2).unwrap();
310
311        assert_eq!(queue.len(), 1);
312        assert_eq!(queue.size(), 10);
313
314        let dequeued = queue.dequeue().unwrap();
315        assert_eq!(dequeued.topic, "test2");
316    }
317
318    #[test]
319    fn test_queue_message_too_large() {
320        let mut queue = MessageQueue::new(10, 20);
321        let limits = LimitsManager::with_defaults();
322
323        let msg = ExpiringMessage::new(
324            "test".into(),
325            vec![0; 50],
326            QoS::AtMostOnce,
327            false,
328            None,
329            None,
330            &limits,
331        );
332
333        assert!(queue.enqueue(msg).is_err());
334        assert_eq!(queue.len(), 0);
335    }
336
337    #[test]
338    fn test_queue_batch_dequeue() {
339        let mut queue = MessageQueue::new(10, 1024);
340
341        for i in 0u8..5 {
342            let msg = test_expiring_message(i);
343            let result = queue.enqueue(msg).unwrap();
344            assert!(result.was_queued);
345        }
346
347        let batch = queue.dequeue_batch(3);
348        assert_eq!(batch.len(), 3);
349        assert_eq!(batch[0].topic, "test/0");
350        assert_eq!(batch[1].topic, "test/1");
351        assert_eq!(batch[2].topic, "test/2");
352
353        assert_eq!(queue.len(), 2);
354    }
355
356    #[test]
357    fn test_queue_clear() {
358        let mut queue = MessageQueue::new(10, 1024);
359
360        for i in 0u8..3 {
361            let msg = test_expiring_message(i);
362            let result = queue.enqueue(msg).unwrap();
363            assert!(result.was_queued);
364        }
365
366        queue.clear();
367        assert_eq!(queue.len(), 0);
368        assert_eq!(queue.size(), 0);
369        assert!(queue.is_empty());
370    }
371
372    #[test]
373    fn test_queue_stats() {
374        let mut queue = MessageQueue::new(10, 1024);
375        let limits = LimitsManager::with_defaults();
376
377        let msg = ExpiringMessage::new(
378            "test".into(),
379            vec![1, 2, 3],
380            QoS::AtMostOnce,
381            false,
382            None,
383            None,
384            &limits,
385        );
386
387        queue.enqueue(msg).unwrap();
388
389        let stats = queue.stats();
390        assert_eq!(stats.message_count, 1);
391        assert_eq!(stats.total_size, 7);
392        assert_eq!(stats.max_messages, 10);
393        assert_eq!(stats.max_size, 1024);
394        assert!(stats.oldest_message_age.is_some());
395        assert!(stats.newest_message_age.is_some());
396    }
397
398    #[test]
399    fn test_queue_with_expiring_messages() {
400        let mut queue = MessageQueue::new(10, 1024);
401        let config = LimitsConfig {
402            default_message_expiry: Some(Duration::from_millis(50)),
403            ..Default::default()
404        };
405        let limits = LimitsManager::new(config);
406
407        let msg = ExpiringMessage::new(
408            "test/expiring".into(),
409            vec![1, 2, 3],
410            QoS::AtLeastOnce,
411            false,
412            Some(1),
413            Some(0),
414            &limits,
415        );
416
417        queue.enqueue(msg).unwrap();
418
419        #[cfg(feature = "std")]
420        std::thread::sleep(Duration::from_millis(10));
421
422        let dequeued = queue.dequeue();
423        assert!(dequeued.is_none());
424        assert_eq!(queue.len(), 0);
425    }
426}