openigtlink_rust/io/
message_queue.rs1use std::sync::Arc;
6use tokio::sync::{mpsc, Mutex};
7use tracing::{debug, info, trace, warn};
8use crate::error::{IgtlError, Result};
9
10#[derive(Debug, Clone)]
12pub struct QueueConfig {
13 pub capacity: Option<usize>,
15 pub drop_on_full: bool,
17}
18
19impl Default for QueueConfig {
20 fn default() -> Self {
21 Self {
22 capacity: Some(1000), drop_on_full: false, }
25 }
26}
27
28impl QueueConfig {
29 pub fn unbounded() -> Self {
31 Self {
32 capacity: None,
33 drop_on_full: false,
34 }
35 }
36
37 pub fn bounded(capacity: usize) -> Self {
39 Self {
40 capacity: Some(capacity),
41 drop_on_full: false,
42 }
43 }
44
45 pub fn bounded_drop_old(capacity: usize) -> Self {
47 Self {
48 capacity: Some(capacity),
49 drop_on_full: true,
50 }
51 }
52}
53
54pub struct MessageQueue {
58 tx: mpsc::UnboundedSender<Vec<u8>>,
59 rx: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
60 config: QueueConfig,
61 stats: Arc<Mutex<QueueStats>>,
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct QueueStats {
67 pub enqueued: u64,
69 pub dequeued: u64,
71 pub dropped: u64,
73 pub current_size: usize,
75 pub peak_size: usize,
77}
78
79impl MessageQueue {
80 pub fn new() -> Self {
82 Self::with_config(QueueConfig::default())
83 }
84
85 pub fn with_config(config: QueueConfig) -> Self {
87 info!(
88 capacity = ?config.capacity,
89 drop_on_full = config.drop_on_full,
90 "Creating message queue"
91 );
92 let (tx, rx) = mpsc::unbounded_channel();
93
94 Self {
95 tx,
96 rx: Arc::new(Mutex::new(rx)),
97 config,
98 stats: Arc::new(Mutex::new(QueueStats::default())),
99 }
100 }
101
102 pub async fn enqueue(&self, data: Vec<u8>) -> Result<()> {
110 let mut stats = self.stats.lock().await;
111
112 if let Some(capacity) = self.config.capacity {
114 if stats.current_size >= capacity {
115 if self.config.drop_on_full {
116 warn!(
117 capacity = capacity,
118 current_size = stats.current_size,
119 "Queue full, dropping oldest message"
120 );
121 drop(stats); if let Ok(_) = self.try_dequeue().await {
124 stats = self.stats.lock().await;
125 stats.dropped += 1;
126 } else {
127 return Err(IgtlError::Io(std::io::Error::new(
128 std::io::ErrorKind::WouldBlock,
129 "Queue full and cannot drop oldest",
130 )));
131 }
132 } else {
133 debug!(
134 capacity = capacity,
135 current_size = stats.current_size,
136 "Queue full, rejecting enqueue"
137 );
138 return Err(IgtlError::Io(std::io::Error::new(
139 std::io::ErrorKind::WouldBlock,
140 "Queue full",
141 )));
142 }
143 }
144 }
145
146 let size = data.len();
147
148 self.tx.send(data).map_err(|_| {
150 warn!("Failed to enqueue: queue closed");
151 IgtlError::Io(std::io::Error::new(
152 std::io::ErrorKind::BrokenPipe,
153 "Queue closed",
154 ))
155 })?;
156
157 stats.enqueued += 1;
158 stats.current_size += 1;
159 if stats.current_size > stats.peak_size {
160 stats.peak_size = stats.current_size;
161 }
162
163 trace!(
164 size = size,
165 queue_size = stats.current_size,
166 "Message enqueued"
167 );
168
169 Ok(())
170 }
171
172 pub async fn dequeue(&self) -> Result<Vec<u8>> {
177 let mut rx = self.rx.lock().await;
178
179 match rx.recv().await {
180 Some(data) => {
181 let size = data.len();
182 drop(rx); let mut stats = self.stats.lock().await;
184 stats.dequeued += 1;
185 stats.current_size = stats.current_size.saturating_sub(1);
186 trace!(
187 size = size,
188 queue_size = stats.current_size,
189 "Message dequeued"
190 );
191 Ok(data)
192 }
193 None => {
194 warn!("Dequeue failed: queue closed");
195 Err(IgtlError::Io(std::io::Error::new(
196 std::io::ErrorKind::BrokenPipe,
197 "Queue closed",
198 )))
199 }
200 }
201 }
202
203 pub async fn try_dequeue(&self) -> Result<Vec<u8>> {
208 let mut rx = self.rx.lock().await;
209
210 match rx.try_recv() {
211 Ok(data) => {
212 drop(rx);
213 let mut stats = self.stats.lock().await;
214 stats.dequeued += 1;
215 stats.current_size = stats.current_size.saturating_sub(1);
216 Ok(data)
217 }
218 Err(mpsc::error::TryRecvError::Empty) => {
219 Err(IgtlError::Io(std::io::Error::new(
220 std::io::ErrorKind::WouldBlock,
221 "Queue empty",
222 )))
223 }
224 Err(mpsc::error::TryRecvError::Disconnected) => {
225 Err(IgtlError::Io(std::io::Error::new(
226 std::io::ErrorKind::BrokenPipe,
227 "Queue closed",
228 )))
229 }
230 }
231 }
232
233 pub async fn size(&self) -> usize {
235 self.stats.lock().await.current_size
236 }
237
238 pub async fn stats(&self) -> QueueStats {
240 self.stats.lock().await.clone()
241 }
242
243 pub async fn is_empty(&self) -> bool {
245 self.stats.lock().await.current_size == 0
246 }
247
248 pub fn config(&self) -> &QueueConfig {
250 &self.config
251 }
252}
253
254impl Default for MessageQueue {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[tokio::test]
265 async fn test_unbounded_queue() {
266 let queue = MessageQueue::with_config(QueueConfig::unbounded());
267
268 for i in 0..100 {
270 let data = vec![i as u8];
271 queue.enqueue(data).await.unwrap();
272 }
273
274 assert_eq!(queue.size().await, 100);
275
276 for i in 0..100 {
278 let data = queue.dequeue().await.unwrap();
279 assert_eq!(data, vec![i as u8]);
280 }
281
282 assert!(queue.is_empty().await);
283 }
284
285 #[tokio::test]
286 async fn test_bounded_queue() {
287 let queue = MessageQueue::with_config(QueueConfig::bounded(10));
288
289 for i in 0..10 {
291 let data = vec![i as u8];
292 queue.enqueue(data).await.unwrap();
293 }
294
295 let result = queue.enqueue(vec![100]).await;
297 assert!(result.is_err());
298
299 let _ = queue.dequeue().await.unwrap();
301
302 let result = queue.enqueue(vec![100]).await;
304 assert!(result.is_ok());
305 }
306
307 #[tokio::test]
308 async fn test_bounded_drop_old() {
309 let queue = MessageQueue::with_config(QueueConfig::bounded_drop_old(5));
310
311 for i in 0..5 {
313 let data = vec![i as u8];
314 queue.enqueue(data).await.unwrap();
315 }
316
317 for i in 5..10 {
319 let data = vec![i as u8];
320 queue.enqueue(data).await.unwrap();
321 }
322
323 assert_eq!(queue.size().await, 5);
325
326 let data = queue.dequeue().await.unwrap();
328 assert_eq!(data, vec![5]);
329
330 let stats = queue.stats().await;
332 assert_eq!(stats.enqueued, 10);
333 assert_eq!(stats.dropped, 5);
334 }
335
336 #[tokio::test]
337 async fn test_try_dequeue_empty() {
338 let queue = MessageQueue::new();
339
340 let result = queue.try_dequeue().await;
341 assert!(result.is_err());
342 }
343
344 #[tokio::test]
345 async fn test_queue_stats() {
346 let queue = MessageQueue::new();
347
348 for i in 0..10 {
350 queue.enqueue(vec![i]).await.unwrap();
351 }
352
353 for _ in 0..5 {
355 let _ = queue.dequeue().await.unwrap();
356 }
357
358 let stats = queue.stats().await;
359 assert_eq!(stats.enqueued, 10);
360 assert_eq!(stats.dequeued, 5);
361 assert_eq!(stats.current_size, 5);
362 assert_eq!(stats.peak_size, 10);
363 }
364
365 #[tokio::test]
366 async fn test_concurrent_access() {
367 let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(100)));
368
369 let queue_clone = queue.clone();
370 let producer = tokio::spawn(async move {
371 for i in 0..50 {
372 queue_clone.enqueue(vec![i as u8]).await.unwrap();
373 tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
374 }
375 });
376
377 let queue_clone = queue.clone();
378 let consumer = tokio::spawn(async move {
379 for _ in 0..50 {
380 let _ = queue_clone.dequeue().await.unwrap();
381 tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
382 }
383 });
384
385 producer.await.unwrap();
386 consumer.await.unwrap();
387
388 assert!(queue.is_empty().await);
389 }
390}