use openigtlink_rust::error::Result;
use openigtlink_rust::io::message_queue::{MessageQueue, QueueConfig};
use openigtlink_rust::protocol::message::IgtlMessage;
use openigtlink_rust::protocol::types::{TransformMessage, StatusMessage};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
println!("=== Message Queue Demo ===\n");
println!("📦 Example 1: Unbounded Queue");
{
let queue = MessageQueue::with_config(QueueConfig::unbounded());
for i in 0..100 {
let transform = TransformMessage::identity();
let msg = IgtlMessage::new(transform, &format!("Device{}", i))?;
let data = msg.encode()?;
queue.enqueue(data).await?;
}
println!(" Enqueued: 100 messages");
println!(" Queue size: {}", queue.size().await);
for _ in 0..10 {
let _ = queue.dequeue().await?;
}
println!(" After dequeuing 10: {}", queue.size().await);
let stats = queue.stats().await;
println!(" Stats:");
println!(" Enqueued: {}", stats.enqueued);
println!(" Dequeued: {}", stats.dequeued);
println!(" Peak size: {}", stats.peak_size);
}
println!();
println!("📦 Example 2: Bounded Queue (Blocking)");
{
let queue = MessageQueue::with_config(QueueConfig::bounded(10));
for i in 0..10 {
let status = StatusMessage::ok(&format!("Message {}", i));
let msg = IgtlMessage::new(status, "Device")?;
let data = msg.encode()?;
queue.enqueue(data).await?;
}
println!(" Queue full: {}/10", queue.size().await);
let status = StatusMessage::ok("Extra message");
let msg = IgtlMessage::new(status, "Device")?;
let data = msg.encode()?;
match queue.enqueue(data).await {
Ok(_) => println!(" ✅ Enqueued (unexpected)"),
Err(_) => println!(" ⚠️ Queue full - enqueue blocked (as expected)"),
}
let _ = queue.dequeue().await?;
println!(" Dequeued 1 message: {}/10", queue.size().await);
let status = StatusMessage::ok("New message");
let msg = IgtlMessage::new(status, "Device")?;
let data = msg.encode()?;
queue.enqueue(data).await?;
println!(" ✅ Enqueued successfully: {}/10", queue.size().await);
}
println!();
println!("📦 Example 3: Bounded Queue (Drop Oldest)");
{
let queue = MessageQueue::with_config(QueueConfig::bounded_drop_old(5));
for i in 0..5 {
let status = StatusMessage::ok(&format!("Message {}", i));
let msg = IgtlMessage::new(status, "Device")?;
let data = msg.encode()?;
queue.enqueue(data).await?;
}
println!(" Initial queue: 5/5");
for i in 5..10 {
let status = StatusMessage::ok(&format!("Message {}", i));
let msg = IgtlMessage::new(status, "Device")?;
let data = msg.encode()?;
queue.enqueue(data).await?;
}
println!(" After enqueueing 5 more: {}/5", queue.size().await);
let stats = queue.stats().await;
println!(" Stats:");
println!(" Enqueued: {}", stats.enqueued);
println!(" Dropped: {} (oldest messages)", stats.dropped);
println!(" Current: {}", stats.current_size);
let data = queue.dequeue().await?;
let msg = IgtlMessage::<StatusMessage>::decode(&data)?;
println!(" First message in queue: {:?}", msg.content);
}
println!();
println!("📦 Example 4: Producer-Consumer Pattern");
{
let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(50)));
let producer_queue = queue.clone();
let producer = tokio::spawn(async move {
for i in 0..100 {
let transform = TransformMessage::identity();
let msg = IgtlMessage::new(transform, &format!("Tracker{}", i))
.expect("Failed to create message");
let data = msg.encode().expect("Failed to encode");
tokio::time::sleep(Duration::from_millis(10)).await;
match producer_queue.enqueue(data).await {
Ok(_) => {},
Err(_) => println!(" Producer: Queue full, waiting..."),
}
}
println!(" Producer: Finished sending 100 messages");
});
let consumer_queue = queue.clone();
let consumer = tokio::spawn(async move {
let mut count = 0;
while count < 100 {
tokio::time::sleep(Duration::from_millis(15)).await;
match consumer_queue.try_dequeue().await {
Ok(_) => {
count += 1;
if count % 20 == 0 {
println!(" Consumer: Processed {} messages", count);
}
}
Err(_) => {
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
}
println!(" Consumer: Finished processing 100 messages");
});
producer.await.unwrap();
consumer.await.unwrap();
let stats = queue.stats().await;
println!(" Final stats:");
println!(" Enqueued: {}", stats.enqueued);
println!(" Dequeued: {}", stats.dequeued);
println!(" Peak size: {}", stats.peak_size);
}
println!();
println!("📦 Example 5: Multiple Producers, Single Consumer");
{
let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(100)));
let mut producers = vec![];
for producer_id in 0..3 {
let queue_clone = queue.clone();
let producer = tokio::spawn(async move {
for i in 0..30 {
let status = StatusMessage::ok(&format!("P{}-M{}", producer_id, i));
let msg = IgtlMessage::new(status, &format!("Producer{}", producer_id))
.expect("Failed to create message");
let data = msg.encode().expect("Failed to encode");
queue_clone.enqueue(data).await.ok();
tokio::time::sleep(Duration::from_millis(5)).await;
}
});
producers.push(producer);
}
let consumer_queue = queue.clone();
let consumer = tokio::spawn(async move {
let mut count = 0;
while count < 90 {
if let Ok(_) = consumer_queue.dequeue().await {
count += 1;
}
}
count
});
for producer in producers {
producer.await.unwrap();
}
let processed = consumer.await.unwrap();
println!(" Producers: 3 x 30 messages = 90 total");
println!(" Consumer processed: {} messages", processed);
let stats = queue.stats().await;
println!(" Queue stats:");
println!(" Peak size: {}", stats.peak_size);
println!(" Final size: {}", stats.current_size);
}
println!();
println!("📦 Example 6: Backpressure Management");
{
let queue = Arc::new(MessageQueue::with_config(QueueConfig::bounded(20)));
let producer_queue = queue.clone();
let producer = tokio::spawn(async move {
let mut blocked_count = 0;
for i in 0..50 {
let transform = TransformMessage::identity();
let msg = IgtlMessage::new(transform, "FastProducer")
.expect("Failed to create message");
let data = msg.encode().expect("Failed to encode");
loop {
match producer_queue.enqueue(data.clone()).await {
Ok(_) => break,
Err(_) => {
blocked_count += 1;
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
}
blocked_count
});
let consumer_queue = queue.clone();
let consumer = tokio::spawn(async move {
for _ in 0..50 {
let _ = consumer_queue.dequeue().await;
tokio::time::sleep(Duration::from_millis(20)).await;
}
});
let blocked_count = producer.await.unwrap();
consumer.await.unwrap();
println!(" Fast producer blocked {} times", blocked_count);
println!(" Backpressure successfully applied!");
let stats = queue.stats().await;
println!(" Peak queue size: {}/20", stats.peak_size);
}
println!("\n✅ All examples completed successfully!");
println!("\n💡 Key Takeaways:");
println!(" - Unbounded queues: No size limit, risk of memory exhaustion");
println!(" - Bounded queues: Fixed capacity, provides backpressure");
println!(" - Drop-old mode: Maintains latest data, useful for real-time systems");
println!(" - Producer-consumer: Decouples data production from processing");
Ok(())
}