subscriber/
subscriber.rs

1use std::time::Duration;
2use xactor::*;
3
4// This is a basic subscriber example to demonstrate usage of Sender
5// We have actor A - SubscriberParent, manages a vec of child subscribers and in this example, sets up the message producer
6// Actor B - (Child) Subscriber
7// Actor C - Message Producer (Being subscribed to by the child subscribers) - producing a RandomMessage every few seconds to be broadcast to subscribers
8
9#[xactor::main]
10async fn main() -> std::io::Result<()> {
11    let parent_addr = SubscriberParent::new().await.start().await.unwrap();
12    parent_addr.wait_for_stop().await;
13    Ok(())
14}
15
16// Subscriber Parent - A
17
18struct SubscriberParent {
19    children_subscribers: Vec<Addr<Subscriber>>,
20    message_producer: Addr<MessageProducer>,
21}
22
23impl SubscriberParent {
24    async fn new() -> SubscriberParent {
25        SubscriberParent {
26            children_subscribers: Vec::new(),
27            message_producer: MessageProducer::new().start().await.unwrap(),
28        }
29    }
30}
31
32#[async_trait::async_trait]
33impl Actor for SubscriberParent {
34    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
35        println!("Subscriber Parent Started");
36        let _ = ctx.address().send(InitializeChildSubscribers);
37        Ok(())
38    }
39}
40
41#[message]
42struct InitializeChildSubscribers;
43
44#[async_trait::async_trait]
45impl Handler<InitializeChildSubscribers> for SubscriberParent {
46    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: InitializeChildSubscribers) {
47        let message_producer_addr = self.message_producer.clone();
48        let dummy_ids: Vec<i32> = vec![1, 2, 3, 4, 5];
49        let children_unstarted_actors_vec = dummy_ids.into_iter().map(move |id| {
50            let id = id.clone();
51            let addr = message_producer_addr.clone();
52
53            Subscriber::new(id, addr)
54        });
55
56        let children_addr_vec = children_unstarted_actors_vec
57            .into_iter()
58            .map(|actor| async { actor.start().await.unwrap() });
59
60        let children_addr_vec = futures::future::join_all(children_addr_vec).await;
61
62        self.children_subscribers = children_addr_vec;
63    }
64}
65
66#[message]
67struct ClearChildSubscribers;
68
69#[async_trait::async_trait]
70impl Handler<ClearChildSubscribers> for SubscriberParent {
71    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ClearChildSubscribers) {
72        self.children_subscribers = Vec::new();
73    }
74}
75
76// (Child) Subscriber - B
77
78struct Subscriber {
79    id: i32,
80    message_producer_addr: Addr<MessageProducer>,
81}
82
83impl Subscriber {
84    fn new(id: i32, message_producer_addr: Addr<MessageProducer>) -> Subscriber {
85        Subscriber {
86            id,
87            message_producer_addr,
88        }
89    }
90}
91
92#[async_trait::async_trait]
93impl Actor for Subscriber {
94    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
95        // Send subscription request message to the Message Producer
96        println!("Child Subscriber Started - id {:?}", self.id);
97        let self_sender = ctx.address().sender();
98        let _ = self.message_producer_addr.send(SubscribeToProducer {
99            sender: self_sender,
100        });
101        Ok(())
102    }
103}
104
105#[async_trait::async_trait]
106impl Handler<RandomMessage> for Subscriber {
107    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: RandomMessage) {
108        // We just print out the random id, along with the actor's unique id
109        println!(
110            "Child Subscriber (id: {:?}) Handling RandomMessage body: {:?}",
111            self.id, msg.0
112        );
113    }
114}
115
116// Message Producer - C
117
118struct MessageProducer {
119    subscribers: Vec<Sender<RandomMessage>>,
120}
121
122impl MessageProducer {
123    fn new() -> MessageProducer {
124        MessageProducer {
125            subscribers: Vec::new(),
126        }
127    }
128}
129
130#[async_trait::async_trait]
131impl Actor for MessageProducer {
132    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
133        // Send broadcast message to self every 2 seconds
134        println!("Message Producer Started");
135        ctx.send_interval(Broadcast, Duration::from_secs(2));
136        Ok(())
137    }
138}
139
140#[message]
141struct SubscribeToProducer {
142    sender: Sender<RandomMessage>,
143}
144
145#[message]
146#[derive(Clone)]
147struct Broadcast;
148
149#[message]
150#[derive(Clone)]
151struct RandomMessage(i32);
152
153#[async_trait::async_trait]
154impl Handler<Broadcast> for MessageProducer {
155    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Broadcast) {
156        // Generate random number and broadcast that message to all subscribers
157        println!("Broadcasting");
158        // To avoid bringing in rand package for the sake of this example, we are hardcoding the "random" number
159        let random_int: i32 = 20;
160        let broadcast_message = RandomMessage(random_int);
161        let _: Vec<_> = self
162            .subscribers
163            .iter()
164            .map(|subscriber| subscriber.send(broadcast_message.clone()))
165            .collect();
166    }
167}
168
169#[async_trait::async_trait]
170impl Handler<SubscribeToProducer> for MessageProducer {
171    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: SubscribeToProducer) {
172        println!("Recieved Subscription Request");
173        self.subscribers.push(msg.sender);
174    }
175}