acton_core/common/agent_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tracing::{error, trace}; // Added error import
25
26use crate::actor::{AgentConfig, Idle, ManagedAgent};
27use crate::common::acton_inner::ActonInner;
28use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
29use crate::traits::AgentHandleInterface;
30
31/// Represents the initialized and active Acton agent system runtime.
32///
33/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
34/// It holds the internal state of the running system, including a reference to the
35/// central message broker and a registry of top-level agents.
36///
37/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
38/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
39/// a graceful shutdown of all agents (`shutdown_all`).
40///
41/// It is cloneable, allowing different parts of an application to hold references
42/// to the runtime environment.
43#[derive(Debug, Clone, Default)]
44pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
45
46impl AgentRuntime {
47 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
48 ///
49 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
50 /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
51 /// The agent is registered as a top-level agent within the runtime.
52 ///
53 /// The returned agent is ready for further configuration (e.g., adding message handlers
54 /// via `act_on`) before being started by calling `.start()` on it.
55 ///
56 /// # Type Parameters
57 ///
58 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
59 ///
60 /// # Arguments
61 ///
62 /// * `name`: A string that will form the root name of the agent's [`Ern`].
63 ///
64 /// # Returns
65 ///
66 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
67 ///
68 /// # Panics
69 ///
70 /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
71 pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
72 where
73 State: Default + Send + Debug + 'static,
74 {
75 let actor_config = AgentConfig::new(
76 Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
77 None, // No parent for top-level agent
78 Some(self.0.broker.clone()), // Use system broker
79 )
80 .expect("Failed to create actor config");
81
82 let runtime = self.clone();
83 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
84 trace!("Registering new top-level agent: {}", new_actor.id());
85 self.0
86 .roots
87 .insert(new_actor.id.clone(), new_actor.handle.clone());
88 new_actor
89 }
90
91 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
92 ///
93 /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
94 /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
95 ///
96 /// The returned agent is ready for further configuration before being started via `.start()`.
97 ///
98 /// # Type Parameters
99 ///
100 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
101 ///
102 /// # Returns
103 ///
104 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
105 ///
106 /// # Panics
107 ///
108 /// Panics if creating the internal `AgentConfig` fails.
109 pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
110 where
111 State: Default + Send + Debug + 'static,
112 {
113 // Use a default name if none is provided.
114 self.new_agent_with_name("agent".to_string()).await // Reuse the named version
115 }
116
117 /// Returns the number of top-level agents currently registered in the runtime.
118 ///
119 /// This count only includes agents directly created via the `AgentRuntime` and
120 /// does not include child agents supervised by other agents.
121 #[inline]
122 pub fn agent_count(&self) -> usize {
123 self.0.roots.len()
124 }
125
126 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
127 ///
128 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
129 /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
130 /// already set in the config. The agent is registered as a top-level agent within the runtime.
131 ///
132 /// The returned agent is ready for further configuration before being started via `.start()`.
133 ///
134 /// # Type Parameters
135 ///
136 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
137 ///
138 /// # Arguments
139 ///
140 /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
141 /// overridden with the system broker if it's `None`.
142 ///
143 /// # Returns
144 ///
145 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
146 pub async fn new_agent_with_config<State>(
147 &mut self,
148 mut config: AgentConfig,
149 ) -> ManagedAgent<Idle, State>
150 where
151 State: Default + Send + Debug + 'static,
152 {
153 let acton_ready = self.clone();
154 // Ensure the agent uses the system broker if none is specified.
155 if config.broker.is_none() {
156 config.broker = Some(self.0.broker.clone());
157 }
158 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
159 trace!(
160 "Created new agent builder with config, id: {}",
161 new_agent.id()
162 );
163 self.0
164 .roots
165 .insert(new_agent.id.clone(), new_agent.handle.clone());
166 new_agent
167 }
168
169 /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
170 #[inline]
171 pub fn broker(&self) -> BrokerRef {
172 self.0.broker.clone()
173 }
174
175 /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
176 ///
177 /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
178 /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
179 /// necessary configurations (like adding message handlers), and must call `.start()` to
180 /// transition the agent to the `Started` state, returning its `AgentHandle`.
181 ///
182 /// The agent is registered as a top-level agent within the runtime.
183 ///
184 /// # Type Parameters
185 ///
186 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
187 ///
188 /// # Arguments
189 ///
190 /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
191 /// overridden with the system broker if it's `None`.
192 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
193 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
194 ///
195 /// # Returns
196 ///
197 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
198 /// agent creation or the `setup_fn` fails.
199 pub async fn spawn_agent_with_setup_fn<State>(
200 &mut self,
201 mut config: AgentConfig,
202 setup_fn: impl FnOnce(
203 ManagedAgent<Idle, State>,
204 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
205 ) -> anyhow::Result<AgentHandle>
206 where
207 State: Default + Send + Debug + 'static,
208 {
209 let acton_ready = self.clone();
210 if config.broker.is_none() {
211 config.broker = Some(self.0.broker.clone());
212 }
213
214 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
215 let agent_id = new_agent.id().clone(); // Get ID before moving
216 trace!("Running setup function for agent: {}", agent_id);
217 let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
218 trace!("Agent {} setup complete, registering handle.", agent_id);
219 self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
220 Ok(handle)
221 }
222
223 /// Initiates a graceful shutdown of the entire Acton system.
224 ///
225 /// This method attempts to stop all registered top-level agents (and consequently their
226 /// descendant children through the `stop` propagation mechanism) by sending them a
227 /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
228 /// Finally, it stops the central message broker agent.
229 ///
230 /// # Returns
231 ///
232 /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
233 /// Errors during the stopping of individual agents or the broker will be propagated.
234 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
235 use std::env;
236 use std::time::Duration;
237 use tokio::time::timeout as tokio_timeout;
238
239 // Phase 1: Concurrently signal all root agents to terminate gracefully.
240 trace!("Sending Terminate signal to all root agents.");
241 let stop_futures: Vec<_> = self
242 .0
243 .roots
244 .iter()
245 .map(|item| {
246 let handle = item.value().clone();
247 async move {
248 if let Err(e) = handle.stop().await {
249 error!("Error stopping agent {}: {:?}", handle.id(), e);
250 }
251 }
252 })
253 .collect();
254
255 let timeout_ms: u64 = env::var("ACTON_SYSTEM_SHUTDOWN_TIMEOUT_MS")
256 .ok()
257 .and_then(|val| val.parse().ok())
258 .unwrap_or(30_000);
259
260 trace!("Waiting for all agents to finish gracefully...");
261 if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
262 .await
263 .is_err()
264 {
265 error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
266 self.0.cancellation_token.cancel(); // Forceful cancellation
267 } else {
268 trace!("All agents completed gracefully.");
269 }
270
271 trace!("Stopping the system broker...");
272 // Stop the broker agent, using same system shutdown timeout.
273 match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
274 Ok(res) => res?,
275 Err(_) => {
276 error!(
277 "Timeout waiting for broker to shut down after {} ms",
278 timeout_ms
279 );
280 return Err(anyhow::anyhow!(
281 "Timeout while waiting for system broker to shut down after {} ms",
282 timeout_ms
283 ));
284 }
285 }
286 trace!("System shutdown complete.");
287 Ok(())
288 }
289
290 /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
291 ///
292 /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
293 /// automatically creates a default `AgentConfig` (with a default name and the system broker).
294 /// The provided `setup_fn` configures and starts the agent.
295 ///
296 /// The agent is registered as a top-level agent within the runtime.
297 ///
298 /// # Type Parameters
299 ///
300 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
301 ///
302 /// # Arguments
303 ///
304 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
305 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
306 ///
307 /// # Returns
308 ///
309 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
310 /// agent creation or the `setup_fn` fails.
311 ///
312 /// # Errors
313 ///
314 /// Returns an error if the default `AgentConfig` cannot be created.
315 pub async fn spawn_agent<State>(
316 &mut self,
317 setup_fn: impl FnOnce(
318 ManagedAgent<Idle, State>,
319 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
320 ) -> anyhow::Result<AgentHandle>
321 where
322 State: Default + Send + Debug + 'static,
323 {
324 // Create a default config, ensuring the system broker is included.
325 let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
326 // Reuse the more general spawn function.
327 self.spawn_agent_with_setup_fn(config, setup_fn).await
328 }
329}
330
331/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
332///
333/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
334/// It performs the following steps:
335/// 1. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
336/// 2. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
337/// back from the background task.
338/// 3. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
339/// for the broker initialization to complete. This ensures that `ActonApp::launch()`
340/// does not return until the core system components (like the broker) are ready.
341/// 4. Constructs the `AgentRuntime` using the received broker handle.
342///
343/// **Warning**: The use of `block_in_place` means this conversion should typically
344/// only happen once at the very start of the application within the main thread
345/// or a dedicated initialization thread, before the main asynchronous workload begins.
346/// Calling this from within an existing Tokio runtime task could lead to deadlocks
347/// or performance issues.
348impl From<ActonApp> for AgentRuntime {
349 fn from(_acton: ActonApp) -> Self {
350 trace!("Starting Acton system initialization (From<ActonApp>)");
351 let (sender, receiver) = oneshot::channel();
352 // We do this so the broker gets access to the cancellation token.
353 let mut runtime = AgentRuntime(ActonInner::default());
354 // Spawn broker initialization in a separate task.
355 let runtime_clone = runtime.clone();
356 // Assert that the cancellation_token is present in the clone before broker initialization
357 assert!(
358 !runtime_clone.0.cancellation_token.is_cancelled(),
359 "ActonInner cancellation_token must be present and active before Broker initialization"
360 );
361 tokio::spawn(async move {
362 trace!("Broker initialization task started.");
363 let broker = AgentBroker::initialize(runtime_clone).await;
364 trace!("Broker initialization task finished, sending handle.");
365 let _ = sender.send(broker); // Send broker handle back
366 });
367
368 trace!("Blocking current thread to wait for broker initialization...");
369 // Block until the broker handle is received.
370 let broker = tokio::task::block_in_place(|| {
371 tokio::runtime::Handle::current()
372 .block_on(async { receiver.await.expect("Broker initialization failed") })
373 });
374 trace!("Broker handle received, constructing AgentRuntime.");
375 runtime.0.broker = broker;
376
377 // Create the runtime with the initialized broker.
378 runtime
379 }
380}