1use std::collections::HashMap;
2
3use crate::{Actor, Addr, Context, Handler, Message, Service, WeakSender, context::ContextID};
4
5pub struct Broker<T: Message<Response = ()>> {
52 subscribers: HashMap<ContextID, WeakSender<T>>,
53}
54
55impl<T: Message<Response = ()> + Clone> Broker<T> {
56 pub async fn publish(topic: T) -> crate::error::Result<()> {
58 Self::from_registry().await.publish(topic).await
59 }
60
61 pub async fn try_publish(topic: T) -> Option<crate::error::Result<()>> {
63 if let Some(broker) = Self::try_from_registry() {
64 Some(broker.publish(topic).await)
65 } else {
66 None
67 }
68 }
69
70 pub async fn subscribe(sender: WeakSender<T>) -> crate::error::Result<()> {
72 Self::from_registry().await.subscribe(sender).await
73 }
74}
75
76impl<T: Message<Response = ()>> Default for Broker<T> {
77 fn default() -> Self {
78 Broker {
79 subscribers: Default::default(),
80 }
81 }
82}
83
84impl<T: Message<Response = ()>> Actor for Broker<T> {}
85impl<T: Message<Response = ()>> Service for Broker<T> {}
86
87struct Publish<T: Message>(T);
88
89impl<T: Message> Message for Publish<T> {
90 type Response = ();
91}
92
93impl<T: Message<Response = ()> + Clone> Handler<Publish<T>> for Broker<T> {
94 async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Publish<T>) {
95 let live_subscribers = self
96 .subscribers
97 .values()
98 .filter_map(WeakSender::upgrade)
99 .collect::<Vec<_>>();
100 for subscriber in &live_subscribers {
101 if let Err(_error) = subscriber.send(msg.0.clone()).await {
102 }
104 }
105 log::trace!(
106 "published to {} subscribers, topic {:?}",
107 live_subscribers.len(),
108 std::any::type_name::<T>()
109 );
110
111 self.subscribers
112 .retain(|_, sender| sender.upgrade().is_some());
113 }
114}
115
116struct Subscribe<T: Message<Response = ()>>(WeakSender<T>);
117
118impl<T: Message<Response = ()>> Message for Subscribe<T> {
119 type Response = ();
120}
121
122struct Unsubscribe<T: Message<Response = ()>>(WeakSender<T>);
123
124impl<T: Message<Response = ()>> Message for Unsubscribe<T> {
125 type Response = ();
126}
127
128impl<T: Message<Response = ()> + Clone> Handler<Subscribe<T>> for Broker<T> {
129 async fn handle(&mut self, _ctx: &mut Context<Self>, Subscribe(sender): Subscribe<T>) {
130 self.subscribers.insert(sender.id, sender);
131 log::trace!("subscribed to topic {:?}", std::any::type_name::<T>());
132 }
133}
134
135impl<T: Message<Response = ()> + Clone> Handler<Unsubscribe<T>> for Broker<T> {
136 async fn handle(&mut self, _ctx: &mut Context<Self>, Unsubscribe(sender): Unsubscribe<T>) {
137 self.subscribers.remove(&sender.id);
138 log::trace!("unsubscribed to topic {:?}", std::any::type_name::<T>());
139 }
140}
141
142impl<T: Message<Response = ()> + Clone> Addr<Broker<T>> {
143 pub async fn publish(&self, msg: T) -> crate::error::Result<()> {
145 log::trace!("publishing to topic {:?}", std::any::type_name::<T>());
146 self.send(Publish(msg)).await
147 }
148
149 pub async fn subscribe(&self, sender: WeakSender<T>) -> crate::error::Result<()> {
151 log::debug!("subscribing to topic {:?}", std::any::type_name::<T>());
152 self.send(Subscribe(sender)).await
153 }
154
155 pub async fn unsubscribe(&self, sender: WeakSender<T>) -> crate::error::Result<()> {
157 self.send(Unsubscribe(sender)).await
158 }
159}
160
161#[cfg(test)]
162mod subscribe_publish_unsubscribe {
163 #![allow(clippy::unwrap_used)]
164
165 use futures::future::join;
166
167 use crate::{
168 Actor, Broker, Context, DynResult, Handler, Message, Service, prelude::Spawnable as _,
169 };
170
171 #[derive(Clone, Debug)]
172 struct Topic1(u32);
173 impl Message for Topic1 {
174 type Response = ();
175 }
176
177 #[derive(Default, Debug, PartialEq)]
178 struct Subscribing(Vec<u32>);
179
180 impl Actor for Subscribing {
181 async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
182 ctx.subscribe::<Topic1>().await?;
183 Ok(())
184 }
185 }
186
187 impl Handler<Topic1> for Subscribing {
188 async fn handle(&mut self, ctx: &mut Context<Self>, msg: Topic1) {
189 self.0.push(msg.0);
190 if self.0.len() == 2 {
191 ctx.stop().unwrap()
192 }
193 }
194 }
195
196 #[test_log::test(tokio::test)]
197 async fn publish_different_ways() -> DynResult<()> {
198 let mut subscriber1 = Subscribing::default().spawn_owning();
199 subscriber1.ping().await.unwrap();
200
201 let mut subscriber2 = Subscribing::default().spawn_owning();
202 subscriber2.ping().await.unwrap();
203
204 let ping_both = || join(subscriber1.ping(), subscriber2.ping());
205 let _ = ping_both().await;
206
207 let broker = Broker::from_registry().await;
208 broker.publish(Topic1(42)).await.unwrap();
209 broker.publish(Topic1(23)).await.unwrap();
210
211 let _ = ping_both().await;
212
213 assert_eq!(subscriber1.join().await, Some(Subscribing(vec![42, 23])));
214 assert_eq!(subscriber2.join().await, Some(Subscribing(vec![42, 23])));
215
216 Ok(())
217 }
218}