acton_reactive/common/actor_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tracing::{error, trace};
24
25use crate::actor::{ActorConfig, Idle, ManagedActor};
26use crate::common::acton_inner::ActonInner;
27use crate::common::{ActorHandle, BrokerRef};
28use crate::traits::ActorHandleInterface;
29
30/// Represents the initialized and active Acton actor system runtime.
31///
32/// This struct is obtained after successfully launching the system via [`ActonApp::launch_async().await`].
33/// It holds the internal state of the running system, including a reference to the
34/// central message broker and a registry of top-level actors.
35///
36/// `ActorRuntime` provides the primary methods for interacting with the system as a whole,
37/// such as creating new top-level actors (`new_actor`, `spawn_actor`, etc.) and initiating
38/// a graceful shutdown of all actors (`shutdown_all`).
39///
40/// It is cloneable, allowing different parts of an application to hold references
41/// to the runtime environment.
42#[derive(Debug, Clone, Default)]
43pub struct ActorRuntime(pub(crate) ActonInner); // Keep inner field crate-public
44
45impl ActorRuntime {
46 /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) with a specified root name.
47 ///
48 /// This method initializes a [`ManagedActor`] in the [`Idle`] state, configured with a
49 /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
50 /// The actor is registered as a top-level actor within the runtime.
51 ///
52 /// The returned actor is ready for further configuration (e.g., adding message handlers
53 /// via `act_on`) before being started by calling `.start()` on it.
54 ///
55 /// # Type Parameters
56 ///
57 /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
58 ///
59 /// # Arguments
60 ///
61 /// * `name`: A string that will form the root name of the actor's [`Ern`].
62 ///
63 /// # Returns
64 ///
65 /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
66 ///
67 /// # Panics
68 ///
69 /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `ActorConfig` fails.
70 pub fn new_actor_with_name<State>(&mut self, name: String) -> ManagedActor<Idle, State>
71 where
72 State: Default + Send + Debug + 'static,
73 {
74 let actor_config = ActorConfig::new(
75 Ern::with_root(name).expect("Failed to create root Ern for new actor"), // Use expect for clarity
76 None, // No parent for top-level actor
77 Some(self.0.broker.clone()), // Use system broker
78 )
79 .expect("Failed to create actor config");
80
81 let runtime = self.clone();
82 let new_actor = ManagedActor::new(Some(&runtime), Some(&actor_config));
83 trace!("Registering new top-level actor: {}", new_actor.id());
84 self.0
85 .roots
86 .insert(new_actor.id.clone(), new_actor.handle.clone());
87 new_actor
88 }
89
90 /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) with a default name ("actor").
91 ///
92 /// Similar to [`ActorRuntime::new_actor_with_name`], but uses a default root name "actor"
93 /// for the actor's [`Ern`]. The actor is registered as a top-level actor within the runtime.
94 ///
95 /// The returned actor is ready for further configuration before being started via `.start()`.
96 ///
97 /// # Type Parameters
98 ///
99 /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
100 ///
101 /// # Returns
102 ///
103 /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
104 ///
105 /// # Panics
106 ///
107 /// Panics if creating the internal `ActorConfig` fails.
108 pub fn new_actor<State>(&mut self) -> ManagedActor<Idle, State>
109 where
110 State: Default + Send + Debug + 'static,
111 {
112 // Use a default name if none is provided.
113 self.new_actor_with_name("actor".to_string()) // Reuse the named version
114 }
115
116 /// Returns the number of top-level actors currently registered in the runtime.
117 ///
118 /// This count only includes actors directly created via the `ActorRuntime` and
119 /// does not include child actors supervised by other actors.
120 #[inline]
121 #[must_use]
122 pub fn actor_count(&self) -> usize {
123 self.0.roots.len()
124 }
125
126 /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) using a provided configuration.
127 ///
128 /// This method initializes a [`ManagedActor`] in the [`Idle`] state using the specified
129 /// [`ActorConfig`]. It ensures the actor is configured with the system's broker if not
130 /// already set in the config. The actor is registered as a top-level actor within the runtime.
131 ///
132 /// The returned actor is ready for further configuration before being started via `.start()`.
133 ///
134 /// # Type Parameters
135 ///
136 /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
137 ///
138 /// # Arguments
139 ///
140 /// * `config`: The [`ActorConfig`] to use for the new actor. The broker field will be
141 /// overridden with the system broker if it's `None`.
142 ///
143 /// # Returns
144 ///
145 /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
146 pub fn new_actor_with_config<State>(
147 &mut self,
148 mut config: ActorConfig,
149 ) -> ManagedActor<Idle, State>
150 where
151 State: Default + Send + Debug + 'static,
152 {
153 let acton_ready = self.clone();
154 // Ensure the actor uses the system broker if none is specified.
155 if config.broker.is_none() {
156 config.broker = Some(self.0.broker.clone());
157 }
158 let new_actor = ManagedActor::new(Some(&acton_ready), Some(&config));
159 trace!(
160 "Created new actor builder with config, id: {}",
161 new_actor.id()
162 );
163 self.0
164 .roots
165 .insert(new_actor.id.clone(), new_actor.handle.clone());
166 new_actor
167 }
168
169 /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
170 #[inline]
171 #[must_use]
172 pub fn broker(&self) -> BrokerRef {
173 self.0.broker.clone()
174 }
175
176 /// Returns a clone of the Arc-wrapped IPC type registry.
177 ///
178 /// The registry is used to register message types for cross-process
179 /// serialization and deserialization. Message types must be registered
180 /// before they can be received via IPC.
181 ///
182 /// Only available when the `ipc` feature is enabled.
183 ///
184 /// # Example
185 ///
186 /// ```rust,ignore
187 /// use acton_reactive::prelude::*;
188 /// use serde::{Serialize, Deserialize};
189 ///
190 /// #[derive(Clone, Debug, Serialize, Deserialize)]
191 /// struct PriceUpdate {
192 /// symbol: String,
193 /// price: f64,
194 /// }
195 ///
196 /// let mut runtime = ActonApp::launch_async().await;
197 ///
198 /// // Register the message type with a stable name
199 /// runtime.ipc_registry().register::<PriceUpdate>("PriceUpdate");
200 /// ```
201 #[cfg(feature = "ipc")]
202 #[inline]
203 #[must_use]
204 pub fn ipc_registry(&self) -> std::sync::Arc<crate::common::ipc::IpcTypeRegistry> {
205 self.0.ipc_type_registry.clone()
206 }
207
208 /// Exposes an actor for IPC access with a logical name.
209 ///
210 /// External processes reference actors by logical names (e.g., `price_service`)
211 /// rather than full ERNs. This method registers the mapping between a
212 /// human-readable name and the actor's handle.
213 ///
214 /// Only available when the `ipc` feature is enabled.
215 ///
216 /// # Arguments
217 ///
218 /// * `name`: The logical name to expose the actor as. External IPC clients
219 /// will use this name to target the actor.
220 /// * `handle`: The [`ActorHandle`] of the actor to expose.
221 ///
222 /// # Example
223 ///
224 /// ```rust,ignore
225 /// let mut runtime = ActonApp::launch_async().await;
226 /// let actor = runtime.new_actor_with_name::<PriceServiceState>("price_service".to_string());
227 /// let handle = actor.start().await;
228 ///
229 /// // Expose the actor for IPC access
230 /// runtime.ipc_expose("price_service", handle.clone());
231 /// ```
232 #[cfg(feature = "ipc")]
233 pub fn ipc_expose(&self, name: &str, handle: ActorHandle) {
234 trace!("Exposing actor {} for IPC as '{}'", handle.id(), name);
235 self.0.ipc_actor_registry.insert(name.to_string(), handle);
236 }
237
238 /// Removes an actor from IPC exposure.
239 ///
240 /// After calling this method, external processes will no longer be able
241 /// to send messages to the actor using the specified name.
242 ///
243 /// Only available when the `ipc` feature is enabled.
244 ///
245 /// # Arguments
246 ///
247 /// * `name`: The logical name to remove from IPC exposure.
248 ///
249 /// # Returns
250 ///
251 /// The removed [`ActorHandle`] if the name was registered, or `None` if
252 /// no actor was registered with that name.
253 #[cfg(feature = "ipc")]
254 pub fn ipc_hide(&self, name: &str) -> Option<ActorHandle> {
255 trace!("Hiding actor '{}' from IPC", name);
256 self.0.ipc_actor_registry.remove(name).map(|(_, h)| h)
257 }
258
259 /// Looks up an actor handle by its IPC logical name.
260 ///
261 /// This is used internally by the IPC listener to route messages to
262 /// the correct actor.
263 ///
264 /// Only available when the `ipc` feature is enabled.
265 ///
266 /// # Arguments
267 ///
268 /// * `name`: The logical name to look up.
269 ///
270 /// # Returns
271 ///
272 /// A clone of the [`ActorHandle`] if found, or `None` if no actor
273 /// is registered with that name.
274 #[cfg(feature = "ipc")]
275 #[must_use]
276 pub fn ipc_lookup(&self, name: &str) -> Option<ActorHandle> {
277 self.0.ipc_actor_registry.get(name).map(|r| r.clone())
278 }
279
280 /// Returns the number of actors currently exposed for IPC.
281 ///
282 /// Only available when the `ipc` feature is enabled.
283 #[cfg(feature = "ipc")]
284 #[inline]
285 #[must_use]
286 pub fn ipc_actor_count(&self) -> usize {
287 self.0.ipc_actor_registry.len()
288 }
289
290 /// Starts the IPC listener with the default configuration.
291 ///
292 /// This method loads IPC configuration from XDG-compliant locations and
293 /// starts a Unix Domain Socket listener that accepts connections from
294 /// external processes and routes messages to registered actors.
295 ///
296 /// The listener runs in a background task and will be automatically stopped
297 /// when the runtime's cancellation token is triggered (e.g., during shutdown).
298 ///
299 /// Only available when the `ipc` feature is enabled.
300 ///
301 /// # Returns
302 ///
303 /// An [`IpcListenerHandle`](crate::common::ipc::IpcListenerHandle) for
304 /// managing the listener lifecycle and accessing statistics.
305 ///
306 /// # Errors
307 ///
308 /// Returns an error if:
309 /// - The socket directory cannot be created
310 /// - Another listener is already running at the socket path
311 /// - The socket cannot be bound
312 ///
313 /// # Example
314 ///
315 /// ```rust,ignore
316 /// let mut runtime = ActonApp::launch_async().await;
317 ///
318 /// // Register message types and expose actors first
319 /// runtime.ipc_registry().register::<MyMessage>("MyMessage");
320 /// runtime.ipc_expose("my_actor", actor_handle);
321 ///
322 /// // Start the IPC listener
323 /// let listener = runtime.start_ipc_listener().await?;
324 ///
325 /// // Check listener statistics
326 /// println!("Active connections: {}", listener.stats.connections_active());
327 /// ```
328 #[cfg(feature = "ipc")]
329 pub async fn start_ipc_listener(
330 &self,
331 ) -> Result<crate::common::ipc::IpcListenerHandle, crate::common::ipc::IpcError> {
332 let config = crate::common::ipc::IpcConfig::load();
333 self.start_ipc_listener_with_config(config).await
334 }
335
336 /// Starts the IPC listener with a custom configuration.
337 ///
338 /// This method allows you to provide a custom IPC configuration instead
339 /// of loading from the default XDG locations.
340 ///
341 /// Only available when the `ipc` feature is enabled.
342 ///
343 /// # Arguments
344 ///
345 /// * `config` - Custom IPC configuration.
346 ///
347 /// # Returns
348 ///
349 /// An [`IpcListenerHandle`](crate::common::ipc::IpcListenerHandle) for
350 /// managing the listener lifecycle.
351 ///
352 /// # Errors
353 ///
354 /// Same as [`start_ipc_listener`](Self::start_ipc_listener).
355 #[cfg(feature = "ipc")]
356 pub async fn start_ipc_listener_with_config(
357 &self,
358 config: crate::common::ipc::IpcConfig,
359 ) -> Result<crate::common::ipc::IpcListenerHandle, crate::common::ipc::IpcError> {
360 trace!("Starting IPC listener with config: {:?}", config);
361 let handle = crate::common::ipc::start_listener(
362 config,
363 self.0.ipc_type_registry.clone(),
364 self.0.ipc_actor_registry.clone(),
365 self.0.cancellation_token.clone(),
366 )
367 .await?;
368
369 // Store the subscription manager reference so the broker can forward broadcasts to IPC clients
370 {
371 let mut guard = self.0.ipc_subscription_manager.write();
372 *guard = Some(handle.subscription_manager().clone());
373 }
374
375 Ok(handle)
376 }
377
378 /// Creates, configures, and starts a top-level actor using a provided configuration and setup function.
379 ///
380 /// This method combines actor creation (using `config`), custom asynchronous setup (`setup_fn`),
381 /// and starting the actor. The `setup_fn` receives the actor in the `Idle` state, performs
382 /// necessary configurations (like adding message handlers), and must call `.start()` to
383 /// transition the actor to the `Started` state, returning its `ActorHandle`.
384 ///
385 /// The actor is registered as a top-level actor within the runtime.
386 ///
387 /// # Type Parameters
388 ///
389 /// * `State`: The state type of the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
390 ///
391 /// # Arguments
392 ///
393 /// * `config`: The [`ActorConfig`] to use for creating the actor. The broker field will be
394 /// overridden with the system broker if it's `None`.
395 /// * `setup_fn`: An asynchronous closure that takes the `ManagedActor<Idle, State>`, configures it,
396 /// calls `.start()`, and returns the resulting `ActorHandle`. The closure must be `Send + 'static`.
397 ///
398 /// # Returns
399 ///
400 /// A `Result` containing the `ActorHandle` of the successfully spawned actor, or an error if
401 /// actor creation or the `setup_fn` fails.
402 pub async fn spawn_actor_with_setup_fn<State>(
403 &mut self,
404 mut config: ActorConfig,
405 setup_fn: impl FnOnce(
406 ManagedActor<Idle, State>,
407 ) -> Pin<Box<dyn Future<Output = ActorHandle> + Send + 'static>>,
408 ) -> anyhow::Result<ActorHandle>
409 where
410 State: Default + Send + Debug + 'static,
411 {
412 let acton_ready = self.clone();
413 if config.broker.is_none() {
414 config.broker = Some(self.0.broker.clone());
415 }
416
417 let new_actor = ManagedActor::new(Some(&acton_ready), Some(&config));
418 let actor_id = new_actor.id().clone(); // Get ID before moving
419 trace!("Running setup function for actor: {}", actor_id);
420 let handle = setup_fn(new_actor).await; // Setup function consumes the actor and returns handle
421 trace!("Actor {} setup complete, registering handle.", actor_id);
422 self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
423 Ok(handle)
424 }
425
426 /// Initiates a graceful shutdown of the entire Acton system.
427 ///
428 /// This method attempts to stop all registered top-level actors (and consequently their
429 /// descendant children through the `stop` propagation mechanism) by sending them a
430 /// [`SystemSignal::Terminate`]. It waits for all top-level actor tasks to complete.
431 /// Finally, it stops the central message broker actor.
432 ///
433 /// # Returns
434 ///
435 /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
436 /// Errors during the stopping of individual actors or the broker will be propagated.
437 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
438 use std::time::Duration;
439 use tokio::time::timeout as tokio_timeout;
440
441 // Phase 1: Concurrently signal all root actors to terminate gracefully.
442 trace!("Sending Terminate signal to all root actors.");
443 let stop_futures: Vec<_> = self
444 .0
445 .roots
446 .iter()
447 .map(|item| {
448 let handle = item.value().clone();
449 async move {
450 if let Err(e) = handle.stop().await {
451 error!("Error stopping actor {}: {:?}", handle.id(), e);
452 }
453 }
454 })
455 .collect();
456
457 let timeout_ms: u64 = self
458 .0
459 .config
460 .system_shutdown_timeout()
461 .as_millis()
462 .try_into()
463 .unwrap_or(u64::MAX);
464
465 trace!("Waiting for all actors to finish gracefully...");
466 if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
467 .await
468 .is_err()
469 {
470 error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
471 self.0.cancellation_token.cancel(); // Forceful cancellation
472 } else {
473 trace!("All actors completed gracefully.");
474 }
475
476 trace!("Stopping the system broker...");
477 // Stop the broker actor, using same system shutdown timeout.
478 if let Ok(res) =
479 tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await
480 {
481 res?;
482 } else {
483 error!(
484 "Timeout waiting for broker to shut down after {} ms",
485 timeout_ms
486 );
487 return Err(anyhow::anyhow!(
488 "Timeout while waiting for system broker to shut down after {timeout_ms} ms"
489 ));
490 }
491 trace!("System shutdown complete.");
492 Ok(())
493 }
494
495 /// Creates, configures, and starts a top-level actor using a default configuration and a setup function.
496 ///
497 /// This is a convenience method similar to [`ActorRuntime::spawn_actor_with_setup_fn`], but it
498 /// automatically creates a default `ActorConfig` (with a default name and the system broker).
499 /// The provided `setup_fn` configures and starts the actor.
500 ///
501 /// The actor is registered as a top-level actor within the runtime.
502 ///
503 /// # Type Parameters
504 ///
505 /// * `State`: The state type of the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
506 ///
507 /// # Arguments
508 ///
509 /// * `setup_fn`: An asynchronous closure that takes the `ManagedActor<Idle, State>`, configures it,
510 /// calls `.start()`, and returns the resulting `ActorHandle`. The closure must be `Send + 'static`.
511 ///
512 /// # Returns
513 ///
514 /// A `Result` containing the `ActorHandle` of the successfully spawned actor, or an error if
515 /// actor creation or the `setup_fn` fails.
516 ///
517 /// # Errors
518 ///
519 /// Returns an error if the default `ActorConfig` cannot be created.
520 pub async fn spawn_actor<State>(
521 &mut self,
522 setup_fn: impl FnOnce(
523 ManagedActor<Idle, State>,
524 ) -> Pin<Box<dyn Future<Output = ActorHandle> + Send + 'static>>,
525 ) -> anyhow::Result<ActorHandle>
526 where
527 State: Default + Send + Debug + 'static,
528 {
529 // Create a default config, ensuring the system broker is included.
530 let config = ActorConfig::new(Ern::default(), None, Some(self.broker()))?;
531 // Reuse the more general spawn function.
532 self.spawn_actor_with_setup_fn(config, setup_fn).await
533 }
534}
535