Skip to main content

atomr_core/actor/
actor_system.rs

1//! `ActorSystem` — root of the actor hierarchy.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use atomr_config::Config;
7use parking_lot::Mutex;
8use thiserror::Error;
9use tokio::sync::{mpsc, Notify};
10
11use super::actor_cell::{spawn_cell, ChildEntry, SystemMsg};
12use super::actor_ref::{ActorRef, UntypedActorRef};
13use super::address::Address;
14use super::extensions::Extensions;
15use super::observer::{DeadLetterObserver, SpawnObserver};
16use super::path::ActorPath;
17use super::props::Props;
18use super::remote::RemoteProvider;
19use super::scheduler::{Scheduler, TokioScheduler};
20use super::traits::Actor;
21
22pub(crate) struct ActorSystemInner {
23    pub name: String,
24    pub config: Config,
25    pub address: Address,
26    pub scheduler: Arc<dyn Scheduler>,
27    pub extensions: Extensions,
28    pub user_guardian: Mutex<HashMap<String, ChildEntry>>,
29    pub(crate) spawn_observer: parking_lot::RwLock<Option<Arc<dyn SpawnObserver>>>,
30    pub(crate) dead_letter_observer: parking_lot::RwLock<Option<Arc<dyn DeadLetterObserver>>>,
31    pub(crate) remote_provider: parking_lot::RwLock<Option<Arc<dyn RemoteProvider>>>,
32    terminated: Notify,
33    terminated_flag: std::sync::atomic::AtomicBool,
34}
35
36/// Public handle to the actor system.
37#[derive(Clone)]
38pub struct ActorSystem {
39    pub(crate) inner: Arc<ActorSystemInner>,
40}
41
42impl ActorSystem {
43    /// Create an actor system with the given name and configuration.
44    pub async fn create(name: impl Into<String>, config: Config) -> Result<Self, ActorSystemError> {
45        let name = name.into();
46        let address = Address::local(&name);
47        let inner = Arc::new(ActorSystemInner {
48            name,
49            config,
50            address,
51            scheduler: Arc::new(TokioScheduler::new()),
52            extensions: Extensions::default(),
53            user_guardian: Mutex::new(HashMap::new()),
54            spawn_observer: parking_lot::RwLock::new(None),
55            dead_letter_observer: parking_lot::RwLock::new(None),
56            remote_provider: parking_lot::RwLock::new(None),
57            terminated: Notify::new(),
58            terminated_flag: std::sync::atomic::AtomicBool::new(false),
59        });
60        Ok(Self { inner })
61    }
62
63    pub fn name(&self) -> &str {
64        &self.inner.name
65    }
66
67    pub fn address(&self) -> &Address {
68        &self.inner.address
69    }
70
71    pub fn config(&self) -> &Config {
72        &self.inner.config
73    }
74
75    pub fn scheduler(&self) -> Arc<dyn Scheduler> {
76        self.inner.scheduler.clone()
77    }
78
79    pub fn extensions(&self) -> &Extensions {
80        &self.inner.extensions
81    }
82
83    /// Install a [`SpawnObserver`]. Only one observer may be installed;
84    /// subsequent calls replace the previous one. This is the hook used by
85    /// `atomr-telemetry` to populate its actor registry.
86    pub fn set_spawn_observer(&self, obs: Arc<dyn SpawnObserver>) {
87        *self.inner.spawn_observer.write() = Some(obs);
88    }
89
90    /// Install a [`DeadLetterObserver`] that is notified when a `tell`
91    /// fails because the target has stopped.
92    pub fn set_dead_letter_observer(&self, obs: Arc<dyn DeadLetterObserver>) {
93        *self.inner.dead_letter_observer.write() = Some(obs);
94    }
95
96    /// Install the remote provider. Done by `atomr-remote::RemoteSystemExt::enable_remote`.
97    /// Replaces any previous provider.
98    pub fn set_remote_provider(&self, provider: Arc<dyn RemoteProvider>) {
99        *self.inner.remote_provider.write() = Some(provider);
100    }
101
102    /// True if a remote provider is installed and the address has global scope.
103    pub fn is_remote_path(&self, path: &ActorPath) -> bool {
104        path.address.has_global_scope() && self.inner.remote_provider.read().is_some()
105    }
106
107    /// Look up an actor by full path string. Local paths return the local
108    /// child if it exists; remote paths consult the installed remote provider.
109    pub fn actor_selection(&self, path_str: &str) -> Option<UntypedActorRef> {
110        let path = parse_actor_path(path_str)?;
111        if path.address.has_local_scope() || path.address == self.inner.address {
112            // Local: best-effort look-up among top-level user actors.
113            if path.elements.len() >= 2 && path.elements[0].as_str() == "user" {
114                let name = path.elements[1].as_str();
115                let g = self.inner.user_guardian.lock();
116                return g.get(name).map(|c| c.untyped.clone());
117            }
118            return None;
119        }
120        let provider = self.inner.remote_provider.read().clone()?;
121        let handle = provider.resolve(&path)?;
122        Some(UntypedActorRef::from_remote(handle))
123    }
124
125    /// Resolve a remote path and produce a *typed* `ActorRef<M>`. The caller
126    /// supplies a serializer closure for `M`; `atomr-remote::RemoteSystem`
127    /// provides a default that uses bincode + `type_name::<M>()`.
128    pub fn actor_selection_with<M>(
129        &self,
130        path_str: &str,
131        serialize: Arc<dyn Fn(M, Option<ActorPath>) -> super::remote::SerializedMessage + Send + Sync>,
132    ) -> Option<ActorRef<M>>
133    where
134        M: Send + 'static,
135    {
136        let path = parse_actor_path(path_str)?;
137        if path.address.has_local_scope() || path.address == self.inner.address {
138            return None;
139        }
140        let provider = self.inner.remote_provider.read().clone()?;
141        let handle = provider.resolve(&path)?;
142        Some(ActorRef::from_remote(handle, serialize))
143    }
144
145    /// Spawn a top-level actor under `/user`.
146    ///
147    /// **Naming guarantee.** Child names are unique among
148    /// *currently-alive* children of the user guardian — not globally
149    /// unique forever. Once a top-level actor has stopped (its
150    /// `post_stop` has run and death-watch has fired), its name slot is
151    /// freed and a subsequent `actor_of(.., name)` call with the same
152    /// `name` will succeed. To avoid races, callers that re-spawn a
153    /// just-stopped actor should await the previous instance's
154    /// termination (e.g. via a death-watch) before calling `actor_of`
155    /// again, rather than relying on a sleep.
156    pub fn actor_of<A: Actor>(
157        &self,
158        props: Props<A>,
159        name: &str,
160    ) -> Result<ActorRef<A::Msg>, ActorSystemError> {
161        let root = ActorPath::root(self.inner.address.clone());
162        let parent = root.child("user");
163        let path = parent.child(name);
164        let mut guardian = self.inner.user_guardian.lock();
165        if guardian.contains_key(name) {
166            return Err(ActorSystemError::NameTaken(name.into()));
167        }
168        let r = spawn_cell::<A>(self.inner.clone(), props, path.clone())
169            .map_err(|e| ActorSystemError::Spawn(e.to_string()))?;
170        if let Some(obs) = self.inner.spawn_observer.read().as_ref() {
171            obs.on_spawn(&path, Some(&parent), std::any::type_name::<A>());
172        }
173        guardian.insert(
174            name.to_string(),
175            ChildEntry { path, untyped: r.as_untyped(), system_tx: r.system_sender() },
176        );
177        Ok(r)
178    }
179
180    /// Stop a top-level actor by name.
181    pub fn stop(&self, name: &str) {
182        if let Some(c) = self.inner.user_guardian.lock().get(name) {
183            let _ = c.system_tx.send(SystemMsg::Stop);
184        }
185    }
186
187    /// Initiate orderly shutdown. Awaits actor termination best-effort.
188    pub async fn terminate(&self) {
189        {
190            let guardian = self.inner.user_guardian.lock();
191            for (_, c) in guardian.iter() {
192                let _ = c.system_tx.send(SystemMsg::Stop);
193            }
194        }
195        self.inner.terminated_flag.store(true, std::sync::atomic::Ordering::Release);
196        self.inner.terminated.notify_waiters();
197        // Give in-flight tasks a moment to unwind.
198        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
199    }
200
201    pub async fn when_terminated(&self) {
202        if self.inner.terminated_flag.load(std::sync::atomic::Ordering::Acquire) {
203            return;
204        }
205        self.inner.terminated.notified().await;
206    }
207}
208
209#[derive(Debug, Error)]
210pub enum ActorSystemError {
211    #[error("top-level actor name `{0}` already taken")]
212    NameTaken(String),
213    #[error("failed to spawn actor: {0}")]
214    Spawn(String),
215    #[error("system already terminated")]
216    Terminated,
217}
218
219// Keep channel import referenced to avoid unused imports in stub paths.
220#[allow(dead_code)]
221type _SysChan = mpsc::UnboundedSender<SystemMsg>;
222
223/// Parse a string like `akka.tcp://Sys@host:port/user/foo/bar` into an
224/// `ActorPath`. Returns `None` on malformed input.
225fn parse_actor_path(s: &str) -> Option<ActorPath> {
226    let (addr_part, path_part) = split_addr_path(s)?;
227    let address = Address::parse(addr_part)?;
228    let mut path = ActorPath::root(address);
229    for seg in path_part.split('/').filter(|s| !s.is_empty()) {
230        // Strip optional `#uid` suffix on the leaf segment.
231        if let Some((name, uid)) = seg.split_once('#') {
232            let uid_n: u64 = uid.parse().ok()?;
233            path = path.child(name).with_uid(uid_n);
234        } else {
235            path = path.child(seg);
236        }
237    }
238    Some(path)
239}
240
241fn split_addr_path(s: &str) -> Option<(&str, &str)> {
242    // Address always contains `://`. The path starts at the next `/` *after*
243    // the host:port section.
244    let scheme_end = s.find("://")?;
245    let after_scheme = &s[scheme_end + 3..];
246    // The address ends at the first `/` in the after-scheme section.
247    if let Some(slash) = after_scheme.find('/') {
248        let split = scheme_end + 3 + slash;
249        Some((&s[..split], &s[split..]))
250    } else {
251        Some((s, ""))
252    }
253}