acton_core/common/agent_handle.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16use std::fmt::Debug;
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19
20use acton_ern::{Ern};
21use async_trait::async_trait;
22use dashmap::DashMap;
23use tokio::sync::mpsc;
24use tokio_util::task::TaskTracker;
25use tracing::{error, instrument, trace, warn}; // warn seems unused
26
27use crate::actor::{Idle, ManagedAgent};
28use crate::common::{BrokerRef, OutboundEnvelope, AgentSender, ParentRef};
29use crate::message::{BrokerRequest, MessageAddress, SystemSignal};
30use crate::prelude::ActonMessage;
31use crate::traits::{AgentHandleInterface, Broker, Subscriber};
32
33/// A clonable handle for interacting with an agent.
34///
35/// `AgentHandle` provides the primary mechanism for communicating with and managing
36/// an agent from outside its own execution context. It encapsulates the necessary
37/// information to send messages to the agent's inbox (`outbox`), identify the agent (`id`),
38/// manage its lifecycle (`stop`), track its tasks (`tracker`), and navigate the
39/// supervision hierarchy (`parent`, `children`, `supervise`).
40///
41/// Handles can be cloned freely, allowing multiple parts of the system to hold references
42/// to the same agent. Sending messages through the handle is asynchronous.
43///
44/// Key functionalities are exposed through implemented traits:
45/// * [`AgentHandleInterface`]: Core methods for interaction (sending messages, stopping, etc.).
46/// * [`Broker`]: Methods for broadcasting messages via the system broker.
47/// * [`Subscriber`]: Method for accessing the system broker handle.
48///
49/// Equality and hashing are based solely on the agent's unique identifier (`id`).
50#[derive(Debug, Clone)]
51pub struct AgentHandle {
52 /// The unique identifier (`Ern`) for the agent this handle refers to.
53 pub(crate) id: Ern,
54 /// The sender part of the MPSC channel connected to the agent's inbox.
55 pub(crate) outbox: AgentSender,
56 /// Tracks the agent's main task and potentially other associated tasks.
57 tracker: TaskTracker,
58 /// Optional reference to the parent (supervisor) agent's handle.
59 /// `None` if this is a top-level agent. Boxed to manage `AgentHandle` size.
60 pub parent: Option<Box<ParentRef>>,
61 /// Optional reference to the system message broker's handle.
62 /// Boxed to manage `AgentHandle` size.
63 pub broker: Box<Option<BrokerRef>>,
64 /// A map holding handles to the direct children supervised by this agent.
65 /// Keys are the string representation of the child agent's `Ern`.
66 children: DashMap<String, AgentHandle>,
67}
68
69impl Default for AgentHandle {
70 /// Creates a default, placeholder `AgentHandle`.
71 ///
72 /// This handle is typically initialized with a default `Ern`, a closed channel,
73 /// and no parent, broker, or children. It's primarily used as a starting point
74 /// before being properly configured when a `ManagedAgent` is created.
75 fn default() -> Self {
76 let (outbox, _) = mpsc::channel(1); // Create a dummy channel
77 AgentHandle {
78 id: Ern::default(),
79 outbox,
80 tracker: TaskTracker::new(),
81 parent: None,
82 broker: Box::new(None),
83 children: DashMap::new(),
84 }
85 }
86}
87
88/// Implements the `Subscriber` trait, allowing access to the broker.
89impl Subscriber for AgentHandle {
90 /// Returns a clone of the optional broker handle associated with this agent.
91 ///
92 /// Returns `None` if the agent was not configured with a broker reference.
93 fn get_broker(&self) -> Option<BrokerRef> {
94 *self.broker.clone() // Clone the Option<BrokerRef> inside the Box
95 }
96}
97
98/// Implements equality comparison based on the agent's unique ID (`Ern`).
99impl PartialEq for AgentHandle {
100 fn eq(&self, other: &Self) -> bool {
101 self.id == other.id
102 }
103}
104
105/// Derives `Eq` based on the `PartialEq` implementation.
106impl Eq for AgentHandle {}
107
108/// Implements hashing based on the agent's unique ID (`Ern`).
109impl Hash for AgentHandle {
110 fn hash<H: Hasher>(&self, state: &mut H) {
111 self.id.hash(state);
112 }
113}
114
115impl AgentHandle {
116 /// Starts a child agent and registers it under this agent's supervision.
117 ///
118 /// This method takes a `ManagedAgent` configured in the [`Idle`] state,
119 /// starts its execution by calling its `start` method, and then stores
120 /// the resulting child `AgentHandle` in this parent handle's `children` map.
121 ///
122 /// # Type Parameters
123 ///
124 /// * `State`: The user-defined state type of the child agent. Must implement
125 /// `Default`, `Send`, `Debug`, and be `'static`.
126 ///
127 /// # Arguments
128 ///
129 /// * `child`: The [`ManagedAgent<Idle, State>`] instance representing the child agent
130 /// to be started and supervised.
131 ///
132 /// # Returns
133 ///
134 /// A `Result` containing:
135 /// * `Ok(AgentHandle)`: The handle of the successfully started and registered child agent.
136 /// * `Err(anyhow::Error)`: If starting the child agent fails.
137 #[instrument(skip(self, child))] // Skip child in instrument
138 pub async fn supervise<State: Default + Send + Debug + 'static>( // Add 'static bound
139 &self,
140 child: ManagedAgent<Idle, State>,
141 ) -> anyhow::Result<AgentHandle> {
142 let child_id = child.id().clone(); // Get ID before consuming child
143 trace!("Supervising child agent with id: {}", child_id);
144 let handle = child.start().await; // Start the child agent
145 trace!("Child agent {} started, adding to parent {} children map", child_id, self.id);
146 self.children.insert(handle.id.to_string(), handle.clone()); // Store child handle
147 Ok(handle)
148 }
149}
150
151/// Implements the `Broker` trait, allowing broadcasting via the associated broker.
152impl Broker for AgentHandle {
153 /// Sends a message to the associated system broker for broadcasting.
154 ///
155 /// This method wraps the provided `message` in a [`BrokerRequest`] and sends it
156 /// to the broker handle stored within this `AgentHandle`. If no broker handle
157 /// is configured, an error is logged.
158 ///
159 /// # Arguments
160 ///
161 /// * `message`: The message payload (must implement `ActonMessage`) to be broadcast.
162 fn broadcast(&self, message: impl ActonMessage) -> impl Future<Output=()> + Send + Sync + '_ {
163 trace!("Attempting broadcast via handle: {}", self.id);
164 async move {
165 if let Some(broker_handle) = self.broker.as_ref() {
166 trace!("Broker found for handle {}, sending BrokerRequest", self.id);
167 // Send the BrokerRequest to the actual broker agent.
168 broker_handle.send(BrokerRequest::new(message)).await;
169 } else {
170 // Log an error if no broker is configured for this agent handle.
171 error!("No broker configured for agent handle {}, cannot broadcast.", self.id);
172 }
173 }
174 }
175}
176
177/// Implements the core interface for interacting with an agent.
178#[async_trait]
179impl AgentHandleInterface for AgentHandle {
180 /// Returns the [`MessageAddress`] for this agent, used for sending replies.
181 #[inline]
182 fn reply_address(&self) -> MessageAddress {
183 MessageAddress::new(self.outbox.clone(), self.id.clone())
184 }
185
186 /// Creates an [`OutboundEnvelope`] for sending a message from this agent.
187 ///
188 /// # Arguments
189 ///
190 /// * `recipient_address`: An optional [`MessageAddress`] specifying the recipient.
191 /// If `None`, the envelope is created without a specific recipient (e.g., for broadcasting
192 /// or when the recipient is set later).
193 ///
194 /// # Returns
195 ///
196 /// An [`OutboundEnvelope`] with the `return_address` set to this agent's address.
197 #[instrument(skip(self))]
198 fn create_envelope(&self, recipient_address: Option<MessageAddress>) -> OutboundEnvelope {
199 let return_address = self.reply_address();
200 trace!(sender = %return_address.sender.root, recipient = ?recipient_address.as_ref().map(|r| r.sender.root.as_str()), "Creating envelope");
201 if let Some(recipient) = recipient_address {
202 OutboundEnvelope::new_with_recipient(return_address, recipient)
203 } else {
204 OutboundEnvelope::new(return_address)
205 }
206 }
207
208 /// Returns a clone of the internal map containing handles to the agent's direct children.
209 ///
210 /// Provides a snapshot of the currently supervised children. Modifications to the
211 /// returned map will not affect the agent's actual children list.
212 #[inline]
213 fn children(&self) -> DashMap<String, AgentHandle> {
214 self.children.clone()
215 }
216
217 /// Searches for a direct child agent by its unique identifier (`Ern`).
218 ///
219 /// # Arguments
220 ///
221 /// * `ern`: The [`Ern`] of the child agent to find.
222 ///
223 /// # Returns
224 ///
225 /// * `Some(AgentHandle)`: If a direct child with the matching `Ern` is found.
226 /// * `None`: If no direct child with the specified `Ern` exists.
227 #[instrument(skip(self))]
228 fn find_child(&self, ern: &Ern) -> Option<AgentHandle> {
229 trace!("Searching for child with ERN: {}", ern);
230 // Access the DashMap using the ERN's string representation as the key.
231 self.children.get(&ern.to_string()).map(|entry|
232 entry.value().clone() // Clone the handle if found
233 )
234 }
235
236 /// Returns a clone of the agent's task tracker.
237 ///
238 /// The tracker can be used to monitor the agent's main task.
239 #[inline]
240 fn tracker(&self) -> TaskTracker {
241 self.tracker.clone()
242 }
243
244 /// Returns a clone of the agent's unique Entity Resource Name (`Ern`).
245 #[inline]
246 fn id(&self) -> Ern {
247 self.id.clone()
248 }
249
250 /// Returns the agent's root name (the first part of its `Ern`) as a String.
251 #[inline]
252 fn name(&self) -> String {
253 self.id.root.to_string()
254 }
255
256 /// Returns a clone of this `AgentHandle`.
257 #[inline]
258 fn clone_ref(&self) -> AgentHandle {
259 self.clone()
260 }
261
262 /// Sends a [`SystemSignal::Terminate`] message to the agent and waits for its task to complete.
263 ///
264 /// This initiates a graceful shutdown of the agent. It sends the `Terminate` signal
265 /// to the agent's inbox and then waits on the agent's `TaskTracker` until the main
266 /// task (and potentially associated tasks) have finished execution.
267 ///
268 /// The agent's `wake` loop is responsible for handling the `Terminate` signal,
269 /// potentially running `before_stop` and `after_stop` hooks, and stopping child agents.
270 ///
271 /// # Returns
272 ///
273 /// An `anyhow::Result<()>` indicating success or failure. Failure typically occurs
274 /// if sending the `Terminate` signal to the agent's inbox fails (e.g., if the channel
275 /// is already closed).
276 #[allow(clippy::manual_async_fn)] // Keep async_trait style
277 #[instrument(skip(self))]
278 fn stop(&self) -> impl Future<Output=anyhow::Result<()>> + Send + Sync + '_ {
279 async move {
280 let tracker = self.tracker();
281 // Create an envelope to send the signal from self to self.
282 let self_envelope = self.create_envelope(Some(self.reply_address()));
283
284 trace!(actor = %self.id, "Sending Terminate signal");
285 // Send the Terminate signal. Use `?` to propagate potential send errors.
286 self_envelope.reply(SystemSignal::Terminate)?;
287
288 trace!(actor = %self.id, "Waiting for agent tasks to complete...");
289 // Wait for the agent's main task and any tracked tasks to finish.
290 tracker.wait().await;
291
292 trace!(actor = %self.id, "Agent terminated successfully.");
293 Ok(())
294 }
295 }
296}