actors_rs/actor/
channel.rs

1use std::{collections::HashMap, hash::Hash};
2
3use crate::actor_ref::{
4    ActorRef, ActorRefFactory, ActorReference, BasicActorRef, BoxedTell, Sender,
5};
6use crate::{
7    actor::{Actor, Context, CreateError, Receive},
8    system::{SystemEvent, SystemMsg},
9    Message,
10};
11
12type Subs<Msg> = HashMap<Topic, Vec<BoxedTell<Msg>>>;
13
14/// A specialized actor for providing Publish/Subscribe capabilities to users.
15///
16
17// Generic Channel
18pub type ChannelCtx<Msg> = Context<ChannelMsg<Msg>>;
19pub type ChannelRef<Msg> = ActorRef<ChannelMsg<Msg>>;
20
21/// A specialized actor for providing Publish/Subscribe capabilities for user level messages
22pub struct Channel<Msg: Message> {
23    subs: Subs<Msg>,
24}
25
26impl<Msg: Message> Default for Channel<Msg> {
27    fn default() -> Self {
28        Self {
29            subs: HashMap::new(),
30        }
31    }
32}
33
34impl<Msg> Actor for Channel<Msg>
35where
36    Msg: Message,
37{
38    type Msg = ChannelMsg<Msg>;
39
40    // todo subscribe to events to unsub subscribers when they die
41    fn pre_start(&mut self, _ctx: &ChannelCtx<Msg>) {
42        // let sub = Subscribe {
43        //     topic: SysTopic::ActorTerminated.into(),
44        //     actor: Box::new(ctx.myself.clone())//.into()
45        // };
46
47        // let msg = ChannelMsg::Subscribe(sub);
48        // ctx.myself.tell(msg, None);
49    }
50
51    fn recv(&mut self, ctx: &ChannelCtx<Msg>, msg: ChannelMsg<Msg>, sender: Sender) {
52        self.receive(ctx, msg, sender);
53    }
54
55    // We expect to receive ActorTerminated messages because we subscribed
56    // to this system event. This allows us to remove actors that have been
57    // terminated but did not explicity unsubscribe before terminating.
58    fn sys_recv(&mut self, _: &ChannelCtx<Msg>, msg: SystemMsg, _sender: Sender) {
59        if let SystemMsg::Event(evt) = msg {
60            if let SystemEvent::ActorTerminated(terminated) = evt {
61                let subs = self.subs.clone();
62
63                for topic in subs.keys() {
64                    unsubscribe(&mut self.subs, topic, &terminated.actor);
65                }
66            }
67        }
68    }
69}
70
71impl<Msg> Receive<ChannelMsg<Msg>> for Channel<Msg>
72where
73    Msg: Message,
74{
75    type Msg = ChannelMsg<Msg>;
76
77    fn receive(&mut self, ctx: &ChannelCtx<Msg>, msg: Self::Msg, sender: Sender) {
78        match msg {
79            ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
80            ChannelMsg::Subscribe(sub) => self.receive(ctx, sub, sender),
81            ChannelMsg::Unsubscribe(unsub) => self.receive(ctx, unsub, sender),
82            ChannelMsg::UnsubscribeAll(unsub) => self.receive(ctx, unsub, sender),
83        }
84    }
85}
86
87impl<Msg> Receive<Subscribe<Msg>> for Channel<Msg>
88where
89    Msg: Message,
90{
91    type Msg = ChannelMsg<Msg>;
92
93    fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Subscribe<Msg>, _sender: Sender) {
94        let subs = self.subs.entry(msg.topic).or_default();
95        subs.push(msg.actor);
96    }
97}
98
99impl<Msg> Receive<Unsubscribe<Msg>> for Channel<Msg>
100where
101    Msg: Message,
102{
103    type Msg = ChannelMsg<Msg>;
104
105    fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Unsubscribe<Msg>, _sender: Sender) {
106        unsubscribe(&mut self.subs, &msg.topic, &msg.actor);
107    }
108}
109
110impl<Msg> Receive<UnsubscribeAll<Msg>> for Channel<Msg>
111where
112    Msg: Message,
113{
114    type Msg = ChannelMsg<Msg>;
115
116    fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: UnsubscribeAll<Msg>, _sender: Sender) {
117        let subs = self.subs.clone();
118
119        for topic in subs.keys() {
120            unsubscribe(&mut self.subs, topic, &msg.actor);
121        }
122    }
123}
124
125impl<Msg> Receive<Publish<Msg>> for Channel<Msg>
126where
127    Msg: Message,
128{
129    type Msg = ChannelMsg<Msg>;
130
131    fn receive(&mut self, _ctx: &ChannelCtx<Msg>, msg: Publish<Msg>, sender: Sender) {
132        // send system event to actors subscribed to all topics
133        if let Some(subs) = self.subs.get(&All.into()) {
134            for sub in subs.iter() {
135                sub.tell(msg.msg.clone(), sender.clone());
136            }
137        }
138
139        // send system event to actors subscribed to the topic
140        if let Some(subs) = self.subs.get(&msg.topic) {
141            for sub in subs.iter() {
142                sub.tell(msg.msg.clone(), sender.clone());
143            }
144        }
145    }
146}
147
148fn unsubscribe<Msg>(subs: &mut Subs<Msg>, topic: &Topic, actor: &dyn ActorReference) {
149    // Nightly only: self.subs.get(msg_type).unwrap().remove_item(actor);
150    if subs.contains_key(topic) {
151        if let Some(pos) = subs
152            .get(topic)
153            .unwrap()
154            .iter()
155            .position(|x| x.path() == actor.path())
156        {
157            subs.get_mut(topic).unwrap().remove(pos);
158        }
159    }
160}
161
162/// A specialized channel that publishes messages as system messages
163#[derive(Default)]
164pub struct EventsChannel(Channel<SystemEvent>);
165
166impl Actor for EventsChannel {
167    type Msg = ChannelMsg<SystemEvent>;
168
169    fn pre_start(&mut self, ctx: &ChannelCtx<SystemEvent>) {
170        self.0.pre_start(ctx);
171    }
172
173    fn recv(
174        &mut self,
175        ctx: &ChannelCtx<SystemEvent>,
176        msg: ChannelMsg<SystemEvent>,
177        sender: Sender,
178    ) {
179        self.receive(ctx, msg, sender);
180    }
181
182    fn sys_recv(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: SystemMsg, sender: Sender) {
183        self.0.sys_recv(ctx, msg, sender);
184    }
185}
186
187impl Receive<ChannelMsg<SystemEvent>> for EventsChannel {
188    type Msg = ChannelMsg<SystemEvent>;
189
190    fn receive(&mut self, ctx: &ChannelCtx<SystemEvent>, msg: Self::Msg, sender: Sender) {
191        // Publish variant uses specialized EventsChannel Receive
192        // All other variants use the wrapped Channel (self.0) Receive(s)
193        match msg {
194            ChannelMsg::Publish(p) => self.receive(ctx, p, sender),
195            ChannelMsg::Subscribe(sub) => self.0.receive(ctx, sub, sender),
196            ChannelMsg::Unsubscribe(unsub) => self.0.receive(ctx, unsub, sender),
197            ChannelMsg::UnsubscribeAll(unsub) => self.0.receive(ctx, unsub, sender),
198        }
199    }
200}
201
202impl Receive<Publish<SystemEvent>> for EventsChannel {
203    type Msg = ChannelMsg<SystemEvent>;
204
205    fn receive(
206        &mut self,
207        _ctx: &ChannelCtx<SystemEvent>,
208        msg: Publish<SystemEvent>,
209        _sender: Sender,
210    ) {
211        // send system event to actors subscribed to all topics
212        if let Some(subs) = self.0.subs.get(&All.into()) {
213            for sub in subs.iter() {
214                let evt = SystemMsg::Event(msg.msg.clone());
215                sub.sys_tell(evt);
216            }
217        }
218
219        // send system event to actors subscribed to the topic
220        if let Some(subs) = self.0.subs.get(&msg.topic) {
221            for sub in subs.iter() {
222                let evt = SystemMsg::Event(msg.msg.clone());
223                sub.sys_tell(evt);
224            }
225        }
226    }
227}
228
229// Deadletter channel implementations
230pub type DLChannelMsg = ChannelMsg<DeadLetter>;
231
232#[derive(Clone, Debug)]
233pub struct DeadLetter {
234    pub msg: String,
235    pub sender: Sender,
236    pub recipient: BasicActorRef,
237}
238
239#[derive(Debug, Clone)]
240pub struct Subscribe<Msg: Message> {
241    pub topic: Topic,
242    pub actor: BoxedTell<Msg>,
243}
244
245#[derive(Debug, Clone)]
246pub struct Unsubscribe<Msg: Message> {
247    pub topic: Topic,
248    pub actor: BoxedTell<Msg>,
249}
250
251#[derive(Debug, Clone)]
252pub struct UnsubscribeAll<Msg: Message> {
253    pub actor: BoxedTell<Msg>,
254}
255
256#[derive(Debug, Clone)]
257pub struct Publish<Msg: Message> {
258    pub topic: Topic,
259    pub msg: Msg,
260}
261
262#[derive(Debug, Clone)]
263pub enum ChannelMsg<Msg: Message> {
264    /// Publish message
265    Publish(Publish<Msg>),
266
267    /// Subscribe given `ActorRef` to a topic on a channel
268    Subscribe(Subscribe<Msg>),
269
270    /// Unsubscribe the given `ActorRef` from a topic on a channel
271    Unsubscribe(Unsubscribe<Msg>),
272
273    /// Unsubscribe the given `ActorRef` from all topics on a channel
274    UnsubscribeAll(UnsubscribeAll<Msg>),
275}
276
277// publish
278impl<Msg: Message> Into<ChannelMsg<Msg>> for Publish<Msg> {
279    fn into(self) -> ChannelMsg<Msg> {
280        ChannelMsg::Publish(self)
281    }
282}
283
284// subscribe
285impl<Msg: Message> Into<ChannelMsg<Msg>> for Subscribe<Msg> {
286    fn into(self) -> ChannelMsg<Msg> {
287        ChannelMsg::Subscribe(self)
288    }
289}
290
291// unsubscribe
292impl<Msg: Message> Into<ChannelMsg<Msg>> for Unsubscribe<Msg> {
293    fn into(self) -> ChannelMsg<Msg> {
294        ChannelMsg::Unsubscribe(self)
295    }
296}
297
298// unsubscribe
299impl<Msg: Message> Into<ChannelMsg<Msg>> for UnsubscribeAll<Msg> {
300    fn into(self) -> ChannelMsg<Msg> {
301        ChannelMsg::UnsubscribeAll(self)
302    }
303}
304
305// Topics allow channel subscribers to filter messages by interest
306///
307/// When publishing a message to a channel a Topic is provided.
308#[derive(Clone, Debug, Eq, PartialEq, Hash)]
309pub struct Topic(String);
310
311impl<'a> From<&'a str> for Topic {
312    fn from(topic: &str) -> Self {
313        Self(topic.to_string())
314    }
315}
316
317impl From<String> for Topic {
318    fn from(topic: String) -> Self {
319        Self(topic)
320    }
321}
322
323impl<'a> From<&'a SystemEvent> for Topic {
324    fn from(evt: &SystemEvent) -> Self {
325        match *evt {
326            SystemEvent::ActorCreated(_) => Self::from("actor.created"),
327            SystemEvent::ActorTerminated(_) => Self::from("actor.terminated"),
328            SystemEvent::ActorRestarted(_) => Self::from("actor.restarted"),
329        }
330    }
331}
332
333/// A channel topic representing all topics `*`
334pub struct All;
335
336impl From<All> for Topic {
337    fn from(_all: All) -> Self {
338        Self::from("*")
339    }
340}
341
342/// System topics used by the `event_stream` channel
343pub enum SysTopic {
344    ActorCreated,
345    ActorTerminated,
346    ActorRestarted,
347}
348
349impl From<SysTopic> for Topic {
350    fn from(evt: SysTopic) -> Self {
351        match evt {
352            SysTopic::ActorCreated => Self::from("actor.created"),
353            SysTopic::ActorTerminated => Self::from("actor.terminated"),
354            SysTopic::ActorRestarted => Self::from("actor.restarted"),
355        }
356    }
357}
358
359pub fn channel<Msg>(name: &str, fact: &impl ActorRefFactory) -> Result<ChannelRef<Msg>, CreateError>
360where
361    Msg: Message,
362{
363    fact.actor_of::<Channel<Msg>>(name)
364}