Skip to main content

coerce/remote/actor/
registry.rs

1use crate::actor::context::ActorContext;
2use crate::actor::message::Handler;
3use crate::actor::scheduler::ActorType;
4use crate::actor::system::ActorSystem;
5use crate::actor::{Actor, ActorId, LocalActorRef};
6use crate::remote::actor::message::{
7    GetActorNode, GetNodes, NodeTerminated, RegisterActor, RegisterNode, SetRemote, UpdateNodes,
8};
9use crate::remote::actor::RemoteResponse;
10use crate::remote::cluster::node::{RemoteNode, RemoteNodeState, RemoteNodeStore};
11use crate::remote::net::message::SessionEvent;
12use crate::remote::net::proto::network::{ActorAddress, FindActorEvent};
13use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
14use crate::remote::stream::system::{SystemEvent, SystemTopic};
15use crate::remote::system::{NodeId, RemoteActorSystem};
16use protobuf::well_known_types::wrappers::UInt64Value;
17use protobuf::Message;
18use std::collections::HashMap;
19use uuid::Uuid;
20
21pub struct RemoteRegistry {
22    nodes: RemoteNodeStore,
23    actors: HashMap<ActorId, NodeId>,
24    system: Option<RemoteActorSystem>,
25    system_event_subscription: Option<Subscription>,
26}
27
28impl RemoteRegistry {
29    pub async fn new(ctx: &ActorSystem) -> LocalActorRef<RemoteRegistry> {
30        ctx.new_actor(
31            "remote-registry",
32            RemoteRegistry {
33                actors: HashMap::new(),
34                nodes: RemoteNodeStore::new(vec![]),
35                system: None,
36                system_event_subscription: None,
37            },
38            ActorType::Tracked,
39        )
40        .await
41        .expect("RemoteRegistry")
42    }
43}
44
45impl Actor for RemoteRegistry {}
46
47#[async_trait]
48impl Handler<SetRemote> for RemoteRegistry {
49    async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) {
50        let sys = message.0;
51        ctx.set_system(sys.actor_system().clone());
52        self.system = Some(sys);
53
54        let subscription = PubSub::subscribe::<Self, SystemTopic>(SystemTopic, ctx).await;
55        if let Ok(subscription) = subscription {
56            trace!("subscribed to system event");
57            self.system_event_subscription = Some(subscription);
58        }
59    }
60}
61
62#[async_trait]
63impl Handler<GetNodes> for RemoteRegistry {
64    async fn handle(
65        &mut self,
66        _message: GetNodes,
67        _ctx: &mut ActorContext,
68    ) -> Vec<RemoteNodeState> {
69        self.nodes.get_all()
70    }
71}
72
73#[async_trait]
74impl Handler<RegisterNode> for RemoteRegistry {
75    async fn handle(&mut self, message: RegisterNode, _ctx: &mut ActorContext) {
76        self.register_node(message.0);
77    }
78}
79
80impl RemoteRegistry {
81    pub fn register_node(&mut self, node: RemoteNode) {
82        self.nodes.add(node);
83    }
84}
85
86#[async_trait]
87impl Handler<UpdateNodes> for RemoteRegistry {
88    async fn handle(&mut self, message: UpdateNodes, _ctx: &mut ActorContext) {
89        self.nodes.update_nodes(message.0);
90    }
91}
92
93#[async_trait]
94impl Handler<NodeTerminated> for RemoteRegistry {
95    async fn handle(&mut self, message: NodeTerminated, _ctx: &mut ActorContext) {
96        self.nodes.node_terminated(message.0);
97        debug!("node_id={} marked as terminated", message.0);
98
99        // TODO: should this be published to clusterevent subscribers?
100    }
101}
102
103#[async_trait]
104impl Handler<GetActorNode> for RemoteRegistry {
105    async fn handle(&mut self, message: GetActorNode, _: &mut ActorContext) {
106        // let span = tracing::trace_span!(
107        //     "RemoteRegistry::GetActorNode",
108        //     actor_id = message.actor_id.as_str()
109        // );
110        //
111        // let _enter = span.enter();
112
113        let id = message.actor_id;
114        let current_system = self.system.as_ref().unwrap().node_id();
115        let assigned_registry_node = self.nodes.get_by_key(&id).map(|n| n.id);
116
117        let assigned_registry_node = assigned_registry_node.map_or_else(
118            || {
119                trace!("no nodes configured, assigning locally");
120                current_system
121            },
122            |n| n,
123        );
124
125        trace!("{:?}", &self.nodes.get_all());
126
127        let local_registry_entry = self.actors.get(&id);
128        if local_registry_entry.is_some() || &assigned_registry_node == &current_system {
129            trace!("searching locally, {}", current_system);
130            let node = local_registry_entry.map(|s| *s);
131
132            trace!("found: {:?}", &node);
133            let _ = message.sender.send(node);
134        } else {
135            let system = self.system.as_ref().unwrap().clone();
136            let sender = message.sender;
137
138            trace!(
139                "asking remotely, current_sys={}, target_sys={}",
140                current_system,
141                assigned_registry_node
142            );
143            tokio::spawn(async move {
144                // let span = tracing::trace_span!("RemoteRegistry::GetActorNode::Remote");
145                // let _enter = span.enter();
146
147                let message_id = Uuid::new_v4();
148                let system = system;
149                let (res_tx, res_rx) = tokio::sync::oneshot::channel();
150
151                trace!("remote request={}", message_id);
152                system.push_request(message_id, res_tx);
153
154                trace!("sending actor lookup request to={}", assigned_registry_node);
155                let trace_id = String::new(); //extract_trace_identifier(&span);
156                system
157                    .notify_node(
158                        assigned_registry_node,
159                        SessionEvent::FindActor(FindActorEvent {
160                            message_id: message_id.to_string(),
161                            actor_id: id.to_string(),
162                            trace_id,
163                            ..Default::default()
164                        }),
165                    )
166                    .await;
167
168                trace!("lookup sent, waiting for result");
169                match res_rx.await {
170                    Ok(RemoteResponse::Ok(res)) => {
171                        let res = ActorAddress::parse_from_bytes(&res);
172                        match res {
173                            Ok(res) => {
174                                let _ = sender.send(if res.node_id.is_none() {
175                                    None
176                                } else {
177                                    Some(res.node_id.value)
178                                });
179                            }
180                            Err(e) => {
181                                panic!("failed to decode message - {}", e.to_string());
182                            }
183                        }
184                    }
185                    _ => panic!("get actornode failed"),
186                }
187            });
188        }
189    }
190}
191
192#[async_trait]
193impl Handler<RegisterActor> for RemoteRegistry {
194    async fn handle(&mut self, message: RegisterActor, _ctx: &mut ActorContext) {
195        trace!(
196            "Registering actor: {:?}, node={}",
197            &message,
198            self.system.as_ref().unwrap().node_id()
199        );
200
201        match message.node_id {
202            Some(node_id) => {
203                trace!("registering actor locally {}", node_id);
204                self.actors.insert(message.actor_id, node_id);
205            }
206
207            None => {
208                if let Some(system) = self.system.as_mut() {
209                    let node_id = system.node_id();
210                    let id = message.actor_id;
211
212                    let assigned_registry_node =
213                        self.nodes.get_by_key(&id).map_or_else(|| node_id, |n| n.id);
214
215                    if &assigned_registry_node == &node_id {
216                        trace!("registering actor locally {}", assigned_registry_node);
217                        self.actors.insert(id, node_id);
218                    } else {
219                        let system = system.clone();
220                        tokio::spawn(async move {
221                            let event = SessionEvent::RegisterActor(ActorAddress {
222                                node_id: Some(UInt64Value::from(node_id)).into(),
223                                actor_id: id.to_string(),
224                                ..ActorAddress::default()
225                            });
226                            system.notify_node(assigned_registry_node, event).await;
227                        });
228                    }
229                }
230            }
231        }
232    }
233}
234
235#[async_trait]
236impl Handler<Receive<SystemTopic>> for RemoteRegistry {
237    async fn handle(&mut self, event: Receive<SystemTopic>, ctx: &mut ActorContext) {
238        match event.0.as_ref() {
239            SystemEvent::Cluster(e) => {
240                debug!("cluster event - {:?}", e);
241                let system = self.system.as_ref().unwrap().clone();
242                let registry_ref = self.actor_ref(ctx);
243                //
244                // // TODO: remove all of this stuff
245                tokio::spawn(async move {
246                    let sys = system;
247                    let actor_ids = sys
248                        .actor_system()
249                        .scheduler()
250                        .exec::<_, Vec<ActorId>>(|s| s.actors.keys().map(|k| k.clone()).collect())
251                        .await
252                        .expect("unable to get active actor ids from scheduler");
253
254                    for actor_id in actor_ids {
255                        let _ = registry_ref.notify(RegisterActor::new(actor_id, None));
256                    }
257                });
258            }
259        }
260    }
261}