use actix_rt;
use std::time::Duration;
use tokio::task::{self, LocalSet}; use vuo::{Stream, Topic};
#[derive(Debug, Clone)]
struct Message {
id: u32,
content: String,
}
fn main() {
let system = actix_rt::System::new();
system.block_on(async {
let local_set = LocalSet::new();
local_set.run_until(async {
println!("[Main] Topic Example: Starting (Actix System with inner LocalSet)");
let topic: Topic<Message> = Topic::new();
println!("[Main] Topic created.");
let topic_for_subscriber1 = topic.clone();
let subscriber1_handle = task::spawn_local(async move {
println!("[Subscriber 1] Subscribing to topic.");
let stream1: Stream<Message> = topic_for_subscriber1.subscribe();
println!("[Subscriber 1] Subscribed. Waiting for messages...");
match stream1.compile_to_list().await {
Ok(msgs) => {
println!("[Subscriber 1] Stream ended. Received {} messages:", msgs.len());
for msg in msgs {
println!("[Subscriber 1] > ID: {}, Content: '{}'", msg.id, msg.content);
}
}
Err(_) => eprintln!("[Subscriber 1] Stream processing failed or was closed prematurely."),
}
println!("[Subscriber 1] Finished.");
});
tokio::time::sleep(Duration::from_millis(100)).await;
let topic_for_publisher = topic.clone();
let publisher_handle = task::spawn_local(async move {
println!("[Publisher] Starting to publish first batch of messages...");
for i in 0..3 { let msg = Message { id: i, content: format!("Message Batch 1 - Item {}", i) };
println!("[Publisher] Publishing Msg ID: {}", msg.id);
topic_for_publisher.publish(msg);
tokio::time::sleep(Duration::from_millis(50)).await;
}
println!("[Publisher] First batch published. Spawning Subscriber 2...");
let topic_for_subscriber2 = topic_for_publisher.clone();
let subscriber2_handle = task::spawn_local(async move {
println!("[Subscriber 2] Subscribing to topic.");
let stream2: Stream<Message> = topic_for_subscriber2.subscribe();
println!("[Subscriber 2] Subscribed. Waiting for messages...");
match stream2.compile_to_list().await {
Ok(msgs) => {
println!("[Subscriber 2] Stream ended. Received {} messages:", msgs.len());
for msg in msgs {
println!("[Subscriber 2] > ID: {}, Content: '{}'", msg.id, msg.content);
}
}
Err(_) => eprintln!("[Subscriber 2] Stream processing failed or was closed prematurely."),
}
println!("[Subscriber 2] Finished.");
});
tokio::time::sleep(Duration::from_millis(100)).await;
println!("[Publisher] Publishing second batch of messages (Subscriber 2 should see these)...");
for i in 3..6 { let msg = Message { id: i, content: format!("Message Batch 2 - Item {}", i) };
println!("[Publisher] Publishing Msg ID: {}", msg.id);
topic_for_publisher.publish(msg);
tokio::time::sleep(Duration::from_millis(50)).await;
}
println!("[Publisher] All messages published. Closing topic.");
topic_for_publisher.close();
println!("[Publisher] Topic close signal sent.");
if let Err(e) = subscriber2_handle.await {
eprintln!("[Publisher] Subscriber 2 task panicked or failed: {:?}", e);
} else {
println!("[Publisher] Subscriber 2 task completed.");
}
println!("[Publisher] Publisher task finished.");
});
if let Err(e) = publisher_handle.await {
eprintln!("[Main] Publisher task panicked or failed: {:?}", e);
} else {
println!("[Main] Publisher task completed.");
}
if let Err(e) = subscriber1_handle.await {
eprintln!("[Main] Subscriber 1 task panicked or failed: {:?}", e);
} else {
println!("[Main] Subscriber 1 task completed.");
}
println!("\n[Main] Topic Example: Complete.");
println!("[Main] Expected behavior check:");
println!(" - Subscriber 1 should have received messages with IDs 0 through 5.");
println!(" - Subscriber 2 should have received messages with IDs 3 through 5.");
}).await; }); }