openigtlink_rust/io/
message_queue.rs1use crate::error::{IgtlError, Result};
6use std::sync::Arc;
7use tokio::sync::{mpsc, Mutex};
8use tracing::{debug, info, trace, warn};
9
10#[derive(Debug, Clone)]
12pub struct QueueConfig {
13 pub capacity: Option<usize>,
15 pub drop_on_full: bool,
17}
18
19impl Default for QueueConfig {
20 fn default() -> Self {
21 Self {
22 capacity: Some(1000), drop_on_full: false, }
25 }
26}
27
28impl QueueConfig {
29 pub fn unbounded() -> Self {
31 Self {
32 capacity: None,
33 drop_on_full: false,
34 }
35 }
36
37 pub fn bounded(capacity: usize) -> Self {
39 Self {
40 capacity: Some(capacity),
41 drop_on_full: false,
42 }
43 }
44
45 pub fn bounded_drop_old(capacity: usize) -> Self {
47 Self {
48 capacity: Some(capacity),
49 drop_on_full: true,
50 }
51 }
52}
53
54pub struct MessageQueue {
58 tx: mpsc::UnboundedSender<Vec<u8>>,
59 rx: Arc<Mutex<mpsc::UnboundedReceiver<Vec<u8>>>>,
60 config: QueueConfig,
61 stats: Arc<Mutex<QueueStats>>,
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct QueueStats {
67 pub enqueued: u64,
69 pub dequeued: u64,
71 pub dropped: u64,
73 pub current_size: usize,
75 pub peak_size: usize,
77}
78
79impl MessageQueue {
80 pub fn new() -> Self {
82 Self::with_config(QueueConfig::default())
83 }
84
85 pub fn with_config(config: QueueConfig) -> Self {
87 info!(
88 capacity = ?config.capacity,
89 drop_on_full = config.drop_on_full,
90 "Creating message queue"
91 );
92 let (tx, rx) = mpsc::unbounded_channel();
93
94 Self {
95 tx,
96 rx: Arc::new(Mutex::new(rx)),
97 config,
98 stats: Arc::new(Mutex::new(QueueStats::default())),
99 }
100 }
101
102 pub async fn enqueue(&self, data: Vec<u8>) -> Result<()> {
110 let mut stats = self.stats.lock().await;
111
112 if let Some(capacity) = self.config.capacity {
114 if stats.current_size >= capacity {
115 if self.config.drop_on_full {
116 warn!(
117 capacity = capacity,
118 current_size = stats.current_size,
119 "Queue full, dropping oldest message"
120 );
121 drop(stats); #[allow(clippy::redundant_pattern_matching)]
124 if let Ok(_) = self.try_dequeue().await {
125 stats = self.stats.lock().await;
126 stats.dropped += 1;
127 } else {
128 return Err(IgtlError::Io(std::io::Error::new(
129 std::io::ErrorKind::WouldBlock,
130 "Queue full and cannot drop oldest",
131 )));
132 }
133 } else {
134 debug!(
135 capacity = capacity,
136 current_size = stats.current_size,
137 "Queue full, rejecting enqueue"
138 );
139 return Err(IgtlError::Io(std::io::Error::new(
140 std::io::ErrorKind::WouldBlock,
141 "Queue full",
142 )));
143 }
144 }
145 }
146
147 let size = data.len();
148
149 self.tx.send(data).map_err(|_| {
151 warn!("Failed to enqueue: queue closed");
152 IgtlError::Io(std::io::Error::new(
153 std::io::ErrorKind::BrokenPipe,
154 "Queue closed",
155 ))
156 })?;
157
158 stats.enqueued += 1;
159 stats.current_size += 1;
160 if stats.current_size > stats.peak_size {
161 stats.peak_size = stats.current_size;
162 }
163
164 trace!(
165 size = size,
166 queue_size = stats.current_size,
167 "Message enqueued"
168 );
169
170 Ok(())
171 }
172
173 pub async fn dequeue(&self) -> Result<Vec<u8>> {
178 let mut rx = self.rx.lock().await;
179
180 match rx.recv().await {
181 Some(data) => {
182 let size = data.len();
183 drop(rx); let mut stats = self.stats.lock().await;
185 stats.dequeued += 1;
186 stats.current_size = stats.current_size.saturating_sub(1);
187 trace!(
188 size = size,
189 queue_size = stats.current_size,
190 "Message dequeued"
191 );
192 Ok(data)
193 }
194 None => {
195 warn!("Dequeue failed: queue closed");
196 Err(IgtlError::Io(std::io::Error::new(
197 std::io::ErrorKind::BrokenPipe,
198 "Queue closed",
199 )))
200 }
201 }
202 }
203
204 pub async fn try_dequeue(&self) -> Result<Vec<u8>> {
209 let mut rx = self.rx.lock().await;
210
211 match rx.try_recv() {
212 Ok(data) => {
213 drop(rx);
214 let mut stats = self.stats.lock().await;
215 stats.dequeued += 1;
216 stats.current_size = stats.current_size.saturating_sub(1);
217 Ok(data)
218 }
219 Err(mpsc::error::TryRecvError::Empty) => Err(IgtlError::Io(std::io::Error::new(
220 std::io::ErrorKind::WouldBlock,
221 "Queue empty",
222 ))),
223 Err(mpsc::error::TryRecvError::Disconnected) => Err(IgtlError::Io(
224 std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Queue closed"),
225 )),
226 }
227 }
228
229 pub async fn size(&self) -> usize {
231 self.stats.lock().await.current_size
232 }
233
234 pub async fn stats(&self) -> QueueStats {
236 self.stats.lock().await.clone()
237 }
238
239 pub async fn is_empty(&self) -> bool {
241 self.stats.lock().await.current_size == 0
242 }
243
244 pub fn config(&self) -> &QueueConfig {
246 &self.config
247 }
248}
249
250impl Default for MessageQueue {
251 fn default() -> Self {
252 Self::new()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[tokio::test]
261 async fn test_unbounded_queue() {
262 let queue = MessageQueue::with_config(QueueConfig::unbounded());
263
264 for i in 0..100 {
266 let data = vec![i as u8];
267 queue.enqueue(data).await.unwrap();
268 }
269
270 assert_eq!(queue.size().await, 100);
271
272 for i in 0..100 {
274 let data = queue.dequeue().await.unwrap();
275 assert_eq!(data, vec![i as u8]);
276 }
277
278 assert!(queue.is_empty().await);
279 }
280
281 #[tokio::test]
282 async fn test_bounded_queue() {
283 let queue = MessageQueue::with_config(QueueConfig::bounded(10));
284
285 for i in 0..10 {
287 let data = vec![i as u8];
288 queue.enqueue(data).await.unwrap();
289 }
290
291 let result = queue.enqueue(vec![100]).await;
293 assert!(result.is_err());
294
295 let _ = queue.dequeue().await.unwrap();
297
298 let result = queue.enqueue(vec![100]).await;
300 assert!(result.is_ok());
301 }
302
303 #[tokio::test]
304 async fn test_bounded_drop_old() {
305 let queue = MessageQueue::with_config(QueueConfig::bounded_drop_old(5));
306
307 for i in 0..5 {
309 let data = vec![i as u8];
310 queue.enqueue(data).await.unwrap();
311 }
312
313 for i in 5..10 {
315 let data = vec![i as u8];
316 queue.enqueue(data).await.unwrap();
317 }
318
319 assert_eq!(queue.size().await, 5);
321
322 let data = queue.dequeue().await.unwrap();
324 assert_eq!(data, vec![5]);
325
326 let stats = queue.stats().await;
328 assert_eq!(stats.enqueued, 10);
329 assert_eq!(stats.dropped, 5);
330 }
331
332 #[tokio::test]
333 async fn test_try_dequeue_empty() {
334 let queue = MessageQueue::new();
335
336 let result = queue.try_dequeue().await;
337 assert!(result.is_err());
338 }
339
340 #[tokio::test]
341 async fn test_queue_stats() {
342 let queue = MessageQueue::new();
343
344 for i in 0..10 {
346 queue.enqueue(vec![i]).await.unwrap();
347 }
348
349 for _ in 0..5 {
351 let _ = queue.dequeue().await.unwrap();
352 }
353
354 let stats = queue.stats().await;
355 assert_eq!(stats.enqueued, 10);
356 assert_eq!(stats.dequeued, 5);
357 assert_eq!(stats.current_size, 5);
358 assert_eq!(stats.peak_size, 10);
359 }
360
361 #[tokio::test]
362 async fn test_concurrent_access() {
363 let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(100)));
364
365 let queue_clone = queue.clone();
366 let producer = tokio::spawn(async move {
367 for i in 0..50 {
368 queue_clone.enqueue(vec![i as u8]).await.unwrap();
369 tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
370 }
371 });
372
373 let queue_clone = queue.clone();
374 let consumer = tokio::spawn(async move {
375 for _ in 0..50 {
376 let _ = queue_clone.dequeue().await.unwrap();
377 tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
378 }
379 });
380
381 producer.await.unwrap();
382 consumer.await.unwrap();
383
384 assert!(queue.is_empty().await);
385 }
386}