openigtlink_rust/io/
message_queue.rs

1//! Message queue implementation for buffering and backpressure management
2//!
3//! Provides bounded and unbounded message queues for async message processing.
4
5use crate::error::{IgtlError, Result};
6use std::sync::Arc;
7use tokio::sync::{mpsc, Mutex};
8use tracing::{debug, info, trace, warn};
9
10/// Configuration for message queue behavior
11#[derive(Debug, Clone)]
12pub struct QueueConfig {
13    /// Maximum number of messages in queue (None = unbounded)
14    pub capacity: Option<usize>,
15    /// Whether to drop oldest messages when queue is full (vs blocking)
16    pub drop_on_full: bool,
17}
18
19impl Default for QueueConfig {
20    fn default() -> Self {
21        Self {
22            capacity: Some(1000), // Default: bounded queue with 1000 messages
23            drop_on_full: false,  // Default: block when full
24        }
25    }
26}
27
28impl QueueConfig {
29    /// Create unbounded queue configuration
30    pub fn unbounded() -> Self {
31        Self {
32            capacity: None,
33            drop_on_full: false,
34        }
35    }
36
37    /// Create bounded queue with specified capacity
38    pub fn bounded(capacity: usize) -> Self {
39        Self {
40            capacity: Some(capacity),
41            drop_on_full: false,
42        }
43    }
44
45    /// Create bounded queue that drops oldest messages when full
46    pub fn bounded_drop_old(capacity: usize) -> Self {
47        Self {
48            capacity: Some(capacity),
49            drop_on_full: true,
50        }
51    }
52}
53
54/// Message queue for buffering raw message data
55///
56/// Supports both bounded and unbounded queues with optional message dropping.
57pub struct MessageQueue {
58    tx: mpsc::UnboundedSender<Vec<u8>>,
59    rx: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
60    config: QueueConfig,
61    stats: Arc<Mutex<QueueStats>>,
62}
63
64/// Statistics for message queue
65#[derive(Debug, Clone, Default)]
66pub struct QueueStats {
67    /// Total messages enqueued
68    pub enqueued: u64,
69    /// Total messages dequeued
70    pub dequeued: u64,
71    /// Total messages dropped (when queue full)
72    pub dropped: u64,
73    /// Current queue size
74    pub current_size: usize,
75    /// Peak queue size
76    pub peak_size: usize,
77}
78
79impl MessageQueue {
80    /// Create a new message queue with default configuration
81    pub fn new() -> Self {
82        Self::with_config(QueueConfig::default())
83    }
84
85    /// Create a new message queue with custom configuration
86    pub fn with_config(config: QueueConfig) -> Self {
87        info!(
88            capacity = ?config.capacity,
89            drop_on_full = config.drop_on_full,
90            "Creating message queue"
91        );
92        let (tx, rx) = mpsc::unbounded_channel();
93
94        Self {
95            tx,
96            rx: Arc::new(Mutex::new(rx)),
97            config,
98            stats: Arc::new(Mutex::new(QueueStats::default())),
99        }
100    }
101
102    /// Enqueue a message (non-blocking)
103    ///
104    /// # Arguments
105    /// * `data` - Raw message bytes
106    ///
107    /// # Returns
108    /// Ok(()) if enqueued, Err if queue is full and not configured to drop
109    pub async fn enqueue(&self, data: Vec<u8>) -> Result<()> {
110        let mut stats = self.stats.lock().await;
111
112        // Check capacity if bounded
113        if let Some(capacity) = self.config.capacity {
114            if stats.current_size >= capacity {
115                if self.config.drop_on_full {
116                    warn!(
117                        capacity = capacity,
118                        current_size = stats.current_size,
119                        "Queue full, dropping oldest message"
120                    );
121                    // Drop the oldest message by dequeuing it
122                    drop(stats); // Release lock before dequeue
123                    #[allow(clippy::redundant_pattern_matching)]
124                    if let Ok(_) = self.try_dequeue().await {
125                        stats = self.stats.lock().await;
126                        stats.dropped += 1;
127                    } else {
128                        return Err(IgtlError::Io(std::io::Error::new(
129                            std::io::ErrorKind::WouldBlock,
130                            "Queue full and cannot drop oldest",
131                        )));
132                    }
133                } else {
134                    debug!(
135                        capacity = capacity,
136                        current_size = stats.current_size,
137                        "Queue full, rejecting enqueue"
138                    );
139                    return Err(IgtlError::Io(std::io::Error::new(
140                        std::io::ErrorKind::WouldBlock,
141                        "Queue full",
142                    )));
143                }
144            }
145        }
146
147        let size = data.len();
148
149        // Send message
150        self.tx.send(data).map_err(|_| {
151            warn!("Failed to enqueue: queue closed");
152            IgtlError::Io(std::io::Error::new(
153                std::io::ErrorKind::BrokenPipe,
154                "Queue closed",
155            ))
156        })?;
157
158        stats.enqueued += 1;
159        stats.current_size += 1;
160        if stats.current_size > stats.peak_size {
161            stats.peak_size = stats.current_size;
162        }
163
164        trace!(
165            size = size,
166            queue_size = stats.current_size,
167            "Message enqueued"
168        );
169
170        Ok(())
171    }
172
173    /// Dequeue a message (blocking until message available)
174    ///
175    /// # Returns
176    /// Message bytes or error if queue is closed
177    pub async fn dequeue(&self) -> Result<Vec<u8>> {
178        let mut rx = self.rx.lock().await;
179
180        match rx.recv().await {
181            Some(data) => {
182                let size = data.len();
183                drop(rx); // Release lock before updating stats
184                let mut stats = self.stats.lock().await;
185                stats.dequeued += 1;
186                stats.current_size = stats.current_size.saturating_sub(1);
187                trace!(
188                    size = size,
189                    queue_size = stats.current_size,
190                    "Message dequeued"
191                );
192                Ok(data)
193            }
194            None => {
195                warn!("Dequeue failed: queue closed");
196                Err(IgtlError::Io(std::io::Error::new(
197                    std::io::ErrorKind::BrokenPipe,
198                    "Queue closed",
199                )))
200            }
201        }
202    }
203
204    /// Try to dequeue a message (non-blocking)
205    ///
206    /// # Returns
207    /// Some(data) if message available, None if queue is empty
208    pub async fn try_dequeue(&self) -> Result<Vec<u8>> {
209        let mut rx = self.rx.lock().await;
210
211        match rx.try_recv() {
212            Ok(data) => {
213                drop(rx);
214                let mut stats = self.stats.lock().await;
215                stats.dequeued += 1;
216                stats.current_size = stats.current_size.saturating_sub(1);
217                Ok(data)
218            }
219            Err(mpsc::error::TryRecvError::Empty) => Err(IgtlError::Io(std::io::Error::new(
220                std::io::ErrorKind::WouldBlock,
221                "Queue empty",
222            ))),
223            Err(mpsc::error::TryRecvError::Disconnected) => Err(IgtlError::Io(
224                std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Queue closed"),
225            )),
226        }
227    }
228
229    /// Get current queue size
230    pub async fn size(&self) -> usize {
231        self.stats.lock().await.current_size
232    }
233
234    /// Get queue statistics
235    pub async fn stats(&self) -> QueueStats {
236        self.stats.lock().await.clone()
237    }
238
239    /// Check if queue is empty
240    pub async fn is_empty(&self) -> bool {
241        self.stats.lock().await.current_size == 0
242    }
243
244    /// Get queue configuration
245    pub fn config(&self) -> &QueueConfig {
246        &self.config
247    }
248}
249
250impl Default for MessageQueue {
251    fn default() -> Self {
252        Self::new()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[tokio::test]
261    async fn test_unbounded_queue() {
262        let queue = MessageQueue::with_config(QueueConfig::unbounded());
263
264        // Enqueue multiple messages
265        for i in 0..100 {
266            let data = vec![i as u8];
267            queue.enqueue(data).await.unwrap();
268        }
269
270        assert_eq!(queue.size().await, 100);
271
272        // Dequeue all messages
273        for i in 0..100 {
274            let data = queue.dequeue().await.unwrap();
275            assert_eq!(data, vec![i as u8]);
276        }
277
278        assert!(queue.is_empty().await);
279    }
280
281    #[tokio::test]
282    async fn test_bounded_queue() {
283        let queue = MessageQueue::with_config(QueueConfig::bounded(10));
284
285        // Fill queue
286        for i in 0..10 {
287            let data = vec![i as u8];
288            queue.enqueue(data).await.unwrap();
289        }
290
291        // Try to enqueue when full (should fail)
292        let result = queue.enqueue(vec![100]).await;
293        assert!(result.is_err());
294
295        // Dequeue one
296        let _ = queue.dequeue().await.unwrap();
297
298        // Now should succeed
299        let result = queue.enqueue(vec![100]).await;
300        assert!(result.is_ok());
301    }
302
303    #[tokio::test]
304    async fn test_bounded_drop_old() {
305        let queue = MessageQueue::with_config(QueueConfig::bounded_drop_old(5));
306
307        // Fill queue
308        for i in 0..5 {
309            let data = vec![i as u8];
310            queue.enqueue(data).await.unwrap();
311        }
312
313        // Enqueue more (should drop oldest)
314        for i in 5..10 {
315            let data = vec![i as u8];
316            queue.enqueue(data).await.unwrap();
317        }
318
319        // Queue should still be size 5
320        assert_eq!(queue.size().await, 5);
321
322        // First message should be 5 (0-4 were dropped)
323        let data = queue.dequeue().await.unwrap();
324        assert_eq!(data, vec![5]);
325
326        // Check stats
327        let stats = queue.stats().await;
328        assert_eq!(stats.enqueued, 10);
329        assert_eq!(stats.dropped, 5);
330    }
331
332    #[tokio::test]
333    async fn test_try_dequeue_empty() {
334        let queue = MessageQueue::new();
335
336        let result = queue.try_dequeue().await;
337        assert!(result.is_err());
338    }
339
340    #[tokio::test]
341    async fn test_queue_stats() {
342        let queue = MessageQueue::new();
343
344        // Enqueue 10 messages
345        for i in 0..10 {
346            queue.enqueue(vec![i]).await.unwrap();
347        }
348
349        // Dequeue 5 messages
350        for _ in 0..5 {
351            let _ = queue.dequeue().await.unwrap();
352        }
353
354        let stats = queue.stats().await;
355        assert_eq!(stats.enqueued, 10);
356        assert_eq!(stats.dequeued, 5);
357        assert_eq!(stats.current_size, 5);
358        assert_eq!(stats.peak_size, 10);
359    }
360
361    #[tokio::test]
362    async fn test_concurrent_access() {
363        let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(100)));
364
365        let queue_clone = queue.clone();
366        let producer = tokio::spawn(async move {
367            for i in 0..50 {
368                queue_clone.enqueue(vec![i as u8]).await.unwrap();
369                tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
370            }
371        });
372
373        let queue_clone = queue.clone();
374        let consumer = tokio::spawn(async move {
375            for _ in 0..50 {
376                let _ = queue_clone.dequeue().await.unwrap();
377                tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
378            }
379        });
380
381        producer.await.unwrap();
382        consumer.await.unwrap();
383
384        assert!(queue.is_empty().await);
385    }
386}