Skip to main content

tokio_actors/
system.rs

1//! Actor system providing named actor registration and coordinated shutdown.
2//!
3//! Internal module. Public API exposed since PR-04 (Spawn API Expansion).
4
5use std::any::{Any, TypeId};
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12
13use crate::actor::handle::ActorHandle;
14use crate::actor::Actor;
15use crate::error::{SendError, SpawnError};
16use crate::types::{ActorId, StopReason};
17
18type StopFn =
19    dyn Fn(StopReason) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send>> + Send + Sync;
20
21// ---------------------------------------------------------------------------
22// Global systems registry
23// ---------------------------------------------------------------------------
24
25static SYSTEMS: OnceLock<DashMap<String, Arc<ActorSystem>>> = OnceLock::new();
26
27fn systems() -> &'static DashMap<String, Arc<ActorSystem>> {
28    SYSTEMS.get_or_init(DashMap::new)
29}
30
31// ---------------------------------------------------------------------------
32// ShutdownPolicy
33// ---------------------------------------------------------------------------
34
35/// Policy controlling how [`ActorSystem::shutdown`] behaves.
36#[derive(Debug, Clone)]
37pub struct ShutdownPolicy {
38    /// Maximum wall-clock time for the entire shutdown sequence.
39    /// After this deadline, all remaining actors are force-stopped.
40    ///
41    /// Default: 30 seconds.
42    pub timeout: Duration,
43    /// Maximum time to wait for each individual actor to stop gracefully.
44    /// If an actor doesn't stop within this period, it receives `StopReason::Kill`.
45    ///
46    /// Default: 5 seconds.
47    pub per_actor_timeout: Duration,
48}
49
50impl Default for ShutdownPolicy {
51    fn default() -> Self {
52        Self {
53            timeout: Duration::from_secs(30),
54            per_actor_timeout: Duration::from_secs(5),
55        }
56    }
57}
58
59/// Configuration for creating a new [`ActorSystem`].
60#[derive(Debug, Clone, Default)]
61pub struct SystemConfig {
62    /// Shutdown policy for this system.
63    pub shutdown_policy: ShutdownPolicy,
64}
65
66// ---------------------------------------------------------------------------
67// Type-erased actor handle
68// ---------------------------------------------------------------------------
69
70struct AnyActorHandle {
71    type_id: TypeId,
72    handle: Box<dyn Any + Send + Sync>,
73    stopper: Box<StopFn>,
74    #[allow(dead_code)]
75    created_at: Instant,
76}
77
78impl AnyActorHandle {
79    fn new<A: Actor>(handle: &ActorHandle<A>) -> Self {
80        let cloned = handle.clone();
81        let stopper_handle = handle.clone();
82        Self {
83            type_id: TypeId::of::<ActorHandle<A>>(),
84            handle: Box::new(cloned),
85            stopper: Box::new(move |reason| {
86                let h = stopper_handle.clone();
87                Box::pin(async move { h.stop(reason).await })
88            }),
89            created_at: Instant::now(),
90        }
91    }
92
93    fn downcast<A: Actor>(&self) -> Option<ActorHandle<A>> {
94        if self.type_id == TypeId::of::<ActorHandle<A>>() {
95            self.handle.downcast_ref::<ActorHandle<A>>().cloned()
96        } else {
97            None
98        }
99    }
100
101    async fn stop(&self, reason: StopReason) -> Result<(), SendError> {
102        (self.stopper)(reason).await
103    }
104}
105
106// ---------------------------------------------------------------------------
107// RegistryGuard - drop-based auto-unregister
108// ---------------------------------------------------------------------------
109
110pub(crate) struct RegistryGuard {
111    system: Arc<ActorSystem>,
112    id: ActorId,
113    name: Option<String>,
114}
115
116impl RegistryGuard {
117    pub(crate) fn new(system: Arc<ActorSystem>, id: ActorId, name: Option<String>) -> Self {
118        Self { system, id, name }
119    }
120}
121
122impl Drop for RegistryGuard {
123    fn drop(&mut self) {
124        self.system.unregister_by_id(&self.id);
125        if let Some(name) = &self.name {
126            self.system.by_name.remove(name);
127        }
128    }
129}
130
131// ---------------------------------------------------------------------------
132// ActorSystem
133// ---------------------------------------------------------------------------
134
135/// A named actor registry with coordinated shutdown.
136///
137/// `ActorSystem` is a phone book, not a runtime. It does not create or own a
138/// Tokio runtime. Actors spawn on whatever runtime is current via
139/// `Handle::try_current()`.
140pub struct ActorSystem {
141    name: String,
142    by_name: DashMap<String, AnyActorHandle>,
143    by_id: DashMap<ActorId, AnyActorHandle>,
144    shutdown_policy: ShutdownPolicy,
145}
146
147impl std::fmt::Debug for ActorSystem {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("ActorSystem")
150            .field("name", &self.name)
151            .finish_non_exhaustive()
152    }
153}
154
155impl ActorSystem {
156    // -- Construction -------------------------------------------------------
157
158    /// Returns the default system (named `"default"`), creating it lazily.
159    ///
160    /// This is intentionally not `std::default::Default` because it returns
161    /// `Arc<ActorSystem>` (shared ownership), not an owned value.
162    #[allow(clippy::should_implement_trait)]
163    pub fn default() -> Arc<ActorSystem> {
164        let reg = systems();
165        reg.entry("default".to_string())
166            .or_insert_with(|| {
167                Arc::new(ActorSystem {
168                    name: "default".to_string(),
169                    by_name: DashMap::new(),
170                    by_id: DashMap::new(),
171                    shutdown_policy: ShutdownPolicy::default(),
172                })
173            })
174            .value()
175            .clone()
176    }
177
178    /// Creates a new named system and registers it in the global systems map.
179    ///
180    /// Returns [`SpawnError::SystemNameTaken`] if a system with this name
181    /// already exists.
182    pub fn create(name: impl Into<String>) -> Result<Arc<ActorSystem>, SpawnError> {
183        let name = name.into();
184        let reg = systems();
185        match reg.entry(name.clone()) {
186            dashmap::mapref::entry::Entry::Occupied(_) => Err(SpawnError::SystemNameTaken(name)),
187            dashmap::mapref::entry::Entry::Vacant(v) => {
188                let system = Arc::new(ActorSystem {
189                    name,
190                    by_name: DashMap::new(),
191                    by_id: DashMap::new(),
192                    shutdown_policy: ShutdownPolicy::default(),
193                });
194                v.insert(system.clone());
195                Ok(system)
196            }
197        }
198    }
199
200    /// Creates a new named system with custom configuration.
201    ///
202    /// Returns [`SpawnError::SystemNameTaken`] if a system with this name
203    /// already exists.
204    pub fn create_with(
205        name: impl Into<String>,
206        config: SystemConfig,
207    ) -> Result<Arc<ActorSystem>, SpawnError> {
208        let name = name.into();
209        let reg = systems();
210        match reg.entry(name.clone()) {
211            dashmap::mapref::entry::Entry::Occupied(_) => Err(SpawnError::SystemNameTaken(name)),
212            dashmap::mapref::entry::Entry::Vacant(v) => {
213                let system = Arc::new(ActorSystem {
214                    name,
215                    by_name: DashMap::new(),
216                    by_id: DashMap::new(),
217                    shutdown_policy: config.shutdown_policy,
218                });
219                v.insert(system.clone());
220                Ok(system)
221            }
222        }
223    }
224
225    /// Looks up a named system. Returns `None` if no system with this name exists.
226    pub fn get_named(name: &str) -> Option<Arc<ActorSystem>> {
227        systems().get(name).map(|entry| entry.value().clone())
228    }
229
230    /// Lists the names of all registered systems.
231    pub fn all() -> Vec<String> {
232        systems().iter().map(|e| e.key().clone()).collect()
233    }
234
235    /// Returns this system's name.
236    pub fn name(&self) -> &str {
237        &self.name
238    }
239
240    // -- Actor lookup -------------------------------------------------------
241
242    /// Looks up a named actor, returning a typed handle.
243    ///
244    /// Returns `None` if the name is not registered or if the registered actor
245    /// is a different type (type mismatch is silent, matching OTP's
246    /// `whereis/1 -> undefined` semantics).
247    pub fn get<A: Actor>(&self, name: &str) -> Option<ActorHandle<A>> {
248        self.by_name.get(name).and_then(|e| e.downcast::<A>())
249    }
250
251    /// Looks up an actor by ID.
252    #[allow(dead_code)]
253    pub(crate) fn get_by_id<A: Actor>(&self, id: &ActorId) -> Option<ActorHandle<A>> {
254        self.by_id.get(id).and_then(|e| e.downcast::<A>())
255    }
256
257    /// Stops a named actor gracefully.
258    ///
259    /// Returns [`SendError::Closed`] if no actor with the given name is registered.
260    pub async fn stop(&self, name: &str) -> Result<(), crate::error::SendError> {
261        let entry = self
262            .by_name
263            .get(name)
264            .ok_or(crate::error::SendError::Closed)?;
265        entry.stop(StopReason::Graceful).await
266    }
267
268    /// Force-kills a named actor, bypassing all lifecycle callbacks.
269    ///
270    /// Returns [`SendError::Closed`] if no actor with the given name is registered.
271    pub async fn kill(&self, name: &str) -> Result<(), crate::error::SendError> {
272        let entry = self
273            .by_name
274            .get(name)
275            .ok_or(crate::error::SendError::Closed)?;
276        entry.stop(StopReason::Kill).await
277    }
278
279    /// Lists all registered actor names in this system.
280    pub fn registered(&self) -> Vec<String> {
281        self.by_name.iter().map(|e| e.key().clone()).collect()
282    }
283
284    // -- Internal registration (used by spawn path) -------------------------
285
286    pub(crate) fn register_actor<A: Actor>(
287        &self,
288        id: &ActorId,
289        name: Option<&str>,
290        handle: &ActorHandle<A>,
291    ) -> Result<(), SpawnError> {
292        if let Some(n) = name {
293            let entry = self.by_name.entry(n.to_string());
294            match entry {
295                dashmap::mapref::entry::Entry::Occupied(_) => {
296                    return Err(SpawnError::NameTaken {
297                        name: n.to_string(),
298                        system: self.name.clone(),
299                    });
300                }
301                dashmap::mapref::entry::Entry::Vacant(v) => {
302                    v.insert(AnyActorHandle::new(handle));
303                }
304            }
305        }
306        self.by_id.insert(id.clone(), AnyActorHandle::new(handle));
307        Ok(())
308    }
309
310    pub(crate) fn unregister_by_id(&self, id: &ActorId) {
311        self.by_id.remove(id);
312    }
313
314    // -- Shutdown -----------------------------------------------------------
315
316    /// Shuts down all registered actors using the system's stored policy.
317    pub async fn shutdown(&self) {
318        self.shutdown_with(self.shutdown_policy.clone()).await;
319    }
320
321    /// Shuts down all registered actors with a custom policy.
322    ///
323    /// All entries are drained from the registry first (releasing locks),
324    /// then stop signals are sent. This avoids deadlock with `RegistryGuard`
325    /// drop handlers that also mutate the registry.
326    ///
327    /// Each actor is given up to `per_actor_timeout` to stop gracefully.
328    /// If it does not stop in time, it receives `StopReason::Kill`.
329    /// If the global `timeout` deadline is exceeded, all remaining actors
330    /// are immediately killed.
331    pub async fn shutdown_with(&self, policy: ShutdownPolicy) {
332        let deadline = Instant::now() + policy.timeout;
333
334        // Drain both maps, takes ownership and releases all DashMap locks
335        // before we await the stop futures. This prevents deadlock with
336        // RegistryGuard::drop which also mutates these maps.
337        self.by_name.clear();
338        let entries: Vec<(ActorId, AnyActorHandle)> = self
339            .by_id
340            .iter()
341            .map(|e| e.key().clone())
342            .collect::<Vec<_>>()
343            .into_iter()
344            .filter_map(|id| self.by_id.remove(&id))
345            .collect();
346
347        for (_id, entry) in &entries {
348            if Instant::now() >= deadline {
349                let _ = entry.stop(StopReason::Kill).await;
350                continue;
351            }
352
353            let result =
354                tokio::time::timeout(policy.per_actor_timeout, entry.stop(StopReason::Graceful))
355                    .await;
356
357            if result.is_err() {
358                let _ = entry.stop(StopReason::Kill).await;
359            }
360        }
361    }
362}