acton_core/common/agent_handle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16use std::fmt::Debug;
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19
20use acton_ern::Ern;
21use async_trait::async_trait;
22use dashmap::DashMap;
23use std::env;
24use std::time::Duration;
25use tokio::sync::mpsc;
26use tokio::time::timeout as tokio_timeout;
27use tokio_util::task::TaskTracker;
28use tracing::{error, instrument, trace, warn}; // warn seems unused
29
30use crate::actor::{Idle, ManagedAgent};
31use crate::common::{AgentSender, BrokerRef, OutboundEnvelope, ParentRef};
32use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
33use crate::prelude::ActonMessage;
34use crate::traits::{AgentHandleInterface, Broker, Subscriber};
35
36/// A clonable handle for interacting with an agent.
37///
38/// `AgentHandle` provides the primary mechanism for communicating with and managing
39/// an agent from outside its own execution context. It encapsulates the necessary
40/// information to send messages to the agent's inbox (`outbox`), identify the agent (`id`),
41/// manage its lifecycle (`stop`), track its tasks (`tracker`), and navigate the
42/// supervision hierarchy (`parent`, `children`, `supervise`).
43///
44/// Handles can be cloned freely, allowing multiple parts of the system to hold references
45/// to the same agent. Sending messages through the handle is asynchronous.
46///
47/// Key functionalities are exposed through implemented traits:
48/// * [`AgentHandleInterface`]: Core methods for interaction (sending messages, stopping, etc.).
49/// * [`Broker`]: Methods for broadcasting messages via the system broker.
50/// * [`Subscriber`]: Method for accessing the system broker handle.
51///
52/// Equality and hashing are based solely on the agent's unique identifier (`id`).
53#[derive(Debug, Clone)]
54pub struct AgentHandle {
55 /// The unique identifier (`Ern`) for the agent this handle refers to.
56 pub(crate) id: Ern,
57 /// The sender part of the MPSC channel connected to the agent's inbox.
58 pub(crate) outbox: AgentSender,
59 /// Tracks the agent's main task and potentially other associated tasks.
60 tracker: TaskTracker,
61 /// Optional reference to the parent (supervisor) agent's handle.
62 /// `None` if this is a top-level agent. Boxed to manage `AgentHandle` size.
63 pub parent: Option<Box<ParentRef>>,
64 /// Optional reference to the system message broker's handle.
65 /// Boxed to manage `AgentHandle` size.
66 pub broker: Box<Option<BrokerRef>>,
67 /// A map holding handles to the direct children supervised by this agent.
68 /// Keys are the string representation of the child agent's `Ern`.
69 children: DashMap<String, AgentHandle>,
70 /// The agent's cancellation token (clone).
71 pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
72}
73
74impl Default for AgentHandle {
75 /// Creates a default, placeholder `AgentHandle`.
76 ///
77 /// This handle is typically initialized with a default `Ern`, a closed channel,
78 /// and no parent, broker, or children. It's primarily used as a starting point
79 /// before being properly configured when a `ManagedAgent` is created.
80 fn default() -> Self {
81 let (outbox, _) = mpsc::channel(1); // Create a dummy channel
82 AgentHandle {
83 id: Ern::default(),
84 outbox,
85 tracker: TaskTracker::new(),
86 parent: None,
87 broker: Box::new(None),
88 children: DashMap::new(),
89 cancellation_token: tokio_util::sync::CancellationToken::new(),
90 }
91 }
92}
93
94/// Implements the `Subscriber` trait, allowing access to the broker.
95impl Subscriber for AgentHandle {
96 /// Returns a clone of the optional broker handle associated with this agent.
97 ///
98 /// Returns `None` if the agent was not configured with a broker reference.
99 fn get_broker(&self) -> Option<BrokerRef> {
100 *self.broker.clone() // Clone the Option<BrokerRef> inside the Box
101 }
102}
103
104/// Implements equality comparison based on the agent's unique ID (`Ern`).
105impl PartialEq for AgentHandle {
106 fn eq(&self, other: &Self) -> bool {
107 self.id == other.id
108 }
109}
110
111/// Derives `Eq` based on the `PartialEq` implementation.
112impl Eq for AgentHandle {}
113
114/// Implements hashing based on the agent's unique ID (`Ern`).
115impl Hash for AgentHandle {
116 fn hash<H: Hasher>(&self, state: &mut H) {
117 self.id.hash(state);
118 }
119}
120
121impl AgentHandle {
122 /// Starts a child agent and registers it under this agent's supervision.
123 ///
124 /// This method takes a `ManagedAgent` configured in the [`Idle`] state,
125 /// starts its execution by calling its `start` method, and then stores
126 /// the resulting child `AgentHandle` in this parent handle's `children` map.
127 ///
128 /// # Type Parameters
129 ///
130 /// * `State`: The user-defined state type of the child agent. Must implement
131 /// `Default`, `Send`, `Debug`, and be `'static`.
132 ///
133 /// # Arguments
134 ///
135 /// * `child`: The [`ManagedAgent<Idle, State>`] instance representing the child agent
136 /// to be started and supervised.
137 ///
138 /// # Returns
139 ///
140 /// A `Result` containing:
141 /// * `Ok(AgentHandle)`: The handle of the successfully started and registered child agent.
142 /// * `Err(anyhow::Error)`: If starting the child agent fails.
143 #[instrument(skip(self, child))] // Skip child in instrument
144 pub async fn supervise<State: Default + Send + Debug + 'static>(
145 // Add 'static bound
146 &self,
147 child: ManagedAgent<Idle, State>,
148 ) -> anyhow::Result<AgentHandle> {
149 let child_id = child.id().clone(); // Get ID before consuming child
150 trace!("Supervising child agent with id: {}", child_id);
151 let handle = child.start().await; // Start the child agent
152 trace!(
153 "Child agent {} started, adding to parent {} children map",
154 child_id,
155 self.id
156 );
157 self.children.insert(handle.id.to_string(), handle.clone()); // Store child handle
158 Ok(handle)
159 }
160}
161
162/// Implements the `Broker` trait, allowing broadcasting via the associated broker.
163impl Broker for AgentHandle {
164 /// Sends a message to the associated system broker for broadcasting.
165 ///
166 /// This method wraps the provided `message` in a [`BrokerRequest`] and sends it
167 /// to the broker handle stored within this `AgentHandle`. If no broker handle
168 /// is configured, an error is logged.
169 ///
170 /// # Arguments
171 ///
172 /// * `message`: The message payload (must implement `ActonMessage`) to be broadcast.
173 fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output = ()> + Send + Sync + '_ {
174 trace!("Attempting broadcast via handle: {}", self.id);
175 async move {
176 if let Some(broker_handle) = self.broker.as_ref() {
177 trace!("Broker found for handle {}, sending BrokerRequest", self.id);
178 // Send the BrokerRequest to the actual broker agent.
179 broker_handle.send(BrokerRequest::new(message)).await;
180 } else {
181 // Log an error if no broker is configured for this agent handle.
182 error!(
183 "No broker configured for agent handle {}, cannot broadcast.",
184 self.id
185 );
186 }
187 }
188 }
189}
190
191/// Implements the core interface for interacting with an agent.
192#[async_trait]
193impl AgentHandleInterface for AgentHandle {
194 /// Returns the [`MessageAddress`] for this agent, used for sending replies.
195 #[inline]
196 fn reply_address(&self) -> MessageAddress {
197 MessageAddress::new(
198 self.outbox.clone(),
199 self.id.clone(),
200 self.cancellation_token.clone(),
201 )
202 }
203
204 /// Creates an [`OutboundEnvelope`] for sending a message from this agent.
205 ///
206 /// # Arguments
207 ///
208 /// * `recipient_address`: An optional [`MessageAddress`] specifying the recipient.
209 /// If `None`, the envelope is created without a specific recipient (e.g., for broadcasting
210 /// or when the recipient is set later).
211 ///
212 /// # Returns
213 ///
214 /// An [`OutboundEnvelope`] with the `return_address` set to this agent's address.
215 #[instrument(skip(self))]
216 fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
217 let return_address = self.reply_address();
218 trace!(sender = %return_address.sender.root, recipient = ?recipient_address.as_ref().map(|r| r.sender.root.as_str()), "Creating envelope");
219 if let Some(recipient) = recipient_address {
220 OutboundEnvelope::new_with_recipient(
221 return_address,
222 recipient,
223 self.cancellation_token.clone(),
224 )
225 } else {
226 OutboundEnvelope::new(return_address, self.cancellation_token.clone())
227 }
228 }
229
230 /// Returns a clone of the internal map containing handles to the agent's direct children.
231 ///
232 /// Provides a snapshot of the currently supervised children. Modifications to the
233 /// returned map will not affect the agent's actual children list.
234 #[inline]
235 fn children(&self) -> DashMap<String, AgentHandle> {
236 self.children.clone()
237 }
238
239 /// Searches for a direct child agent by its unique identifier (`Ern`).
240 ///
241 /// # Arguments
242 ///
243 /// * `ern`: The [`Ern`] of the child agent to find.
244 ///
245 /// # Returns
246 ///
247 /// * `Some(AgentHandle)`: If a direct child with the matching `Ern` is found.
248 /// * `None`: If no direct child with the specified `Ern` exists.
249 #[instrument(skip(self))]
250 fn find_child(&self, ern: &Ern) -> Option<AgentHandle> {
251 trace!("Searching for child with ERN: {}", ern);
252 // Access the DashMap using the ERN's string representation as the key.
253 self.children.get(&ern.to_string()).map(
254 |entry| entry.value().clone(), // Clone the handle if found
255 )
256 }
257
258 /// Returns a clone of the agent's task tracker.
259 ///
260 /// The tracker can be used to monitor the agent's main task.
261 #[inline]
262 fn tracker(&self) -> TaskTracker {
263 self.tracker.clone()
264 }
265
266 /// Returns a clone of the agent's unique Entity Resource Name (`Ern`).
267 #[inline]
268 fn id(&self) -> Ern {
269 self.id.clone()
270 }
271
272 /// Returns the agent's root name (the first part of its `Ern`) as a String.
273 #[inline]
274 fn name(&self) -> String {
275 self.id.root.to_string()
276 }
277
278 /// Returns a clone of this `AgentHandle`.
279 #[inline]
280 fn clone_ref(&self) -> AgentHandle {
281 self.clone()
282 }
283
284 /// Sends a [`SystemSignal::Terminate`] message to the agent and waits for its task to complete.
285 ///
286 /// This initiates a graceful shutdown of the agent. It sends the `Terminate` signal
287 /// to the agent's inbox and then waits on the agent's `TaskTracker` until the main
288 /// task (and potentially associated tasks) have finished execution.
289 ///
290 /// The agent's `wake` loop is responsible for handling the `Terminate` signal,
291 /// potentially running `before_stop` and `after_stop` hooks, and stopping child agents.
292 ///
293 /// # Returns
294 ///
295 /// An `anyhow::Result<()>` indicating success or failure. Failure typically occurs
296 /// if sending the `Terminate` signal to the agent's inbox fails (e.g., if the channel
297 /// is already closed).
298 #[allow(clippy::manual_async_fn)] // Keep async_trait style
299 #[instrument(skip(self))]
300 fn stop(&self) -> impl Future<Output = anyhow::Result<()>> + Send + Sync + '_ {
301 async move {
302 let tracker = self.tracker();
303
304 // Cancel this agent's token before sending the Terminate signal.
305 self.cancellation_token.cancel();
306
307 // Create an envelope to send the signal from self to self.
308 let self_envelope = self.create_envelope(Some(self.reply_address()));
309
310 trace!(actor = %self.id, "Sending Terminate signal");
311 // Send the Terminate signal. Use `?` to propagate potential send errors.
312 //self_envelope.reply(SystemSignal::Terminate)?;
313 self_envelope.send(SystemSignal::Terminate).await;
314 // Determine agent shutdown timeout from env or use default (10s)
315 let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
316 .ok()
317 .and_then(|val| val.parse().ok())
318 .unwrap_or(10_000);
319
320 trace!(actor = %self.id, timeout_ms, "Waiting for agent tasks to complete...");
321
322 // Wait for the agent's main task and any tracked tasks to finish, within timeout.
323 let wait_fut = tracker.wait();
324 match tokio_timeout(Duration::from_millis(timeout_ms), wait_fut).await {
325 Ok(()) => {
326 trace!(actor = %self.id, "Agent terminated successfully.");
327 Ok(())
328 }
329 Err(_) => {
330 error!(
331 "Shutdown timeout for agent {} after {} ms",
332 self.id, timeout_ms
333 );
334 Err(anyhow::anyhow!(
335 "Timeout while waiting for agent {} to shut down",
336 self.id
337 ))
338 }
339 }
340 }
341 }
342}