acton_core/common/agent_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::channel::oneshot::Canceled;
23use futures::future::join_all;
24use tokio::sync::oneshot;
25use tokio_util::sync::CancellationToken;
26use tracing::{error, trace}; // Added error import
27
28use crate::actor::{AgentConfig, Idle, ManagedAgent};
29use crate::common::acton_inner::ActonInner;
30use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
31use crate::traits::AgentHandleInterface;
32
33/// Represents the initialized and active Acton agent system runtime.
34///
35/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
36/// It holds the internal state of the running system, including a reference to the
37/// central message broker and a registry of top-level agents.
38///
39/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
40/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
41/// a graceful shutdown of all agents (`shutdown_all`).
42///
43/// It is cloneable, allowing different parts of an application to hold references
44/// to the runtime environment.
45#[derive(Debug, Clone, Default)]
46pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
47
48impl AgentRuntime {
49 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
50 ///
51 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
52 /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
53 /// The agent is registered as a top-level agent within the runtime.
54 ///
55 /// The returned agent is ready for further configuration (e.g., adding message handlers
56 /// via `act_on`) before being started by calling `.start()` on it.
57 ///
58 /// # Type Parameters
59 ///
60 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
61 ///
62 /// # Arguments
63 ///
64 /// * `name`: A string that will form the root name of the agent's [`Ern`].
65 ///
66 /// # Returns
67 ///
68 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
69 ///
70 /// # Panics
71 ///
72 /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
73 pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
74 where
75 State: Default + Send + Debug + 'static,
76 {
77 let actor_config = AgentConfig::new(
78 Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
79 None, // No parent for top-level agent
80 Some(self.0.broker.clone()), // Use system broker
81 )
82 .expect("Failed to create actor config");
83
84 let runtime = self.clone();
85 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
86 trace!("Registering new top-level agent: {}", new_actor.id());
87 self.0
88 .roots
89 .insert(new_actor.id.clone(), new_actor.handle.clone());
90 new_actor
91 }
92
93 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
94 ///
95 /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
96 /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
97 ///
98 /// The returned agent is ready for further configuration before being started via `.start()`.
99 ///
100 /// # Type Parameters
101 ///
102 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
103 ///
104 /// # Returns
105 ///
106 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
107 ///
108 /// # Panics
109 ///
110 /// Panics if creating the internal `AgentConfig` fails.
111 pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
112 where
113 State: Default + Send + Debug + 'static,
114 {
115 // Use a default name if none is provided.
116 self.new_agent_with_name("agent".to_string()).await // Reuse the named version
117 }
118
119 /// Returns the number of top-level agents currently registered in the runtime.
120 ///
121 /// This count only includes agents directly created via the `AgentRuntime` and
122 /// does not include child agents supervised by other agents.
123 #[inline]
124 pub fn agent_count(&self) -> usize {
125 self.0.roots.len()
126 }
127
128 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
129 ///
130 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
131 /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
132 /// already set in the config. The agent is registered as a top-level agent within the runtime.
133 ///
134 /// The returned agent is ready for further configuration before being started via `.start()`.
135 ///
136 /// # Type Parameters
137 ///
138 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
139 ///
140 /// # Arguments
141 ///
142 /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
143 /// overridden with the system broker if it's `None`.
144 ///
145 /// # Returns
146 ///
147 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
148 pub async fn new_agent_with_config<State>(
149 &mut self,
150 mut config: AgentConfig,
151 ) -> ManagedAgent<Idle, State>
152 where
153 State: Default + Send + Debug + 'static,
154 {
155 let acton_ready = self.clone();
156 // Ensure the agent uses the system broker if none is specified.
157 if config.broker.is_none() {
158 config.broker = Some(self.0.broker.clone());
159 }
160 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
161 trace!(
162 "Created new agent builder with config, id: {}",
163 new_agent.id()
164 );
165 self.0
166 .roots
167 .insert(new_agent.id.clone(), new_agent.handle.clone());
168 new_agent
169 }
170
171 /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
172 #[inline]
173 pub fn broker(&self) -> BrokerRef {
174 self.0.broker.clone()
175 }
176
177 /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
178 ///
179 /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
180 /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
181 /// necessary configurations (like adding message handlers), and must call `.start()` to
182 /// transition the agent to the `Started` state, returning its `AgentHandle`.
183 ///
184 /// The agent is registered as a top-level agent within the runtime.
185 ///
186 /// # Type Parameters
187 ///
188 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
189 ///
190 /// # Arguments
191 ///
192 /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
193 /// overridden with the system broker if it's `None`.
194 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
195 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
196 ///
197 /// # Returns
198 ///
199 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
200 /// agent creation or the `setup_fn` fails.
201 pub async fn spawn_agent_with_setup_fn<State>(
202 &mut self,
203 mut config: AgentConfig,
204 setup_fn: impl FnOnce(
205 ManagedAgent<Idle, State>,
206 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
207 ) -> anyhow::Result<AgentHandle>
208 where
209 State: Default + Send + Debug + 'static,
210 {
211 let acton_ready = self.clone();
212 if config.broker.is_none() {
213 config.broker = Some(self.0.broker.clone());
214 }
215
216 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
217 let agent_id = new_agent.id().clone(); // Get ID before moving
218 trace!("Running setup function for agent: {}", agent_id);
219 let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
220 trace!("Agent {} setup complete, registering handle.", agent_id);
221 self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
222 Ok(handle)
223 }
224
225 /// Initiates a graceful shutdown of the entire Acton system.
226 ///
227 /// This method attempts to stop all registered top-level agents (and consequently their
228 /// descendant children through the `stop` propagation mechanism) by sending them a
229 /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
230 /// Finally, it stops the central message broker agent.
231 ///
232 /// # Returns
233 ///
234 /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
235 /// Errors during the stopping of individual agents or the broker will be propagated.
236 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
237 use std::env;
238 use std::time::Duration;
239 use tokio::time::timeout as tokio_timeout;
240
241 trace!("Initiating shutdown of all top-level agents...");
242 // Trigger the system-wide cancellation_token for all agents.
243 self.0.cancellation_token.cancel();
244
245 // Wait for all agent tracker tasks to complete, draining all roots' trackers.
246 // This ensures that all root agents (and their children) have finished execution.
247 // The timeouts control how long we wait for all trackers to resolve.
248 let timeout_ms: u64 = env::var("ACTON_SYSTEM_SHUTDOWN_TIMEOUT_MS")
249 .ok()
250 .and_then(|val| val.parse().ok())
251 .unwrap_or(30_000);
252
253 let tracker_futures: Vec<_> = self
254 .0
255 .roots
256 .iter()
257 .map(|item| {
258 let root_handle = item.value().clone();
259 async move {
260 trace!("Waiting for tracker for root agent: {}", root_handle.id());
261 root_handle.tracker().wait().await;
262 }
263 })
264 .collect();
265
266 match tokio_timeout(Duration::from_millis(timeout_ms), join_all(tracker_futures)).await {
267 Ok(_) => trace!("All agent trackers completed."),
268 Err(_) => {
269 error!("System-wide tracker shutdown timeout expired after {} ms. Some agent tasks may remain running.", timeout_ms);
270 return Err(anyhow::anyhow!(
271 "Timeout: not all agent tracker tasks completed within {} ms",
272 timeout_ms
273 ));
274 }
275 };
276
277 trace!("Stopping the system broker...");
278 // Stop the broker agent, using same system shutdown timeout.
279 match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
280 Ok(res) => res?,
281 Err(_) => {
282 error!(
283 "Timeout waiting for broker to shut down after {} ms",
284 timeout_ms
285 );
286 return Err(anyhow::anyhow!(
287 "Timeout while waiting for system broker to shut down after {} ms",
288 timeout_ms
289 ));
290 }
291 }
292 trace!("System shutdown complete.");
293 Ok(())
294 }
295
296 /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
297 ///
298 /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
299 /// automatically creates a default `AgentConfig` (with a default name and the system broker).
300 /// The provided `setup_fn` configures and starts the agent.
301 ///
302 /// The agent is registered as a top-level agent within the runtime.
303 ///
304 /// # Type Parameters
305 ///
306 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
307 ///
308 /// # Arguments
309 ///
310 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
311 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
312 ///
313 /// # Returns
314 ///
315 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
316 /// agent creation or the `setup_fn` fails.
317 ///
318 /// # Errors
319 ///
320 /// Returns an error if the default `AgentConfig` cannot be created.
321 pub async fn spawn_agent<State>(
322 &mut self,
323 setup_fn: impl FnOnce(
324 ManagedAgent<Idle, State>,
325 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
326 ) -> anyhow::Result<AgentHandle>
327 where
328 State: Default + Send + Debug + 'static,
329 {
330 // Create a default config, ensuring the system broker is included.
331 let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
332 // Reuse the more general spawn function.
333 self.spawn_agent_with_setup_fn(config, setup_fn).await
334 }
335}
336
337/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
338///
339/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
340/// It performs the following steps:
341/// 1. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
342/// 2. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
343/// back from the background task.
344/// 3. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
345/// for the broker initialization to complete. This ensures that `ActonApp::launch()`
346/// does not return until the core system components (like the broker) are ready.
347/// 4. Constructs the `AgentRuntime` using the received broker handle.
348///
349/// **Warning**: The use of `block_in_place` means this conversion should typically
350/// only happen once at the very start of the application within the main thread
351/// or a dedicated initialization thread, before the main asynchronous workload begins.
352/// Calling this from within an existing Tokio runtime task could lead to deadlocks
353/// or performance issues.
354impl From<ActonApp> for AgentRuntime {
355 fn from(_acton: ActonApp) -> Self {
356 trace!("Starting Acton system initialization (From<ActonApp>)");
357 let (sender, receiver) = oneshot::channel();
358 // We do this so the broker gets access to the cancellation token.
359 let mut runtime = AgentRuntime(ActonInner::default());
360 // Spawn broker initialization in a separate task.
361 let runtime_clone = runtime.clone();
362 // Assert that the cancellation_token is present in the clone before broker initialization
363 assert!(
364 !runtime_clone.0.cancellation_token.is_cancelled(),
365 "ActonInner cancellation_token must be present and active before Broker initialization"
366 );
367 tokio::spawn(async move {
368 trace!("Broker initialization task started.");
369 let broker = AgentBroker::initialize(runtime_clone).await;
370 trace!("Broker initialization task finished, sending handle.");
371 let _ = sender.send(broker); // Send broker handle back
372 });
373
374 trace!("Blocking current thread to wait for broker initialization...");
375 // Block until the broker handle is received.
376 let broker = tokio::task::block_in_place(|| {
377 tokio::runtime::Handle::current()
378 .block_on(async { receiver.await.expect("Broker initialization failed") })
379 });
380 trace!("Broker handle received, constructing AgentRuntime.");
381 runtime.0.broker = broker;
382
383 // Create the runtime with the initialized broker.
384 runtime
385 }
386}