acton_core/actor/managed_agent/
started.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19
20use futures::future::join_all;
21use tracing::{instrument, trace}; // Removed unused error import
22
23use crate::actor::ManagedAgent;
24use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
25use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
26use crate::traits::AgentHandleInterface;
27
28/// Type-state marker for a [`ManagedAgent`] that is actively running and processing messages.
29///
30/// When a `ManagedAgent` is in the `Started` state, its main asynchronous task (`wake`)
31/// is running, receiving messages from its inbox and dispatching them to the appropriate
32/// handlers registered during the [`Idle`](super::Idle) state.
33///
34/// Agents in this state can create message envelopes using methods like [`ManagedAgent::new_envelope`]
35/// and [`ManagedAgent::new_parent_envelope`]. Interaction typically occurs via the agent's
36/// [`AgentHandle`].
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
38pub struct Started;
39
40/// Implements methods specific to a `ManagedAgent` in the `Started` state.
41impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
42    /// Creates a new [`OutboundEnvelope`] originating from this agent.
43    ///
44    /// This helper function constructs an envelope suitable for sending a message
45    /// from this agent to another recipient. The envelope's `return_address`
46    /// will be set to this agent's [`MessageAddress`]. The `recipient_address`
47    /// field will be `None` initially and should typically be set using the
48    /// envelope's methods before sending.
49    ///
50    /// # Returns
51    ///
52    /// An [`OutboundEnvelope`] configured with this agent as the sender.
53    /// Returns `None` only if the agent's handle somehow lacks an outbox, which
54    /// should not occur under normal circumstances.
55    pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
56        self.cancellation_token.clone().map(|cancellation_token| {
57            OutboundEnvelope::new(
58                MessageAddress::new(self.handle.outbox.clone(), self.id.clone()),
59                cancellation_token,
60            )
61        })
62    }
63
64    /// Creates a new [`OutboundEnvelope`] addressed to this agent's parent.
65    ///
66    /// This is a convenience method for creating an envelope specifically for
67    /// replying or sending a message to the agent that supervises this one.
68    /// It clones the parent's return address information.
69    ///
70    /// # Returns
71    ///
72    /// *   `Some(OutboundEnvelope)`: An envelope configured to be sent to the parent,
73    ///     if this agent has a parent. The `return_address` will be the parent's address,
74    ///     and the `recipient_address` will be this agent's address.
75    /// *   `None`: If this agent does not have a parent (i.e., it's a top-level agent).
76    pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
77        // Only construct if both parent and cancellation_token exist
78        let cancellation_token = self.cancellation_token.clone()?;
79        self.parent.as_ref().map(|parent_handle| {
80            OutboundEnvelope::new_with_recipient(
81                MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), // Self is sender
82                parent_handle.reply_address(), // Parent is recipient
83                cancellation_token,
84            )
85        })
86    }
87
88    // wake() and terminate() are internal implementation details (`pub(crate)` or private)
89    // and do not require public documentation.
90    #[instrument(skip(reactors, self))]
91    pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
92        (self.after_start)(self).await;
93        // Assert that cancellation_token always exists; it must never be missing.
94        assert!(
95            self.cancellation_token.is_some(),
96            "ManagedAgent in Started state must always have a cancellation_token"
97        );
98        let cancel_token = self.cancellation_token.as_ref().cloned().unwrap();
99        let mut cancel = Box::pin(cancel_token.cancelled());
100
101        let mut _terminate_signal_received = false;
102
103        loop {
104            tokio::select! {
105                // React immediately to cancellation
106                _ = &mut cancel => {
107                    trace!("Forceful cancellation triggered for agent: {}", self.id());
108                    break; // Immediate break on forceful cancellation.
109                }
110                incoming_opt = self.inbox.recv() => {
111                    let Some(incoming_envelope) = incoming_opt else { break; };
112                    let type_id;
113                    let mut envelope;
114                    trace!(
115                        "Received envelope from: {}",
116                        incoming_envelope.reply_to.sender.root
117                    );
118                    trace!(
119                        "Message type: {}",
120                        type_name_of_val(&incoming_envelope.message)
121                    );
122
123                    // Handle potential BrokerRequestEnvelope indirection
124                    if let Some(broker_request_envelope) = incoming_envelope
125                        .message
126                        .as_any()
127                        .downcast_ref::<BrokerRequestEnvelope>()
128                    {
129                        trace!("Processing message via BrokerRequestEnvelope");
130                        envelope = Envelope::new(
131                            broker_request_envelope.message.clone(), // Extract inner message
132                            incoming_envelope.reply_to.clone(),
133                            incoming_envelope.recipient.clone(),
134                        );
135                        type_id = broker_request_envelope.message.as_any().type_id(); // Use inner message TypeId
136                    } else {
137                        envelope = incoming_envelope;
138                        type_id = envelope.message.as_any().type_id();
139                    }
140
141                    // Dispatch to registered handler or handle system signals
142                    if let Some(reactor) = reactors.get(&type_id) {
143                        match reactor.value() {
144                            ReactorItem::FutureReactor(fut) => {
145                                // Legacy handler: await, always Ok
146                                fut(self, &mut envelope).await;
147                            }
148                            ReactorItem::FutureReactorResult(fut) => {
149                                // New Result-based handler: await and trigger error handler on Err
150                                let result = fut(self, &mut envelope).await;
151                                if let Err((err, error_type_id)) = result {
152                                    let message_type_id = envelope.message.as_any().type_id();
153                                    if let Some(handler) =
154                                        self.error_handler_map.remove(&(message_type_id, error_type_id))
155                                    {
156                                        let fut = handler(self, &mut envelope, err.as_ref());
157                                        fut.await;
158                                        self.error_handler_map.insert((message_type_id, error_type_id), handler);
159                                    } else {
160                                        tracing::error!(
161                                            "Unhandled error from message handler in agent {}: {:?}",
162                                            self.id(),
163                                            err
164                                        );
165                                    }
166                                }
167                            }
168                        }
169                    } else if let Some(SystemSignal::Terminate) =
170                        envelope.message.as_any().downcast_ref::<SystemSignal>()
171                    {
172                        trace!("Terminate signal received for agent: {}. Closing inbox.", self.id());
173                        _terminate_signal_received = true; // Set flag
174                        (self.before_stop)(self).await; // Execute before_stop hook
175                        self.inbox.close(); // Close inbox to stop receiving new messages.
176                        // Do NOT break here. Allow loop to drain existing messages.
177                    } else {
178                        trace!(
179                            "No handler found for message type {:?} for agent {}",
180                            type_id,
181                            self.id()
182                        );
183                        // Optionally log or handle unknown message types
184                    }
185
186
187                }
188            }
189        }
190        // After loop breaks (either gracefully or forcefully), perform final termination steps.
191        trace!(
192            "Message loop finished for agent: {}. Initiating final termination.",
193            self.id()
194        );
195        self.terminate().await; // Stop children and other cleanup.
196        (self.after_stop)(self).await;
197        trace!("Agent {} stopped.", self.id());
198    }
199
200    #[instrument(skip(self))]
201    async fn terminate(&mut self) {
202        trace!("Terminating children for agent: {}", self.id());
203        // Stop all child agents concurrently.
204        use std::env;
205        use std::time::Duration;
206        use tokio::time::timeout as tokio_timeout;
207
208        let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
209            .ok()
210            .and_then(|val| val.parse().ok())
211            .unwrap_or(10_000);
212
213        let stop_futures: Vec<_> = self
214            .handle
215            .children()
216            .iter()
217            .map(|item| {
218                let child_handle = item.value().clone();
219                async move {
220                    trace!("Sending stop signal to child: {}", child_handle.id());
221                    let stop_res =
222                        tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
223                    match stop_res {
224                        Ok(Ok(())) => {
225                            trace!(
226                                "Stop signal sent to and child {} shut down successfully.",
227                                child_handle.id()
228                            );
229                        }
230                        Ok(Err(e)) => {
231                            tracing::error!(
232                                "Stop signal to child {} returned error: {:?}",
233                                child_handle.id(),
234                                e
235                            );
236                        }
237                        Err(_) => {
238                            tracing::error!(
239                                "Shutdown timeout for child {} after {} ms",
240                                child_handle.id(),
241                                timeout_ms
242                            );
243                        }
244                    }
245                }
246            })
247            .collect();
248
249        join_all(stop_futures).await; // Wait for all stop signals to be sent/processed.
250
251        trace!("All children stopped for agent: {}.", self.id());
252    }
253}