acton_core/actor/managed_agent/started.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19
20use futures::future::join_all;
21use tracing::{instrument, trace}; // Removed unused error import
22
23use crate::actor::ManagedAgent;
24use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
25use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
26use crate::traits::AgentHandleInterface;
27
28/// Type-state marker for a [`ManagedAgent`] that is actively running and processing messages.
29///
30/// When a `ManagedAgent` is in the `Started` state, its main asynchronous task (`wake`)
31/// is running, receiving messages from its inbox and dispatching them to the appropriate
32/// handlers registered during the [`Idle`](super::Idle) state.
33///
34/// Agents in this state can create message envelopes using methods like [`ManagedAgent::new_envelope`]
35/// and [`ManagedAgent::new_parent_envelope`]. Interaction typically occurs via the agent's
36/// [`AgentHandle`].
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
38pub struct Started;
39
40/// Implements methods specific to a `ManagedAgent` in the `Started` state.
41impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
42 /// Creates a new [`OutboundEnvelope`] originating from this agent.
43 ///
44 /// This helper function constructs an envelope suitable for sending a message
45 /// from this agent to another recipient. The envelope's `return_address`
46 /// will be set to this agent's [`MessageAddress`]. The `recipient_address`
47 /// field will be `None` initially and should typically be set using the
48 /// envelope's methods before sending.
49 ///
50 /// # Returns
51 ///
52 /// An [`OutboundEnvelope`] configured with this agent as the sender.
53 /// Returns `None` only if the agent's handle somehow lacks an outbox, which
54 /// should not occur under normal circumstances.
55 pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
56 self.cancellation_token.clone().map(|cancellation_token| {
57 OutboundEnvelope::new(
58 MessageAddress::new(self.handle.outbox.clone(), self.id.clone()),
59 cancellation_token,
60 )
61 })
62 }
63
64 /// Creates a new [`OutboundEnvelope`] addressed to this agent's parent.
65 ///
66 /// This is a convenience method for creating an envelope specifically for
67 /// replying or sending a message to the agent that supervises this one.
68 /// It clones the parent's return address information.
69 ///
70 /// # Returns
71 ///
72 /// * `Some(OutboundEnvelope)`: An envelope configured to be sent to the parent,
73 /// if this agent has a parent. The `return_address` will be the parent's address,
74 /// and the `recipient_address` will be this agent's address.
75 /// * `None`: If this agent does not have a parent (i.e., it's a top-level agent).
76 pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
77 // Only construct if both parent and cancellation_token exist
78 let cancellation_token = self.cancellation_token.clone()?;
79 self.parent.as_ref().map(|parent_handle| {
80 OutboundEnvelope::new_with_recipient(
81 MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), // Self is sender
82 parent_handle.reply_address(), // Parent is recipient
83 cancellation_token,
84 )
85 })
86 }
87
88 // wake() and terminate() are internal implementation details (`pub(crate)` or private)
89 // and do not require public documentation.
90 #[instrument(skip(reactors, self))]
91 pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
92 (self.after_start)(self).await;
93 // Assert that cancellation_token always exists; it must never be missing.
94 assert!(
95 self.cancellation_token.is_some(),
96 "ManagedAgent in Started state must always have a cancellation_token"
97 );
98 let cancel_token = self.cancellation_token.as_ref().cloned().unwrap();
99 let mut cancel = Box::pin(cancel_token.cancelled());
100
101 let mut _terminate_signal_received = false;
102
103 loop {
104 tokio::select! {
105 // React immediately to cancellation
106 _ = &mut cancel => {
107 trace!("Forceful cancellation triggered for agent: {}", self.id());
108 break; // Immediate break on forceful cancellation.
109 }
110 incoming_opt = self.inbox.recv() => {
111 let Some(incoming_envelope) = incoming_opt else { break; };
112 let type_id;
113 let mut envelope;
114 trace!(
115 "Received envelope from: {}",
116 incoming_envelope.reply_to.sender.root
117 );
118 trace!(
119 "Message type: {}",
120 type_name_of_val(&incoming_envelope.message)
121 );
122
123 // Handle potential BrokerRequestEnvelope indirection
124 if let Some(broker_request_envelope) = incoming_envelope
125 .message
126 .as_any()
127 .downcast_ref::<BrokerRequestEnvelope>()
128 {
129 trace!("Processing message via BrokerRequestEnvelope");
130 envelope = Envelope::new(
131 broker_request_envelope.message.clone(), // Extract inner message
132 incoming_envelope.reply_to.clone(),
133 incoming_envelope.recipient.clone(),
134 );
135 type_id = broker_request_envelope.message.as_any().type_id(); // Use inner message TypeId
136 } else {
137 envelope = incoming_envelope;
138 type_id = envelope.message.as_any().type_id();
139 }
140
141 // Dispatch to registered handler or handle system signals
142 if let Some(reactor) = reactors.get(&type_id) {
143 match reactor.value() {
144 ReactorItem::FutureReactor(fut) => {
145 // Legacy handler: await, always Ok
146 fut(self, &mut envelope).await;
147 }
148 ReactorItem::FutureReactorResult(fut) => {
149 // New Result-based handler: await and trigger error handler on Err
150 let result = fut(self, &mut envelope).await;
151 if let Err((err, error_type_id)) = result {
152 let message_type_id = envelope.message.as_any().type_id();
153 if let Some(handler) =
154 self.error_handler_map.remove(&(message_type_id, error_type_id))
155 {
156 let fut = handler(self, &mut envelope, err.as_ref());
157 fut.await;
158 self.error_handler_map.insert((message_type_id, error_type_id), handler);
159 } else {
160 tracing::error!(
161 "Unhandled error from message handler in agent {}: {:?}",
162 self.id(),
163 err
164 );
165 }
166 }
167 }
168 }
169 } else if let Some(SystemSignal::Terminate) =
170 envelope.message.as_any().downcast_ref::<SystemSignal>()
171 {
172 trace!("Terminate signal received for agent: {}. Closing inbox.", self.id());
173 _terminate_signal_received = true; // Set flag
174 (self.before_stop)(self).await; // Execute before_stop hook
175 self.inbox.close(); // Close inbox to stop receiving new messages.
176 // Do NOT break here. Allow loop to drain existing messages.
177 } else {
178 trace!(
179 "No handler found for message type {:?} for agent {}",
180 type_id,
181 self.id()
182 );
183 // Optionally log or handle unknown message types
184 }
185
186
187 }
188 }
189 }
190 // After loop breaks (either gracefully or forcefully), perform final termination steps.
191 trace!(
192 "Message loop finished for agent: {}. Initiating final termination.",
193 self.id()
194 );
195 self.terminate().await; // Stop children and other cleanup.
196 (self.after_stop)(self).await;
197 trace!("Agent {} stopped.", self.id());
198 }
199
200 #[instrument(skip(self))]
201 async fn terminate(&mut self) {
202 trace!("Terminating children for agent: {}", self.id());
203 // Stop all child agents concurrently.
204 use std::env;
205 use std::time::Duration;
206 use tokio::time::timeout as tokio_timeout;
207
208 let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
209 .ok()
210 .and_then(|val| val.parse().ok())
211 .unwrap_or(10_000);
212
213 let stop_futures: Vec<_> = self
214 .handle
215 .children()
216 .iter()
217 .map(|item| {
218 let child_handle = item.value().clone();
219 async move {
220 trace!("Sending stop signal to child: {}", child_handle.id());
221 let stop_res =
222 tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
223 match stop_res {
224 Ok(Ok(())) => {
225 trace!(
226 "Stop signal sent to and child {} shut down successfully.",
227 child_handle.id()
228 );
229 }
230 Ok(Err(e)) => {
231 tracing::error!(
232 "Stop signal to child {} returned error: {:?}",
233 child_handle.id(),
234 e
235 );
236 }
237 Err(_) => {
238 tracing::error!(
239 "Shutdown timeout for child {} after {} ms",
240 child_handle.id(),
241 timeout_ms
242 );
243 }
244 }
245 }
246 })
247 .collect();
248
249 join_all(stop_futures).await; // Wait for all stop signals to be sent/processed.
250
251 trace!("All children stopped for agent: {}.", self.id());
252 }
253}