elfo_core/
topology.rs

1use std::{cell::RefCell, sync::Arc};
2
3use parking_lot::RwLock;
4use tokio::runtime::Handle;
5
6#[cfg(feature = "unstable-stuck-detection")]
7use crate::stuck_detection::StuckDetector;
8use crate::{
9    addr::Addr,
10    address_book::{AddressBook, VacantEntry},
11    context::Context,
12    demux::{Demux, Filter as DemuxFilter},
13    envelope::Envelope,
14    group::Schema,
15    runtime::RuntimeManager,
16};
17
18#[derive(Clone)]
19pub struct Topology {
20    pub(crate) book: AddressBook,
21    inner: Arc<RwLock<Inner>>,
22}
23
24#[derive(Default)]
25struct Inner {
26    groups: Vec<ActorGroup>,
27    connections: Vec<Connection>,
28    rt_manager: RuntimeManager,
29}
30
31#[derive(Debug, Clone)]
32#[non_exhaustive]
33pub struct ActorGroup {
34    pub addr: Addr,
35    pub name: String,
36    pub is_entrypoint: bool,
37}
38
39#[derive(Debug, Clone)]
40#[non_exhaustive]
41pub struct Connection {
42    pub from: Addr,
43    pub to: Addr,
44}
45
46impl Topology {
47    pub fn empty() -> Self {
48        Self {
49            book: AddressBook::new(),
50            inner: Arc::new(RwLock::new(Inner::default())),
51        }
52    }
53
54    #[stability::unstable]
55    pub fn add_dedicated_rt<F: Fn(&crate::ActorMeta) -> bool + Send + Sync + 'static>(
56        &self,
57        filter: F,
58        handle: Handle,
59    ) {
60        self.inner.write().rt_manager.add(filter, handle);
61    }
62
63    #[cfg(feature = "unstable-stuck-detection")]
64    pub fn stuck_detector(&self) -> StuckDetector {
65        self.inner.read().rt_manager.stuck_detector()
66    }
67
68    pub fn local(&self, name: impl Into<String>) -> Local<'_> {
69        let name = name.into();
70        let entry = self.book.vacant_entry();
71
72        let mut inner = self.inner.write();
73        inner.groups.push(ActorGroup {
74            addr: entry.addr(),
75            name: name.clone(),
76            is_entrypoint: false,
77        });
78
79        Local {
80            name,
81            topology: self,
82            entry,
83            demux: RefCell::new(Demux::default()),
84        }
85    }
86
87    pub fn remote(&self, _name: impl Into<String>) -> Remote<'_> {
88        todo!()
89    }
90
91    pub fn actor_groups(&self) -> impl Iterator<Item = ActorGroup> + '_ {
92        let inner = self.inner.read();
93        inner.groups.clone().into_iter()
94    }
95
96    pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
97        let inner = self.inner.read();
98        inner.connections.clone().into_iter()
99    }
100}
101
102impl Default for Topology {
103    fn default() -> Self {
104        Self::empty()
105    }
106}
107
108#[must_use]
109pub struct Local<'t> {
110    name: String,
111    topology: &'t Topology,
112    entry: VacantEntry<'t>,
113    demux: RefCell<Demux>,
114}
115
116impl<'t> Local<'t> {
117    pub fn entrypoint(self) -> Self {
118        let mut inner = self.topology.inner.write();
119        let group = inner
120            .groups
121            .iter_mut()
122            .find(|group| group.addr == self.entry.addr())
123            .expect("just created");
124        group.is_entrypoint = true;
125        self
126    }
127
128    pub fn route_to(
129        &self,
130        dest: &impl GetAddrs,
131        filter: impl Fn(&Envelope) -> bool + Send + Sync + 'static,
132    ) {
133        let filter = Arc::new(filter);
134        for addr in dest.addrs() {
135            self.demux
136                .borrow_mut()
137                .append(addr, DemuxFilter::Dynamic(filter.clone()));
138        }
139    }
140
141    pub fn route_all_to(&self, dest: &impl GetAddrs) {
142        // TODO: more efficient impls.
143        self.route_to(dest, |_| true)
144    }
145
146    pub fn mount(self, schema: Schema) {
147        let addr = self.entry.addr();
148        let book = self.topology.book.clone();
149        let ctx = Context::new(book, self.demux.into_inner()).with_group(addr);
150        let rt_manager = self.topology.inner.read().rt_manager.clone();
151        let object = (schema.run)(ctx, self.name, rt_manager);
152        self.entry.insert(object);
153    }
154}
155
156pub struct Remote<'t> {
157    addr: Addr,
158    _topology: &'t Topology,
159}
160
161#[doc(hidden)]
162pub trait GetAddrs {
163    fn addrs(&self) -> Vec<Addr>;
164}
165
166impl<'t> GetAddrs for Local<'t> {
167    fn addrs(&self) -> Vec<Addr> {
168        vec![self.entry.addr()]
169    }
170}
171
172impl<'t> GetAddrs for Remote<'t> {
173    fn addrs(&self) -> Vec<Addr> {
174        vec![self.addr]
175    }
176}