hannibal/
broker.rs

1use std::collections::HashMap;
2
3use crate::{Actor, Addr, Context, Handler, Message, Service, WeakSender, context::ContextID};
4
5/// Enables global subscriptions and message distribution.
6///
7/// The `Broker` is a service actor that allows actors to publish and subscribe to messages by type.
8///
9/// # Example
10/// ```
11/// # use futures::future::join;
12/// # use hannibal::{Broker, prelude::*};
13/// #[derive(Clone, Message)]
14/// struct Topic1(u32);
15///
16/// #[derive(Debug, Default, PartialEq)]
17/// struct Subscribing(Vec<u32>);
18///
19/// impl Actor for Subscribing {
20///     async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
21///         // subscribe to `Topic1`
22///         ctx.subscribe::<Topic1>().await?;
23///         Ok(())
24///     }
25/// }
26///
27/// impl Handler<Topic1> for Subscribing {
28///     async fn handle(&mut self, ctx: &mut Context<Self>, msg: Topic1) {
29///         self.0.push(msg.0);
30///         # if self.0.len() == 2 {
31///         #     ctx.stop().unwrap()
32///         # }
33///     }
34/// }
35///
36/// # #[hannibal::main]
37/// # async fn main() {
38/// let mut subscriber1 = Subscribing::default().spawn_owning();
39/// let mut subscriber2 = Subscribing::default().spawn_owning();
40/// # subscriber1.ping().await.unwrap();
41/// # subscriber2.ping().await.unwrap();
42///
43/// let broker = Broker::from_registry().await;
44/// broker.publish(Topic1(42)).await.unwrap();
45/// broker.publish(Topic1(23)).await.unwrap();
46///
47/// assert_eq!(subscriber1.join().await, Some(Subscribing(vec![42, 23])));
48/// assert_eq!(subscriber2.join().await, Some(Subscribing(vec![42, 23])));
49/// # }
50/// ```
51pub struct Broker<T: Message<Response = ()>> {
52    subscribers: HashMap<ContextID, WeakSender<T>>,
53}
54
55impl<T: Message<Response = ()> + Clone> Broker<T> {
56    /// Publishes a message to all subscribers.
57    pub async fn publish(topic: T) -> crate::error::Result<()> {
58        Self::from_registry().await.publish(topic).await
59    }
60
61    /// Tries to publish a message to the broker.
62    pub async fn try_publish(topic: T) -> Option<crate::error::Result<()>> {
63        if let Some(broker) = Self::try_from_registry() {
64            Some(broker.publish(topic).await)
65        } else {
66            None
67        }
68    }
69
70    /// Subscribes to messages of the given type.
71    pub async fn subscribe(sender: WeakSender<T>) -> crate::error::Result<()> {
72        Self::from_registry().await.subscribe(sender).await
73    }
74}
75
76impl<T: Message<Response = ()>> Default for Broker<T> {
77    fn default() -> Self {
78        Broker {
79            subscribers: Default::default(),
80        }
81    }
82}
83
84impl<T: Message<Response = ()>> Actor for Broker<T> {}
85impl<T: Message<Response = ()>> Service for Broker<T> {}
86
87struct Publish<T: Message>(T);
88
89impl<T: Message> Message for Publish<T> {
90    type Response = ();
91}
92
93impl<T: Message<Response = ()> + Clone> Handler<Publish<T>> for Broker<T> {
94    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Publish<T>) {
95        let live_subscribers = self
96            .subscribers
97            .values()
98            .filter_map(WeakSender::upgrade)
99            .collect::<Vec<_>>();
100        for subscriber in &live_subscribers {
101            if let Err(_error) = subscriber.send(msg.0.clone()).await {
102                // log::warn!("Failed to send message to subscriber: {:?}", error)
103            }
104        }
105        log::trace!(
106            "published to {} subscribers, topic {:?}",
107            live_subscribers.len(),
108            std::any::type_name::<T>()
109        );
110
111        self.subscribers
112            .retain(|_, sender| sender.upgrade().is_some());
113    }
114}
115
116struct Subscribe<T: Message<Response = ()>>(WeakSender<T>);
117
118impl<T: Message<Response = ()>> Message for Subscribe<T> {
119    type Response = ();
120}
121
122struct Unsubscribe<T: Message<Response = ()>>(WeakSender<T>);
123
124impl<T: Message<Response = ()>> Message for Unsubscribe<T> {
125    type Response = ();
126}
127
128impl<T: Message<Response = ()> + Clone> Handler<Subscribe<T>> for Broker<T> {
129    async fn handle(&mut self, _ctx: &mut Context<Self>, Subscribe(sender): Subscribe<T>) {
130        self.subscribers.insert(sender.id, sender);
131        log::trace!("subscribed to topic {:?}", std::any::type_name::<T>());
132    }
133}
134
135impl<T: Message<Response = ()> + Clone> Handler<Unsubscribe<T>> for Broker<T> {
136    async fn handle(&mut self, _ctx: &mut Context<Self>, Unsubscribe(sender): Unsubscribe<T>) {
137        self.subscribers.remove(&sender.id);
138        log::trace!("unsubscribed to topic {:?}", std::any::type_name::<T>());
139    }
140}
141
142impl<T: Message<Response = ()> + Clone> Addr<Broker<T>> {
143    /// Publishes a message to all subscribers.
144    pub async fn publish(&self, msg: T) -> crate::error::Result<()> {
145        log::trace!("publishing to topic {:?}", std::any::type_name::<T>());
146        self.send(Publish(msg)).await
147    }
148
149    /// Subscribes to messages of the given type.
150    pub async fn subscribe(&self, sender: WeakSender<T>) -> crate::error::Result<()> {
151        log::debug!("subscribing to topic {:?}", std::any::type_name::<T>());
152        self.send(Subscribe(sender)).await
153    }
154
155    /// Unsubscribes from messages of the given type.
156    pub async fn unsubscribe(&self, sender: WeakSender<T>) -> crate::error::Result<()> {
157        self.send(Unsubscribe(sender)).await
158    }
159}
160
161#[cfg(test)]
162mod subscribe_publish_unsubscribe {
163    #![allow(clippy::unwrap_used)]
164
165    use futures::future::join;
166
167    use crate::{
168        Actor, Broker, Context, DynResult, Handler, Message, Service, prelude::Spawnable as _,
169    };
170
171    #[derive(Clone, Debug)]
172    struct Topic1(u32);
173    impl Message for Topic1 {
174        type Response = ();
175    }
176
177    #[derive(Default, Debug, PartialEq)]
178    struct Subscribing(Vec<u32>);
179
180    impl Actor for Subscribing {
181        async fn started(&mut self, ctx: &mut Context<Self>) -> DynResult<()> {
182            ctx.subscribe::<Topic1>().await?;
183            Ok(())
184        }
185    }
186
187    impl Handler<Topic1> for Subscribing {
188        async fn handle(&mut self, ctx: &mut Context<Self>, msg: Topic1) {
189            self.0.push(msg.0);
190            if self.0.len() == 2 {
191                ctx.stop().unwrap()
192            }
193        }
194    }
195
196    #[test_log::test(tokio::test)]
197    async fn publish_different_ways() -> DynResult<()> {
198        let mut subscriber1 = Subscribing::default().spawn_owning();
199        subscriber1.ping().await.unwrap();
200
201        let mut subscriber2 = Subscribing::default().spawn_owning();
202        subscriber2.ping().await.unwrap();
203
204        let ping_both = || join(subscriber1.ping(), subscriber2.ping());
205        let _ = ping_both().await;
206
207        let broker = Broker::from_registry().await;
208        broker.publish(Topic1(42)).await.unwrap();
209        broker.publish(Topic1(23)).await.unwrap();
210
211        let _ = ping_both().await;
212
213        assert_eq!(subscriber1.join().await, Some(Subscribing(vec![42, 23])));
214        assert_eq!(subscriber2.join().await, Some(Subscribing(vec![42, 23])));
215
216        Ok(())
217    }
218}