acton_core/actor/managed_agent/
started.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19use std::time::Duration;
20
21use futures::future::join_all;
22use tokio::time::sleep;
23use tracing::{instrument, trace}; // Removed unused error import
24
25use crate::actor::ManagedAgent;
26use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
27use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
28use crate::traits::AgentHandleInterface;
29
30/// Type-state marker for a [`ManagedAgent`] that is actively running and processing messages.
31///
32/// When a `ManagedAgent` is in the `Started` state, its main asynchronous task (`wake`)
33/// is running, receiving messages from its inbox and dispatching them to the appropriate
34/// handlers registered during the [`Idle`](super::Idle) state.
35///
36/// Agents in this state can create message envelopes using methods like [`ManagedAgent::new_envelope`]
37/// and [`ManagedAgent::new_parent_envelope`]. Interaction typically occurs via the agent's
38/// [`AgentHandle`].
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
40pub struct Started;
41
42/// Implements methods specific to a `ManagedAgent` in the `Started` state.
43impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
44    /// Creates a new [`OutboundEnvelope`] originating from this agent.
45    ///
46    /// This helper function constructs an envelope suitable for sending a message
47    /// from this agent to another recipient. The envelope's `return_address`
48    /// will be set to this agent's [`MessageAddress`]. The `recipient_address`
49    /// field will be `None` initially and should typically be set using the
50    /// envelope's methods before sending.
51    ///
52    /// # Returns
53    ///
54    /// An [`OutboundEnvelope`] configured with this agent as the sender.
55    /// Returns `None` only if the agent's handle somehow lacks an outbox, which
56    /// should not occur under normal circumstances.
57    pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
58        // The Option wrapper seems unnecessary given the implementation always returns Some.
59        // Consider changing return type to just OutboundEnvelope if None is impossible.
60        Some(OutboundEnvelope::new(MessageAddress::new(
61            self.handle.outbox.clone(),
62            self.id.clone(),
63        )))
64    }
65
66    /// Creates a new [`OutboundEnvelope`] addressed to this agent's parent.
67    ///
68    /// This is a convenience method for creating an envelope specifically for
69    /// replying or sending a message to the agent that supervises this one.
70    /// It clones the parent's return address information.
71    ///
72    /// # Returns
73    ///
74    /// *   `Some(OutboundEnvelope)`: An envelope configured to be sent to the parent,
75    ///     if this agent has a parent. The `return_address` will be the parent's address,
76    ///     and the `recipient_address` will be this agent's address.
77    /// *   `None`: If this agent does not have a parent (i.e., it's a top-level agent).
78    pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
79        // Clones the parent's handle and creates an envelope targeting the parent.
80        self.parent.as_ref().map(|parent_handle| {
81            // Create an envelope where the recipient is the parent and the sender is self.
82            OutboundEnvelope::new_with_recipient(
83                MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), // Self is sender
84                parent_handle.reply_address(), // Parent is recipient
85            )
86        })
87        // The original implementation `parent.create_envelope(None).clone()` might have different semantics.
88        // The above implementation assumes sending *to* the parent *from* self.
89        // If the intent was to create an envelope *as if* it came from the parent,
90        // the original logic might be needed, but seems less common. Let's stick to the clearer intent.
91        // Original logic: self.parent.as_ref().map(|parent| parent.create_envelope(None).clone())
92    }
93
94    // wake() and terminate() are internal implementation details (`pub(crate)` or private)
95    // and do not require public documentation.
96    #[instrument(skip(reactors, self))]
97    pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
98        (self.after_start)(self).await;
99        let mut terminate_requested = false;
100        while let Some(incoming_envelope) = self.inbox.recv().await {
101            let type_id;
102            let mut envelope;
103            trace!(
104                "Received envelope from: {}",
105                incoming_envelope.reply_to.sender.root
106            );
107            trace!(
108                "Message type: {}",
109                type_name_of_val(&incoming_envelope.message)
110            );
111
112            // Handle potential BrokerRequestEnvelope indirection
113            if let Some(broker_request_envelope) = incoming_envelope
114                .message
115                .as_any()
116                .downcast_ref::<BrokerRequestEnvelope>()
117            {
118                trace!("Processing message via BrokerRequestEnvelope");
119                envelope = Envelope::new(
120                    broker_request_envelope.message.clone(), // Extract inner message
121                    incoming_envelope.reply_to.clone(),
122                    incoming_envelope.recipient.clone(),
123                );
124                type_id = broker_request_envelope.message.as_any().type_id(); // Use inner message TypeId
125            } else {
126                envelope = incoming_envelope;
127                type_id = envelope.message.as_any().type_id();
128            }
129
130            // Dispatch to registered handler or handle system signals
131            if let Some(reactor) = reactors.get(&type_id) {
132                match reactor.value() {
133                    ReactorItem::FutureReactor(fut) => {
134                        // Legacy handler: await, always Ok
135                        fut(self, &mut envelope).await;
136                    }
137                    ReactorItem::FutureReactorResult(fut) => {
138                        // New Result-based handler: await and trigger error handler on Err
139                        let result = fut(self, &mut envelope).await;
140                        if let Err(err) = result {
141                            // Call every registered error handler; closure does downcast & handles only if type matches
142                            let mut handled = false;
143                            let handler_arcs: Vec<_> =
144                                self.error_handler_map.values().cloned().collect();
145                            for handler_arc in handler_arcs {
146                                // Handler returns immediately if error type doesn't match
147                                let fut = handler_arc(self, &mut envelope, err.as_ref());
148                                fut.await;
149                                handled = true; // mark as handled since at least one handler exists
150                            }
151                            if !handled {
152                                tracing::error!(
153                                    "Unhandled error from message handler in agent {}: {:?}",
154                                    self.id(),
155                                    err
156                                );
157                            }
158                        }
159                    }
160                }
161            } else if let Some(SystemSignal::Terminate) =
162                envelope.message.as_any().downcast_ref::<SystemSignal>()
163            {
164                trace!("Terminate signal received for agent: {}", self.id());
165                terminate_requested = true;
166                (self.before_stop)(self).await; // Execute before_stop hook
167                                                // Short delay to allow before_stop processing, if needed.
168                sleep(Duration::from_millis(10)).await;
169                self.inbox.close(); // Close inbox to stop receiving new messages
170                trace!("Inbox closed for agent: {}", self.id());
171            } else {
172                trace!(
173                    "No handler found for message type {:?} for agent {}",
174                    type_id,
175                    self.id()
176                );
177                // Optionally log or handle unknown message types
178            }
179
180            // Check if termination requested and inbox is now empty and closed
181            if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
182                trace!("Inbox empty and closed after terminate request, initiating termination for agent: {}", self.id());
183                self.terminate().await; // Initiate graceful shutdown of children etc.
184                break; // Exit the loop
185            }
186        }
187        trace!("Message loop finished for agent: {}", self.id());
188        (self.after_stop)(self).await; // Execute after_stop hook
189        trace!("Agent {} stopped.", self.id());
190    }
191
192    #[instrument(skip(self))]
193    async fn terminate(&mut self) {
194        trace!("Terminating children for agent: {}", self.id());
195        // Stop all child agents concurrently.
196        use std::env;
197        use std::time::Duration;
198        use tokio::time::timeout as tokio_timeout;
199
200        let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
201            .ok()
202            .and_then(|val| val.parse().ok())
203            .unwrap_or(10_000);
204
205        let stop_futures: Vec<_> = self
206            .handle
207            .children()
208            .iter()
209            .map(|item| {
210                let child_handle = item.value().clone();
211                async move {
212                    trace!("Sending stop signal to child: {}", child_handle.id());
213                    let stop_res =
214                        tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
215                    match stop_res {
216                        Ok(Ok(())) => {
217                            trace!(
218                                "Stop signal sent to and child {} shut down successfully.",
219                                child_handle.id()
220                            );
221                        }
222                        Ok(Err(e)) => {
223                            tracing::error!(
224                                "Stop signal to child {} returned error: {:?}",
225                                child_handle.id(),
226                                e
227                            );
228                        }
229                        Err(_) => {
230                            tracing::error!(
231                                "Shutdown timeout for child {} after {} ms",
232                                child_handle.id(),
233                                timeout_ms
234                            );
235                        }
236                    }
237                }
238            })
239            .collect();
240
241        join_all(stop_futures).await; // Wait for all stop signals to be sent/processed.
242
243        trace!(
244            "All children stopped for agent: {}. Closing own inbox.",
245            self.id()
246        );
247        // Ensure inbox is closed (might be redundant if closed in wake loop, but safe).
248        self.inbox.close();
249    }
250}