1use std::time::Duration;
2use xactor::*;
3
4#[xactor::main]
10async fn main() -> std::io::Result<()> {
11 let parent_addr = SubscriberParent::new().await.start().await.unwrap();
12 parent_addr.wait_for_stop().await;
13 Ok(())
14}
15
16struct SubscriberParent {
19 children_subscribers: Vec<Addr<Subscriber>>,
20 message_producer: Addr<MessageProducer>,
21}
22
23impl SubscriberParent {
24 async fn new() -> SubscriberParent {
25 SubscriberParent {
26 children_subscribers: Vec::new(),
27 message_producer: MessageProducer::new().start().await.unwrap(),
28 }
29 }
30}
31
32#[async_trait::async_trait]
33impl Actor for SubscriberParent {
34 async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
35 println!("Subscriber Parent Started");
36 let _ = ctx.address().send(InitializeChildSubscribers);
37 Ok(())
38 }
39}
40
41#[message]
42struct InitializeChildSubscribers;
43
44#[async_trait::async_trait]
45impl Handler<InitializeChildSubscribers> for SubscriberParent {
46 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: InitializeChildSubscribers) {
47 let message_producer_addr = self.message_producer.clone();
48 let dummy_ids: Vec<i32> = vec![1, 2, 3, 4, 5];
49 let children_unstarted_actors_vec = dummy_ids.into_iter().map(move |id| {
50 let id = id.clone();
51 let addr = message_producer_addr.clone();
52
53 Subscriber::new(id, addr)
54 });
55
56 let children_addr_vec = children_unstarted_actors_vec
57 .into_iter()
58 .map(|actor| async { actor.start().await.unwrap() });
59
60 let children_addr_vec = futures::future::join_all(children_addr_vec).await;
61
62 self.children_subscribers = children_addr_vec;
63 }
64}
65
66#[message]
67struct ClearChildSubscribers;
68
69#[async_trait::async_trait]
70impl Handler<ClearChildSubscribers> for SubscriberParent {
71 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ClearChildSubscribers) {
72 self.children_subscribers = Vec::new();
73 }
74}
75
76struct Subscriber {
79 id: i32,
80 message_producer_addr: Addr<MessageProducer>,
81}
82
83impl Subscriber {
84 fn new(id: i32, message_producer_addr: Addr<MessageProducer>) -> Subscriber {
85 Subscriber {
86 id,
87 message_producer_addr,
88 }
89 }
90}
91
92#[async_trait::async_trait]
93impl Actor for Subscriber {
94 async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
95 println!("Child Subscriber Started - id {:?}", self.id);
97 let self_sender = ctx.address().sender();
98 let _ = self.message_producer_addr.send(SubscribeToProducer {
99 sender: self_sender,
100 });
101 Ok(())
102 }
103}
104
105#[async_trait::async_trait]
106impl Handler<RandomMessage> for Subscriber {
107 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: RandomMessage) {
108 println!(
110 "Child Subscriber (id: {:?}) Handling RandomMessage body: {:?}",
111 self.id, msg.0
112 );
113 }
114}
115
116struct MessageProducer {
119 subscribers: Vec<Sender<RandomMessage>>,
120}
121
122impl MessageProducer {
123 fn new() -> MessageProducer {
124 MessageProducer {
125 subscribers: Vec::new(),
126 }
127 }
128}
129
130#[async_trait::async_trait]
131impl Actor for MessageProducer {
132 async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
133 println!("Message Producer Started");
135 ctx.send_interval(Broadcast, Duration::from_secs(2));
136 Ok(())
137 }
138}
139
140#[message]
141struct SubscribeToProducer {
142 sender: Sender<RandomMessage>,
143}
144
145#[message]
146#[derive(Clone)]
147struct Broadcast;
148
149#[message]
150#[derive(Clone)]
151struct RandomMessage(i32);
152
153#[async_trait::async_trait]
154impl Handler<Broadcast> for MessageProducer {
155 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: Broadcast) {
156 println!("Broadcasting");
158 let random_int: i32 = 20;
160 let broadcast_message = RandomMessage(random_int);
161 let _: Vec<_> = self
162 .subscribers
163 .iter()
164 .map(|subscriber| subscriber.send(broadcast_message.clone()))
165 .collect();
166 }
167}
168
169#[async_trait::async_trait]
170impl Handler<SubscribeToProducer> for MessageProducer {
171 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: SubscribeToProducer) {
172 println!("Recieved Subscription Request");
173 self.subscribers.push(msg.sender);
174 }
175}