actix_broker/
broker.rs

1use std::{
2    any::{Any, TypeId},
3    collections::HashMap,
4    hash::BuildHasherDefault,
5    marker::PhantomData,
6};
7
8use actix::prelude::*;
9use ahash::AHasher;
10use log::trace;
11
12use crate::msgs::*;
13
14type TypeMap<A> = HashMap<TypeId, A, BuildHasherDefault<AHasher>>;
15
16#[derive(Default)]
17pub struct Broker<T> {
18    sub_map: TypeMap<Vec<(TypeId, Box<dyn Any>)>>,
19    msg_map: TypeMap<Box<dyn Any>>,
20    _t: PhantomData<T>,
21}
22
23#[derive(Default)]
24pub struct SystemBroker;
25
26#[derive(Default)]
27pub struct ArbiterBroker;
28
29/// The system service actor that keeps track of subscriptions and routes messages to them.
30impl Broker<SystemBroker> {
31    /// Send messages asynchronously via the broker. It can be called from with
32    /// actors with a `SyncContext`, or where you don't have access to `self`. e.g. From within
33    /// a `HttpHandler` from `actix-web`.
34    pub fn issue_async<M: BrokerMsg>(msg: M) {
35        let broker = Self::from_registry();
36        broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
37    }
38}
39
40/// The system service actor that keeps track of subscriptions and routes messages to them.
41impl Broker<ArbiterBroker> {
42    /// Send messages asynchronously via the broker. It can be called from with
43    /// actors with a `SyncContext`, or where you don't have access to `self`. e.g. From within
44    /// a `HttpHandler` from `actix-web`.
45    pub fn issue_async<M: BrokerMsg>(msg: M) {
46        let broker = Self::from_registry();
47        broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
48    }
49}
50
51/// The system service actor that keeps track of subscriptions and routes messages to them.
52impl<T> Broker<T> {
53    fn get_subs<M: BrokerMsg>(
54        &self,
55    ) -> Option<impl Iterator<Item = (usize, (&TypeId, &Recipient<M>))>> {
56        let id = TypeId::of::<M>();
57        let msg_subs = self.sub_map.get(&id)?;
58
59        trace!("Broker: Found subscription list for {:?}.", id);
60
61        let iter = msg_subs
62            .iter()
63            .enumerate()
64            .map(|(idx, (actor_id, recipient))| {
65                let typed_recipient = recipient
66                    .downcast_ref::<Recipient<M>>()
67                    .expect("Message type should always be M");
68                (idx, (actor_id, typed_recipient))
69            });
70        Some(iter)
71    }
72
73    fn add_sub<M: BrokerMsg>(&mut self, sub: Recipient<M>, id: TypeId) {
74        let msg_id = TypeId::of::<M>();
75        let boxed = Box::new(sub);
76        if let Some(subs) = self.sub_map.get_mut(&msg_id) {
77            trace!("Broker: Adding to {:?} subscription list.", msg_id);
78            subs.push((id, boxed));
79            return;
80        }
81
82        trace!("Broker: Creating {:?} subscription list.", msg_id);
83        self.sub_map.insert(msg_id, vec![(id, boxed)]);
84    }
85
86    fn remove_subs<M: BrokerMsg>(&mut self, indexes: &[usize]) {
87        if indexes.is_empty() {
88            return;
89        }
90
91        let msg_id = TypeId::of::<M>();
92        if let Some(subscribers) = self.sub_map.get_mut(&msg_id) {
93            trace!("Broker: Removing {:?} subscribers", indexes);
94            indexes.iter().rev().for_each(|&idx| {
95                subscribers.swap_remove(idx);
96            });
97        }
98    }
99
100    fn get_previous_msg<M: BrokerMsg>(&self) -> Option<M> {
101        let id = TypeId::of::<M>();
102        let msg = self.msg_map.get(&id)?;
103        trace!("Broker: Previous message found for {:?}", id);
104        let msg = msg.downcast_ref::<M>()?;
105        Some(msg.clone())
106    }
107
108    fn set_msg<M: BrokerMsg>(&mut self, msg: M) {
109        let id = TypeId::of::<M>();
110        let boxed = Box::new(msg);
111        if let Some(pm) = self.msg_map.get_mut(&id) {
112            trace!("Broker: Setting new message value for {:?}", id);
113            *pm = boxed;
114            return;
115        }
116
117        trace!("Broker: Adding first message value for {:?}", id);
118        self.msg_map.insert(id, boxed);
119    }
120}
121
122impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeAsync<M>> for Broker<T> {
123    type Result = ();
124
125    fn handle(&mut self, msg: SubscribeAsync<M>, _ctx: &mut Context<Self>) {
126        trace!("Broker: Received SubscribeAsync");
127        self.add_sub::<M>(msg.0, msg.1);
128    }
129}
130
131impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeSync<M>> for Broker<T> {
132    type Result = Option<M>;
133
134    fn handle(&mut self, msg: SubscribeSync<M>, _ctx: &mut Context<Self>) -> Self::Result {
135        trace!("Broker: Received SubscribeSync");
136        self.add_sub::<M>(msg.0, msg.1);
137        self.get_previous_msg::<M>()
138    }
139}
140
141impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueAsync<M>> for Broker<T> {
142    type Result = ();
143
144    fn handle(&mut self, msg: IssueAsync<M>, _ctx: &mut Context<Self>) {
145        trace!("Broker: Received IssueAsync");
146        let subscriber_indexes = if let Some(subscribers) = self.get_subs::<M>() {
147            subscribers
148                .filter(|&(_, (actor_id, _))| !msg.1.eq(actor_id))
149                .filter_map(
150                    |(idx, (_, recipient))| match recipient.try_send(msg.0.clone()) {
151                        Err(SendError::Full(msg)) => {
152                            recipient.do_send(msg);
153                            None
154                        }
155                        Err(_) => Some(idx),
156                        _ => None,
157                    },
158                )
159                .collect()
160        } else {
161            vec![]
162        };
163
164        self.remove_subs::<M>(subscriber_indexes.as_slice());
165        self.set_msg::<M>(msg.0);
166    }
167}
168
169impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueSync<M>> for Broker<T> {
170    type Result = ();
171
172    fn handle(&mut self, msg: IssueSync<M>, ctx: &mut Context<Self>) {
173        trace!("Broker: Received IssueSync");
174
175        if let Some(subscribers) = self.get_subs::<M>() {
176            subscribers
177                .filter(|&(_, (actor_id, _))| !msg.1.eq(actor_id))
178                .for_each(|(_, (_, recipient))| {
179                    recipient
180                        .send(msg.0.clone())
181                        .into_actor(self)
182                        .map(|_, _, _| ())
183                        .wait(ctx);
184                });
185        }
186
187        self.set_msg::<M>(msg.0);
188    }
189}
190
191impl<T: 'static + Unpin> Actor for Broker<T> {
192    type Context = Context<Self>;
193}
194
195impl SystemService for Broker<SystemBroker> {}
196impl Supervised for Broker<SystemBroker> {}
197
198impl ArbiterService for Broker<ArbiterBroker> {}
199impl Supervised for Broker<ArbiterBroker> {}
200
201pub trait RegisteredBroker: 'static + Unpin
202where
203    Self: std::marker::Sized,
204{
205    fn get_broker() -> Addr<Broker<Self>>;
206}
207
208impl RegisteredBroker for SystemBroker {
209    fn get_broker() -> Addr<Broker<Self>> {
210        Broker::<SystemBroker>::from_registry()
211    }
212}
213
214impl RegisteredBroker for ArbiterBroker {
215    fn get_broker() -> Addr<Broker<Self>> {
216        Broker::<ArbiterBroker>::from_registry()
217    }
218}