acton_core/common/
agent_runtime.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tokio_util::sync::CancellationToken;
25use tracing::{error, trace}; // Added error import
26
27use crate::actor::{AgentConfig, Idle, ManagedAgent};
28use crate::common::acton_inner::ActonInner;
29use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef, ActonConfig};
30use crate::traits::AgentHandleInterface;
31
32/// Represents the initialized and active Acton agent system runtime.
33///
34/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
35/// It holds the internal state of the running system, including a reference to the
36/// central message broker and a registry of top-level agents.
37///
38/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
39/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
40/// a graceful shutdown of all agents (`shutdown_all`).
41///
42/// It is cloneable, allowing different parts of an application to hold references
43/// to the runtime environment.
44#[derive(Debug, Clone, Default)]
45pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
46
47impl AgentRuntime {
48    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
49    ///
50    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
51    /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
52    /// The agent is registered as a top-level agent within the runtime.
53    ///
54    /// The returned agent is ready for further configuration (e.g., adding message handlers
55    /// via `act_on`) before being started by calling `.start()` on it.
56    ///
57    /// # Type Parameters
58    ///
59    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
60    ///
61    /// # Arguments
62    ///
63    /// * `name`: A string that will form the root name of the agent's [`Ern`].
64    ///
65    /// # Returns
66    ///
67    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
68    ///
69    /// # Panics
70    ///
71    /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
72    pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
73    where
74        State: Default + Send + Debug + 'static,
75    {
76        let actor_config = AgentConfig::new(
77            Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
78            None,                        // No parent for top-level agent
79            Some(self.0.broker.clone()), // Use system broker
80        )
81        .expect("Failed to create actor config");
82
83        let runtime = self.clone();
84        let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
85        trace!("Registering new top-level agent: {}", new_actor.id());
86        self.0
87            .roots
88            .insert(new_actor.id.clone(), new_actor.handle.clone());
89        new_actor
90    }
91
92    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
93    ///
94    /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
95    /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
96    ///
97    /// The returned agent is ready for further configuration before being started via `.start()`.
98    ///
99    /// # Type Parameters
100    ///
101    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
102    ///
103    /// # Returns
104    ///
105    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
106    ///
107    /// # Panics
108    ///
109    /// Panics if creating the internal `AgentConfig` fails.
110    pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
111    where
112        State: Default + Send + Debug + 'static,
113    {
114        // Use a default name if none is provided.
115        self.new_agent_with_name("agent".to_string()).await // Reuse the named version
116    }
117
118    /// Returns the number of top-level agents currently registered in the runtime.
119    ///
120    /// This count only includes agents directly created via the `AgentRuntime` and
121    /// does not include child agents supervised by other agents.
122    #[inline]
123    pub fn agent_count(&self) -> usize {
124        self.0.roots.len()
125    }
126
127    /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
128    ///
129    /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
130    /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
131    /// already set in the config. The agent is registered as a top-level agent within the runtime.
132    ///
133    /// The returned agent is ready for further configuration before being started via `.start()`.
134    ///
135    /// # Type Parameters
136    ///
137    /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
138    ///
139    /// # Arguments
140    ///
141    /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
142    ///   overridden with the system broker if it's `None`.
143    ///
144    /// # Returns
145    ///
146    /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
147    pub async fn new_agent_with_config<State>(
148        &mut self,
149        mut config: AgentConfig,
150    ) -> ManagedAgent<Idle, State>
151    where
152        State: Default + Send + Debug + 'static,
153    {
154        let acton_ready = self.clone();
155        // Ensure the agent uses the system broker if none is specified.
156        if config.broker.is_none() {
157            config.broker = Some(self.0.broker.clone());
158        }
159        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
160        trace!(
161            "Created new agent builder with config, id: {}",
162            new_agent.id()
163        );
164        self.0
165            .roots
166            .insert(new_agent.id.clone(), new_agent.handle.clone());
167        new_agent
168    }
169
170    /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
171    #[inline]
172    pub fn broker(&self) -> BrokerRef {
173        self.0.broker.clone()
174    }
175
176    /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
177    ///
178    /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
179    /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
180    /// necessary configurations (like adding message handlers), and must call `.start()` to
181    /// transition the agent to the `Started` state, returning its `AgentHandle`.
182    ///
183    /// The agent is registered as a top-level agent within the runtime.
184    ///
185    /// # Type Parameters
186    ///
187    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
188    ///
189    /// # Arguments
190    ///
191    /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
192    ///   overridden with the system broker if it's `None`.
193    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
194    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
195    ///
196    /// # Returns
197    ///
198    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
199    /// agent creation or the `setup_fn` fails.
200    pub async fn spawn_agent_with_setup_fn<State>(
201        &mut self,
202        mut config: AgentConfig,
203        setup_fn: impl FnOnce(
204            ManagedAgent<Idle, State>,
205        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
206    ) -> anyhow::Result<AgentHandle>
207    where
208        State: Default + Send + Debug + 'static,
209    {
210        let acton_ready = self.clone();
211        if config.broker.is_none() {
212            config.broker = Some(self.0.broker.clone());
213        }
214
215        let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
216        let agent_id = new_agent.id().clone(); // Get ID before moving
217        trace!("Running setup function for agent: {}", agent_id);
218        let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
219        trace!("Agent {} setup complete, registering handle.", agent_id);
220        self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
221        Ok(handle)
222    }
223
224    /// Initiates a graceful shutdown of the entire Acton system.
225    ///
226    /// This method attempts to stop all registered top-level agents (and consequently their
227    /// descendant children through the `stop` propagation mechanism) by sending them a
228    /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
229    /// Finally, it stops the central message broker agent.
230    ///
231    /// # Returns
232    ///
233    /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
234    /// Errors during the stopping of individual agents or the broker will be propagated.
235    pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
236        use std::time::Duration;
237        use tokio::time::timeout as tokio_timeout;
238
239        // Phase 1: Concurrently signal all root agents to terminate gracefully.
240        trace!("Sending Terminate signal to all root agents.");
241        let stop_futures: Vec<_> = self
242            .0
243            .roots
244            .iter()
245            .map(|item| {
246                let handle = item.value().clone();
247                async move {
248                    if let Err(e) = handle.stop().await {
249                        error!("Error stopping agent {}: {:?}", handle.id(), e);
250                    }
251                }
252            })
253            .collect();
254
255        let timeout_ms = self.0.config.system_shutdown_timeout().as_millis() as u64;
256
257        trace!("Waiting for all agents to finish gracefully...");
258        if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
259            .await
260            .is_err()
261        {
262            error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
263            self.0.cancellation_token.cancel(); // Forceful cancellation
264        } else {
265            trace!("All agents completed gracefully.");
266        }
267
268        trace!("Stopping the system broker...");
269        // Stop the broker agent, using same system shutdown timeout.
270        match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
271            Ok(res) => res?,
272            Err(_) => {
273                error!(
274                    "Timeout waiting for broker to shut down after {} ms",
275                    timeout_ms
276                );
277                return Err(anyhow::anyhow!(
278                    "Timeout while waiting for system broker to shut down after {} ms",
279                    timeout_ms
280                ));
281            }
282        }
283        trace!("System shutdown complete.");
284        Ok(())
285    }
286
287    /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
288    ///
289    /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
290    /// automatically creates a default `AgentConfig` (with a default name and the system broker).
291    /// The provided `setup_fn` configures and starts the agent.
292    ///
293    /// The agent is registered as a top-level agent within the runtime.
294    ///
295    /// # Type Parameters
296    ///
297    /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
298    ///
299    /// # Arguments
300    ///
301    /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
302    ///   calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
303    ///
304    /// # Returns
305    ///
306    /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
307    /// agent creation or the `setup_fn` fails.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the default `AgentConfig` cannot be created.
312    pub async fn spawn_agent<State>(
313        &mut self,
314        setup_fn: impl FnOnce(
315            ManagedAgent<Idle, State>,
316        ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
317    ) -> anyhow::Result<AgentHandle>
318    where
319        State: Default + Send + Debug + 'static,
320    {
321        // Create a default config, ensuring the system broker is included.
322        let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
323        // Reuse the more general spawn function.
324        self.spawn_agent_with_setup_fn(config, setup_fn).await
325    }
326}
327
328/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
329///
330/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
331/// It performs the following steps:
332/// 1. Loads configuration from XDG-compliant locations using [`ActonConfig::load()`].
333/// 2. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
334/// 3. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
335///    back from the background task.
336/// 4. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
337///    for the broker initialization to complete. This ensures that `ActonApp::launch()`
338///    does not return until the core system components (like the broker) are ready.
339/// 5. Constructs the `AgentRuntime` using the received broker handle and loaded configuration.
340///
341/// **Warning**: The use of `block_in_place` means this conversion should typically
342/// only happen once at the very start of the application within the main thread
343/// or a dedicated initialization thread, before the main asynchronous workload begins.
344/// Calling this from within an existing Tokio runtime task could lead to deadlocks
345/// or performance issues.
346impl From<ActonApp> for AgentRuntime {
347    fn from(_acton: ActonApp) -> Self {
348        trace!("Starting Acton system initialization (From<ActonApp>)");
349        
350        // Load configuration from XDG-compliant locations
351        let config = ActonConfig::load();
352        trace!("Configuration loaded: {:?}", config);
353        
354        let (sender, receiver) = oneshot::channel();
355        
356        // Create runtime with loaded configuration
357        let mut runtime = AgentRuntime(ActonInner {
358            broker: Default::default(),
359            roots: Default::default(),
360            cancellation_token: CancellationToken::new(),
361            config,
362        });
363        
364        // Spawn broker initialization in a separate task
365        let runtime_clone = runtime.clone();
366        
367        // Assert that the cancellation_token is present in the clone before broker initialization
368        assert!(
369            !runtime_clone.0.cancellation_token.is_cancelled(),
370            "ActonInner cancellation_token must be present and active before Broker initialization"
371        );
372        
373        tokio::spawn(async move {
374            trace!("Broker initialization task started.");
375            let broker = AgentBroker::initialize(runtime_clone).await;
376            trace!("Broker initialization task finished, sending handle.");
377            let _ = sender.send(broker); // Send broker handle back
378        });
379
380        trace!("Blocking current thread to wait for broker initialization...");
381        // Block until the broker handle is received
382        let broker = tokio::task::block_in_place(|| {
383            tokio::runtime::Handle::current()
384                .block_on(async { receiver.await.expect("Broker initialization failed") })
385        });
386        trace!("Broker handle received, constructing AgentRuntime.");
387        runtime.0.broker = broker;
388
389        // Create the runtime with the initialized broker and configuration
390        runtime
391    }
392}