1use crate::actor::context::ActorContext;
2use crate::actor::message::Handler;
3use crate::actor::scheduler::ActorType;
4use crate::actor::system::ActorSystem;
5use crate::actor::{Actor, ActorId, LocalActorRef};
6use crate::remote::actor::message::{
7 GetActorNode, GetNodes, NodeTerminated, RegisterActor, RegisterNode, SetRemote, UpdateNodes,
8};
9use crate::remote::actor::RemoteResponse;
10use crate::remote::cluster::node::{RemoteNode, RemoteNodeState, RemoteNodeStore};
11use crate::remote::net::message::SessionEvent;
12use crate::remote::net::proto::network::{ActorAddress, FindActorEvent};
13use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
14use crate::remote::stream::system::{SystemEvent, SystemTopic};
15use crate::remote::system::{NodeId, RemoteActorSystem};
16use protobuf::well_known_types::wrappers::UInt64Value;
17use protobuf::Message;
18use std::collections::HashMap;
19use uuid::Uuid;
20
21pub struct RemoteRegistry {
22 nodes: RemoteNodeStore,
23 actors: HashMap<ActorId, NodeId>,
24 system: Option<RemoteActorSystem>,
25 system_event_subscription: Option<Subscription>,
26}
27
28impl RemoteRegistry {
29 pub async fn new(ctx: &ActorSystem) -> LocalActorRef<RemoteRegistry> {
30 ctx.new_actor(
31 "remote-registry",
32 RemoteRegistry {
33 actors: HashMap::new(),
34 nodes: RemoteNodeStore::new(vec![]),
35 system: None,
36 system_event_subscription: None,
37 },
38 ActorType::Tracked,
39 )
40 .await
41 .expect("RemoteRegistry")
42 }
43}
44
45impl Actor for RemoteRegistry {}
46
47#[async_trait]
48impl Handler<SetRemote> for RemoteRegistry {
49 async fn handle(&mut self, message: SetRemote, ctx: &mut ActorContext) {
50 let sys = message.0;
51 ctx.set_system(sys.actor_system().clone());
52 self.system = Some(sys);
53
54 let subscription = PubSub::subscribe::<Self, SystemTopic>(SystemTopic, ctx).await;
55 if let Ok(subscription) = subscription {
56 trace!("subscribed to system event");
57 self.system_event_subscription = Some(subscription);
58 }
59 }
60}
61
62#[async_trait]
63impl Handler<GetNodes> for RemoteRegistry {
64 async fn handle(
65 &mut self,
66 _message: GetNodes,
67 _ctx: &mut ActorContext,
68 ) -> Vec<RemoteNodeState> {
69 self.nodes.get_all()
70 }
71}
72
73#[async_trait]
74impl Handler<RegisterNode> for RemoteRegistry {
75 async fn handle(&mut self, message: RegisterNode, _ctx: &mut ActorContext) {
76 self.register_node(message.0);
77 }
78}
79
80impl RemoteRegistry {
81 pub fn register_node(&mut self, node: RemoteNode) {
82 self.nodes.add(node);
83 }
84}
85
86#[async_trait]
87impl Handler<UpdateNodes> for RemoteRegistry {
88 async fn handle(&mut self, message: UpdateNodes, _ctx: &mut ActorContext) {
89 self.nodes.update_nodes(message.0);
90 }
91}
92
93#[async_trait]
94impl Handler<NodeTerminated> for RemoteRegistry {
95 async fn handle(&mut self, message: NodeTerminated, _ctx: &mut ActorContext) {
96 self.nodes.node_terminated(message.0);
97 debug!("node_id={} marked as terminated", message.0);
98
99 }
101}
102
103#[async_trait]
104impl Handler<GetActorNode> for RemoteRegistry {
105 async fn handle(&mut self, message: GetActorNode, _: &mut ActorContext) {
106 let id = message.actor_id;
114 let current_system = self.system.as_ref().unwrap().node_id();
115 let assigned_registry_node = self.nodes.get_by_key(&id).map(|n| n.id);
116
117 let assigned_registry_node = assigned_registry_node.map_or_else(
118 || {
119 trace!("no nodes configured, assigning locally");
120 current_system
121 },
122 |n| n,
123 );
124
125 trace!("{:?}", &self.nodes.get_all());
126
127 let local_registry_entry = self.actors.get(&id);
128 if local_registry_entry.is_some() || &assigned_registry_node == ¤t_system {
129 trace!("searching locally, {}", current_system);
130 let node = local_registry_entry.map(|s| *s);
131
132 trace!("found: {:?}", &node);
133 let _ = message.sender.send(node);
134 } else {
135 let system = self.system.as_ref().unwrap().clone();
136 let sender = message.sender;
137
138 trace!(
139 "asking remotely, current_sys={}, target_sys={}",
140 current_system,
141 assigned_registry_node
142 );
143 tokio::spawn(async move {
144 let message_id = Uuid::new_v4();
148 let system = system;
149 let (res_tx, res_rx) = tokio::sync::oneshot::channel();
150
151 trace!("remote request={}", message_id);
152 system.push_request(message_id, res_tx);
153
154 trace!("sending actor lookup request to={}", assigned_registry_node);
155 let trace_id = String::new(); system
157 .notify_node(
158 assigned_registry_node,
159 SessionEvent::FindActor(FindActorEvent {
160 message_id: message_id.to_string(),
161 actor_id: id.to_string(),
162 trace_id,
163 ..Default::default()
164 }),
165 )
166 .await;
167
168 trace!("lookup sent, waiting for result");
169 match res_rx.await {
170 Ok(RemoteResponse::Ok(res)) => {
171 let res = ActorAddress::parse_from_bytes(&res);
172 match res {
173 Ok(res) => {
174 let _ = sender.send(if res.node_id.is_none() {
175 None
176 } else {
177 Some(res.node_id.value)
178 });
179 }
180 Err(e) => {
181 panic!("failed to decode message - {}", e.to_string());
182 }
183 }
184 }
185 _ => panic!("get actornode failed"),
186 }
187 });
188 }
189 }
190}
191
192#[async_trait]
193impl Handler<RegisterActor> for RemoteRegistry {
194 async fn handle(&mut self, message: RegisterActor, _ctx: &mut ActorContext) {
195 trace!(
196 "Registering actor: {:?}, node={}",
197 &message,
198 self.system.as_ref().unwrap().node_id()
199 );
200
201 match message.node_id {
202 Some(node_id) => {
203 trace!("registering actor locally {}", node_id);
204 self.actors.insert(message.actor_id, node_id);
205 }
206
207 None => {
208 if let Some(system) = self.system.as_mut() {
209 let node_id = system.node_id();
210 let id = message.actor_id;
211
212 let assigned_registry_node =
213 self.nodes.get_by_key(&id).map_or_else(|| node_id, |n| n.id);
214
215 if &assigned_registry_node == &node_id {
216 trace!("registering actor locally {}", assigned_registry_node);
217 self.actors.insert(id, node_id);
218 } else {
219 let system = system.clone();
220 tokio::spawn(async move {
221 let event = SessionEvent::RegisterActor(ActorAddress {
222 node_id: Some(UInt64Value::from(node_id)).into(),
223 actor_id: id.to_string(),
224 ..ActorAddress::default()
225 });
226 system.notify_node(assigned_registry_node, event).await;
227 });
228 }
229 }
230 }
231 }
232 }
233}
234
235#[async_trait]
236impl Handler<Receive<SystemTopic>> for RemoteRegistry {
237 async fn handle(&mut self, event: Receive<SystemTopic>, ctx: &mut ActorContext) {
238 match event.0.as_ref() {
239 SystemEvent::Cluster(e) => {
240 debug!("cluster event - {:?}", e);
241 let system = self.system.as_ref().unwrap().clone();
242 let registry_ref = self.actor_ref(ctx);
243 tokio::spawn(async move {
246 let sys = system;
247 let actor_ids = sys
248 .actor_system()
249 .scheduler()
250 .exec::<_, Vec<ActorId>>(|s| s.actors.keys().map(|k| k.clone()).collect())
251 .await
252 .expect("unable to get active actor ids from scheduler");
253
254 for actor_id in actor_ids {
255 let _ = registry_ref.notify(RegisterActor::new(actor_id, None));
256 }
257 });
258 }
259 }
260 }
261}