1use std::{cell::RefCell, sync::Arc};
2
3use parking_lot::RwLock;
4use sealed::sealed;
5use tokio::runtime::Handle;
6
7#[cfg(feature = "unstable-stuck-detection")]
8use crate::stuck_detection::StuckDetector;
9use crate::{
10 addr::{Addr, GroupNo, NodeLaunchId, NodeNo},
11 address_book::{AddressBook, VacantEntry},
12 context::Context,
13 demux::Demux,
14 envelope::Envelope,
15 group::Blueprint,
16 runtime::RuntimeManager,
17};
18
19pub(crate) const SYSTEM_INIT_GROUP_NO: u8 = 1;
20
21#[derive(Clone)]
23pub struct Topology {
24 node_no: NodeNo,
25 launch_id: NodeLaunchId,
26 pub(crate) book: AddressBook,
27 inner: Arc<RwLock<Inner>>,
28}
29
30struct Inner {
31 last_group_no: u8,
32 locals: Vec<LocalActorGroup>,
33 #[cfg(feature = "network")]
34 remotes: Vec<RemoteActorGroup>,
35 connections: Vec<Connection>,
36 rt_manager: RuntimeManager,
37}
38
39impl Default for Inner {
40 fn default() -> Self {
41 Self {
42 last_group_no: SYSTEM_INIT_GROUP_NO,
43 locals: Vec::new(),
44 #[cfg(feature = "network")]
45 remotes: Vec::new(),
46 connections: Vec::new(),
47 rt_manager: RuntimeManager::default(),
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54#[non_exhaustive]
55pub struct LocalActorGroup {
56 pub addr: Addr,
57 pub name: String,
58 pub is_entrypoint: bool,
59 pub is_mounted: bool,
60 pub(crate) stop_order: i8,
61}
62
63#[instability::unstable]
65#[derive(Debug, Clone)]
66#[non_exhaustive]
67pub struct Connection {
68 pub from: Addr,
69 pub to: ConnectionTo,
70}
71
72#[derive(Debug, Clone)]
74pub enum ConnectionTo {
75 Local(Addr),
76 #[cfg(feature = "network")]
77 Remote(String),
78}
79
80impl ConnectionTo {
81 #[instability::unstable]
82 pub fn into_remote(self) -> Option<String> {
83 match self {
84 Self::Local(_) => None,
85 #[cfg(feature = "network")]
86 Self::Remote(name) => Some(name),
87 }
88 }
89}
90
91impl Default for Topology {
92 fn default() -> Self {
93 Self::empty()
94 }
95}
96
97impl Topology {
98 pub fn empty() -> Self {
100 let launch_id = NodeLaunchId::generate();
101 Self {
102 node_no: NodeNo::generate(),
103 launch_id,
104 book: AddressBook::new(launch_id),
105 inner: Arc::new(RwLock::new(Inner::default())),
106 }
107 }
108
109 #[instability::unstable]
111 pub fn node_no(&self) -> NodeNo {
112 self.node_no
113 }
114
115 #[instability::unstable]
119 pub fn set_node_no(&mut self, node_no: NodeNo) {
120 self.node_no = node_no;
121 }
122
123 pub fn launch_id(&self) -> NodeLaunchId {
125 self.launch_id
126 }
127
128 #[instability::unstable]
134 pub fn add_dedicated_rt<F: Fn(&crate::ActorMeta) -> bool + Send + Sync + 'static>(
135 &self,
136 filter: F,
137 handle: Handle,
138 ) {
139 self.inner.write().rt_manager.add(filter, handle);
140 }
141
142 #[cfg(feature = "unstable-stuck-detection")]
143 pub fn stuck_detector(&self) -> StuckDetector {
144 self.inner.read().rt_manager.stuck_detector()
145 }
146
147 #[track_caller]
153 pub fn local(&self, name: impl Into<String>) -> Local<'_> {
154 let name = name.into();
155 let mut inner = self.inner.write();
156
157 for local in &inner.locals {
158 if local.name == name {
159 panic!("local group name `{name}` is already taken");
160 }
161 }
162
163 inner.last_group_no = inner.last_group_no.checked_add(1).expect("too many groups");
164 let group_no = GroupNo::new(inner.last_group_no, self.launch_id).expect("invalid group no");
165
166 let entry = self.book.vacant_entry(group_no);
167 inner.locals.push(LocalActorGroup {
168 addr: entry.addr(),
169 name: name.clone(),
170 is_mounted: false,
171 is_entrypoint: false,
172 stop_order: 0,
173 });
174
175 Local {
176 name,
177 topology: self,
178 entry,
179 demux: RefCell::new(Demux::default()),
180 }
181 }
182
183 pub fn locals(&self) -> impl Iterator<Item = LocalActorGroup> + '_ {
185 let inner = self.inner.read();
186 inner.locals.clone().into_iter().filter(|g| g.is_mounted)
187 }
188
189 #[instability::unstable]
190 pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
191 let inner = self.inner.read();
192 inner.connections.clone().into_iter()
193 }
194}
195
196#[must_use]
198pub struct Local<'t> {
199 topology: &'t Topology,
200 name: String,
201 entry: VacantEntry<'t>,
202 demux: RefCell<Demux>,
203}
204
205impl Local<'_> {
206 #[doc(hidden)]
207 pub fn addr(&self) -> Addr {
208 self.entry.addr()
209 }
210
211 pub fn entrypoint(self) -> Self {
218 self.with_group_mut(|group| group.is_entrypoint = true);
219 self
220 }
221
222 pub fn route_to<F>(&self, dest: &impl Destination<F>, filter: F) {
245 dest.extend_demux(
246 self.entry.addr().group_no().expect("invalid addr"),
247 &mut self.demux.borrow_mut(),
248 filter,
249 );
250
251 let mut inner = self.topology.inner.write();
252 inner.connections.push(Connection {
253 from: self.entry.addr(),
254 to: dest.connection_endpoint(),
255 });
256 }
257
258 pub fn route_all_to(&self, dest: &Local<'_>) {
260 let addr = dest.entry.addr();
261 self.demux
262 .borrow_mut()
263 .append(move |_, addrs| addrs.push(addr));
264 }
265
266 pub fn mount(self, blueprint: Blueprint) {
268 self.with_group_mut(|group| {
269 group.stop_order = blueprint.stop_order;
270 group.is_mounted = true;
271 });
272
273 let addr = self.entry.addr();
274 let book = self.topology.book.clone();
275 let ctx = Context::new(book, self.demux.into_inner()).with_group(addr);
276 let rt_manager = self.topology.inner.read().rt_manager.clone();
277 let object = (blueprint.mount)(
278 ctx,
279 self.topology.node_no,
280 self.topology.launch_id,
281 self.name,
282 rt_manager,
283 );
284 self.entry.insert(object);
285 }
286
287 fn with_group_mut(&self, f: impl FnOnce(&mut LocalActorGroup)) {
288 let mut inner = self.topology.inner.write();
289 let group = inner
290 .locals
291 .iter_mut()
292 .find(|group| group.addr == self.entry.addr())
293 .expect("no corresponding group for Local<_>");
294 f(group);
295 }
296}
297
298#[sealed]
299pub trait Destination<F> {
300 #[doc(hidden)]
301 fn extend_demux(&self, source_group_no: GroupNo, demux: &mut Demux, filter: F);
302
303 #[doc(hidden)]
304 fn connection_endpoint(&self) -> ConnectionTo;
305}
306
307#[sealed]
308impl<F> Destination<F> for Local<'_>
309where
310 F: Fn(&Envelope) -> bool + Send + Sync + 'static,
311{
312 fn extend_demux(&self, _: GroupNo, demux: &mut Demux, filter: F) {
313 let addr = self.entry.addr();
314 demux.append(move |envelope, addrs| {
315 if filter(envelope) {
316 addrs.push(addr);
317 }
318 });
319 }
320
321 fn connection_endpoint(&self) -> ConnectionTo {
322 ConnectionTo::Local(self.entry.addr())
323 }
324}
325
326cfg_network!({
327 use arc_swap::ArcSwap;
328 use fxhash::FxHashMap;
329
330 use crate::{object::Object, remote::RemoteHandle};
331
332 type Nodes = Arc<ArcSwap<FxHashMap<NodeNo, Addr>>>;
335
336 #[instability::unstable]
339 #[derive(Debug, Clone)]
340 #[non_exhaustive]
341 pub struct RemoteActorGroup {
342 pub name: String,
343 nodes: FxHashMap<GroupNo, Nodes>,
345 }
346
347 impl Topology {
348 #[instability::unstable]
351 pub fn register_remote(
352 &self,
353 network_actor_addr: Addr,
354 local_group: GroupNo,
355 remote_group: (NodeNo, GroupNo),
356 remote_group_name: &str,
357 handle: impl RemoteHandle,
358 ) -> RegisterRemoteGroupGuard<'_> {
359 let group_no =
362 GroupNo::new(SYSTEM_INIT_GROUP_NO, self.launch_id).expect("invalid group no");
363 let entry = self.book.vacant_entry(group_no);
364 let handle_addr = entry.addr();
365 let object = Object::new(handle_addr, Box::new(handle) as Box<dyn RemoteHandle>);
366 entry.insert(object);
367
368 self.book
369 .register_remote(network_actor_addr, local_group, remote_group, handle_addr);
370
371 let nodes = {
374 let inner = self.inner.write();
375 inner
376 .remotes
377 .iter()
378 .find(|group| group.name == remote_group_name)
379 .and_then(|group| group.nodes.get(&local_group).cloned())
380 };
381
382 if let Some(nodes) = &nodes {
383 nodes.rcu(|nodes| {
384 let mut nodes = (**nodes).clone();
385 nodes.insert(remote_group.0, handle_addr);
386 nodes
387 });
388 }
389
390 RegisterRemoteGroupGuard {
391 book: &self.book,
392 handle_addr,
393 network_actor_addr,
394 local_group,
395 remote_group,
396 nodes,
397 }
398 }
399
400 pub fn remote(&self, name: impl Into<String>) -> Remote<'_> {
405 let name = name.into();
406 let mut inner = self.inner.write();
407
408 for remote in &inner.remotes {
409 if remote.name == name {
410 panic!("remote group name `{name}` is already taken");
411 }
412 }
413
414 inner.remotes.push(RemoteActorGroup {
415 name: name.clone(),
416 nodes: Default::default(),
417 });
418
419 Remote {
420 topology: self,
421 name,
422 }
423 }
424
425 #[instability::unstable]
427 pub fn remotes(&self) -> impl Iterator<Item = RemoteActorGroup> + '_ {
428 let inner = self.inner.read();
429 inner.remotes.clone().into_iter()
430 }
431 }
432
433 pub struct Remote<'t> {
435 topology: &'t Topology,
436 name: String,
437 }
438
439 #[sealed]
440 impl<F> Destination<F> for Remote<'_>
441 where
442 F: Fn(&Envelope, &NodeDiscovery) -> Outcome + Send + Sync + 'static,
443 {
444 fn extend_demux(&self, local_group_no: GroupNo, demux: &mut Demux, filter: F) {
445 let nodes = self
446 .topology
447 .inner
448 .write()
449 .remotes
450 .iter_mut()
451 .find(|group| group.name == self.name)
452 .expect("remote group not found")
453 .nodes
454 .entry(local_group_no)
455 .or_default()
456 .clone();
457
458 demux.append(move |envelope, addrs| {
459 let discovery = NodeDiscovery(());
460
461 match filter(envelope, &discovery) {
462 Outcome::Unicast(node_no) => {
463 if let Some(addr) = nodes.load().get(&node_no) {
464 addrs.push(*addr);
465 }
466 }
467 Outcome::Multicast(node_nos) => {
468 let nodes = nodes.load();
469 for node_no in node_nos {
470 if let Some(addr) = nodes.get(&node_no) {
471 addrs.push(*addr);
472 }
473 }
474 }
475 Outcome::Broadcast => {
476 let nodes = nodes.load();
477 for addr in nodes.values() {
478 addrs.push(*addr);
479 }
480 }
481 Outcome::Discard => {}
482 }
483 });
484 }
485
486 fn connection_endpoint(&self) -> ConnectionTo {
487 ConnectionTo::Remote(self.name.clone())
488 }
489 }
490
491 #[derive(Debug)]
492 #[non_exhaustive]
493 pub enum Outcome {
494 Unicast(NodeNo),
496 Multicast(Vec<NodeNo>),
498 Broadcast,
500 Discard,
502 }
503
504 pub struct NodeDiscovery(());
506
507 #[instability::unstable]
508 pub struct RegisterRemoteGroupGuard<'a> {
509 book: &'a AddressBook,
510 handle_addr: Addr,
511 network_actor_addr: Addr,
512 local_group: GroupNo,
513 remote_group: (NodeNo, GroupNo),
514 nodes: Option<Nodes>,
515 }
516
517 #[instability::unstable]
518 impl RegisterRemoteGroupGuard<'_> {
519 pub fn handle_addr(&self) -> Addr {
520 self.handle_addr
521 }
522 }
523
524 impl Drop for RegisterRemoteGroupGuard<'_> {
525 fn drop(&mut self) {
526 self.book.deregister_remote(
528 self.network_actor_addr,
529 self.local_group,
530 self.remote_group,
531 self.handle_addr,
532 );
533
534 self.book.remove(self.handle_addr);
536
537 if let Some(nodes) = &self.nodes {
539 nodes.rcu(|nodes| {
540 let mut nodes = (**nodes).clone();
541
542 if nodes.get(&self.remote_group.0) == Some(&self.handle_addr) {
544 nodes.remove(&self.remote_group.0);
545 }
546
547 nodes
548 });
549 }
550 }
551 }
552});