acton_core/common/
agent_runtime.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::channel::oneshot::Canceled;
23use futures::future::join_all;
24use tokio::sync::oneshot;
25use tokio_util::sync::CancellationToken;
26use tracing::{error, trace}; // Added error import
27
28use crate::actor::{AgentConfig, Idle, ManagedAgent};
29use crate::common::acton_inner::ActonInner;
30use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
31use crate::traits::AgentHandleInterface;
32
33/// Represents the initialized and active Acton agent system runtime.
34///
35/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
36/// It holds the internal state of the running system, including a reference to the
37/// central message broker and a registry of top-level agents.
38///
39/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
40/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
41/// a graceful shutdown of all agents (`shutdown_all`).
42///
43/// It is cloneable, allowing different parts of an application to hold references
44/// to the runtime environment.
45#[derive(Debug, Clone, Default)]
46pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
47
48impl AgentRuntime {
49    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
50    ///
51    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
52    /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
53    /// The agent is registered as a top-level agent within the runtime.
54    ///
55    /// The returned agent is ready for further configuration (e.g., adding message handlers
56    /// via `act_on`) before being started by calling `.start()` on it.
57    ///
58    /// # Type Parameters
59    ///
60    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
61    ///
62    /// # Arguments
63    ///
64    /// * `name`: A string that will form the root name of the agent's [`Ern`].
65    ///
66    /// # Returns
67    ///
68    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
69    ///
70    /// # Panics
71    ///
72    /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
73    pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
74    where
75        State: Default + Send + Debug + 'static,
76    {
77        let actor_config = AgentConfig::new(
78            Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
79            None,                        // No parent for top-level agent
80            Some(self.0.broker.clone()), // Use system broker
81        )
82        .expect("Failed to create actor config");
83
84        let runtime = self.clone();
85        let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
86        trace!("Registering new top-level agent: {}", new_actor.id());
87        self.0
88            .roots
89            .insert(new_actor.id.clone(), new_actor.handle.clone());
90        new_actor
91    }
92
93    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
94    ///
95    /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
96    /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
97    ///
98    /// The returned agent is ready for further configuration before being started via `.start()`.
99    ///
100    /// # Type Parameters
101    ///
102    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
103    ///
104    /// # Returns
105    ///
106    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
107    ///
108    /// # Panics
109    ///
110    /// Panics if creating the internal `AgentConfig` fails.
111    pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
112    where
113        State: Default + Send + Debug + 'static,
114    {
115        // Use a default name if none is provided.
116        self.new_agent_with_name("agent".to_string()).await // Reuse the named version
117    }
118
119    /// Returns the number of top-level agents currently registered in the runtime.
120    ///
121    /// This count only includes agents directly created via the `AgentRuntime` and
122    /// does not include child agents supervised by other agents.
123    #[inline]
124    pub fn agent_count(&self) -> usize {
125        self.0.roots.len()
126    }
127
128    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
129    ///
130    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
131    /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
132    /// already set in the config. The agent is registered as a top-level agent within the runtime.
133    ///
134    /// The returned agent is ready for further configuration before being started via `.start()`.
135    ///
136    /// # Type Parameters
137    ///
138    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
139    ///
140    /// # Arguments
141    ///
142    /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
143    ///   overridden with the system broker if it's `None`.
144    ///
145    /// # Returns
146    ///
147    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
148    pub async fn new_agent_with_config<State>(
149        &mut self,
150        mut config: AgentConfig,
151    ) -> ManagedAgent<Idle, State>
152    where
153        State: Default + Send + Debug + 'static,
154    {
155        let acton_ready = self.clone();
156        // Ensure the agent uses the system broker if none is specified.
157        if config.broker.is_none() {
158            config.broker = Some(self.0.broker.clone());
159        }
160        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
161        trace!(
162            "Created new agent builder with config, id: {}",
163            new_agent.id()
164        );
165        self.0
166            .roots
167            .insert(new_agent.id.clone(), new_agent.handle.clone());
168        new_agent
169    }
170
171    /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
172    #[inline]
173    pub fn broker(&self) -> BrokerRef {
174        self.0.broker.clone()
175    }
176
177    /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
178    ///
179    /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
180    /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
181    /// necessary configurations (like adding message handlers), and must call `.start()` to
182    /// transition the agent to the `Started` state, returning its `AgentHandle`.
183    ///
184    /// The agent is registered as a top-level agent within the runtime.
185    ///
186    /// # Type Parameters
187    ///
188    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
189    ///
190    /// # Arguments
191    ///
192    /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
193    ///   overridden with the system broker if it's `None`.
194    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
195    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
196    ///
197    /// # Returns
198    ///
199    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
200    /// agent creation or the `setup_fn` fails.
201    pub async fn spawn_agent_with_setup_fn<State>(
202        &mut self,
203        mut config: AgentConfig,
204        setup_fn: impl FnOnce(
205            ManagedAgent<Idle, State>,
206        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
207    ) -> anyhow::Result<AgentHandle>
208    where
209        State: Default + Send + Debug + 'static,
210    {
211        let acton_ready = self.clone();
212        if config.broker.is_none() {
213            config.broker = Some(self.0.broker.clone());
214        }
215
216        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
217        let agent_id = new_agent.id().clone(); // Get ID before moving
218        trace!("Running setup function for agent: {}", agent_id);
219        let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
220        trace!("Agent {} setup complete, registering handle.", agent_id);
221        self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
222        Ok(handle)
223    }
224
225    /// Initiates a graceful shutdown of the entire Acton system.
226    ///
227    /// This method attempts to stop all registered top-level agents (and consequently their
228    /// descendant children through the `stop` propagation mechanism) by sending them a
229    /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
230    /// Finally, it stops the central message broker agent.
231    ///
232    /// # Returns
233    ///
234    /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
235    /// Errors during the stopping of individual agents or the broker will be propagated.
236    pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
237        use std::env;
238        use std::time::Duration;
239        use tokio::time::timeout as tokio_timeout;
240
241        trace!("Initiating shutdown of all top-level agents...");
242        // Trigger the system-wide cancellation_token for all agents.
243        self.0.cancellation_token.cancel();
244
245        // Wait for all agent tracker tasks to complete, draining all roots' trackers.
246        // This ensures that all root agents (and their children) have finished execution.
247        // The timeouts control how long we wait for all trackers to resolve.
248        let timeout_ms: u64 = env::var("ACTON_SYSTEM_SHUTDOWN_TIMEOUT_MS")
249            .ok()
250            .and_then(|val| val.parse().ok())
251            .unwrap_or(30_000);
252
253        let tracker_futures: Vec<_> = self
254            .0
255            .roots
256            .iter()
257            .map(|item| {
258                let root_handle = item.value().clone();
259                async move {
260                    trace!("Waiting for tracker for root agent: {}", root_handle.id());
261                    root_handle.tracker().wait().await;
262                }
263            })
264            .collect();
265
266        match tokio_timeout(Duration::from_millis(timeout_ms), join_all(tracker_futures)).await {
267            Ok(_) => trace!("All agent trackers completed."),
268            Err(_) => {
269                error!("System-wide tracker shutdown timeout expired after {} ms. Some agent tasks may remain running.", timeout_ms);
270                return Err(anyhow::anyhow!(
271                    "Timeout: not all agent tracker tasks completed within {} ms",
272                    timeout_ms
273                ));
274            }
275        };
276
277        trace!("Stopping the system broker...");
278        // Stop the broker agent, using same system shutdown timeout.
279        match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
280            Ok(res) => res?,
281            Err(_) => {
282                error!(
283                    "Timeout waiting for broker to shut down after {} ms",
284                    timeout_ms
285                );
286                return Err(anyhow::anyhow!(
287                    "Timeout while waiting for system broker to shut down after {} ms",
288                    timeout_ms
289                ));
290            }
291        }
292        trace!("System shutdown complete.");
293        Ok(())
294    }
295
296    /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
297    ///
298    /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
299    /// automatically creates a default `AgentConfig` (with a default name and the system broker).
300    /// The provided `setup_fn` configures and starts the agent.
301    ///
302    /// The agent is registered as a top-level agent within the runtime.
303    ///
304    /// # Type Parameters
305    ///
306    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
307    ///
308    /// # Arguments
309    ///
310    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
311    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
312    ///
313    /// # Returns
314    ///
315    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
316    /// agent creation or the `setup_fn` fails.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the default `AgentConfig` cannot be created.
321    pub async fn spawn_agent<State>(
322        &mut self,
323        setup_fn: impl FnOnce(
324            ManagedAgent<Idle, State>,
325        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
326    ) -> anyhow::Result<AgentHandle>
327    where
328        State: Default + Send + Debug + 'static,
329    {
330        // Create a default config, ensuring the system broker is included.
331        let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
332        // Reuse the more general spawn function.
333        self.spawn_agent_with_setup_fn(config, setup_fn).await
334    }
335}
336
337/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
338///
339/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
340/// It performs the following steps:
341/// 1. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
342/// 2. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
343///    back from the background task.
344/// 3. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
345///    for the broker initialization to complete. This ensures that `ActonApp::launch()`
346///    does not return until the core system components (like the broker) are ready.
347/// 4. Constructs the `AgentRuntime` using the received broker handle.
348///
349/// **Warning**: The use of `block_in_place` means this conversion should typically
350/// only happen once at the very start of the application within the main thread
351/// or a dedicated initialization thread, before the main asynchronous workload begins.
352/// Calling this from within an existing Tokio runtime task could lead to deadlocks
353/// or performance issues.
354impl From<ActonApp> for AgentRuntime {
355    fn from(_acton: ActonApp) -> Self {
356        trace!("Starting Acton system initialization (From<ActonApp>)");
357        let (sender, receiver) = oneshot::channel();
358        // We do this so the broker gets access to the cancellation token.
359        let mut runtime = AgentRuntime(ActonInner::default());
360        // Spawn broker initialization in a separate task.
361        let runtime_clone = runtime.clone();
362        // Assert that the cancellation_token is present in the clone before broker initialization
363        assert!(
364            !runtime_clone.0.cancellation_token.is_cancelled(),
365            "ActonInner cancellation_token must be present and active before Broker initialization"
366        );
367        tokio::spawn(async move {
368            trace!("Broker initialization task started.");
369            let broker = AgentBroker::initialize(runtime_clone).await;
370            trace!("Broker initialization task finished, sending handle.");
371            let _ = sender.send(broker); // Send broker handle back
372        });
373
374        trace!("Blocking current thread to wait for broker initialization...");
375        // Block until the broker handle is received.
376        let broker = tokio::task::block_in_place(|| {
377            tokio::runtime::Handle::current()
378                .block_on(async { receiver.await.expect("Broker initialization failed") })
379        });
380        trace!("Broker handle received, constructing AgentRuntime.");
381        runtime.0.broker = broker;
382
383        // Create the runtime with the initialized broker.
384        runtime
385    }
386}