1use std::{collections::HashMap, hash::Hash};
2
3use crate::actor_ref::{
4 ActorRef, ActorRefFactory, ActorReference, BasicActorRef, BoxedTell, Sender,
5};
6use crate::{
7 actor::{Actor, Context, CreateError, Receive},
8 system::{SystemEvent, SystemMsg},
9 Message,
10};
11
12type Subs<Msg> = HashMap<Topic, Vec<BoxedTell<Msg>>>;
13
14pub type ChannelCtx<Msg> = Context<ChannelMsg<Msg>>;
19pub type ChannelRef<Msg> = ActorRef<ChannelMsg<Msg>>;
20
21pub struct Channel<Msg: Message> {
23 subs: Subs<Msg>,
24}
25
26impl<Msg: Message> Default for Channel<Msg> {
27 fn default() -> Self {
28 Self {
29 subs: HashMap::new(),
30 }
31 }
32}
33
34impl<Msg> Actor for Channel<Msg>
35where
36 Msg: Message,
37{
38 type Msg = ChannelMsg<Msg>;
39
40 fn pre_start(&mut self, _ctx: &ChannelCtx<Msg>) {
42 }
50
51 fn recv(&mut self, ctx: &ChannelCtx<Msg>, msg: ChannelMsg<Msg>, sender: Sender) {
52 self.receive(ctx, msg, sender);
53 }
54
55 fn sys_recv(&mut self, _: &ChannelCtx<Msg>, msg: SystemMsg, _sender: Sender) {
59 if let SystemMsg::Event(evt) = msg {
60 if let SystemEvent::ActorTerminated(terminated) = evt {
61 let subs = self.subs.clone();
62
63 for topic in subs.keys() {
64 unsubscribe(&mut self.subs, topic, &terminated.actor);
65 }
66 }
67 }
68 }
69}
70
71impl<Msg> Receive<ChannelMsg<Msg>> for Channel<Msg>
72where
73 Msg: Message,
74{
75 type Msg = ChannelMsg<Msg>;
76
77 fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Self::Msg, sender: Sender) {
78 match msg {
79 ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
80 ChannelMsg::Subscribe(sub) => self.receive(ctx, sub, sender),
81 ChannelMsg::Unsubscribe(unsub) => self.receive(ctx, unsub, sender),
82 ChannelMsg::UnsubscribeAll(unsub) => self.receive(ctx, unsub, sender),
83 }
84 }
85}
86
87impl<Msg> Receive<Subscribe<Msg>> for Channel<Msg>
88where
89 Msg: Message,
90{
91 type Msg = ChannelMsg<Msg>;
92
93 fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Subscribe<Msg>, _sender: Sender) {
94 let subs = self.subs.entry(msg.topic).or_default();
95 subs.push(msg.actor);
96 }
97}
98
99impl<Msg> Receive<Unsubscribe<Msg>> for Channel<Msg>
100where
101 Msg: Message,
102{
103 type Msg = ChannelMsg<Msg>;
104
105 fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Unsubscribe<Msg>, _sender: Sender) {
106 unsubscribe(&mut self.subs, &msg.topic, &msg.actor);
107 }
108}
109
110impl<Msg> Receive<UnsubscribeAll<Msg>> for Channel<Msg>
111where
112 Msg: Message,
113{
114 type Msg = ChannelMsg<Msg>;
115
116 fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: UnsubscribeAll<Msg>, _sender: Sender) {
117 let subs = self.subs.clone();
118
119 for topic in subs.keys() {
120 unsubscribe(&mut self.subs, topic, &msg.actor);
121 }
122 }
123}
124
125impl<Msg> Receive<Publish<Msg>> for Channel<Msg>
126where
127 Msg: Message,
128{
129 type Msg = ChannelMsg<Msg>;
130
131 fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Publish<Msg>, sender: Sender) {
132 if let Some(subs) = self.subs.get(&All.into()) {
134 for sub in subs.iter() {
135 sub.tell(msg.msg.clone(), sender.clone());
136 }
137 }
138
139 if let Some(subs) = self.subs.get(&msg.topic) {
141 for sub in subs.iter() {
142 sub.tell(msg.msg.clone(), sender.clone());
143 }
144 }
145 }
146}
147
148fn unsubscribe<Msg>(subs: &mut Subs<Msg>, topic: &Topic, actor: &dyn ActorReference) {
149 if subs.contains_key(topic) {
151 if let Some(pos) = subs
152 .get(topic)
153 .unwrap()
154 .iter()
155 .position(|x| x.path() == actor.path())
156 {
157 subs.get_mut(topic).unwrap().remove(pos);
158 }
159 }
160}
161
162#[derive(Default)]
164pub struct EventsChannel(Channel<SystemEvent>);
165
166impl Actor for EventsChannel {
167 type Msg = ChannelMsg<SystemEvent>;
168
169 fn pre_start(&mut self, ctx: &ChannelCtx<SystemEvent>) {
170 self.0.pre_start(ctx);
171 }
172
173 fn recv(
174 &mut self,
175 ctx: &ChannelCtx<SystemEvent>,
176 msg: ChannelMsg<SystemEvent>,
177 sender: Sender,
178 ) {
179 self.receive(ctx, msg, sender);
180 }
181
182 fn sys_recv(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: SystemMsg, sender: Sender) {
183 self.0.sys_recv(ctx, msg, sender);
184 }
185}
186
187impl Receive<ChannelMsg<SystemEvent>> for EventsChannel {
188 type Msg = ChannelMsg<SystemEvent>;
189
190 fn receive(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: Self::Msg, sender: Sender) {
191 match msg {
194 ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
195 ChannelMsg::Subscribe(sub) => self.0.receive(ctx, sub, sender),
196 ChannelMsg::Unsubscribe(unsub) => self.0.receive(ctx, unsub, sender),
197 ChannelMsg::UnsubscribeAll(unsub) => self.0.receive(ctx, unsub, sender),
198 }
199 }
200}
201
202impl Receive<Publish<SystemEvent>> for EventsChannel {
203 type Msg = ChannelMsg<SystemEvent>;
204
205 fn receive(
206 &mut self,
207 _ctx: &ChannelCtx<SystemEvent>,
208 msg: Publish<SystemEvent>,
209 _sender: Sender,
210 ) {
211 if let Some(subs) = self.0.subs.get(&All.into()) {
213 for sub in subs.iter() {
214 let evt = SystemMsg::Event(msg.msg.clone());
215 sub.sys_tell(evt);
216 }
217 }
218
219 if let Some(subs) = self.0.subs.get(&msg.topic) {
221 for sub in subs.iter() {
222 let evt = SystemMsg::Event(msg.msg.clone());
223 sub.sys_tell(evt);
224 }
225 }
226 }
227}
228
229pub type DLChannelMsg = ChannelMsg<DeadLetter>;
231
232#[derive(Clone, Debug)]
233pub struct DeadLetter {
234 pub msg: String,
235 pub sender: Sender,
236 pub recipient: BasicActorRef,
237}
238
239#[derive(Debug, Clone)]
240pub struct Subscribe<Msg: Message> {
241 pub topic: Topic,
242 pub actor: BoxedTell<Msg>,
243}
244
245#[derive(Debug, Clone)]
246pub struct Unsubscribe<Msg: Message> {
247 pub topic: Topic,
248 pub actor: BoxedTell<Msg>,
249}
250
251#[derive(Debug, Clone)]
252pub struct UnsubscribeAll<Msg: Message> {
253 pub actor: BoxedTell<Msg>,
254}
255
256#[derive(Debug, Clone)]
257pub struct Publish<Msg: Message> {
258 pub topic: Topic,
259 pub msg: Msg,
260}
261
262#[derive(Debug, Clone)]
263pub enum ChannelMsg<Msg: Message> {
264 Publish(Publish<Msg>),
266
267 Subscribe(Subscribe<Msg>),
269
270 Unsubscribe(Unsubscribe<Msg>),
272
273 UnsubscribeAll(UnsubscribeAll<Msg>),
275}
276
277impl<Msg: Message> Into<ChannelMsg<Msg>> for Publish<Msg> {
279 fn into(self) -> ChannelMsg<Msg> {
280 ChannelMsg::Publish(self)
281 }
282}
283
284impl<Msg: Message> Into<ChannelMsg<Msg>> for Subscribe<Msg> {
286 fn into(self) -> ChannelMsg<Msg> {
287 ChannelMsg::Subscribe(self)
288 }
289}
290
291impl<Msg: Message> Into<ChannelMsg<Msg>> for Unsubscribe<Msg> {
293 fn into(self) -> ChannelMsg<Msg> {
294 ChannelMsg::Unsubscribe(self)
295 }
296}
297
298impl<Msg: Message> Into<ChannelMsg<Msg>> for UnsubscribeAll<Msg> {
300 fn into(self) -> ChannelMsg<Msg> {
301 ChannelMsg::UnsubscribeAll(self)
302 }
303}
304
305#[derive(Clone, Debug, Eq, PartialEq, Hash)]
309pub struct Topic(String);
310
311impl<'a> From<&'a str> for Topic {
312 fn from(topic: &str) -> Self {
313 Self(topic.to_string())
314 }
315}
316
317impl From<String> for Topic {
318 fn from(topic: String) -> Self {
319 Self(topic)
320 }
321}
322
323impl<'a> From<&'a SystemEvent> for Topic {
324 fn from(evt: &SystemEvent) -> Self {
325 match *evt {
326 SystemEvent::ActorCreated(_) => Self::from("actor.created"),
327 SystemEvent::ActorTerminated(_) => Self::from("actor.terminated"),
328 SystemEvent::ActorRestarted(_) => Self::from("actor.restarted"),
329 }
330 }
331}
332
333pub struct All;
335
336impl From<All> for Topic {
337 fn from(_all: All) -> Self {
338 Self::from("*")
339 }
340}
341
342pub enum SysTopic {
344 ActorCreated,
345 ActorTerminated,
346 ActorRestarted,
347}
348
349impl From<SysTopic> for Topic {
350 fn from(evt: SysTopic) -> Self {
351 match evt {
352 SysTopic::ActorCreated => Self::from("actor.created"),
353 SysTopic::ActorTerminated => Self::from("actor.terminated"),
354 SysTopic::ActorRestarted => Self::from("actor.restarted"),
355 }
356 }
357}
358
359pub fn channel<Msg>(name: &str, fact: &impl ActorRefFactory) -> Result<ChannelRef<Msg>, CreateError>
360where
361 Msg: Message,
362{
363 fact.actor_of::<Channel<Msg>>(name)
364}