use acty::{Actor, ActorExt, AsyncClose, UnboundedOutbox};
use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::pin::pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
struct Subscriber {
id: usize,
msg_count: Arc<AtomicUsize>,
}
impl Actor for Subscriber {
type Message = String;
async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
let mut inbox = pin!(inbox);
println!("[Subscriber {}]: Online and listening.", self.id);
while let Some(msg) = inbox.next().await {
println!("[Subscriber {}]: Received message: '{}'", self.id, msg);
self.msg_count.fetch_add(1, Ordering::SeqCst);
}
println!("[Subscriber {}]: Shutting down.", self.id);
}
}
struct TopicActor;
enum TopicMessage {
Publish(String),
Subscribe {
id: usize,
subscriber: UnboundedOutbox<String>,
},
Unsubscribe(usize),
}
impl Actor for TopicActor {
type Message = TopicMessage;
async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
let mut inbox = pin!(inbox);
let mut subscribers: HashMap<usize, UnboundedOutbox<String>> = HashMap::new();
println!("[Topic]: Online.");
while let Some(msg) = inbox.next().await {
match msg {
TopicMessage::Publish(text) => {
println!("[Topic]: Publishing message: '{}'", text);
for outbox in subscribers.values() {
let _ = outbox.send(text.clone());
}
}
TopicMessage::Subscribe { id, subscriber } => {
println!("[Topic]: New subscription from ID {}.", id);
subscribers.insert(id, subscriber);
}
TopicMessage::Unsubscribe(id) => {
println!("[Topic]: Unsubscribe request for ID {}.", id);
if subscribers.remove(&id).is_some() {
println!("[Topic]: Subscriber {} removed.", id);
}
}
}
}
println!("[Topic]: Shutting down.");
}
}
#[tokio::main]
async fn main() {
let topic = TopicActor.start();
let mut subscriber_outboxes = Vec::new();
let mut msg_counts = Vec::new();
for i in 1..=3 {
let count = Arc::new(AtomicUsize::new(0));
let subscriber = Subscriber {
id: i,
msg_count: count.clone(),
};
let outbox = subscriber.start();
topic
.send(TopicMessage::Subscribe {
id: i,
subscriber: outbox.clone(),
})
.unwrap();
subscriber_outboxes.push(outbox);
msg_counts.push(count);
}
tokio::time::sleep(Duration::from_millis(20)).await;
topic
.send(TopicMessage::Publish("Hello everyone!".to_string()))
.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
println!("\nClosing subscriber 2...\n");
let subscriber_2_id = 2;
topic
.send(TopicMessage::Unsubscribe(subscriber_2_id))
.unwrap();
let sub2_outbox = subscriber_outboxes.remove(1); sub2_outbox.close().await;
topic
.send(TopicMessage::Publish("Is anyone still here?".to_string()))
.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
topic.close().await;
for outbox in subscriber_outboxes {
outbox.close().await;
}
println!("\n--- Verification ---");
assert_eq!(msg_counts[0].load(Ordering::SeqCst), 2);
println!(
"Subscriber 1 received {} messages. (Correct)",
msg_counts[0].load(Ordering::SeqCst)
);
assert_eq!(msg_counts[1].load(Ordering::SeqCst), 1);
println!(
"Subscriber 2 received {} message. (Correct)",
msg_counts[1].load(Ordering::SeqCst)
);
assert_eq!(msg_counts[2].load(Ordering::SeqCst), 2);
println!(
"Subscriber 3 received {} messages. (Correct)",
msg_counts[2].load(Ordering::SeqCst)
);
}