acton_core/actor/managed_agent/
started.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19use std::time::Duration;
20
21use futures::future::join_all;
22use tokio::time::sleep;
23use tracing::{instrument, trace}; // Removed unused error import
24
25use crate::actor::ManagedAgent;
26use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
27use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
28use crate::traits::AgentHandleInterface;
29
30/// Type-state marker for a [`ManagedAgent`] that is actively running and processing messages.
31///
32/// When a `ManagedAgent` is in the `Started` state, its main asynchronous task (`wake`)
33/// is running, receiving messages from its inbox and dispatching them to the appropriate
34/// handlers registered during the [`Idle`](super::Idle) state.
35///
36/// Agents in this state can create message envelopes using methods like [`ManagedAgent::new_envelope`]
37/// and [`ManagedAgent::new_parent_envelope`]. Interaction typically occurs via the agent's
38/// [`AgentHandle`].
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
40pub struct Started;
41
42/// Implements methods specific to a `ManagedAgent` in the `Started` state.
43impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
44    /// Creates a new [`OutboundEnvelope`] originating from this agent.
45    ///
46    /// This helper function constructs an envelope suitable for sending a message
47    /// from this agent to another recipient. The envelope's `return_address`
48    /// will be set to this agent's [`MessageAddress`]. The `recipient_address`
49    /// field will be `None` initially and should typically be set using the
50    /// envelope's methods before sending.
51    ///
52    /// # Returns
53    ///
54    /// An [`OutboundEnvelope`] configured with this agent as the sender.
55    /// Returns `None` only if the agent's handle somehow lacks an outbox, which
56    /// should not occur under normal circumstances.
57    pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
58        self.cancellation_token.clone().map(|cancellation_token| {
59            OutboundEnvelope::new(
60                MessageAddress::new(
61                    self.handle.outbox.clone(),
62                    self.id.clone(),
63                    self.handle.cancellation_token.clone(),
64                ),
65                cancellation_token,
66            )
67        })
68    }
69
70    /// Creates a new [`OutboundEnvelope`] addressed to this agent's parent.
71    ///
72    /// This is a convenience method for creating an envelope specifically for
73    /// replying or sending a message to the agent that supervises this one.
74    /// It clones the parent's return address information.
75    ///
76    /// # Returns
77    ///
78    /// *   `Some(OutboundEnvelope)`: An envelope configured to be sent to the parent,
79    ///     if this agent has a parent. The `return_address` will be the parent's address,
80    ///     and the `recipient_address` will be this agent's address.
81    /// *   `None`: If this agent does not have a parent (i.e., it's a top-level agent).
82    pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
83        // Only construct if both parent and cancellation_token exist
84        let cancellation_token = self.cancellation_token.clone()?;
85        self.parent.as_ref().map(|parent_handle| {
86            OutboundEnvelope::new_with_recipient(
87                MessageAddress::new(
88                    self.handle.outbox.clone(),
89                    self.id.clone(),
90                    self.handle.cancellation_token.clone(),
91                ), // Self is sender
92                parent_handle.reply_address(), // Parent is recipient
93                cancellation_token,
94            )
95        })
96    }
97
98    // wake() and terminate() are internal implementation details (`pub(crate)` or private)
99    // and do not require public documentation.
100    #[instrument(skip(reactors, self))]
101    pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
102        (self.after_start)(self).await;
103        let mut terminate_requested = false;
104        // Assert that cancellation_token always exists; it must never be missing.
105        assert!(
106            self.cancellation_token.is_some(),
107            "ManagedAgent in Started state must always have a cancellation_token"
108        );
109        let cancel_token = self.cancellation_token.as_ref().cloned().unwrap();
110        let mut cancel = Box::pin(cancel_token.cancelled());
111
112        loop {
113            tokio::select! {
114                // React immediately to cancellation
115                _ = &mut cancel => {
116                    trace!("Cancellation token triggered for agent: {}", self.id());
117                    break;
118                }
119                incoming_opt = self.inbox.recv() => {
120                    let Some(incoming_envelope) = incoming_opt else { break; };
121                    let type_id;
122                    let mut envelope;
123                    trace!(
124                        "Received envelope from: {}",
125                        incoming_envelope.reply_to.sender.root
126                    );
127                    trace!(
128                        "Message type: {}",
129                        type_name_of_val(&incoming_envelope.message)
130                    );
131
132                    // Handle potential BrokerRequestEnvelope indirection
133                    if let Some(broker_request_envelope) = incoming_envelope
134                        .message
135                        .as_any()
136                        .downcast_ref::<BrokerRequestEnvelope>()
137                    {
138                        trace!("Processing message via BrokerRequestEnvelope");
139                        envelope = Envelope::new(
140                            broker_request_envelope.message.clone(), // Extract inner message
141                            incoming_envelope.reply_to.clone(),
142                            incoming_envelope.recipient.clone(),
143                        );
144                        type_id = broker_request_envelope.message.as_any().type_id(); // Use inner message TypeId
145                    } else {
146                        envelope = incoming_envelope;
147                        type_id = envelope.message.as_any().type_id();
148                    }
149
150                    // Dispatch to registered handler or handle system signals
151                    if let Some(reactor) = reactors.get(&type_id) {
152                        match reactor.value() {
153                            ReactorItem::FutureReactor(fut) => {
154                                // Legacy handler: await, always Ok
155                                fut(self, &mut envelope).await;
156                            }
157                            ReactorItem::FutureReactorResult(fut) => {
158                                // New Result-based handler: await and trigger error handler on Err
159                                let result = fut(self, &mut envelope).await;
160                                if let Err(err) = result {
161                                    // Call every registered error handler; closure does downcast & handles only if type matches
162                                    let mut handled = false;
163                                    let handler_arcs: Vec<_> =
164                                        self.error_handler_map.values().cloned().collect();
165                                    for handler_arc in handler_arcs {
166                                        // Handler returns immediately if error type doesn't match
167                                        let fut = handler_arc(self, &mut envelope, err.as_ref());
168                                        fut.await;
169                                        handled = true; // mark as handled since at least one handler exists
170                                    }
171                                    if !handled {
172                                        tracing::error!(
173                                            "Unhandled error from message handler in agent {}: {:?}",
174                                            self.id(),
175                                            err
176                                        );
177                                    }
178                                }
179                            }
180                        }
181                    } else if let Some(SystemSignal::Terminate) =
182                        envelope.message.as_any().downcast_ref::<SystemSignal>()
183                    {
184                        trace!("Terminate signal received for agent: {}", self.id());
185                        terminate_requested = true;
186                        (self.before_stop)(self).await; // Execute before_stop hook
187                                                        // Short delay to allow before_stop processing, if needed.
188                        sleep(Duration::from_millis(10)).await;
189                        self.inbox.close(); // Close inbox to stop receiving new messages
190                        trace!("Inbox closed for agent: {}", self.id());
191                    } else {
192                        trace!(
193                            "No handler found for message type {:?} for agent {}",
194                            type_id,
195                            self.id()
196                        );
197                        // Optionally log or handle unknown message types
198                    }
199
200                    // Check if termination requested and inbox is now empty and closed
201                    if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
202                        trace!("Inbox empty and closed after terminate request, initiating termination for agent: {}", self.id());
203                        self.terminate().await; // Initiate graceful shutdown of children etc.
204                        break; // Exit the loop
205                    }
206                }
207            }
208        }
209        trace!("Message loop finished for agent: {}", self.id());
210        (self.after_stop)(self).await; // Execute after_stop hook
211        trace!("Agent {} stopped.", self.id());
212    }
213
214    #[instrument(skip(self))]
215    async fn terminate(&mut self) {
216        trace!("Terminating children for agent: {}", self.id());
217        // Stop all child agents concurrently.
218        use std::env;
219        use std::time::Duration;
220        use tokio::time::timeout as tokio_timeout;
221
222        let timeout_ms: u64 = env::var("ACTON_AGENT_SHUTDOWN_TIMEOUT_MS")
223            .ok()
224            .and_then(|val| val.parse().ok())
225            .unwrap_or(10_000);
226
227        let stop_futures: Vec<_> = self
228            .handle
229            .children()
230            .iter()
231            .map(|item| {
232                let child_handle = item.value().clone();
233                async move {
234                    trace!("Sending stop signal to child: {}", child_handle.id());
235                    let stop_res =
236                        tokio_timeout(Duration::from_millis(timeout_ms), child_handle.stop()).await;
237                    match stop_res {
238                        Ok(Ok(())) => {
239                            trace!(
240                                "Stop signal sent to and child {} shut down successfully.",
241                                child_handle.id()
242                            );
243                        }
244                        Ok(Err(e)) => {
245                            tracing::error!(
246                                "Stop signal to child {} returned error: {:?}",
247                                child_handle.id(),
248                                e
249                            );
250                        }
251                        Err(_) => {
252                            tracing::error!(
253                                "Shutdown timeout for child {} after {} ms",
254                                child_handle.id(),
255                                timeout_ms
256                            );
257                        }
258                    }
259                }
260            })
261            .collect();
262
263        join_all(stop_futures).await; // Wait for all stop signals to be sent/processed.
264
265        trace!(
266            "All children stopped for agent: {}. Closing own inbox.",
267            self.id()
268        );
269        // Ensure inbox is closed (might be redundant if closed in wake loop, but safe).
270        self.inbox.close();
271    }
272}