Skip to main content

acton_reactive/common/
actor_runtime.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tracing::{error, trace};
24
25use crate::actor::{ActorConfig, Idle, ManagedActor};
26use crate::common::acton_inner::ActonInner;
27use crate::common::{ActorHandle, BrokerRef};
28use crate::traits::ActorHandleInterface;
29
30/// Represents the initialized and active Acton actor system runtime.
31///
32/// This struct is obtained after successfully launching the system via [`ActonApp::launch_async().await`].
33/// It holds the internal state of the running system, including a reference to the
34/// central message broker and a registry of top-level actors.
35///
36/// `ActorRuntime` provides the primary methods for interacting with the system as a whole,
37/// such as creating new top-level actors (`new_actor`, `spawn_actor`, etc.) and initiating
38/// a graceful shutdown of all actors (`shutdown_all`).
39///
40/// It is cloneable, allowing different parts of an application to hold references
41/// to the runtime environment.
42#[derive(Debug, Clone, Default)]
43pub struct ActorRuntime(pub(crate) ActonInner); // Keep inner field crate-public
44
45impl ActorRuntime {
46    /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) with a specified root name.
47    ///
48    /// This method initializes a [`ManagedActor`] in the [`Idle`] state, configured with a
49    /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
50    /// The actor is registered as a top-level actor within the runtime.
51    ///
52    /// The returned actor is ready for further configuration (e.g., adding message handlers
53    /// via `act_on`) before being started by calling `.start()` on it.
54    ///
55    /// # Type Parameters
56    ///
57    /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
58    ///
59    /// # Arguments
60    ///
61    /// * `name`: A string that will form the root name of the actor's [`Ern`].
62    ///
63    /// # Returns
64    ///
65    /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
66    ///
67    /// # Panics
68    ///
69    /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `ActorConfig` fails.
70    pub fn new_actor_with_name<State>(&mut self, name: String) -> ManagedActor<Idle, State>
71    where
72        State: Default + Send + Debug + 'static,
73    {
74        let actor_config = ActorConfig::new(
75            Ern::with_root(name).expect("Failed to create root Ern for new actor"), // Use expect for clarity
76            None,                        // No parent for top-level actor
77            Some(self.0.broker.clone()), // Use system broker
78        )
79        .expect("Failed to create actor config");
80
81        let runtime = self.clone();
82        let new_actor = ManagedActor::new(Some(&runtime), Some(&actor_config));
83        trace!("Registering new top-level actor: {}", new_actor.id());
84        self.0
85            .roots
86            .insert(new_actor.id.clone(), new_actor.handle.clone());
87        new_actor
88    }
89
90    /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) with a default name ("actor").
91    ///
92    /// Similar to [`ActorRuntime::new_actor_with_name`], but uses a default root name "actor"
93    /// for the actor's [`Ern`]. The actor is registered as a top-level actor within the runtime.
94    ///
95    /// The returned actor is ready for further configuration before being started via `.start()`.
96    ///
97    /// # Type Parameters
98    ///
99    /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
100    ///
101    /// # Returns
102    ///
103    /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
104    ///
105    /// # Panics
106    ///
107    /// Panics if creating the internal `ActorConfig` fails.
108    pub fn new_actor<State>(&mut self) -> ManagedActor<Idle, State>
109    where
110        State: Default + Send + Debug + 'static,
111    {
112        // Use a default name if none is provided.
113        self.new_actor_with_name("actor".to_string()) // Reuse the named version
114    }
115
116    /// Returns the number of top-level actors currently registered in the runtime.
117    ///
118    /// This count only includes actors directly created via the `ActorRuntime` and
119    /// does not include child actors supervised by other actors.
120    #[inline]
121    #[must_use]
122    pub fn actor_count(&self) -> usize {
123        self.0.roots.len()
124    }
125
126    /// Creates a new top-level actor builder (`ManagedActor<Idle, State>`) using a provided configuration.
127    ///
128    /// This method initializes a [`ManagedActor`] in the [`Idle`] state using the specified
129    /// [`ActorConfig`]. It ensures the actor is configured with the system's broker if not
130    /// already set in the config. The actor is registered as a top-level actor within the runtime.
131    ///
132    /// The returned actor is ready for further configuration before being started via `.start()`.
133    ///
134    /// # Type Parameters
135    ///
136    /// * `State`: The user-defined state type for the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
137    ///
138    /// # Arguments
139    ///
140    /// * `config`: The [`ActorConfig`] to use for the new actor. The broker field will be
141    ///   overridden with the system broker if it's `None`.
142    ///
143    /// # Returns
144    ///
145    /// A [`ManagedActor<Idle, State>`] instance, ready for configuration and starting.
146    pub fn new_actor_with_config<State>(
147        &mut self,
148        mut config: ActorConfig,
149    ) -> ManagedActor<Idle, State>
150    where
151        State: Default + Send + Debug + 'static,
152    {
153        let acton_ready = self.clone();
154        // Ensure the actor uses the system broker if none is specified.
155        if config.broker.is_none() {
156            config.broker = Some(self.0.broker.clone());
157        }
158        let new_actor = ManagedActor::new(Some(&acton_ready), Some(&config));
159        trace!(
160            "Created new actor builder with config, id: {}",
161            new_actor.id()
162        );
163        self.0
164            .roots
165            .insert(new_actor.id.clone(), new_actor.handle.clone());
166        new_actor
167    }
168
169    /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
170    #[inline]
171    #[must_use]
172    pub fn broker(&self) -> BrokerRef {
173        self.0.broker.clone()
174    }
175
176    /// Returns a clone of the Arc-wrapped IPC type registry.
177    ///
178    /// The registry is used to register message types for cross-process
179    /// serialization and deserialization. Message types must be registered
180    /// before they can be received via IPC.
181    ///
182    /// Only available when the `ipc` feature is enabled.
183    ///
184    /// # Example
185    ///
186    /// ```rust,ignore
187    /// use acton_reactive::prelude::*;
188    /// use serde::{Serialize, Deserialize};
189    ///
190    /// #[derive(Clone, Debug, Serialize, Deserialize)]
191    /// struct PriceUpdate {
192    ///     symbol: String,
193    ///     price: f64,
194    /// }
195    ///
196    /// let mut runtime = ActonApp::launch_async().await;
197    ///
198    /// // Register the message type with a stable name
199    /// runtime.ipc_registry().register::<PriceUpdate>("PriceUpdate");
200    /// ```
201    #[cfg(feature = "ipc")]
202    #[inline]
203    #[must_use]
204    pub fn ipc_registry(&self) -> std::sync::Arc<crate::common::ipc::IpcTypeRegistry> {
205        self.0.ipc_type_registry.clone()
206    }
207
208    /// Exposes an actor for IPC access with a logical name.
209    ///
210    /// External processes reference actors by logical names (e.g., `price_service`)
211    /// rather than full ERNs. This method registers the mapping between a
212    /// human-readable name and the actor's handle.
213    ///
214    /// Only available when the `ipc` feature is enabled.
215    ///
216    /// # Arguments
217    ///
218    /// * `name`: The logical name to expose the actor as. External IPC clients
219    ///   will use this name to target the actor.
220    /// * `handle`: The [`ActorHandle`] of the actor to expose.
221    ///
222    /// # Example
223    ///
224    /// ```rust,ignore
225    /// let mut runtime = ActonApp::launch_async().await;
226    /// let actor = runtime.new_actor_with_name::<PriceServiceState>("price_service".to_string());
227    /// let handle = actor.start().await;
228    ///
229    /// // Expose the actor for IPC access
230    /// runtime.ipc_expose("price_service", handle.clone());
231    /// ```
232    #[cfg(feature = "ipc")]
233    pub fn ipc_expose(&self, name: &str, handle: ActorHandle) {
234        trace!("Exposing actor {} for IPC as '{}'", handle.id(), name);
235        self.0.ipc_actor_registry.insert(name.to_string(), handle);
236    }
237
238    /// Removes an actor from IPC exposure.
239    ///
240    /// After calling this method, external processes will no longer be able
241    /// to send messages to the actor using the specified name.
242    ///
243    /// Only available when the `ipc` feature is enabled.
244    ///
245    /// # Arguments
246    ///
247    /// * `name`: The logical name to remove from IPC exposure.
248    ///
249    /// # Returns
250    ///
251    /// The removed [`ActorHandle`] if the name was registered, or `None` if
252    /// no actor was registered with that name.
253    #[cfg(feature = "ipc")]
254    pub fn ipc_hide(&self, name: &str) -> Option<ActorHandle> {
255        trace!("Hiding actor '{}' from IPC", name);
256        self.0.ipc_actor_registry.remove(name).map(|(_, h)| h)
257    }
258
259    /// Looks up an actor handle by its IPC logical name.
260    ///
261    /// This is used internally by the IPC listener to route messages to
262    /// the correct actor.
263    ///
264    /// Only available when the `ipc` feature is enabled.
265    ///
266    /// # Arguments
267    ///
268    /// * `name`: The logical name to look up.
269    ///
270    /// # Returns
271    ///
272    /// A clone of the [`ActorHandle`] if found, or `None` if no actor
273    /// is registered with that name.
274    #[cfg(feature = "ipc")]
275    #[must_use]
276    pub fn ipc_lookup(&self, name: &str) -> Option<ActorHandle> {
277        self.0.ipc_actor_registry.get(name).map(|r| r.clone())
278    }
279
280    /// Returns the number of actors currently exposed for IPC.
281    ///
282    /// Only available when the `ipc` feature is enabled.
283    #[cfg(feature = "ipc")]
284    #[inline]
285    #[must_use]
286    pub fn ipc_actor_count(&self) -> usize {
287        self.0.ipc_actor_registry.len()
288    }
289
290    /// Starts the IPC listener with the default configuration.
291    ///
292    /// This method loads IPC configuration from XDG-compliant locations and
293    /// starts a Unix Domain Socket listener that accepts connections from
294    /// external processes and routes messages to registered actors.
295    ///
296    /// The listener runs in a background task and will be automatically stopped
297    /// when the runtime's cancellation token is triggered (e.g., during shutdown).
298    ///
299    /// Only available when the `ipc` feature is enabled.
300    ///
301    /// # Returns
302    ///
303    /// An [`IpcListenerHandle`](crate::common::ipc::IpcListenerHandle) for
304    /// managing the listener lifecycle and accessing statistics.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if:
309    /// - The socket directory cannot be created
310    /// - Another listener is already running at the socket path
311    /// - The socket cannot be bound
312    ///
313    /// # Example
314    ///
315    /// ```rust,ignore
316    /// let mut runtime = ActonApp::launch_async().await;
317    ///
318    /// // Register message types and expose actors first
319    /// runtime.ipc_registry().register::<MyMessage>("MyMessage");
320    /// runtime.ipc_expose("my_actor", actor_handle);
321    ///
322    /// // Start the IPC listener
323    /// let listener = runtime.start_ipc_listener().await?;
324    ///
325    /// // Check listener statistics
326    /// println!("Active connections: {}", listener.stats.connections_active());
327    /// ```
328    #[cfg(feature = "ipc")]
329    pub async fn start_ipc_listener(
330        &self,
331    ) -> Result<crate::common::ipc::IpcListenerHandle, crate::common::ipc::IpcError> {
332        let config = crate::common::ipc::IpcConfig::load();
333        self.start_ipc_listener_with_config(config).await
334    }
335
336    /// Starts the IPC listener with a custom configuration.
337    ///
338    /// This method allows you to provide a custom IPC configuration instead
339    /// of loading from the default XDG locations.
340    ///
341    /// Only available when the `ipc` feature is enabled.
342    ///
343    /// # Arguments
344    ///
345    /// * `config` - Custom IPC configuration.
346    ///
347    /// # Returns
348    ///
349    /// An [`IpcListenerHandle`](crate::common::ipc::IpcListenerHandle) for
350    /// managing the listener lifecycle.
351    ///
352    /// # Errors
353    ///
354    /// Same as [`start_ipc_listener`](Self::start_ipc_listener).
355    #[cfg(feature = "ipc")]
356    pub async fn start_ipc_listener_with_config(
357        &self,
358        config: crate::common::ipc::IpcConfig,
359    ) -> Result<crate::common::ipc::IpcListenerHandle, crate::common::ipc::IpcError> {
360        trace!("Starting IPC listener with config: {:?}", config);
361        let handle = crate::common::ipc::start_listener(
362            config,
363            self.0.ipc_type_registry.clone(),
364            self.0.ipc_actor_registry.clone(),
365            self.0.cancellation_token.clone(),
366        )
367        .await?;
368
369        // Store the subscription manager reference so the broker can forward broadcasts to IPC clients
370        {
371            let mut guard = self.0.ipc_subscription_manager.write();
372            *guard = Some(handle.subscription_manager().clone());
373        }
374
375        Ok(handle)
376    }
377
378    /// Creates, configures, and starts a top-level actor using a provided configuration and setup function.
379    ///
380    /// This method combines actor creation (using `config`), custom asynchronous setup (`setup_fn`),
381    /// and starting the actor. The `setup_fn` receives the actor in the `Idle` state, performs
382    /// necessary configurations (like adding message handlers), and must call `.start()` to
383    /// transition the actor to the `Started` state, returning its `ActorHandle`.
384    ///
385    /// The actor is registered as a top-level actor within the runtime.
386    ///
387    /// # Type Parameters
388    ///
389    /// * `State`: The state type of the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
390    ///
391    /// # Arguments
392    ///
393    /// * `config`: The [`ActorConfig`] to use for creating the actor. The broker field will be
394    ///   overridden with the system broker if it's `None`.
395    /// * `setup_fn`: An asynchronous closure that takes the `ManagedActor<Idle, State>`, configures it,
396    ///   calls `.start()`, and returns the resulting `ActorHandle`. The closure must be `Send + 'static`.
397    ///
398    /// # Returns
399    ///
400    /// A `Result` containing the `ActorHandle` of the successfully spawned actor, or an error if
401    /// actor creation or the `setup_fn` fails.
402    pub async fn spawn_actor_with_setup_fn<State>(
403        &mut self,
404        mut config: ActorConfig,
405        setup_fn: impl FnOnce(
406            ManagedActor<Idle, State>,
407        ) -> Pin<Box<dyn Future<Output = ActorHandle> + Send + 'static>>,
408    ) -> anyhow::Result<ActorHandle>
409    where
410        State: Default + Send + Debug + 'static,
411    {
412        let acton_ready = self.clone();
413        if config.broker.is_none() {
414            config.broker = Some(self.0.broker.clone());
415        }
416
417        let new_actor = ManagedActor::new(Some(&acton_ready), Some(&config));
418        let actor_id = new_actor.id().clone(); // Get ID before moving
419        trace!("Running setup function for actor: {}", actor_id);
420        let handle = setup_fn(new_actor).await; // Setup function consumes the actor and returns handle
421        trace!("Actor {} setup complete, registering handle.", actor_id);
422        self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
423        Ok(handle)
424    }
425
426    /// Initiates a graceful shutdown of the entire Acton system.
427    ///
428    /// This method attempts to stop all registered top-level actors (and consequently their
429    /// descendant children through the `stop` propagation mechanism) by sending them a
430    /// [`SystemSignal::Terminate`]. It waits for all top-level actor tasks to complete.
431    /// Finally, it stops the central message broker actor.
432    ///
433    /// # Returns
434    ///
435    /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
436    /// Errors during the stopping of individual actors or the broker will be propagated.
437    pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
438        use std::time::Duration;
439        use tokio::time::timeout as tokio_timeout;
440
441        // Phase 1: Concurrently signal all root actors to terminate gracefully.
442        trace!("Sending Terminate signal to all root actors.");
443        let stop_futures: Vec<_> = self
444            .0
445            .roots
446            .iter()
447            .map(|item| {
448                let handle = item.value().clone();
449                async move {
450                    if let Err(e) = handle.stop().await {
451                        error!("Error stopping actor {}: {:?}", handle.id(), e);
452                    }
453                }
454            })
455            .collect();
456
457        let timeout_ms: u64 = self
458            .0
459            .config
460            .system_shutdown_timeout()
461            .as_millis()
462            .try_into()
463            .unwrap_or(u64::MAX);
464
465        trace!("Waiting for all actors to finish gracefully...");
466        if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
467            .await
468            .is_err()
469        {
470            error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
471            self.0.cancellation_token.cancel(); // Forceful cancellation
472        } else {
473            trace!("All actors completed gracefully.");
474        }
475
476        trace!("Stopping the system broker...");
477        // Stop the broker actor, using same system shutdown timeout.
478        if let Ok(res) =
479            tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await
480        {
481            res?;
482        } else {
483            error!(
484                "Timeout waiting for broker to shut down after {} ms",
485                timeout_ms
486            );
487            return Err(anyhow::anyhow!(
488                "Timeout while waiting for system broker to shut down after {timeout_ms} ms"
489            ));
490        }
491        trace!("System shutdown complete.");
492        Ok(())
493    }
494
495    /// Creates, configures, and starts a top-level actor using a default configuration and a setup function.
496    ///
497    /// This is a convenience method similar to [`ActorRuntime::spawn_actor_with_setup_fn`], but it
498    /// automatically creates a default `ActorConfig` (with a default name and the system broker).
499    /// The provided `setup_fn` configures and starts the actor.
500    ///
501    /// The actor is registered as a top-level actor within the runtime.
502    ///
503    /// # Type Parameters
504    ///
505    /// * `State`: The state type of the actor. Must implement `Default`, `Send`, `Debug`, and be `'static`.
506    ///
507    /// # Arguments
508    ///
509    /// * `setup_fn`: An asynchronous closure that takes the `ManagedActor<Idle, State>`, configures it,
510    ///   calls `.start()`, and returns the resulting `ActorHandle`. The closure must be `Send + 'static`.
511    ///
512    /// # Returns
513    ///
514    /// A `Result` containing the `ActorHandle` of the successfully spawned actor, or an error if
515    /// actor creation or the `setup_fn` fails.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the default `ActorConfig` cannot be created.
520    pub async fn spawn_actor<State>(
521        &mut self,
522        setup_fn: impl FnOnce(
523            ManagedActor<Idle, State>,
524        ) -> Pin<Box<dyn Future<Output = ActorHandle> + Send + 'static>>,
525    ) -> anyhow::Result<ActorHandle>
526    where
527        State: Default + Send + Debug + 'static,
528    {
529        // Create a default config, ensuring the system broker is included.
530        let config = ActorConfig::new(Ern::default(), None, Some(self.broker()))?;
531        // Reuse the more general spawn function.
532        self.spawn_actor_with_setup_fn(config, setup_fn).await
533    }
534}
535