1use std::{
2 any::{Any, TypeId},
3 collections::HashMap,
4 hash::BuildHasherDefault,
5 marker::PhantomData,
6};
7
8use actix::prelude::*;
9use ahash::AHasher;
10use log::trace;
11
12use crate::msgs::*;
13
14type TypeMap<A> = HashMap<TypeId, A, BuildHasherDefault<AHasher>>;
15
16#[derive(Default)]
17pub struct Broker<T> {
18 sub_map: TypeMap<Vec<(TypeId, Box<dyn Any>)>>,
19 msg_map: TypeMap<Box<dyn Any>>,
20 _t: PhantomData<T>,
21}
22
23#[derive(Default)]
24pub struct SystemBroker;
25
26#[derive(Default)]
27pub struct ArbiterBroker;
28
29impl Broker<SystemBroker> {
31 pub fn issue_async<M: BrokerMsg>(msg: M) {
35 let broker = Self::from_registry();
36 broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
37 }
38}
39
40impl Broker<ArbiterBroker> {
42 pub fn issue_async<M: BrokerMsg>(msg: M) {
46 let broker = Self::from_registry();
47 broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
48 }
49}
50
51impl<T> Broker<T> {
53 fn get_subs<M: BrokerMsg>(
54 &self,
55 ) -> Option<impl Iterator<Item = (usize, (&TypeId, &Recipient<M>))>> {
56 let id = TypeId::of::<M>();
57 let msg_subs = self.sub_map.get(&id)?;
58
59 trace!("Broker: Found subscription list for {:?}.", id);
60
61 let iter = msg_subs
62 .iter()
63 .enumerate()
64 .map(|(idx, (actor_id, recipient))| {
65 let typed_recipient = recipient
66 .downcast_ref::<Recipient<M>>()
67 .expect("Message type should always be M");
68 (idx, (actor_id, typed_recipient))
69 });
70 Some(iter)
71 }
72
73 fn add_sub<M: BrokerMsg>(&mut self, sub: Recipient<M>, id: TypeId) {
74 let msg_id = TypeId::of::<M>();
75 let boxed = Box::new(sub);
76 if let Some(subs) = self.sub_map.get_mut(&msg_id) {
77 trace!("Broker: Adding to {:?} subscription list.", msg_id);
78 subs.push((id, boxed));
79 return;
80 }
81
82 trace!("Broker: Creating {:?} subscription list.", msg_id);
83 self.sub_map.insert(msg_id, vec![(id, boxed)]);
84 }
85
86 fn remove_subs<M: BrokerMsg>(&mut self, indexes: &[usize]) {
87 if indexes.is_empty() {
88 return;
89 }
90
91 let msg_id = TypeId::of::<M>();
92 if let Some(subscribers) = self.sub_map.get_mut(&msg_id) {
93 trace!("Broker: Removing {:?} subscribers", indexes);
94 indexes.iter().rev().for_each(|&idx| {
95 subscribers.swap_remove(idx);
96 });
97 }
98 }
99
100 fn get_previous_msg<M: BrokerMsg>(&self) -> Option<M> {
101 let id = TypeId::of::<M>();
102 let msg = self.msg_map.get(&id)?;
103 trace!("Broker: Previous message found for {:?}", id);
104 let msg = msg.downcast_ref::<M>()?;
105 Some(msg.clone())
106 }
107
108 fn set_msg<M: BrokerMsg>(&mut self, msg: M) {
109 let id = TypeId::of::<M>();
110 let boxed = Box::new(msg);
111 if let Some(pm) = self.msg_map.get_mut(&id) {
112 trace!("Broker: Setting new message value for {:?}", id);
113 *pm = boxed;
114 return;
115 }
116
117 trace!("Broker: Adding first message value for {:?}", id);
118 self.msg_map.insert(id, boxed);
119 }
120}
121
122impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeAsync<M>> for Broker<T> {
123 type Result = ();
124
125 fn handle(&mut self, msg: SubscribeAsync<M>, _ctx: &mut Context<Self>) {
126 trace!("Broker: Received SubscribeAsync");
127 self.add_sub::<M>(msg.0, msg.1);
128 }
129}
130
131impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeSync<M>> for Broker<T> {
132 type Result = Option<M>;
133
134 fn handle(&mut self, msg: SubscribeSync<M>, _ctx: &mut Context<Self>) -> Self::Result {
135 trace!("Broker: Received SubscribeSync");
136 self.add_sub::<M>(msg.0, msg.1);
137 self.get_previous_msg::<M>()
138 }
139}
140
141impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueAsync<M>> for Broker<T> {
142 type Result = ();
143
144 fn handle(&mut self, msg: IssueAsync<M>, _ctx: &mut Context<Self>) {
145 trace!("Broker: Received IssueAsync");
146 let subscriber_indexes = if let Some(subscribers) = self.get_subs::<M>() {
147 subscribers
148 .filter(|&(_, (actor_id, _))| !msg.1.eq(actor_id))
149 .filter_map(
150 |(idx, (_, recipient))| match recipient.try_send(msg.0.clone()) {
151 Err(SendError::Full(msg)) => {
152 recipient.do_send(msg);
153 None
154 }
155 Err(_) => Some(idx),
156 _ => None,
157 },
158 )
159 .collect()
160 } else {
161 vec![]
162 };
163
164 self.remove_subs::<M>(subscriber_indexes.as_slice());
165 self.set_msg::<M>(msg.0);
166 }
167}
168
169impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueSync<M>> for Broker<T> {
170 type Result = ();
171
172 fn handle(&mut self, msg: IssueSync<M>, ctx: &mut Context<Self>) {
173 trace!("Broker: Received IssueSync");
174
175 if let Some(subscribers) = self.get_subs::<M>() {
176 subscribers
177 .filter(|&(_, (actor_id, _))| !msg.1.eq(actor_id))
178 .for_each(|(_, (_, recipient))| {
179 recipient
180 .send(msg.0.clone())
181 .into_actor(self)
182 .map(|_, _, _| ())
183 .wait(ctx);
184 });
185 }
186
187 self.set_msg::<M>(msg.0);
188 }
189}
190
191impl<T: 'static + Unpin> Actor for Broker<T> {
192 type Context = Context<Self>;
193}
194
195impl SystemService for Broker<SystemBroker> {}
196impl Supervised for Broker<SystemBroker> {}
197
198impl ArbiterService for Broker<ArbiterBroker> {}
199impl Supervised for Broker<ArbiterBroker> {}
200
201pub trait RegisteredBroker: 'static + Unpin
202where
203 Self: std::marker::Sized,
204{
205 fn get_broker() -> Addr<Broker<Self>>;
206}
207
208impl RegisteredBroker for SystemBroker {
209 fn get_broker() -> Addr<Broker<Self>> {
210 Broker::<SystemBroker>::from_registry()
211 }
212}
213
214impl RegisteredBroker for ArbiterBroker {
215 fn get_broker() -> Addr<Broker<Self>> {
216 Broker::<ArbiterBroker>::from_registry()
217 }
218}