coerce/remote/stream/mediator/
mod.rs

1use crate::actor::context::ActorContext;
2use crate::actor::message::{Handler, Message};
3use crate::actor::{Actor, LocalActorRef};
4use crate::remote::actor::message::SetRemote;
5use crate::remote::heartbeat::Heartbeat;
6use crate::remote::net::message::SessionEvent;
7use crate::remote::net::proto::network::StreamPublishEvent;
8use crate::remote::net::StreamData;
9use crate::remote::stream::pubsub::{
10    Receive, Subscription, Topic, TopicEmitter, TopicSubscriberStore,
11};
12use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic};
13use crate::remote::system::{NodeId, RemoteActorSystem};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18pub struct MediatorTopic(Box<dyn TopicEmitter>);
19
20#[derive(Default)]
21pub struct StreamMediator {
22    remote: Option<RemoteActorSystem>,
23    nodes: HashSet<NodeId>,
24    topics: HashMap<String, MediatorTopic>,
25    system_subscription: Option<Subscription>,
26}
27
28impl StreamMediator {
29    pub fn new() -> StreamMediator {
30        Self::default()
31    }
32
33    fn remote(&self) -> &RemoteActorSystem {
34        self.remote
35            .as_ref()
36            .expect("StreamMediator remote actor system not set")
37    }
38}
39
40impl Actor for StreamMediator {}
41
42#[derive(Debug)]
43pub enum SubscribeErr {
44    Err,
45}
46
47pub struct Subscribe<A: Actor, T: Topic> {
48    receiver_ref: LocalActorRef<A>,
49    topic: T,
50}
51
52impl<A: Actor, T: Topic> Subscribe<A, T> {
53    pub fn new(topic: T, receiver_ref: LocalActorRef<A>) -> Self {
54        Subscribe {
55            receiver_ref,
56            topic,
57        }
58    }
59}
60
61#[derive(Debug)]
62pub enum PublishErr {
63    SerializationErr,
64}
65
66pub enum Reach {
67    Local,
68    Cluster,
69}
70
71pub struct Publish<T: Topic> {
72    pub topic: T,
73    pub message: T::Message,
74    pub reach: Reach,
75}
76
77pub struct PublishRaw {
78    pub topic: String,
79    pub key: String,
80    pub message: Vec<u8>,
81}
82
83impl<A: Actor, T: Topic> Message for Subscribe<A, T> {
84    type Result = Result<Subscription, SubscribeErr>;
85}
86
87impl<T: Topic> Message for Publish<T> {
88    type Result = Result<(), PublishErr>;
89}
90
91impl Message for PublishRaw {
92    type Result = ();
93}
94
95impl StreamMediator {
96    pub fn subscribe<A: Actor, T: Topic>(
97        &mut self,
98        topic: T,
99        receiver_ref: LocalActorRef<A>,
100    ) -> Result<Subscription, SubscribeErr>
101    where
102        A: Handler<Receive<T>>,
103    {
104        let topic_key = &topic.key();
105        let receiver = match self.topics.entry(T::topic_name().to_string()) {
106            Entry::Occupied(occupied) => occupied.into_mut(),
107            Entry::Vacant(vacant_entry) => {
108                let topic = MediatorTopic::new::<T>();
109                vacant_entry.insert(topic)
110            }
111        };
112
113        if let Some(topic) = receiver.subscriber_store_mut() {
114            debug!(
115                "actor_id={} subscribing to topic {} (key=\"{}\")",
116                receiver_ref.actor_id(),
117                T::topic_name().to_string(),
118                &topic_key
119            );
120
121            let receiver = topic.receiver(topic_key);
122            let subscription = Subscription::new(receiver, receiver_ref);
123
124            Ok(subscription)
125        } else {
126            error!(
127                "actor_id={} failed to subscribe to topic {} (key=\"{}\")",
128                &receiver_ref.actor_id(),
129                T::topic_name().to_string(),
130                &topic_key
131            );
132
133            Err(SubscribeErr::Err)
134        }
135    }
136}
137
138#[async_trait]
139impl Handler<SetRemote> for StreamMediator {
140    async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) {
141        Heartbeat::register(ctx.boxed_actor_ref(), &message.0);
142
143        self.remote = Some(message.0);
144        self.system_subscription = Some(self.subscribe(SystemTopic, self.actor_ref(ctx)).unwrap())
145    }
146}
147
148#[async_trait]
149impl Handler<Receive<SystemTopic>> for StreamMediator {
150    async fn handle(&mut self, message: Receive<SystemTopic>, _ctx: &mut ActorContext) {
151        match message.0.as_ref() {
152            SystemEvent::Cluster(cluster_event) => match cluster_event {
153                ClusterEvent::NodeAdded(new_node) => {
154                    if new_node.id != self.remote().node_id() {
155                        self.nodes.insert(new_node.id);
156                    }
157
158                    info!("node added (node_id={})", new_node.id);
159                }
160
161                ClusterEvent::NodeRemoved(removed_node) => {
162                    // TODO: instead of just removing a node when
163                    //       we receive notification that a node was removed from the cluster,
164                    //       we could potentially start buffering any messages that have been published
165                    //       with a configurable TTL so that if/when the node rejoins the cluster,
166                    //       it will receive any messages it may have missed.
167
168                    let _ = self.nodes.remove(&removed_node.id);
169
170                    info!("node removed (node_id={})", removed_node.id);
171                }
172                _ => {}
173            },
174        }
175    }
176}
177
178#[async_trait]
179impl<T: Topic> Handler<Publish<T>> for StreamMediator {
180    async fn handle(
181        &mut self,
182        message: Publish<T>,
183        _ctx: &mut ActorContext,
184    ) -> Result<(), PublishErr> {
185        let msg = Arc::new(message.message);
186        if let Some(topic) = self.topics.get(T::topic_name()) {
187            topic.0.emit(&message.topic.key(), msg.clone()).await;
188        }
189
190        if message.reach.remote_publish() && !self.nodes.is_empty() {
191            match msg.write_to_bytes() {
192                Some(bytes) => {
193                    let remote = self.remote().clone();
194                    let topic = T::topic_name().to_string();
195                    let key = message.topic.key();
196                    let nodes: Vec<NodeId> = self.nodes.iter().copied().collect();
197
198                    tokio::spawn(async move {
199                        let message = bytes;
200                        let publish = Arc::new(StreamPublishEvent {
201                            topic,
202                            message,
203                            key,
204                            ..Default::default()
205                        });
206
207                        let node_count = nodes.len();
208                        for node in nodes {
209                            let _ = remote
210                                .notify_node(node, SessionEvent::StreamPublish(publish.clone()))
211                                .await;
212                        }
213
214                        debug!("notified {} nodes", node_count);
215                    });
216                    Ok(())
217                }
218                None => Err(PublishErr::SerializationErr),
219            }
220        } else {
221            Ok(())
222        }
223    }
224}
225
226impl From<Arc<StreamPublishEvent>> for PublishRaw {
227    fn from(s: Arc<StreamPublishEvent>) -> Self {
228        match Arc::<StreamPublishEvent>::try_unwrap(s) {
229            Ok(p) => {
230                let topic = p.topic;
231                let key = p.key;
232                let message = p.message;
233
234                Self {
235                    topic,
236                    key,
237                    message,
238                }
239            }
240            Err(s) => {
241                let topic = s.topic.clone();
242                let key = s.key.clone();
243                let message = s.message.clone();
244
245                Self {
246                    topic,
247                    key,
248                    message,
249                }
250            }
251        }
252    }
253}
254
255#[async_trait]
256impl Handler<PublishRaw> for StreamMediator {
257    async fn handle(&mut self, message: PublishRaw, _ctx: &mut ActorContext) {
258        if let Some(topic) = self.topics.get(&message.topic) {
259            topic.0.emit_serialised(&message.key, message.message).await;
260        } else {
261            trace!("no topic: {}", &message.topic)
262        }
263    }
264}
265
266#[async_trait]
267impl<A: Actor, T: Topic> Handler<Subscribe<A, T>> for StreamMediator
268where
269    A: Handler<Receive<T>>,
270{
271    async fn handle(
272        &mut self,
273        message: Subscribe<A, T>,
274        _ctx: &mut ActorContext,
275    ) -> Result<Subscription, SubscribeErr> {
276        self.subscribe(message.topic, message.receiver_ref)
277    }
278}
279
280impl Reach {
281    pub fn remote_publish(&self) -> bool {
282        match &self {
283            Self::Local => false,
284            Self::Cluster => true,
285        }
286    }
287}
288
289impl MediatorTopic {
290    pub fn new<T: Topic>() -> Self {
291        let subscriber_store = TopicSubscriberStore::<T>::new();
292        MediatorTopic(Box::new(subscriber_store))
293    }
294
295    pub fn subscriber_store_mut<T: Topic>(&mut self) -> Option<&mut TopicSubscriberStore<T>> {
296        self.0.as_any_mut().downcast_mut()
297    }
298
299    pub fn subscriber_store<T: Topic>(&mut self) -> Option<&TopicSubscriberStore<T>> {
300        self.0.as_any().downcast_ref()
301    }
302}