acton_core/common/
agent_runtime.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tracing::{error, trace}; // Added error import
25
26use crate::actor::{AgentConfig, Idle, ManagedAgent};
27use crate::common::acton_inner::ActonInner;
28use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
29use crate::traits::AgentHandleInterface;
30
31/// Represents the initialized and active Acton agent system runtime.
32///
33/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
34/// It holds the internal state of the running system, including a reference to the
35/// central message broker and a registry of top-level agents.
36///
37/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
38/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
39/// a graceful shutdown of all agents (`shutdown_all`).
40///
41/// It is cloneable, allowing different parts of an application to hold references
42/// to the runtime environment.
43#[derive(Debug, Clone, Default)]
44pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
45
46impl AgentRuntime {
47    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
48    ///
49    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
50    /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
51    /// The agent is registered as a top-level agent within the runtime.
52    ///
53    /// The returned agent is ready for further configuration (e.g., adding message handlers
54    /// via `act_on`) before being started by calling `.start()` on it.
55    ///
56    /// # Type Parameters
57    ///
58    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
59    ///
60    /// # Arguments
61    ///
62    /// * `name`: A string that will form the root name of the agent's [`Ern`].
63    ///
64    /// # Returns
65    ///
66    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
67    ///
68    /// # Panics
69    ///
70    /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
71    pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
72    where
73        State: Default + Send + Debug + 'static,
74    {
75        let actor_config = AgentConfig::new(
76            Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
77            None,                        // No parent for top-level agent
78            Some(self.0.broker.clone()), // Use system broker
79        )
80        .expect("Failed to create actor config");
81
82        let runtime = self.clone();
83        let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
84        trace!("Registering new top-level agent: {}", new_actor.id());
85        self.0
86            .roots
87            .insert(new_actor.id.clone(), new_actor.handle.clone());
88        new_actor
89    }
90
91    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
92    ///
93    /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
94    /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
95    ///
96    /// The returned agent is ready for further configuration before being started via `.start()`.
97    ///
98    /// # Type Parameters
99    ///
100    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
101    ///
102    /// # Returns
103    ///
104    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
105    ///
106    /// # Panics
107    ///
108    /// Panics if creating the internal `AgentConfig` fails.
109    pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
110    where
111        State: Default + Send + Debug + 'static,
112    {
113        // Use a default name if none is provided.
114        self.new_agent_with_name("agent".to_string()).await // Reuse the named version
115    }
116
117    /// Returns the number of top-level agents currently registered in the runtime.
118    ///
119    /// This count only includes agents directly created via the `AgentRuntime` and
120    /// does not include child agents supervised by other agents.
121    #[inline]
122    pub fn agent_count(&self) -> usize {
123        self.0.roots.len()
124    }
125
126    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
127    ///
128    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
129    /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
130    /// already set in the config. The agent is registered as a top-level agent within the runtime.
131    ///
132    /// The returned agent is ready for further configuration before being started via `.start()`.
133    ///
134    /// # Type Parameters
135    ///
136    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
137    ///
138    /// # Arguments
139    ///
140    /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
141    ///   overridden with the system broker if it's `None`.
142    ///
143    /// # Returns
144    ///
145    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
146    pub async fn new_agent_with_config<State>(
147        &mut self,
148        mut config: AgentConfig,
149    ) -> ManagedAgent<Idle, State>
150    where
151        State: Default + Send + Debug + 'static,
152    {
153        let acton_ready = self.clone();
154        // Ensure the agent uses the system broker if none is specified.
155        if config.broker.is_none() {
156            config.broker = Some(self.0.broker.clone());
157        }
158        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
159        trace!(
160            "Created new agent builder with config, id: {}",
161            new_agent.id()
162        );
163        self.0
164            .roots
165            .insert(new_agent.id.clone(), new_agent.handle.clone());
166        new_agent
167    }
168
169    /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
170    #[inline]
171    pub fn broker(&self) -> BrokerRef {
172        self.0.broker.clone()
173    }
174
175    /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
176    ///
177    /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
178    /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
179    /// necessary configurations (like adding message handlers), and must call `.start()` to
180    /// transition the agent to the `Started` state, returning its `AgentHandle`.
181    ///
182    /// The agent is registered as a top-level agent within the runtime.
183    ///
184    /// # Type Parameters
185    ///
186    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
187    ///
188    /// # Arguments
189    ///
190    /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
191    ///   overridden with the system broker if it's `None`.
192    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
193    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
194    ///
195    /// # Returns
196    ///
197    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
198    /// agent creation or the `setup_fn` fails.
199    pub async fn spawn_agent_with_setup_fn<State>(
200        &mut self,
201        mut config: AgentConfig,
202        setup_fn: impl FnOnce(
203            ManagedAgent<Idle, State>,
204        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
205    ) -> anyhow::Result<AgentHandle>
206    where
207        State: Default + Send + Debug + 'static,
208    {
209        let acton_ready = self.clone();
210        if config.broker.is_none() {
211            config.broker = Some(self.0.broker.clone());
212        }
213
214        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
215        let agent_id = new_agent.id().clone(); // Get ID before moving
216        trace!("Running setup function for agent: {}", agent_id);
217        let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
218        trace!("Agent {} setup complete, registering handle.", agent_id);
219        self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
220        Ok(handle)
221    }
222
223    /// Initiates a graceful shutdown of the entire Acton system.
224    ///
225    /// This method attempts to stop all registered top-level agents (and consequently their
226    /// descendant children through the `stop` propagation mechanism) by sending them a
227    /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
228    /// Finally, it stops the central message broker agent.
229    ///
230    /// # Returns
231    ///
232    /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
233    /// Errors during the stopping of individual agents or the broker will be propagated.
234    pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
235        use std::env;
236        use std::time::Duration;
237        use tokio::time::timeout as tokio_timeout;
238
239        trace!("Initiating shutdown of all top-level agents...");
240        // Collect stop futures for all root agents.
241        let stop_futures = self.0.roots.iter().map(|item| {
242            let root_handle = item.value().clone();
243            async move {
244                trace!("Sending stop signal to root agent: {}", root_handle.id());
245                root_handle.stop().await // Call stop on the handle
246            }
247        });
248
249        // Determine system shutdown timeout from env or use default (30s)
250        let timeout_ms: u64 = env::var("ACTON_SYSTEM_SHUTDOWN_TIMEOUT_MS")
251            .ok()
252            .and_then(|val| val.parse().ok())
253            .unwrap_or(30_000);
254
255        // Wait for all root agents (and their children) to stop, with timeout.
256        let roots_fut = join_all(stop_futures);
257        let results: Vec<anyhow::Result<()>> = match tokio_timeout(
258            Duration::from_millis(timeout_ms),
259            roots_fut,
260        )
261        .await
262        {
263            Ok(r) => r,
264            Err(_) => {
265                error!("System-wide shutdown timeout expired after {} ms. Some agents may not have stopped.", timeout_ms);
266                return Err(anyhow::anyhow!(
267                    "Timeout: not all root agents stopped within {} ms",
268                    timeout_ms
269                ));
270            }
271        };
272        trace!("All root agent stop futures completed.");
273
274        // Check for errors during agent shutdown.
275        for result in results {
276            if let Err(e) = result {
277                // Log error but continue shutdown attempt
278                error!("Error stopping agent during shutdown: {:?}", e);
279            }
280        }
281
282        trace!("Stopping the system broker...");
283        // Stop the broker agent, using same system shutdown timeout.
284        match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
285            Ok(res) => res?,
286            Err(_) => {
287                error!(
288                    "Timeout waiting for broker to shut down after {} ms",
289                    timeout_ms
290                );
291                return Err(anyhow::anyhow!(
292                    "Timeout while waiting for system broker to shut down after {} ms",
293                    timeout_ms
294                ));
295            }
296        }
297        trace!("System shutdown complete.");
298        Ok(())
299    }
300
301    /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
302    ///
303    /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
304    /// automatically creates a default `AgentConfig` (with a default name and the system broker).
305    /// The provided `setup_fn` configures and starts the agent.
306    ///
307    /// The agent is registered as a top-level agent within the runtime.
308    ///
309    /// # Type Parameters
310    ///
311    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
312    ///
313    /// # Arguments
314    ///
315    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
316    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
317    ///
318    /// # Returns
319    ///
320    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
321    /// agent creation or the `setup_fn` fails.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the default `AgentConfig` cannot be created.
326    pub async fn spawn_agent<State>(
327        &mut self,
328        setup_fn: impl FnOnce(
329            ManagedAgent<Idle, State>,
330        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
331    ) -> anyhow::Result<AgentHandle>
332    where
333        State: Default + Send + Debug + 'static,
334    {
335        // Create a default config, ensuring the system broker is included.
336        let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
337        // Reuse the more general spawn function.
338        self.spawn_agent_with_setup_fn(config, setup_fn).await
339    }
340}
341
342/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
343///
344/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
345/// It performs the following steps:
346/// 1. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
347/// 2. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
348///    back from the background task.
349/// 3. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
350///    for the broker initialization to complete. This ensures that `ActonApp::launch()`
351///    does not return until the core system components (like the broker) are ready.
352/// 4. Constructs the `AgentRuntime` using the received broker handle.
353///
354/// **Warning**: The use of `block_in_place` means this conversion should typically
355/// only happen once at the very start of the application within the main thread
356/// or a dedicated initialization thread, before the main asynchronous workload begins.
357/// Calling this from within an existing Tokio runtime task could lead to deadlocks
358/// or performance issues.
359impl From<ActonApp> for AgentRuntime {
360    fn from(_acton: ActonApp) -> Self {
361        trace!("Starting Acton system initialization (From<ActonApp>)");
362        let (sender, receiver) = oneshot::channel();
363
364        // Spawn broker initialization in a separate task.
365        tokio::spawn(async move {
366            trace!("Broker initialization task started.");
367            let broker = AgentBroker::initialize().await;
368            trace!("Broker initialization task finished, sending handle.");
369            let _ = sender.send(broker); // Send broker handle back
370        });
371
372        trace!("Blocking current thread to wait for broker initialization...");
373        // Block until the broker handle is received.
374        let broker = tokio::task::block_in_place(|| {
375            tokio::runtime::Handle::current()
376                .block_on(async { receiver.await.expect("Broker initialization failed") })
377        });
378        trace!("Broker handle received, constructing AgentRuntime.");
379
380        // Create the runtime with the initialized broker.
381        AgentRuntime(ActonInner {
382            broker,
383            ..Default::default()
384        })
385    }
386}