acton_core/common/agent_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tracing::{trace, error}; // Added error import
25
26use crate::actor::{AgentConfig, Idle, ManagedAgent};
27use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
28use crate::common::acton_inner::ActonInner;
29use crate::traits::AgentHandleInterface;
30
31/// Represents the initialized and active Acton agent system runtime.
32///
33/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
34/// It holds the internal state of the running system, including a reference to the
35/// central message broker and a registry of top-level agents.
36///
37/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
38/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
39/// a graceful shutdown of all agents (`shutdown_all`).
40///
41/// It is cloneable, allowing different parts of an application to hold references
42/// to the runtime environment.
43#[derive(Debug, Clone, Default)]
44pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
45
46impl AgentRuntime {
47 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
48 ///
49 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
50 /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
51 /// The agent is registered as a top-level agent within the runtime.
52 ///
53 /// The returned agent is ready for further configuration (e.g., adding message handlers
54 /// via `act_on`) before being started by calling `.start()` on it.
55 ///
56 /// # Type Parameters
57 ///
58 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
59 ///
60 /// # Arguments
61 ///
62 /// * `name`: A string that will form the root name of the agent's [`Ern`].
63 ///
64 /// # Returns
65 ///
66 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
67 ///
68 /// # Panics
69 ///
70 /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
71 pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
72 where
73 State: Default + Send + Debug + 'static,
74 {
75 let actor_config = AgentConfig::new(
76 Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
77 None, // No parent for top-level agent
78 Some(self.0.broker.clone()), // Use system broker
79 ).expect("Failed to create actor config");
80
81 let runtime = self.clone();
82 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
83 trace!("Registering new top-level agent: {}", new_actor.id());
84 self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
85 new_actor
86 }
87
88 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
89 ///
90 /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
91 /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
92 ///
93 /// The returned agent is ready for further configuration before being started via `.start()`.
94 ///
95 /// # Type Parameters
96 ///
97 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
98 ///
99 /// # Returns
100 ///
101 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
102 ///
103 /// # Panics
104 ///
105 /// Panics if creating the internal `AgentConfig` fails.
106 pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
107 where
108 State: Default + Send + Debug + 'static,
109 {
110 // Use a default name if none is provided.
111 self.new_agent_with_name("agent".to_string()).await // Reuse the named version
112 }
113
114 /// Returns the number of top-level agents currently registered in the runtime.
115 ///
116 /// This count only includes agents directly created via the `AgentRuntime` and
117 /// does not include child agents supervised by other agents.
118 #[inline]
119 pub fn agent_count(&self) -> usize {
120 self.0.roots.len()
121 }
122
123 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
124 ///
125 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
126 /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
127 /// already set in the config. The agent is registered as a top-level agent within the runtime.
128 ///
129 /// The returned agent is ready for further configuration before being started via `.start()`.
130 ///
131 /// # Type Parameters
132 ///
133 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
134 ///
135 /// # Arguments
136 ///
137 /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
138 /// overridden with the system broker if it's `None`.
139 ///
140 /// # Returns
141 ///
142 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
143 pub async fn new_agent_with_config<State>(
144 &mut self,
145 mut config: AgentConfig,
146 ) -> ManagedAgent<Idle, State>
147 where
148 State: Default + Send + Debug + 'static,
149 {
150 let acton_ready = self.clone();
151 // Ensure the agent uses the system broker if none is specified.
152 if config.broker.is_none() {
153 config.broker = Some(self.0.broker.clone());
154 }
155 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
156 trace!("Created new agent builder with config, id: {}", new_agent.id());
157 self.0.roots.insert(new_agent.id.clone(), new_agent.handle.clone());
158 new_agent
159 }
160
161 /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
162 #[inline]
163 pub fn broker(&self) -> BrokerRef {
164 self.0.broker.clone()
165 }
166
167 /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
168 ///
169 /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
170 /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
171 /// necessary configurations (like adding message handlers), and must call `.start()` to
172 /// transition the agent to the `Started` state, returning its `AgentHandle`.
173 ///
174 /// The agent is registered as a top-level agent within the runtime.
175 ///
176 /// # Type Parameters
177 ///
178 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
179 ///
180 /// # Arguments
181 ///
182 /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
183 /// overridden with the system broker if it's `None`.
184 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
185 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
186 ///
187 /// # Returns
188 ///
189 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
190 /// agent creation or the `setup_fn` fails.
191 pub async fn spawn_agent_with_setup_fn<State>(
192 &mut self,
193 mut config: AgentConfig,
194 setup_fn: impl FnOnce(
195 ManagedAgent<Idle, State>,
196 ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
197 ) -> anyhow::Result<AgentHandle>
198 where
199 State: Default + Send + Debug + 'static,
200 {
201 let acton_ready = self.clone();
202 if config.broker.is_none() {
203 config.broker = Some(self.0.broker.clone());
204 }
205
206 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
207 let agent_id = new_agent.id().clone(); // Get ID before moving
208 trace!("Running setup function for agent: {}", agent_id);
209 let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
210 trace!("Agent {} setup complete, registering handle.", agent_id);
211 self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
212 Ok(handle)
213 }
214
215 /// Initiates a graceful shutdown of the entire Acton system.
216 ///
217 /// This method attempts to stop all registered top-level agents (and consequently their
218 /// descendant children through the `stop` propagation mechanism) by sending them a
219 /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
220 /// Finally, it stops the central message broker agent.
221 ///
222 /// # Returns
223 ///
224 /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
225 /// Errors during the stopping of individual agents or the broker will be propagated.
226 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
227 trace!("Initiating shutdown of all top-level agents...");
228 // Collect stop futures for all root agents.
229 let stop_futures = self.0.roots.iter().map(|item| {
230 let root_handle = item.value().clone();
231 async move {
232 trace!("Sending stop signal to root agent: {}", root_handle.id());
233 root_handle.stop().await // Call stop on the handle
234 }
235 });
236
237 // Wait for all root agents (and their children) to stop.
238 let results: Vec<anyhow::Result<()>> = join_all(stop_futures).await;
239 trace!("All root agent stop futures completed.");
240
241 // Check for errors during agent shutdown.
242 for result in results {
243 if let Err(e) = result {
244 // Log error but continue shutdown attempt
245 error!("Error stopping agent during shutdown: {:?}", e);
246 }
247 }
248
249 trace!("Stopping the system broker...");
250 self.0.broker.stop().await?; // Stop the broker agent
251 trace!("System shutdown complete.");
252 Ok(())
253 }
254
255 /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
256 ///
257 /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
258 /// automatically creates a default `AgentConfig` (with a default name and the system broker).
259 /// The provided `setup_fn` configures and starts the agent.
260 ///
261 /// The agent is registered as a top-level agent within the runtime.
262 ///
263 /// # Type Parameters
264 ///
265 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
266 ///
267 /// # Arguments
268 ///
269 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
270 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
271 ///
272 /// # Returns
273 ///
274 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
275 /// agent creation or the `setup_fn` fails.
276 ///
277 /// # Errors
278 ///
279 /// Returns an error if the default `AgentConfig` cannot be created.
280 pub async fn spawn_agent<State>(
281 &mut self,
282 setup_fn: impl FnOnce(
283 ManagedAgent<Idle, State>,
284 ) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
285 ) -> anyhow::Result<AgentHandle>
286 where
287 State: Default + Send + Debug + 'static,
288 {
289 // Create a default config, ensuring the system broker is included.
290 let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
291 // Reuse the more general spawn function.
292 self.spawn_agent_with_setup_fn(config, setup_fn).await
293 }
294}
295
296/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
297///
298/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
299/// It performs the following steps:
300/// 1. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
301/// 2. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
302/// back from the background task.
303/// 3. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
304/// for the broker initialization to complete. This ensures that `ActonApp::launch()`
305/// does not return until the core system components (like the broker) are ready.
306/// 4. Constructs the `AgentRuntime` using the received broker handle.
307///
308/// **Warning**: The use of `block_in_place` means this conversion should typically
309/// only happen once at the very start of the application within the main thread
310/// or a dedicated initialization thread, before the main asynchronous workload begins.
311/// Calling this from within an existing Tokio runtime task could lead to deadlocks
312/// or performance issues.
313impl From<ActonApp> for AgentRuntime {
314 fn from(_acton: ActonApp) -> Self {
315 trace!("Starting Acton system initialization (From<ActonApp>)");
316 let (sender, receiver) = oneshot::channel();
317
318 // Spawn broker initialization in a separate task.
319 tokio::spawn(async move {
320 trace!("Broker initialization task started.");
321 let broker = AgentBroker::initialize().await;
322 trace!("Broker initialization task finished, sending handle.");
323 let _ = sender.send(broker); // Send broker handle back
324 });
325
326 trace!("Blocking current thread to wait for broker initialization...");
327 // Block until the broker handle is received.
328 let broker = tokio::task::block_in_place(|| {
329 tokio::runtime::Handle::current()
330 .block_on(async { receiver.await.expect("Broker initialization failed") })
331 });
332 trace!("Broker handle received, constructing AgentRuntime.");
333
334 // Create the runtime with the initialized broker.
335 AgentRuntime(ActonInner { broker, ..Default::default() })
336 }
337}