1use actix::prelude::*;
2use ahash::AHasher;
3use log::trace;
4
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::hash::BuildHasherDefault;
8use std::marker::PhantomData;
9
10use crate::msgs::*;
11
12type TypeMap<A> = HashMap<TypeId, A, BuildHasherDefault<AHasher>>;
13
14#[derive(Default)]
15pub struct Broker<T> {
16 sub_map: TypeMap<Vec<(TypeId, Box<dyn Any>)>>,
17 msg_map: TypeMap<Box<dyn Any>>,
18 _t: PhantomData<T>,
19}
20
21#[derive(Default)]
22pub struct SystemBroker;
23
24#[derive(Default)]
25pub struct ArbiterBroker;
26
27impl Broker<SystemBroker> {
29 pub fn issue_async<M: BrokerMsg>(msg: M) {
33 let broker = Self::from_registry();
34 broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
35 }
36}
37
38impl Broker<ArbiterBroker> {
40 pub fn issue_async<M: BrokerMsg>(msg: M) {
44 let broker = Self::from_registry();
45 broker.do_send(IssueAsync(msg, TypeId::of::<Self>()));
46 }
47}
48
49impl<T> Broker<T> {
51 fn take_subs<M: BrokerMsg>(&mut self) -> Option<Vec<(TypeId, Recipient<M>)>> {
52 let id = TypeId::of::<M>();
53 let subs = self.sub_map.get_mut(&id)?;
54 trace!("Broker: Found subscription list for {:?}.", id);
55 let subs = subs
56 .drain(..)
57 .filter_map(|(id, s)| {
58 if let Ok(rec) = s.downcast::<Recipient<M>>() {
59 Some((id, rec))
60 } else {
61 None
62 }
63 })
64 .map(|(id, s)| (id, *s))
65 .collect();
66 Some(subs)
67 }
68
69 fn add_sub<M: BrokerMsg>(&mut self, sub: Recipient<M>, id: TypeId) {
70 let msg_id = TypeId::of::<M>();
71 let boxed = Box::new(sub);
72 if let Some(subs) = self.sub_map.get_mut(&msg_id) {
73 trace!("Broker: Adding to {:?} subscription list.", msg_id);
74 subs.push((id, boxed));
75 return;
76 }
77
78 trace!("Broker: Creating {:?} subscription list.", msg_id);
79 self.sub_map.insert(msg_id, vec![(id, boxed)]);
80 }
81
82 fn get_previous_msg<M: BrokerMsg>(&self) -> Option<M> {
83 let id = TypeId::of::<M>();
84 let msg = self.msg_map.get(&id)?;
85 trace!("Broker: Previous message found for {:?}", id);
86 let msg = msg.downcast_ref::<M>()?;
87 Some(msg.clone())
88 }
89
90 fn set_msg<M: BrokerMsg>(&mut self, msg: M) {
91 let id = TypeId::of::<M>();
92 let boxed = Box::new(msg);
93 if let Some(pm) = self.msg_map.get_mut(&id) {
94 trace!("Broker: Setting new message value for {:?}", id);
95 *pm = boxed;
96 return;
97 }
98
99 trace!("Broker: Adding first message value for {:?}", id);
100 self.msg_map.insert(id, boxed);
101 }
102}
103
104impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeAsync<M>> for Broker<T> {
105 type Result = ();
106
107 fn handle(&mut self, msg: SubscribeAsync<M>, _ctx: &mut Context<Self>) {
108 trace!("Broker: Received SubscribeAsync");
109 self.add_sub::<M>(msg.0, msg.1);
110 }
111}
112
113impl<T: 'static + Unpin, M: BrokerMsg> Handler<SubscribeSync<M>> for Broker<T> {
114 type Result = Option<M>;
115
116 fn handle(&mut self, msg: SubscribeSync<M>, _ctx: &mut Context<Self>) -> Self::Result {
117 trace!("Broker: Received SubscribeSync");
118 self.add_sub::<M>(msg.0, msg.1);
119 self.get_previous_msg::<M>()
120 }
121}
122
123impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueAsync<M>> for Broker<T> {
124 type Result = ();
125
126 fn handle(&mut self, msg: IssueAsync<M>, _ctx: &mut Context<Self>) {
127 trace!("Broker: Received IssueAsync");
128 if let Some(mut subs) = self.take_subs::<M>() {
129 subs.drain(..).for_each(|(id, s)| {
130 if id == msg.1 {
131 self.add_sub::<M>(s, id);
132 } else {
133 match s.try_send(msg.0.clone()) {
134 Ok(_) => self.add_sub::<M>(s, id),
135 Err(SendError::Full(_)) => {
136 s.do_send(msg.0.clone());
139 self.add_sub::<M>(s, id);
140 }
141 Err(_) => (),
142 }
143 }
144 });
145 }
146 self.set_msg::<M>(msg.0);
147 }
148}
149
150impl<T: 'static + Unpin, M: BrokerMsg> Handler<IssueSync<M>> for Broker<T> {
151 type Result = ();
152
153 fn handle(&mut self, msg: IssueSync<M>, ctx: &mut Context<Self>) {
154 trace!("Broker: Received IssueSync");
155 if let Some(mut subs) = self.take_subs::<M>() {
156 subs.drain(..).for_each(|(id, s)| {
157 if id == msg.1 {
158 self.add_sub::<M>(s, id);
159 } else {
160 s.send(msg.0.clone())
161 .into_actor(self)
162 .map(move |_, act, _| act.add_sub::<M>(s, id))
163 .wait(ctx);
164 }
165 });
166 }
167 self.set_msg::<M>(msg.0);
168 }
169}
170
171impl<T: 'static + Unpin> Actor for Broker<T> {
172 type Context = Context<Self>;
173}
174
175impl SystemService for Broker<SystemBroker> {}
176impl Supervised for Broker<SystemBroker> {}
177
178impl ArbiterService for Broker<ArbiterBroker> {}
179impl Supervised for Broker<ArbiterBroker> {}
180
181pub trait RegisteredBroker: 'static + Unpin
182where
183 Self: std::marker::Sized,
184{
185 fn get_broker() -> Addr<Broker<Self>>;
186}
187
188impl RegisteredBroker for SystemBroker {
189 fn get_broker() -> Addr<Broker<Self>> {
190 Broker::<SystemBroker>::from_registry()
191 }
192}
193
194impl RegisteredBroker for ArbiterBroker {
195 fn get_broker() -> Addr<Broker<Self>> {
196 Broker::<ArbiterBroker>::from_registry()
197 }
198}