1use std::any::{Any, TypeId};
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12
13use crate::actor::handle::ActorHandle;
14use crate::actor::Actor;
15use crate::error::{SendError, SpawnError};
16use crate::types::{ActorId, StopReason};
17
18type StopFn =
19 dyn Fn(StopReason) -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send>> + Send + Sync;
20
21static SYSTEMS: OnceLock<DashMap<String, Arc<ActorSystem>>> = OnceLock::new();
26
27fn systems() -> &'static DashMap<String, Arc<ActorSystem>> {
28 SYSTEMS.get_or_init(DashMap::new)
29}
30
31#[derive(Debug, Clone)]
37pub struct ShutdownPolicy {
38 pub timeout: Duration,
43 pub per_actor_timeout: Duration,
48}
49
50impl Default for ShutdownPolicy {
51 fn default() -> Self {
52 Self {
53 timeout: Duration::from_secs(30),
54 per_actor_timeout: Duration::from_secs(5),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Default)]
61pub struct SystemConfig {
62 pub shutdown_policy: ShutdownPolicy,
64}
65
66struct AnyActorHandle {
71 type_id: TypeId,
72 handle: Box<dyn Any + Send + Sync>,
73 stopper: Box<StopFn>,
74 #[allow(dead_code)]
75 created_at: Instant,
76}
77
78impl AnyActorHandle {
79 fn new<A: Actor>(handle: &ActorHandle<A>) -> Self {
80 let cloned = handle.clone();
81 let stopper_handle = handle.clone();
82 Self {
83 type_id: TypeId::of::<ActorHandle<A>>(),
84 handle: Box::new(cloned),
85 stopper: Box::new(move |reason| {
86 let h = stopper_handle.clone();
87 Box::pin(async move { h.stop(reason).await })
88 }),
89 created_at: Instant::now(),
90 }
91 }
92
93 fn downcast<A: Actor>(&self) -> Option<ActorHandle<A>> {
94 if self.type_id == TypeId::of::<ActorHandle<A>>() {
95 self.handle.downcast_ref::<ActorHandle<A>>().cloned()
96 } else {
97 None
98 }
99 }
100
101 async fn stop(&self, reason: StopReason) -> Result<(), SendError> {
102 (self.stopper)(reason).await
103 }
104}
105
106pub(crate) struct RegistryGuard {
111 system: Arc<ActorSystem>,
112 id: ActorId,
113 name: Option<String>,
114}
115
116impl RegistryGuard {
117 pub(crate) fn new(system: Arc<ActorSystem>, id: ActorId, name: Option<String>) -> Self {
118 Self { system, id, name }
119 }
120}
121
122impl Drop for RegistryGuard {
123 fn drop(&mut self) {
124 self.system.unregister_by_id(&self.id);
125 if let Some(name) = &self.name {
126 self.system.by_name.remove(name);
127 }
128 }
129}
130
131pub struct ActorSystem {
141 name: String,
142 by_name: DashMap<String, AnyActorHandle>,
143 by_id: DashMap<ActorId, AnyActorHandle>,
144 shutdown_policy: ShutdownPolicy,
145}
146
147impl std::fmt::Debug for ActorSystem {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("ActorSystem")
150 .field("name", &self.name)
151 .finish_non_exhaustive()
152 }
153}
154
155impl ActorSystem {
156 #[allow(clippy::should_implement_trait)]
163 pub fn default() -> Arc<ActorSystem> {
164 let reg = systems();
165 reg.entry("default".to_string())
166 .or_insert_with(|| {
167 Arc::new(ActorSystem {
168 name: "default".to_string(),
169 by_name: DashMap::new(),
170 by_id: DashMap::new(),
171 shutdown_policy: ShutdownPolicy::default(),
172 })
173 })
174 .value()
175 .clone()
176 }
177
178 pub fn create(name: impl Into<String>) -> Result<Arc<ActorSystem>, SpawnError> {
183 let name = name.into();
184 let reg = systems();
185 match reg.entry(name.clone()) {
186 dashmap::mapref::entry::Entry::Occupied(_) => Err(SpawnError::SystemNameTaken(name)),
187 dashmap::mapref::entry::Entry::Vacant(v) => {
188 let system = Arc::new(ActorSystem {
189 name,
190 by_name: DashMap::new(),
191 by_id: DashMap::new(),
192 shutdown_policy: ShutdownPolicy::default(),
193 });
194 v.insert(system.clone());
195 Ok(system)
196 }
197 }
198 }
199
200 pub fn create_with(
205 name: impl Into<String>,
206 config: SystemConfig,
207 ) -> Result<Arc<ActorSystem>, SpawnError> {
208 let name = name.into();
209 let reg = systems();
210 match reg.entry(name.clone()) {
211 dashmap::mapref::entry::Entry::Occupied(_) => Err(SpawnError::SystemNameTaken(name)),
212 dashmap::mapref::entry::Entry::Vacant(v) => {
213 let system = Arc::new(ActorSystem {
214 name,
215 by_name: DashMap::new(),
216 by_id: DashMap::new(),
217 shutdown_policy: config.shutdown_policy,
218 });
219 v.insert(system.clone());
220 Ok(system)
221 }
222 }
223 }
224
225 pub fn get_named(name: &str) -> Option<Arc<ActorSystem>> {
227 systems().get(name).map(|entry| entry.value().clone())
228 }
229
230 pub fn all() -> Vec<String> {
232 systems().iter().map(|e| e.key().clone()).collect()
233 }
234
235 pub fn name(&self) -> &str {
237 &self.name
238 }
239
240 pub fn get<A: Actor>(&self, name: &str) -> Option<ActorHandle<A>> {
248 self.by_name.get(name).and_then(|e| e.downcast::<A>())
249 }
250
251 #[allow(dead_code)]
253 pub(crate) fn get_by_id<A: Actor>(&self, id: &ActorId) -> Option<ActorHandle<A>> {
254 self.by_id.get(id).and_then(|e| e.downcast::<A>())
255 }
256
257 pub async fn stop(&self, name: &str) -> Result<(), crate::error::SendError> {
261 let entry = self
262 .by_name
263 .get(name)
264 .ok_or(crate::error::SendError::Closed)?;
265 entry.stop(StopReason::Graceful).await
266 }
267
268 pub async fn kill(&self, name: &str) -> Result<(), crate::error::SendError> {
272 let entry = self
273 .by_name
274 .get(name)
275 .ok_or(crate::error::SendError::Closed)?;
276 entry.stop(StopReason::Kill).await
277 }
278
279 pub fn registered(&self) -> Vec<String> {
281 self.by_name.iter().map(|e| e.key().clone()).collect()
282 }
283
284 pub(crate) fn register_actor<A: Actor>(
287 &self,
288 id: &ActorId,
289 name: Option<&str>,
290 handle: &ActorHandle<A>,
291 ) -> Result<(), SpawnError> {
292 if let Some(n) = name {
293 let entry = self.by_name.entry(n.to_string());
294 match entry {
295 dashmap::mapref::entry::Entry::Occupied(_) => {
296 return Err(SpawnError::NameTaken {
297 name: n.to_string(),
298 system: self.name.clone(),
299 });
300 }
301 dashmap::mapref::entry::Entry::Vacant(v) => {
302 v.insert(AnyActorHandle::new(handle));
303 }
304 }
305 }
306 self.by_id.insert(id.clone(), AnyActorHandle::new(handle));
307 Ok(())
308 }
309
310 pub(crate) fn unregister_by_id(&self, id: &ActorId) {
311 self.by_id.remove(id);
312 }
313
314 pub async fn shutdown(&self) {
318 self.shutdown_with(self.shutdown_policy.clone()).await;
319 }
320
321 pub async fn shutdown_with(&self, policy: ShutdownPolicy) {
332 let deadline = Instant::now() + policy.timeout;
333
334 self.by_name.clear();
338 let entries: Vec<(ActorId, AnyActorHandle)> = self
339 .by_id
340 .iter()
341 .map(|e| e.key().clone())
342 .collect::<Vec<_>>()
343 .into_iter()
344 .filter_map(|id| self.by_id.remove(&id))
345 .collect();
346
347 for (_id, entry) in &entries {
348 if Instant::now() >= deadline {
349 let _ = entry.stop(StopReason::Kill).await;
350 continue;
351 }
352
353 let result =
354 tokio::time::timeout(policy.per_actor_timeout, entry.stop(StopReason::Graceful))
355 .await;
356
357 if result.is_err() {
358 let _ = entry.stop(StopReason::Kill).await;
359 }
360 }
361 }
362}