actix_broker/
broker.rs

1use actix::prelude::*;
2use ahash::AHasher;
3use log::trace;
4
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::hash::BuildHasherDefault;
8use std::marker::PhantomData;
9
10use crate::msgs::*;
11
12type TypeMap<A> = HashMap<TypeId, A, BuildHasherDefault<AHasher>>;
13
14#[derive(Default)]
15pub struct Broker<T> {
16    sub_map: TypeMap<Vec<(TypeId, Box<dyn Any>)>>,
17    msg_map: TypeMap<Box<dyn Any>>,
18    _t: PhantomData<T>,
19}
20
21#[derive(Default)]
22pub struct SystemBroker;
23
24#[derive(Default)]
25pub struct ArbiterBroker;
26
27/// The system service actor that keeps track of subscriptions and routes messages to them.
28impl Broker<SystemBroker> {
29    /// Send messages asynchronously via the broker. It can be called from with
30    /// actors with a `SyncContext`, or where you don't have access to `self`. e.g. From within
31    /// a `HttpHandler` from `actix-web`.
32    pub fn issue_async<M: BrokerMsg>(msg: M) {
33        let broker = Self::from_registry();
34        broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
35    }
36}
37
38/// The system service actor that keeps track of subscriptions and routes messages to them.
39impl Broker<ArbiterBroker> {
40    /// Send messages asynchronously via the broker. It can be called from with
41    /// actors with a `SyncContext`, or where you don't have access to `self`. e.g. From within
42    /// a `HttpHandler` from `actix-web`.
43    pub fn issue_async<M: BrokerMsg>(msg: M) {
44        let broker = Self::from_registry();
45        broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
46    }
47}
48
49/// The system service actor that keeps track of subscriptions and routes messages to them.
50impl<T> Broker<T> {
51    fn take_subs<M: BrokerMsg>(&mut self) -> Option<Vec<(TypeId, Recipient<M>)>> {
52        let id = TypeId::of::<M>();
53        let subs = self.sub_map.get_mut(&id)?;
54        trace!("Broker: Found subscription list for {:?}.", id);
55        let subs = subs
56            .drain(..)
57            .filter_map(|(id, s)| {
58                if let Ok(rec) = s.downcast::<Recipient<M>>() {
59                    Some((id, rec))
60                } else {
61                    None
62                }
63            })
64            .map(|(id, s)| (id, *s))
65            .collect();
66        Some(subs)
67    }
68
69    fn add_sub<M: BrokerMsg>(&mut self, sub: Recipient<M>, id: TypeId) {
70        let msg_id = TypeId::of::<M>();
71        let boxed = Box::new(sub);
72        if let Some(subs) = self.sub_map.get_mut(&msg_id) {
73            trace!("Broker: Adding to {:?} subscription list.", msg_id);
74            subs.push((id, boxed));
75            return;
76        }
77
78        trace!("Broker: Creating {:?} subscription list.", msg_id);
79        self.sub_map.insert(msg_id, vec![(id, boxed)]);
80    }
81
82    fn get_previous_msg<M: BrokerMsg>(&self) -> Option<M> {
83        let id = TypeId::of::<M>();
84        let msg = self.msg_map.get(&id)?;
85        trace!("Broker: Previous message found for {:?}", id);
86        let msg = msg.downcast_ref::<M>()?;
87        Some(msg.clone())
88    }
89
90    fn set_msg<M: BrokerMsg>(&mut self, msg: M) {
91        let id = TypeId::of::<M>();
92        let boxed = Box::new(msg);
93        if let Some(pm) = self.msg_map.get_mut(&id) {
94            trace!("Broker: Setting new message value for {:?}", id);
95            *pm = boxed;
96            return;
97        }
98
99        trace!("Broker: Adding first message value for {:?}", id);
100        self.msg_map.insert(id, boxed);
101    }
102}
103
104impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeAsync<M>> for Broker<T> {
105    type Result = ();
106
107    fn handle(&mut self, msg: SubscribeAsync<M>, _ctx: &mut Context<Self>) {
108        trace!("Broker: Received SubscribeAsync");
109        self.add_sub::<M>(msg.0, msg.1);
110    }
111}
112
113impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeSync<M>> for Broker<T> {
114    type Result = Option<M>;
115
116    fn handle(&mut self, msg: SubscribeSync<M>, _ctx: &mut Context<Self>) -> Self::Result {
117        trace!("Broker: Received SubscribeSync");
118        self.add_sub::<M>(msg.0, msg.1);
119        self.get_previous_msg::<M>()
120    }
121}
122
123impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueAsync<M>> for Broker<T> {
124    type Result = ();
125
126    fn handle(&mut self, msg: IssueAsync<M>, _ctx: &mut Context<Self>) {
127        trace!("Broker: Received IssueAsync");
128        if let Some(mut subs) = self.take_subs::<M>() {
129            subs.drain(..).for_each(|(id, s)| {
130                if id == msg.1 {
131                    self.add_sub::<M>(s, id);
132                } else {
133                    match s.try_send(msg.0.clone()) {
134                        Ok(_) => self.add_sub::<M>(s, id),
135                        Err(SendError::Full(_)) => {
136                            // Ensure that that the message is delivered even if the mailbox is full.
137                            // We do a try first to remove receiver that have closed their mailbox.
138                            s.do_send(msg.0.clone());
139                            self.add_sub::<M>(s, id);
140                        }
141                        Err(_) => (),
142                    }
143                }
144            });
145        }
146        self.set_msg::<M>(msg.0);
147    }
148}
149
150impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueSync<M>> for Broker<T> {
151    type Result = ();
152
153    fn handle(&mut self, msg: IssueSync<M>, ctx: &mut Context<Self>) {
154        trace!("Broker: Received IssueSync");
155        if let Some(mut subs) = self.take_subs::<M>() {
156            subs.drain(..).for_each(|(id, s)| {
157                if id == msg.1 {
158                    self.add_sub::<M>(s, id);
159                } else {
160                    s.send(msg.0.clone())
161                        .into_actor(self)
162                        .map(move |_, act, _| act.add_sub::<M>(s, id))
163                        .wait(ctx);
164                }
165            });
166        }
167        self.set_msg::<M>(msg.0);
168    }
169}
170
171impl<T: 'static + Unpin> Actor for Broker<T> {
172    type Context = Context<Self>;
173}
174
175impl SystemService for Broker<SystemBroker> {}
176impl Supervised for Broker<SystemBroker> {}
177
178impl ArbiterService for Broker<ArbiterBroker> {}
179impl Supervised for Broker<ArbiterBroker> {}
180
181pub trait RegisteredBroker: 'static + Unpin
182where
183    Self: std::marker::Sized,
184{
185    fn get_broker() -> Addr<Broker<Self>>;
186}
187
188impl RegisteredBroker for SystemBroker {
189    fn get_broker() -> Addr<Broker<Self>> {
190        Broker::<SystemBroker>::from_registry()
191    }
192}
193
194impl RegisteredBroker for ArbiterBroker {
195    fn get_broker() -> Addr<Broker<Self>> {
196        Broker::<ArbiterBroker>::from_registry()
197    }
198}