openigtlink_rust/io/
message_queue.rs

1//! Message queue implementation for buffering and backpressure management
2//!
3//! Provides bounded and unbounded message queues for async message processing.
4
5use std::sync::Arc;
6use tokio::sync::{mpsc, Mutex};
7use tracing::{debug, info, trace, warn};
8use crate::error::{IgtlError, Result};
9
10/// Configuration for message queue behavior
11#[derive(Debug, Clone)]
12pub struct QueueConfig {
13    /// Maximum number of messages in queue (None = unbounded)
14    pub capacity: Option<usize>,
15    /// Whether to drop oldest messages when queue is full (vs blocking)
16    pub drop_on_full: bool,
17}
18
19impl Default for QueueConfig {
20    fn default() -> Self {
21        Self {
22            capacity: Some(1000), // Default: bounded queue with 1000 messages
23            drop_on_full: false,  // Default: block when full
24        }
25    }
26}
27
28impl QueueConfig {
29    /// Create unbounded queue configuration
30    pub fn unbounded() -> Self {
31        Self {
32            capacity: None,
33            drop_on_full: false,
34        }
35    }
36
37    /// Create bounded queue with specified capacity
38    pub fn bounded(capacity: usize) -> Self {
39        Self {
40            capacity: Some(capacity),
41            drop_on_full: false,
42        }
43    }
44
45    /// Create bounded queue that drops oldest messages when full
46    pub fn bounded_drop_old(capacity: usize) -> Self {
47        Self {
48            capacity: Some(capacity),
49            drop_on_full: true,
50        }
51    }
52}
53
54/// Message queue for buffering raw message data
55///
56/// Supports both bounded and unbounded queues with optional message dropping.
57pub struct MessageQueue {
58    tx: mpsc::UnboundedSender<Vec<u8>>,
59    rx: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
60    config: QueueConfig,
61    stats: Arc<Mutex<QueueStats>>,
62}
63
64/// Statistics for message queue
65#[derive(Debug, Clone, Default)]
66pub struct QueueStats {
67    /// Total messages enqueued
68    pub enqueued: u64,
69    /// Total messages dequeued
70    pub dequeued: u64,
71    /// Total messages dropped (when queue full)
72    pub dropped: u64,
73    /// Current queue size
74    pub current_size: usize,
75    /// Peak queue size
76    pub peak_size: usize,
77}
78
79impl MessageQueue {
80    /// Create a new message queue with default configuration
81    pub fn new() -> Self {
82        Self::with_config(QueueConfig::default())
83    }
84
85    /// Create a new message queue with custom configuration
86    pub fn with_config(config: QueueConfig) -> Self {
87        info!(
88            capacity = ?config.capacity,
89            drop_on_full = config.drop_on_full,
90            "Creating message queue"
91        );
92        let (tx, rx) = mpsc::unbounded_channel();
93
94        Self {
95            tx,
96            rx: Arc::new(Mutex::new(rx)),
97            config,
98            stats: Arc::new(Mutex::new(QueueStats::default())),
99        }
100    }
101
102    /// Enqueue a message (non-blocking)
103    ///
104    /// # Arguments
105    /// * `data` - Raw message bytes
106    ///
107    /// # Returns
108    /// Ok(()) if enqueued, Err if queue is full and not configured to drop
109    pub async fn enqueue(&self, data: Vec<u8>) -> Result<()> {
110        let mut stats = self.stats.lock().await;
111
112        // Check capacity if bounded
113        if let Some(capacity) = self.config.capacity {
114            if stats.current_size >= capacity {
115                if self.config.drop_on_full {
116                    warn!(
117                        capacity = capacity,
118                        current_size = stats.current_size,
119                        "Queue full, dropping oldest message"
120                    );
121                    // Drop the oldest message by dequeuing it
122                    drop(stats); // Release lock before dequeue
123                    if let Ok(_) = self.try_dequeue().await {
124                        stats = self.stats.lock().await;
125                        stats.dropped += 1;
126                    } else {
127                        return Err(IgtlError::Io(std::io::Error::new(
128                            std::io::ErrorKind::WouldBlock,
129                            "Queue full and cannot drop oldest",
130                        )));
131                    }
132                } else {
133                    debug!(
134                        capacity = capacity,
135                        current_size = stats.current_size,
136                        "Queue full, rejecting enqueue"
137                    );
138                    return Err(IgtlError::Io(std::io::Error::new(
139                        std::io::ErrorKind::WouldBlock,
140                        "Queue full",
141                    )));
142                }
143            }
144        }
145
146        let size = data.len();
147
148        // Send message
149        self.tx.send(data).map_err(|_| {
150            warn!("Failed to enqueue: queue closed");
151            IgtlError::Io(std::io::Error::new(
152                std::io::ErrorKind::BrokenPipe,
153                "Queue closed",
154            ))
155        })?;
156
157        stats.enqueued += 1;
158        stats.current_size += 1;
159        if stats.current_size > stats.peak_size {
160            stats.peak_size = stats.current_size;
161        }
162
163        trace!(
164            size = size,
165            queue_size = stats.current_size,
166            "Message enqueued"
167        );
168
169        Ok(())
170    }
171
172    /// Dequeue a message (blocking until message available)
173    ///
174    /// # Returns
175    /// Message bytes or error if queue is closed
176    pub async fn dequeue(&self) -> Result<Vec<u8>> {
177        let mut rx = self.rx.lock().await;
178
179        match rx.recv().await {
180            Some(data) => {
181                let size = data.len();
182                drop(rx); // Release lock before updating stats
183                let mut stats = self.stats.lock().await;
184                stats.dequeued += 1;
185                stats.current_size = stats.current_size.saturating_sub(1);
186                trace!(
187                    size = size,
188                    queue_size = stats.current_size,
189                    "Message dequeued"
190                );
191                Ok(data)
192            }
193            None => {
194                warn!("Dequeue failed: queue closed");
195                Err(IgtlError::Io(std::io::Error::new(
196                    std::io::ErrorKind::BrokenPipe,
197                    "Queue closed",
198                )))
199            }
200        }
201    }
202
203    /// Try to dequeue a message (non-blocking)
204    ///
205    /// # Returns
206    /// Some(data) if message available, None if queue is empty
207    pub async fn try_dequeue(&self) -> Result<Vec<u8>> {
208        let mut rx = self.rx.lock().await;
209
210        match rx.try_recv() {
211            Ok(data) => {
212                drop(rx);
213                let mut stats = self.stats.lock().await;
214                stats.dequeued += 1;
215                stats.current_size = stats.current_size.saturating_sub(1);
216                Ok(data)
217            }
218            Err(mpsc::error::TryRecvError::Empty) => {
219                Err(IgtlError::Io(std::io::Error::new(
220                    std::io::ErrorKind::WouldBlock,
221                    "Queue empty",
222                )))
223            }
224            Err(mpsc::error::TryRecvError::Disconnected) => {
225                Err(IgtlError::Io(std::io::Error::new(
226                    std::io::ErrorKind::BrokenPipe,
227                    "Queue closed",
228                )))
229            }
230        }
231    }
232
233    /// Get current queue size
234    pub async fn size(&self) -> usize {
235        self.stats.lock().await.current_size
236    }
237
238    /// Get queue statistics
239    pub async fn stats(&self) -> QueueStats {
240        self.stats.lock().await.clone()
241    }
242
243    /// Check if queue is empty
244    pub async fn is_empty(&self) -> bool {
245        self.stats.lock().await.current_size == 0
246    }
247
248    /// Get queue configuration
249    pub fn config(&self) -> &QueueConfig {
250        &self.config
251    }
252}
253
254impl Default for MessageQueue {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[tokio::test]
265    async fn test_unbounded_queue() {
266        let queue = MessageQueue::with_config(QueueConfig::unbounded());
267
268        // Enqueue multiple messages
269        for i in 0..100 {
270            let data = vec![i as u8];
271            queue.enqueue(data).await.unwrap();
272        }
273
274        assert_eq!(queue.size().await, 100);
275
276        // Dequeue all messages
277        for i in 0..100 {
278            let data = queue.dequeue().await.unwrap();
279            assert_eq!(data, vec![i as u8]);
280        }
281
282        assert!(queue.is_empty().await);
283    }
284
285    #[tokio::test]
286    async fn test_bounded_queue() {
287        let queue = MessageQueue::with_config(QueueConfig::bounded(10));
288
289        // Fill queue
290        for i in 0..10 {
291            let data = vec![i as u8];
292            queue.enqueue(data).await.unwrap();
293        }
294
295        // Try to enqueue when full (should fail)
296        let result = queue.enqueue(vec![100]).await;
297        assert!(result.is_err());
298
299        // Dequeue one
300        let _ = queue.dequeue().await.unwrap();
301
302        // Now should succeed
303        let result = queue.enqueue(vec![100]).await;
304        assert!(result.is_ok());
305    }
306
307    #[tokio::test]
308    async fn test_bounded_drop_old() {
309        let queue = MessageQueue::with_config(QueueConfig::bounded_drop_old(5));
310
311        // Fill queue
312        for i in 0..5 {
313            let data = vec![i as u8];
314            queue.enqueue(data).await.unwrap();
315        }
316
317        // Enqueue more (should drop oldest)
318        for i in 5..10 {
319            let data = vec![i as u8];
320            queue.enqueue(data).await.unwrap();
321        }
322
323        // Queue should still be size 5
324        assert_eq!(queue.size().await, 5);
325
326        // First message should be 5 (0-4 were dropped)
327        let data = queue.dequeue().await.unwrap();
328        assert_eq!(data, vec![5]);
329
330        // Check stats
331        let stats = queue.stats().await;
332        assert_eq!(stats.enqueued, 10);
333        assert_eq!(stats.dropped, 5);
334    }
335
336    #[tokio::test]
337    async fn test_try_dequeue_empty() {
338        let queue = MessageQueue::new();
339
340        let result = queue.try_dequeue().await;
341        assert!(result.is_err());
342    }
343
344    #[tokio::test]
345    async fn test_queue_stats() {
346        let queue = MessageQueue::new();
347
348        // Enqueue 10 messages
349        for i in 0..10 {
350            queue.enqueue(vec![i]).await.unwrap();
351        }
352
353        // Dequeue 5 messages
354        for _ in 0..5 {
355            let _ = queue.dequeue().await.unwrap();
356        }
357
358        let stats = queue.stats().await;
359        assert_eq!(stats.enqueued, 10);
360        assert_eq!(stats.dequeued, 5);
361        assert_eq!(stats.current_size, 5);
362        assert_eq!(stats.peak_size, 10);
363    }
364
365    #[tokio::test]
366    async fn test_concurrent_access() {
367        let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(100)));
368
369        let queue_clone = queue.clone();
370        let producer = tokio::spawn(async move {
371            for i in 0..50 {
372                queue_clone.enqueue(vec![i as u8]).await.unwrap();
373                tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
374            }
375        });
376
377        let queue_clone = queue.clone();
378        let consumer = tokio::spawn(async move {
379            for _ in 0..50 {
380                let _ = queue_clone.dequeue().await.unwrap();
381                tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
382            }
383        });
384
385        producer.await.unwrap();
386        consumer.await.unwrap();
387
388        assert!(queue.is_empty().await);
389    }
390}