atomr_core/actor/
actor_system.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use atomr_config::Config;
7use parking_lot::Mutex;
8use thiserror::Error;
9use tokio::sync::{mpsc, Notify};
10
11use super::actor_cell::{spawn_cell, ChildEntry, SystemMsg};
12use super::actor_ref::{ActorRef, UntypedActorRef};
13use super::address::Address;
14use super::extensions::Extensions;
15use super::observer::{DeadLetterObserver, SpawnObserver};
16use super::path::ActorPath;
17use super::props::Props;
18use super::remote::RemoteProvider;
19use super::scheduler::{Scheduler, TokioScheduler};
20use super::traits::Actor;
21
22pub(crate) struct ActorSystemInner {
23 pub name: String,
24 pub config: Config,
25 pub address: Address,
26 pub scheduler: Arc<dyn Scheduler>,
27 pub extensions: Extensions,
28 pub user_guardian: Mutex<HashMap<String, ChildEntry>>,
29 pub(crate) spawn_observer: parking_lot::RwLock<Option<Arc<dyn SpawnObserver>>>,
30 pub(crate) dead_letter_observer: parking_lot::RwLock<Option<Arc<dyn DeadLetterObserver>>>,
31 pub(crate) remote_provider: parking_lot::RwLock<Option<Arc<dyn RemoteProvider>>>,
32 terminated: Notify,
33 terminated_flag: std::sync::atomic::AtomicBool,
34}
35
36#[derive(Clone)]
38pub struct ActorSystem {
39 pub(crate) inner: Arc<ActorSystemInner>,
40}
41
42impl ActorSystem {
43 pub async fn create(name: impl Into<String>, config: Config) -> Result<Self, ActorSystemError> {
45 let name = name.into();
46 let address = Address::local(&name);
47 let inner = Arc::new(ActorSystemInner {
48 name,
49 config,
50 address,
51 scheduler: Arc::new(TokioScheduler::new()),
52 extensions: Extensions::default(),
53 user_guardian: Mutex::new(HashMap::new()),
54 spawn_observer: parking_lot::RwLock::new(None),
55 dead_letter_observer: parking_lot::RwLock::new(None),
56 remote_provider: parking_lot::RwLock::new(None),
57 terminated: Notify::new(),
58 terminated_flag: std::sync::atomic::AtomicBool::new(false),
59 });
60 Ok(Self { inner })
61 }
62
63 pub fn name(&self) -> &str {
64 &self.inner.name
65 }
66
67 pub fn address(&self) -> &Address {
68 &self.inner.address
69 }
70
71 pub fn config(&self) -> &Config {
72 &self.inner.config
73 }
74
75 pub fn scheduler(&self) -> Arc<dyn Scheduler> {
76 self.inner.scheduler.clone()
77 }
78
79 pub fn extensions(&self) -> &Extensions {
80 &self.inner.extensions
81 }
82
83 pub fn set_spawn_observer(&self, obs: Arc<dyn SpawnObserver>) {
87 *self.inner.spawn_observer.write() = Some(obs);
88 }
89
90 pub fn set_dead_letter_observer(&self, obs: Arc<dyn DeadLetterObserver>) {
93 *self.inner.dead_letter_observer.write() = Some(obs);
94 }
95
96 pub fn set_remote_provider(&self, provider: Arc<dyn RemoteProvider>) {
99 *self.inner.remote_provider.write() = Some(provider);
100 }
101
102 pub fn is_remote_path(&self, path: &ActorPath) -> bool {
104 path.address.has_global_scope() && self.inner.remote_provider.read().is_some()
105 }
106
107 pub fn actor_selection(&self, path_str: &str) -> Option<UntypedActorRef> {
110 let path = parse_actor_path(path_str)?;
111 if path.address.has_local_scope() || path.address == self.inner.address {
112 if path.elements.len() >= 2 && path.elements[0].as_str() == "user" {
114 let name = path.elements[1].as_str();
115 let g = self.inner.user_guardian.lock();
116 return g.get(name).map(|c| c.untyped.clone());
117 }
118 return None;
119 }
120 let provider = self.inner.remote_provider.read().clone()?;
121 let handle = provider.resolve(&path)?;
122 Some(UntypedActorRef::from_remote(handle))
123 }
124
125 pub fn actor_selection_with<M>(
129 &self,
130 path_str: &str,
131 serialize: Arc<dyn Fn(M, Option<ActorPath>) -> super::remote::SerializedMessage + Send + Sync>,
132 ) -> Option<ActorRef<M>>
133 where
134 M: Send + 'static,
135 {
136 let path = parse_actor_path(path_str)?;
137 if path.address.has_local_scope() || path.address == self.inner.address {
138 return None;
139 }
140 let provider = self.inner.remote_provider.read().clone()?;
141 let handle = provider.resolve(&path)?;
142 Some(ActorRef::from_remote(handle, serialize))
143 }
144
145 pub fn actor_of<A: Actor>(
147 &self,
148 props: Props<A>,
149 name: &str,
150 ) -> Result<ActorRef<A::Msg>, ActorSystemError> {
151 let root = ActorPath::root(self.inner.address.clone());
152 let parent = root.child("user");
153 let path = parent.child(name);
154 let mut guardian = self.inner.user_guardian.lock();
155 if guardian.contains_key(name) {
156 return Err(ActorSystemError::NameTaken(name.into()));
157 }
158 let r = spawn_cell::<A>(self.inner.clone(), props, path.clone())
159 .map_err(|e| ActorSystemError::Spawn(e.to_string()))?;
160 if let Some(obs) = self.inner.spawn_observer.read().as_ref() {
161 obs.on_spawn(&path, Some(&parent), std::any::type_name::<A>());
162 }
163 guardian.insert(
164 name.to_string(),
165 ChildEntry { path, untyped: r.as_untyped(), system_tx: r.system_sender() },
166 );
167 Ok(r)
168 }
169
170 pub fn stop(&self, name: &str) {
172 if let Some(c) = self.inner.user_guardian.lock().get(name) {
173 let _ = c.system_tx.send(SystemMsg::Stop);
174 }
175 }
176
177 pub async fn terminate(&self) {
179 {
180 let guardian = self.inner.user_guardian.lock();
181 for (_, c) in guardian.iter() {
182 let _ = c.system_tx.send(SystemMsg::Stop);
183 }
184 }
185 self.inner.terminated_flag.store(true, std::sync::atomic::Ordering::Release);
186 self.inner.terminated.notify_waiters();
187 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
189 }
190
191 pub async fn when_terminated(&self) {
192 if self.inner.terminated_flag.load(std::sync::atomic::Ordering::Acquire) {
193 return;
194 }
195 self.inner.terminated.notified().await;
196 }
197}
198
199#[derive(Debug, Error)]
200pub enum ActorSystemError {
201 #[error("top-level actor name `{0}` already taken")]
202 NameTaken(String),
203 #[error("failed to spawn actor: {0}")]
204 Spawn(String),
205 #[error("system already terminated")]
206 Terminated,
207}
208
209#[allow(dead_code)]
211type _SysChan = mpsc::UnboundedSender<SystemMsg>;
212
213fn parse_actor_path(s: &str) -> Option<ActorPath> {
216 let (addr_part, path_part) = split_addr_path(s)?;
217 let address = Address::parse(addr_part)?;
218 let mut path = ActorPath::root(address);
219 for seg in path_part.split('/').filter(|s| !s.is_empty()) {
220 if let Some((name, uid)) = seg.split_once('#') {
222 let uid_n: u64 = uid.parse().ok()?;
223 path = path.child(name).with_uid(uid_n);
224 } else {
225 path = path.child(seg);
226 }
227 }
228 Some(path)
229}
230
231fn split_addr_path(s: &str) -> Option<(&str, &str)> {
232 let scheme_end = s.find("://")?;
235 let after_scheme = &s[scheme_end + 3..];
236 if let Some(slash) = after_scheme.find('/') {
238 let split = scheme_end + 3 + slash;
239 Some((&s[..split], &s[split..]))
240 } else {
241 Some((s, ""))
242 }
243}