acton_core/common/agent_runtime.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20
21use acton_ern::Ern;
22use futures::future::join_all;
23use tokio::sync::oneshot;
24use tokio_util::sync::CancellationToken;
25use tracing::{error, trace}; // Added error import
26
27use crate::actor::{AgentConfig, Idle, ManagedAgent};
28use crate::common::acton_inner::ActonInner;
29use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef, ActonConfig};
30use crate::traits::AgentHandleInterface;
31
32/// Represents the initialized and active Acton agent system runtime.
33///
34/// This struct is obtained after successfully launching the system via [`ActonApp::launch()`].
35/// It holds the internal state of the running system, including a reference to the
36/// central message broker and a registry of top-level agents.
37///
38/// `AgentRuntime` provides the primary methods for interacting with the system as a whole,
39/// such as creating new top-level agents (`new_agent`, `spawn_agent`, etc.) and initiating
40/// a graceful shutdown of all agents (`shutdown_all`).
41///
42/// It is cloneable, allowing different parts of an application to hold references
43/// to the runtime environment.
44#[derive(Debug, Clone, Default)]
45pub struct AgentRuntime(pub(crate) ActonInner); // Keep inner field crate-public
46
47impl AgentRuntime {
48 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a specified root name.
49 ///
50 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state, configured with a
51 /// root [`Ern`] derived from the provided `name` and linked to the system's broker.
52 /// The agent is registered as a top-level agent within the runtime.
53 ///
54 /// The returned agent is ready for further configuration (e.g., adding message handlers
55 /// via `act_on`) before being started by calling `.start()` on it.
56 ///
57 /// # Type Parameters
58 ///
59 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
60 ///
61 /// # Arguments
62 ///
63 /// * `name`: A string that will form the root name of the agent's [`Ern`].
64 ///
65 /// # Returns
66 ///
67 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
68 ///
69 /// # Panics
70 ///
71 /// Panics if creating the root `Ern` from the provided `name` fails or if creating the internal `AgentConfig` fails.
72 pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
73 where
74 State: Default + Send + Debug + 'static,
75 {
76 let actor_config = AgentConfig::new(
77 Ern::with_root(name).expect("Failed to create root Ern for new agent"), // Use expect for clarity
78 None, // No parent for top-level agent
79 Some(self.0.broker.clone()), // Use system broker
80 )
81 .expect("Failed to create actor config");
82
83 let runtime = self.clone();
84 let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
85 trace!("Registering new top-level agent: {}", new_actor.id());
86 self.0
87 .roots
88 .insert(new_actor.id.clone(), new_actor.handle.clone());
89 new_actor
90 }
91
92 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) with a default name ("agent").
93 ///
94 /// Similar to [`AgentRuntime::new_agent_with_name`], but uses a default root name "agent"
95 /// for the agent's [`Ern`]. The agent is registered as a top-level agent within the runtime.
96 ///
97 /// The returned agent is ready for further configuration before being started via `.start()`.
98 ///
99 /// # Type Parameters
100 ///
101 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
102 ///
103 /// # Returns
104 ///
105 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
106 ///
107 /// # Panics
108 ///
109 /// Panics if creating the internal `AgentConfig` fails.
110 pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
111 where
112 State: Default + Send + Debug + 'static,
113 {
114 // Use a default name if none is provided.
115 self.new_agent_with_name("agent".to_string()).await // Reuse the named version
116 }
117
118 /// Returns the number of top-level agents currently registered in the runtime.
119 ///
120 /// This count only includes agents directly created via the `AgentRuntime` and
121 /// does not include child agents supervised by other agents.
122 #[inline]
123 pub fn agent_count(&self) -> usize {
124 self.0.roots.len()
125 }
126
127 /// Creates a new top-level agent builder (`ManagedAgent<Idle, State>`) using a provided configuration.
128 ///
129 /// This method initializes a [`ManagedAgent`] in the [`Idle`] state using the specified
130 /// [`AgentConfig`]. It ensures the agent is configured with the system's broker if not
131 /// already set in the config. The agent is registered as a top-level agent within the runtime.
132 ///
133 /// The returned agent is ready for further configuration before being started via `.start()`.
134 ///
135 /// # Type Parameters
136 ///
137 /// * `State`: The user-defined state type for the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
138 ///
139 /// # Arguments
140 ///
141 /// * `config`: The [`AgentConfig`] to use for the new agent. The broker field will be
142 /// overridden with the system broker if it's `None`.
143 ///
144 /// # Returns
145 ///
146 /// A [`ManagedAgent<Idle, State>`] instance, ready for configuration and starting.
147 pub async fn new_agent_with_config<State>(
148 &mut self,
149 mut config: AgentConfig,
150 ) -> ManagedAgent<Idle, State>
151 where
152 State: Default + Send + Debug + 'static,
153 {
154 let acton_ready = self.clone();
155 // Ensure the agent uses the system broker if none is specified.
156 if config.broker.is_none() {
157 config.broker = Some(self.0.broker.clone());
158 }
159 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
160 trace!(
161 "Created new agent builder with config, id: {}",
162 new_agent.id()
163 );
164 self.0
165 .roots
166 .insert(new_agent.id.clone(), new_agent.handle.clone());
167 new_agent
168 }
169
170 /// Returns a clone of the handle ([`BrokerRef`]) to the system's central message broker.
171 #[inline]
172 pub fn broker(&self) -> BrokerRef {
173 self.0.broker.clone()
174 }
175
176 /// Creates, configures, and starts a top-level agent using a provided configuration and setup function.
177 ///
178 /// This method combines agent creation (using `config`), custom asynchronous setup (`setup_fn`),
179 /// and starting the agent. The `setup_fn` receives the agent in the `Idle` state, performs
180 /// necessary configurations (like adding message handlers), and must call `.start()` to
181 /// transition the agent to the `Started` state, returning its `AgentHandle`.
182 ///
183 /// The agent is registered as a top-level agent within the runtime.
184 ///
185 /// # Type Parameters
186 ///
187 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
188 ///
189 /// # Arguments
190 ///
191 /// * `config`: The [`AgentConfig`] to use for creating the agent. The broker field will be
192 /// overridden with the system broker if it's `None`.
193 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
194 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
195 ///
196 /// # Returns
197 ///
198 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
199 /// agent creation or the `setup_fn` fails.
200 pub async fn spawn_agent_with_setup_fn<State>(
201 &mut self,
202 mut config: AgentConfig,
203 setup_fn: impl FnOnce(
204 ManagedAgent<Idle, State>,
205 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
206 ) -> anyhow::Result<AgentHandle>
207 where
208 State: Default + Send + Debug + 'static,
209 {
210 let acton_ready = self.clone();
211 if config.broker.is_none() {
212 config.broker = Some(self.0.broker.clone());
213 }
214
215 let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
216 let agent_id = new_agent.id().clone(); // Get ID before moving
217 trace!("Running setup function for agent: {}", agent_id);
218 let handle = setup_fn(new_agent).await; // Setup function consumes the agent and returns handle
219 trace!("Agent {} setup complete, registering handle.", agent_id);
220 self.0.roots.insert(handle.id.clone(), handle.clone()); // Register the returned handle
221 Ok(handle)
222 }
223
224 /// Initiates a graceful shutdown of the entire Acton system.
225 ///
226 /// This method attempts to stop all registered top-level agents (and consequently their
227 /// descendant children through the `stop` propagation mechanism) by sending them a
228 /// [`SystemSignal::Terminate`]. It waits for all top-level agent tasks to complete.
229 /// Finally, it stops the central message broker agent.
230 ///
231 /// # Returns
232 ///
233 /// An `anyhow::Result<()>` indicating whether the shutdown process completed successfully.
234 /// Errors during the stopping of individual agents or the broker will be propagated.
235 pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
236 use std::time::Duration;
237 use tokio::time::timeout as tokio_timeout;
238
239 // Phase 1: Concurrently signal all root agents to terminate gracefully.
240 trace!("Sending Terminate signal to all root agents.");
241 let stop_futures: Vec<_> = self
242 .0
243 .roots
244 .iter()
245 .map(|item| {
246 let handle = item.value().clone();
247 async move {
248 if let Err(e) = handle.stop().await {
249 error!("Error stopping agent {}: {:?}", handle.id(), e);
250 }
251 }
252 })
253 .collect();
254
255 let timeout_ms = self.0.config.system_shutdown_timeout().as_millis() as u64;
256
257 trace!("Waiting for all agents to finish gracefully...");
258 if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
259 .await
260 .is_err()
261 {
262 error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
263 self.0.cancellation_token.cancel(); // Forceful cancellation
264 } else {
265 trace!("All agents completed gracefully.");
266 }
267
268 trace!("Stopping the system broker...");
269 // Stop the broker agent, using same system shutdown timeout.
270 match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
271 Ok(res) => res?,
272 Err(_) => {
273 error!(
274 "Timeout waiting for broker to shut down after {} ms",
275 timeout_ms
276 );
277 return Err(anyhow::anyhow!(
278 "Timeout while waiting for system broker to shut down after {} ms",
279 timeout_ms
280 ));
281 }
282 }
283 trace!("System shutdown complete.");
284 Ok(())
285 }
286
287 /// Creates, configures, and starts a top-level agent using a default configuration and a setup function.
288 ///
289 /// This is a convenience method similar to [`AgentRuntime::spawn_agent_with_setup_fn`], but it
290 /// automatically creates a default `AgentConfig` (with a default name and the system broker).
291 /// The provided `setup_fn` configures and starts the agent.
292 ///
293 /// The agent is registered as a top-level agent within the runtime.
294 ///
295 /// # Type Parameters
296 ///
297 /// * `State`: The state type of the agent. Must implement `Default`, `Send`, `Debug`, and be `'static`.
298 ///
299 /// # Arguments
300 ///
301 /// * `setup_fn`: An asynchronous closure that takes the `ManagedAgent<Idle, State>`, configures it,
302 /// calls `.start()`, and returns the resulting `AgentHandle`. The closure must be `Send + 'static`.
303 ///
304 /// # Returns
305 ///
306 /// A `Result` containing the `AgentHandle` of the successfully spawned agent, or an error if
307 /// agent creation or the `setup_fn` fails.
308 ///
309 /// # Errors
310 ///
311 /// Returns an error if the default `AgentConfig` cannot be created.
312 pub async fn spawn_agent<State>(
313 &mut self,
314 setup_fn: impl FnOnce(
315 ManagedAgent<Idle, State>,
316 ) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
317 ) -> anyhow::Result<AgentHandle>
318 where
319 State: Default + Send + Debug + 'static,
320 {
321 // Create a default config, ensuring the system broker is included.
322 let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
323 // Reuse the more general spawn function.
324 self.spawn_agent_with_setup_fn(config, setup_fn).await
325 }
326}
327
328/// Converts an [`ActonApp`] marker into an initialized `AgentRuntime`.
329///
330/// This implementation defines the system bootstrap process triggered by [`ActonApp::launch()`].
331/// It performs the following steps:
332/// 1. Loads configuration from XDG-compliant locations using [`ActonConfig::load()`].
333/// 2. Spawns a background Tokio task dedicated to initializing the [`AgentBroker`].
334/// 3. Uses a `oneshot` channel to receive the `AgentHandle` of the initialized broker
335/// back from the background task.
336/// 4. **Blocks the current thread** using `tokio::task::block_in_place` while waiting
337/// for the broker initialization to complete. This ensures that `ActonApp::launch()`
338/// does not return until the core system components (like the broker) are ready.
339/// 5. Constructs the `AgentRuntime` using the received broker handle and loaded configuration.
340///
341/// **Warning**: The use of `block_in_place` means this conversion should typically
342/// only happen once at the very start of the application within the main thread
343/// or a dedicated initialization thread, before the main asynchronous workload begins.
344/// Calling this from within an existing Tokio runtime task could lead to deadlocks
345/// or performance issues.
346impl From<ActonApp> for AgentRuntime {
347 fn from(_acton: ActonApp) -> Self {
348 trace!("Starting Acton system initialization (From<ActonApp>)");
349
350 // Load configuration from XDG-compliant locations
351 let config = ActonConfig::load();
352 trace!("Configuration loaded: {:?}", config);
353
354 let (sender, receiver) = oneshot::channel();
355
356 // Create runtime with loaded configuration
357 let mut runtime = AgentRuntime(ActonInner {
358 broker: Default::default(),
359 roots: Default::default(),
360 cancellation_token: CancellationToken::new(),
361 config,
362 });
363
364 // Spawn broker initialization in a separate task
365 let runtime_clone = runtime.clone();
366
367 // Assert that the cancellation_token is present in the clone before broker initialization
368 assert!(
369 !runtime_clone.0.cancellation_token.is_cancelled(),
370 "ActonInner cancellation_token must be present and active before Broker initialization"
371 );
372
373 tokio::spawn(async move {
374 trace!("Broker initialization task started.");
375 let broker = AgentBroker::initialize(runtime_clone).await;
376 trace!("Broker initialization task finished, sending handle.");
377 let _ = sender.send(broker); // Send broker handle back
378 });
379
380 trace!("Blocking current thread to wait for broker initialization...");
381 // Block until the broker handle is received
382 let broker = tokio::task::block_in_place(|| {
383 tokio::runtime::Handle::current()
384 .block_on(async { receiver.await.expect("Broker initialization failed") })
385 });
386 trace!("Broker handle received, constructing AgentRuntime.");
387 runtime.0.broker = broker;
388
389 // Create the runtime with the initialized broker and configuration
390 runtime
391 }
392}