coerce/remote/stream/mediator/
mod.rs1use crate::actor::context::ActorContext;
2use crate::actor::message::{Handler, Message};
3use crate::actor::{Actor, LocalActorRef};
4use crate::remote::actor::message::SetRemote;
5use crate::remote::heartbeat::Heartbeat;
6use crate::remote::net::message::SessionEvent;
7use crate::remote::net::proto::network::StreamPublishEvent;
8use crate::remote::net::StreamData;
9use crate::remote::stream::pubsub::{
10 Receive, Subscription, Topic, TopicEmitter, TopicSubscriberStore,
11};
12use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic};
13use crate::remote::system::{NodeId, RemoteActorSystem};
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18pub struct MediatorTopic(Box<dyn TopicEmitter>);
19
20#[derive(Default)]
21pub struct StreamMediator {
22 remote: Option<RemoteActorSystem>,
23 nodes: HashSet<NodeId>,
24 topics: HashMap<String, MediatorTopic>,
25 system_subscription: Option<Subscription>,
26}
27
28impl StreamMediator {
29 pub fn new() -> StreamMediator {
30 Self::default()
31 }
32
33 fn remote(&self) -> &RemoteActorSystem {
34 self.remote
35 .as_ref()
36 .expect("StreamMediator remote actor system not set")
37 }
38}
39
40impl Actor for StreamMediator {}
41
42#[derive(Debug)]
43pub enum SubscribeErr {
44 Err,
45}
46
47pub struct Subscribe<A: Actor, T: Topic> {
48 receiver_ref: LocalActorRef<A>,
49 topic: T,
50}
51
52impl<A: Actor, T: Topic> Subscribe<A, T> {
53 pub fn new(topic: T, receiver_ref: LocalActorRef<A>) -> Self {
54 Subscribe {
55 receiver_ref,
56 topic,
57 }
58 }
59}
60
61#[derive(Debug)]
62pub enum PublishErr {
63 SerializationErr,
64}
65
66pub enum Reach {
67 Local,
68 Cluster,
69}
70
71pub struct Publish<T: Topic> {
72 pub topic: T,
73 pub message: T::Message,
74 pub reach: Reach,
75}
76
77pub struct PublishRaw {
78 pub topic: String,
79 pub key: String,
80 pub message: Vec<u8>,
81}
82
83impl<A: Actor, T: Topic> Message for Subscribe<A, T> {
84 type Result = Result<Subscription, SubscribeErr>;
85}
86
87impl<T: Topic> Message for Publish<T> {
88 type Result = Result<(), PublishErr>;
89}
90
91impl Message for PublishRaw {
92 type Result = ();
93}
94
95impl StreamMediator {
96 pub fn subscribe<A: Actor, T: Topic>(
97 &mut self,
98 topic: T,
99 receiver_ref: LocalActorRef<A>,
100 ) -> Result<Subscription, SubscribeErr>
101 where
102 A: Handler<Receive<T>>,
103 {
104 let topic_key = &topic.key();
105 let receiver = match self.topics.entry(T::topic_name().to_string()) {
106 Entry::Occupied(occupied) => occupied.into_mut(),
107 Entry::Vacant(vacant_entry) => {
108 let topic = MediatorTopic::new::<T>();
109 vacant_entry.insert(topic)
110 }
111 };
112
113 if let Some(topic) = receiver.subscriber_store_mut() {
114 debug!(
115 "actor_id={} subscribing to topic {} (key=\"{}\")",
116 receiver_ref.actor_id(),
117 T::topic_name().to_string(),
118 &topic_key
119 );
120
121 let receiver = topic.receiver(topic_key);
122 let subscription = Subscription::new(receiver, receiver_ref);
123
124 Ok(subscription)
125 } else {
126 error!(
127 "actor_id={} failed to subscribe to topic {} (key=\"{}\")",
128 &receiver_ref.actor_id(),
129 T::topic_name().to_string(),
130 &topic_key
131 );
132
133 Err(SubscribeErr::Err)
134 }
135 }
136}
137
138#[async_trait]
139impl Handler<SetRemote> for StreamMediator {
140 async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) {
141 Heartbeat::register(ctx.boxed_actor_ref(), &message.0);
142
143 self.remote = Some(message.0);
144 self.system_subscription = Some(self.subscribe(SystemTopic, self.actor_ref(ctx)).unwrap())
145 }
146}
147
148#[async_trait]
149impl Handler<Receive<SystemTopic>> for StreamMediator {
150 async fn handle(&mut self, message: Receive<SystemTopic>, _ctx: &mut ActorContext) {
151 match message.0.as_ref() {
152 SystemEvent::Cluster(cluster_event) => match cluster_event {
153 ClusterEvent::NodeAdded(new_node) => {
154 if new_node.id != self.remote().node_id() {
155 self.nodes.insert(new_node.id);
156 }
157
158 info!("node added (node_id={})", new_node.id);
159 }
160
161 ClusterEvent::NodeRemoved(removed_node) => {
162 let _ = self.nodes.remove(&removed_node.id);
169
170 info!("node removed (node_id={})", removed_node.id);
171 }
172 _ => {}
173 },
174 }
175 }
176}
177
178#[async_trait]
179impl<T: Topic> Handler<Publish<T>> for StreamMediator {
180 async fn handle(
181 &mut self,
182 message: Publish<T>,
183 _ctx: &mut ActorContext,
184 ) -> Result<(), PublishErr> {
185 let msg = Arc::new(message.message);
186 if let Some(topic) = self.topics.get(T::topic_name()) {
187 topic.0.emit(&message.topic.key(), msg.clone()).await;
188 }
189
190 if message.reach.remote_publish() && !self.nodes.is_empty() {
191 match msg.write_to_bytes() {
192 Some(bytes) => {
193 let remote = self.remote().clone();
194 let topic = T::topic_name().to_string();
195 let key = message.topic.key();
196 let nodes: Vec<NodeId> = self.nodes.iter().copied().collect();
197
198 tokio::spawn(async move {
199 let message = bytes;
200 let publish = Arc::new(StreamPublishEvent {
201 topic,
202 message,
203 key,
204 ..Default::default()
205 });
206
207 let node_count = nodes.len();
208 for node in nodes {
209 let _ = remote
210 .notify_node(node, SessionEvent::StreamPublish(publish.clone()))
211 .await;
212 }
213
214 debug!("notified {} nodes", node_count);
215 });
216 Ok(())
217 }
218 None => Err(PublishErr::SerializationErr),
219 }
220 } else {
221 Ok(())
222 }
223 }
224}
225
226impl From<Arc<StreamPublishEvent>> for PublishRaw {
227 fn from(s: Arc<StreamPublishEvent>) -> Self {
228 match Arc::<StreamPublishEvent>::try_unwrap(s) {
229 Ok(p) => {
230 let topic = p.topic;
231 let key = p.key;
232 let message = p.message;
233
234 Self {
235 topic,
236 key,
237 message,
238 }
239 }
240 Err(s) => {
241 let topic = s.topic.clone();
242 let key = s.key.clone();
243 let message = s.message.clone();
244
245 Self {
246 topic,
247 key,
248 message,
249 }
250 }
251 }
252 }
253}
254
255#[async_trait]
256impl Handler<PublishRaw> for StreamMediator {
257 async fn handle(&mut self, message: PublishRaw, _ctx: &mut ActorContext) {
258 if let Some(topic) = self.topics.get(&message.topic) {
259 topic.0.emit_serialised(&message.key, message.message).await;
260 } else {
261 trace!("no topic: {}", &message.topic)
262 }
263 }
264}
265
266#[async_trait]
267impl<A: Actor, T: Topic> Handler<Subscribe<A, T>> for StreamMediator
268where
269 A: Handler<Receive<T>>,
270{
271 async fn handle(
272 &mut self,
273 message: Subscribe<A, T>,
274 _ctx: &mut ActorContext,
275 ) -> Result<Subscription, SubscribeErr> {
276 self.subscribe(message.topic, message.receiver_ref)
277 }
278}
279
280impl Reach {
281 pub fn remote_publish(&self) -> bool {
282 match &self {
283 Self::Local => false,
284 Self::Cluster => true,
285 }
286 }
287}
288
289impl MediatorTopic {
290 pub fn new<T: Topic>() -> Self {
291 let subscriber_store = TopicSubscriberStore::<T>::new();
292 MediatorTopic(Box::new(subscriber_store))
293 }
294
295 pub fn subscriber_store_mut<T: Topic>(&mut self) -> Option<&mut TopicSubscriberStore<T>> {
296 self.0.as_any_mut().downcast_mut()
297 }
298
299 pub fn subscriber_store<T: Topic>(&mut self) -> Option<&TopicSubscriberStore<T>> {
300 self.0.as_any().downcast_ref()
301 }
302}