Skip to main content

elfo_core/
topology.rs

1use std::{cell::RefCell, sync::Arc};
2
3use parking_lot::RwLock;
4use sealed::sealed;
5use tokio::runtime::Handle;
6
7#[cfg(feature = "unstable-stuck-detection")]
8use crate::stuck_detection::StuckDetector;
9use crate::{
10    addr::{Addr, GroupNo, NodeLaunchId, NodeNo},
11    address_book::{AddressBook, VacantEntry},
12    context::Context,
13    demux::Demux,
14    envelope::Envelope,
15    group::Blueprint,
16    runtime::RuntimeManager,
17};
18
19pub(crate) const SYSTEM_INIT_GROUP_NO: u8 = 1;
20
21/// The topology defines local and remote groups, and routes between them.
22#[derive(Clone)]
23pub struct Topology {
24    node_no: NodeNo,
25    launch_id: NodeLaunchId,
26    pub(crate) book: AddressBook,
27    inner: Arc<RwLock<Inner>>,
28}
29
30struct Inner {
31    last_group_no: u8,
32    locals: Vec<LocalActorGroup>,
33    #[cfg(feature = "network")]
34    remotes: Vec<RemoteActorGroup>,
35    connections: Vec<Connection>,
36    rt_manager: RuntimeManager,
37}
38
39impl Default for Inner {
40    fn default() -> Self {
41        Self {
42            last_group_no: SYSTEM_INIT_GROUP_NO,
43            locals: Vec::new(),
44            #[cfg(feature = "network")]
45            remotes: Vec::new(),
46            connections: Vec::new(),
47            rt_manager: RuntimeManager::default(),
48        }
49    }
50}
51
52/// Represents a local group.
53#[derive(Debug, Clone)]
54#[non_exhaustive]
55pub struct LocalActorGroup {
56    pub addr: Addr,
57    pub name: String,
58    pub is_entrypoint: bool,
59    pub is_mounted: bool,
60    pub(crate) stop_order: i8,
61}
62
63/// Represents a connection between two groups.
64#[instability::unstable]
65#[derive(Debug, Clone)]
66#[non_exhaustive]
67pub struct Connection {
68    pub from: Addr,
69    pub to: ConnectionTo,
70}
71
72// TODO: #[instability::unstable]
73#[derive(Debug, Clone)]
74pub enum ConnectionTo {
75    Local(Addr),
76    #[cfg(feature = "network")]
77    Remote(String),
78}
79
80impl ConnectionTo {
81    #[instability::unstable]
82    pub fn into_remote(self) -> Option<String> {
83        match self {
84            Self::Local(_) => None,
85            #[cfg(feature = "network")]
86            Self::Remote(name) => Some(name),
87        }
88    }
89}
90
91impl Default for Topology {
92    fn default() -> Self {
93        Self::empty()
94    }
95}
96
97impl Topology {
98    /// Creates a new empty topology.
99    pub fn empty() -> Self {
100        let launch_id = NodeLaunchId::generate();
101        Self {
102            node_no: NodeNo::generate(),
103            launch_id,
104            book: AddressBook::new(launch_id),
105            inner: Arc::new(RwLock::new(Inner::default())),
106        }
107    }
108
109    /// Returns the current node number.
110    #[instability::unstable]
111    pub fn node_no(&self) -> NodeNo {
112        self.node_no
113    }
114
115    /// Sets the current node number. Otherwise, it's randomly generated.
116    ///
117    /// See [`NodeNo`] for details.
118    #[instability::unstable]
119    pub fn set_node_no(&mut self, node_no: NodeNo) {
120        self.node_no = node_no;
121    }
122
123    /// Returns the current randomly generated launch ID.
124    pub fn launch_id(&self) -> NodeLaunchId {
125        self.launch_id
126    }
127
128    /// Adds a dedicated runtime for actors matching the given filter.
129    ///
130    /// Check [The Actoromicon] for details.
131    ///
132    /// [The Actoromicon]: https://actoromicon.rs/ch07-02-multiple-runtimes.html
133    #[instability::unstable]
134    pub fn add_dedicated_rt<F: Fn(&crate::ActorMeta) -> bool + Send + Sync + 'static>(
135        &self,
136        filter: F,
137        handle: Handle,
138    ) {
139        self.inner.write().rt_manager.add(filter, handle);
140    }
141
142    #[cfg(feature = "unstable-stuck-detection")]
143    pub fn stuck_detector(&self) -> StuckDetector {
144        self.inner.read().rt_manager.stuck_detector()
145    }
146
147    /// Declares a new local group.
148    ///
149    /// # Panics
150    /// * If the name is already taken for another local group.
151    /// * If there are too many local groups.
152    #[track_caller]
153    pub fn local(&self, name: impl Into<String>) -> Local<'_> {
154        let name = name.into();
155        let mut inner = self.inner.write();
156
157        for local in &inner.locals {
158            if local.name == name {
159                panic!("local group name `{name}` is already taken");
160            }
161        }
162
163        inner.last_group_no = inner.last_group_no.checked_add(1).expect("too many groups");
164        let group_no = GroupNo::new(inner.last_group_no, self.launch_id).expect("invalid group no");
165
166        let entry = self.book.vacant_entry(group_no);
167        inner.locals.push(LocalActorGroup {
168            addr: entry.addr(),
169            name: name.clone(),
170            is_mounted: false,
171            is_entrypoint: false,
172            stop_order: 0,
173        });
174
175        Local {
176            name,
177            topology: self,
178            entry,
179            demux: RefCell::new(Demux::default()),
180        }
181    }
182
183    /// Returns an iterator over all mounted local groups.
184    pub fn locals(&self) -> impl Iterator<Item = LocalActorGroup> + '_ {
185        let inner = self.inner.read();
186        inner.locals.clone().into_iter().filter(|g| g.is_mounted)
187    }
188
189    #[instability::unstable]
190    pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
191        let inner = self.inner.read();
192        inner.connections.clone().into_iter()
193    }
194}
195
196/// Represents a local group's settings.
197#[must_use]
198pub struct Local<'t> {
199    topology: &'t Topology,
200    name: String,
201    entry: VacantEntry<'t>,
202    demux: RefCell<Demux>,
203}
204
205impl Local<'_> {
206    #[doc(hidden)]
207    pub fn addr(&self) -> Addr {
208        self.entry.addr()
209    }
210
211    /// Mark this group as an entrypoint.
212    ///
213    /// It means, that this group will be started automatically when the system
214    /// starts, with empty configuration is provided.
215    ///
216    /// Usually, only `system.configurers` group is marked as an entrypoint.
217    pub fn entrypoint(self) -> Self {
218        self.with_group_mut(|group| group.is_entrypoint = true);
219        self
220    }
221
222    /// Defines a route to the given destination (local or remote group).
223    ///
224    /// # Examples
225    /// Local to local:
226    /// ```
227    /// # use elfo_core as elfo;
228    /// # #[elfo::message] struct SomeEvent;
229    /// use elfo::{Topology, msg};
230    ///
231    /// let topology = Topology::empty();
232    /// let foo = topology.local("foo");
233    /// let bar = topology.local("bar");
234    ///
235    /// foo.route_to(&bar, |envelope| {
236    ///     msg!(match envelope {
237    ///         SomeEvent => true,
238    ///         _ => false,
239    ///     })
240    /// });
241    /// ```
242    ///
243    /// Local to remote (requires the `network` feature): TODO
244    pub fn route_to<F>(&self, dest: &impl Destination<F>, filter: F) {
245        dest.extend_demux(
246            self.entry.addr().group_no().expect("invalid addr"),
247            &mut self.demux.borrow_mut(),
248            filter,
249        );
250
251        let mut inner = self.topology.inner.write();
252        inner.connections.push(Connection {
253            from: self.entry.addr(),
254            to: dest.connection_endpoint(),
255        });
256    }
257
258    // TODO: deprecate?
259    pub fn route_all_to(&self, dest: &Local<'_>) {
260        let addr = dest.entry.addr();
261        self.demux
262            .borrow_mut()
263            .append(move |_, addrs| addrs.push(addr));
264    }
265
266    /// Mounts a blueprint to this group.
267    pub fn mount(self, blueprint: Blueprint) {
268        self.with_group_mut(|group| {
269            group.stop_order = blueprint.stop_order;
270            group.is_mounted = true;
271        });
272
273        let addr = self.entry.addr();
274        let book = self.topology.book.clone();
275        let ctx = Context::new(book, self.demux.into_inner()).with_group(addr);
276        let rt_manager = self.topology.inner.read().rt_manager.clone();
277        let object = (blueprint.mount)(
278            ctx,
279            self.topology.node_no,
280            self.topology.launch_id,
281            self.name,
282            rt_manager,
283        );
284        self.entry.insert(object);
285    }
286
287    fn with_group_mut(&self, f: impl FnOnce(&mut LocalActorGroup)) {
288        let mut inner = self.topology.inner.write();
289        let group = inner
290            .locals
291            .iter_mut()
292            .find(|group| group.addr == self.entry.addr())
293            .expect("no corresponding group for Local<_>");
294        f(group);
295    }
296}
297
298#[sealed]
299pub trait Destination<F> {
300    #[doc(hidden)]
301    fn extend_demux(&self, source_group_no: GroupNo, demux: &mut Demux, filter: F);
302
303    #[doc(hidden)]
304    fn connection_endpoint(&self) -> ConnectionTo;
305}
306
307#[sealed]
308impl<F> Destination<F> for Local<'_>
309where
310    F: Fn(&Envelope) -> bool + Send + Sync + 'static,
311{
312    fn extend_demux(&self, _: GroupNo, demux: &mut Demux, filter: F) {
313        let addr = self.entry.addr();
314        demux.append(move |envelope, addrs| {
315            if filter(envelope) {
316                addrs.push(addr);
317            }
318        });
319    }
320
321    fn connection_endpoint(&self) -> ConnectionTo {
322        ConnectionTo::Local(self.entry.addr())
323    }
324}
325
326cfg_network!({
327    use arc_swap::ArcSwap;
328    use fxhash::FxHashMap;
329
330    use crate::{object::Object, remote::RemoteHandle};
331
332    /// Contains nodes available for routing between one specific local group
333    /// and set of remote ones with the same group name.
334    type Nodes = Arc<ArcSwap<FxHashMap<NodeNo, Addr>>>;
335
336    // TODO: remove `Clone` here, possible footgun in the future.
337    /// Represents remote group(s).
338    #[instability::unstable]
339    #[derive(Debug, Clone)]
340    #[non_exhaustive]
341    pub struct RemoteActorGroup {
342        pub name: String,
343        /// Local group => nodes for this remote group.
344        nodes: FxHashMap<GroupNo, Nodes>,
345    }
346
347    impl Topology {
348        /// # Panics
349        /// If the name isn't used in the topology.
350        #[instability::unstable]
351        pub fn register_remote(
352            &self,
353            network_actor_addr: Addr,
354            local_group: GroupNo,
355            remote_group: (NodeNo, GroupNo),
356            remote_group_name: &str,
357            handle: impl RemoteHandle,
358        ) -> RegisterRemoteGroupGuard<'_> {
359            // Register the handle to make `send_to(addr)` work.
360            // XXX: use system.network's group_no instead.
361            let group_no =
362                GroupNo::new(SYSTEM_INIT_GROUP_NO, self.launch_id).expect("invalid group no");
363            let entry = self.book.vacant_entry(group_no);
364            let handle_addr = entry.addr();
365            let object = Object::new(handle_addr, Box::new(handle) as Box<dyn RemoteHandle>);
366            entry.insert(object);
367
368            self.book
369                .register_remote(network_actor_addr, local_group, remote_group, handle_addr);
370
371            // Update the demux to make `send()` work,
372            // but only if there is a route between these groups.
373            let nodes = {
374                let inner = self.inner.write();
375                inner
376                    .remotes
377                    .iter()
378                    .find(|group| group.name == remote_group_name)
379                    .and_then(|group| group.nodes.get(&local_group).cloned())
380            };
381
382            if let Some(nodes) = &nodes {
383                nodes.rcu(|nodes| {
384                    let mut nodes = (**nodes).clone();
385                    nodes.insert(remote_group.0, handle_addr);
386                    nodes
387                });
388            }
389
390            RegisterRemoteGroupGuard {
391                book: &self.book,
392                handle_addr,
393                network_actor_addr,
394                local_group,
395                remote_group,
396                nodes,
397            }
398        }
399
400        /// Declares a new remote group.
401        ///
402        /// # Panics
403        /// * If the name is already taken for another remote group.
404        pub fn remote(&self, name: impl Into<String>) -> Remote<'_> {
405            let name = name.into();
406            let mut inner = self.inner.write();
407
408            for remote in &inner.remotes {
409                if remote.name == name {
410                    panic!("remote group name `{name}` is already taken");
411                }
412            }
413
414            inner.remotes.push(RemoteActorGroup {
415                name: name.clone(),
416                nodes: Default::default(),
417            });
418
419            Remote {
420                topology: self,
421                name,
422            }
423        }
424
425        /// Returns an iterator over all remote groups.
426        #[instability::unstable]
427        pub fn remotes(&self) -> impl Iterator<Item = RemoteActorGroup> + '_ {
428            let inner = self.inner.read();
429            inner.remotes.clone().into_iter()
430        }
431    }
432
433    /// Represents a remote group's settings.
434    pub struct Remote<'t> {
435        topology: &'t Topology,
436        name: String,
437    }
438
439    #[sealed]
440    impl<F> Destination<F> for Remote<'_>
441    where
442        F: Fn(&Envelope, &NodeDiscovery) -> Outcome + Send + Sync + 'static,
443    {
444        fn extend_demux(&self, local_group_no: GroupNo, demux: &mut Demux, filter: F) {
445            let nodes = self
446                .topology
447                .inner
448                .write()
449                .remotes
450                .iter_mut()
451                .find(|group| group.name == self.name)
452                .expect("remote group not found")
453                .nodes
454                .entry(local_group_no)
455                .or_default()
456                .clone();
457
458            demux.append(move |envelope, addrs| {
459                let discovery = NodeDiscovery(());
460
461                match filter(envelope, &discovery) {
462                    Outcome::Unicast(node_no) => {
463                        if let Some(addr) = nodes.load().get(&node_no) {
464                            addrs.push(*addr);
465                        }
466                    }
467                    Outcome::Multicast(node_nos) => {
468                        let nodes = nodes.load();
469                        for node_no in node_nos {
470                            if let Some(addr) = nodes.get(&node_no) {
471                                addrs.push(*addr);
472                            }
473                        }
474                    }
475                    Outcome::Broadcast => {
476                        let nodes = nodes.load();
477                        for addr in nodes.values() {
478                            addrs.push(*addr);
479                        }
480                    }
481                    Outcome::Discard => {}
482                }
483            });
484        }
485
486        fn connection_endpoint(&self) -> ConnectionTo {
487            ConnectionTo::Remote(self.name.clone())
488        }
489    }
490
491    #[derive(Debug)]
492    #[non_exhaustive]
493    pub enum Outcome {
494        /// Routes a message to the specified node.
495        Unicast(NodeNo),
496        /// Routes a message to all specified nodes.
497        Multicast(Vec<NodeNo>),
498        /// Routes a message to all active nodes.
499        Broadcast,
500        /// Discards a message.
501        Discard,
502    }
503
504    // Nothing for now, reserved for future use.
505    pub struct NodeDiscovery(());
506
507    #[instability::unstable]
508    pub struct RegisterRemoteGroupGuard<'a> {
509        book: &'a AddressBook,
510        handle_addr: Addr,
511        network_actor_addr: Addr,
512        local_group: GroupNo,
513        remote_group: (NodeNo, GroupNo),
514        nodes: Option<Nodes>,
515    }
516
517    #[instability::unstable]
518    impl RegisterRemoteGroupGuard<'_> {
519        pub fn handle_addr(&self) -> Addr {
520            self.handle_addr
521        }
522    }
523
524    impl Drop for RegisterRemoteGroupGuard<'_> {
525        fn drop(&mut self) {
526            // Undo the registration.
527            self.book.deregister_remote(
528                self.network_actor_addr,
529                self.local_group,
530                self.remote_group,
531                self.handle_addr,
532            );
533
534            // Disable direct messaging.
535            self.book.remove(self.handle_addr);
536
537            // Disable routing to this node if it was possible.
538            if let Some(nodes) = &self.nodes {
539                nodes.rcu(|nodes| {
540                    let mut nodes = (**nodes).clone();
541
542                    // We don't want to remove the node if it was re-registered by another handle.
543                    if nodes.get(&self.remote_group.0) == Some(&self.handle_addr) {
544                        nodes.remove(&self.remote_group.0);
545                    }
546
547                    nodes
548                });
549            }
550        }
551    }
552});