1use std::{cell::RefCell, sync::Arc};
2
3use parking_lot::RwLock;
4use tokio::runtime::Handle;
5
6#[cfg(feature = "unstable-stuck-detection")]
7use crate::stuck_detection::StuckDetector;
8use crate::{
9 addr::Addr,
10 address_book::{AddressBook, VacantEntry},
11 context::Context,
12 demux::{Demux, Filter as DemuxFilter},
13 envelope::Envelope,
14 group::Schema,
15 runtime::RuntimeManager,
16};
17
18#[derive(Clone)]
19pub struct Topology {
20 pub(crate) book: AddressBook,
21 inner: Arc<RwLock<Inner>>,
22}
23
24#[derive(Default)]
25struct Inner {
26 groups: Vec<ActorGroup>,
27 connections: Vec<Connection>,
28 rt_manager: RuntimeManager,
29}
30
31#[derive(Debug, Clone)]
32#[non_exhaustive]
33pub struct ActorGroup {
34 pub addr: Addr,
35 pub name: String,
36 pub is_entrypoint: bool,
37}
38
39#[derive(Debug, Clone)]
40#[non_exhaustive]
41pub struct Connection {
42 pub from: Addr,
43 pub to: Addr,
44}
45
46impl Topology {
47 pub fn empty() -> Self {
48 Self {
49 book: AddressBook::new(),
50 inner: Arc::new(RwLock::new(Inner::default())),
51 }
52 }
53
54 #[stability::unstable]
55 pub fn add_dedicated_rt<F: Fn(&crate::ActorMeta) -> bool + Send + Sync + 'static>(
56 &self,
57 filter: F,
58 handle: Handle,
59 ) {
60 self.inner.write().rt_manager.add(filter, handle);
61 }
62
63 #[cfg(feature = "unstable-stuck-detection")]
64 pub fn stuck_detector(&self) -> StuckDetector {
65 self.inner.read().rt_manager.stuck_detector()
66 }
67
68 pub fn local(&self, name: impl Into<String>) -> Local<'_> {
69 let name = name.into();
70 let entry = self.book.vacant_entry();
71
72 let mut inner = self.inner.write();
73 inner.groups.push(ActorGroup {
74 addr: entry.addr(),
75 name: name.clone(),
76 is_entrypoint: false,
77 });
78
79 Local {
80 name,
81 topology: self,
82 entry,
83 demux: RefCell::new(Demux::default()),
84 }
85 }
86
87 pub fn remote(&self, _name: impl Into<String>) -> Remote<'_> {
88 todo!()
89 }
90
91 pub fn actor_groups(&self) -> impl Iterator<Item = ActorGroup> + '_ {
92 let inner = self.inner.read();
93 inner.groups.clone().into_iter()
94 }
95
96 pub fn connections(&self) -> impl Iterator<Item = Connection> + '_ {
97 let inner = self.inner.read();
98 inner.connections.clone().into_iter()
99 }
100}
101
102impl Default for Topology {
103 fn default() -> Self {
104 Self::empty()
105 }
106}
107
108#[must_use]
109pub struct Local<'t> {
110 name: String,
111 topology: &'t Topology,
112 entry: VacantEntry<'t>,
113 demux: RefCell<Demux>,
114}
115
116impl<'t> Local<'t> {
117 pub fn entrypoint(self) -> Self {
118 let mut inner = self.topology.inner.write();
119 let group = inner
120 .groups
121 .iter_mut()
122 .find(|group| group.addr == self.entry.addr())
123 .expect("just created");
124 group.is_entrypoint = true;
125 self
126 }
127
128 pub fn route_to(
129 &self,
130 dest: &impl GetAddrs,
131 filter: impl Fn(&Envelope) -> bool + Send + Sync + 'static,
132 ) {
133 let filter = Arc::new(filter);
134 for addr in dest.addrs() {
135 self.demux
136 .borrow_mut()
137 .append(addr, DemuxFilter::Dynamic(filter.clone()));
138 }
139 }
140
141 pub fn route_all_to(&self, dest: &impl GetAddrs) {
142 self.route_to(dest, |_| true)
144 }
145
146 pub fn mount(self, schema: Schema) {
147 let addr = self.entry.addr();
148 let book = self.topology.book.clone();
149 let ctx = Context::new(book, self.demux.into_inner()).with_group(addr);
150 let rt_manager = self.topology.inner.read().rt_manager.clone();
151 let object = (schema.run)(ctx, self.name, rt_manager);
152 self.entry.insert(object);
153 }
154}
155
156pub struct Remote<'t> {
157 addr: Addr,
158 _topology: &'t Topology,
159}
160
161#[doc(hidden)]
162pub trait GetAddrs {
163 fn addrs(&self) -> Vec<Addr>;
164}
165
166impl<'t> GetAddrs for Local<'t> {
167 fn addrs(&self) -> Vec<Addr> {
168 vec![self.entry.addr()]
169 }
170}
171
172impl<'t> GetAddrs for Remote<'t> {
173 fn addrs(&self) -> Vec<Addr> {
174 vec![self.addr]
175 }
176}