acton_core/actor/managed_agent/started.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19use std::time::Duration;
20
21use futures::future::join_all;
22use tokio::time::sleep;
23use tracing::{instrument, trace}; // Removed unused error import
24
25use crate::actor::ManagedAgent;
26use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
27use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
28use crate::traits::AgentHandleInterface;
29
30/// Type-state marker for a [`ManagedAgent`] that is actively running and processing messages.
31///
32/// When a `ManagedAgent` is in the `Started` state, its main asynchronous task (`wake`)
33/// is running, receiving messages from its inbox and dispatching them to the appropriate
34/// handlers registered during the [`Idle`](super::Idle) state.
35///
36/// Agents in this state can create message envelopes using methods like [`ManagedAgent::new_envelope`]
37/// and [`ManagedAgent::new_parent_envelope`]. Interaction typically occurs via the agent's
38/// [`AgentHandle`].
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] // Add common derives
40pub struct Started;
41
42/// Implements methods specific to a `ManagedAgent` in the `Started` state.
43impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
44 /// Creates a new [`OutboundEnvelope`] originating from this agent.
45 ///
46 /// This helper function constructs an envelope suitable for sending a message
47 /// from this agent to another recipient. The envelope's `return_address`
48 /// will be set to this agent's [`MessageAddress`]. The `recipient_address`
49 /// field will be `None` initially and should typically be set using the
50 /// envelope's methods before sending.
51 ///
52 /// # Returns
53 ///
54 /// An [`OutboundEnvelope`] configured with this agent as the sender.
55 /// Returns `None` only if the agent's handle somehow lacks an outbox, which
56 /// should not occur under normal circumstances.
57 pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
58 // The Option wrapper seems unnecessary given the implementation always returns Some.
59 // Consider changing return type to just OutboundEnvelope if None is impossible.
60 Some(OutboundEnvelope::new(MessageAddress::new(
61 self.handle.outbox.clone(),
62 self.id.clone(),
63 )))
64 }
65
66 /// Creates a new [`OutboundEnvelope`] addressed to this agent's parent.
67 ///
68 /// This is a convenience method for creating an envelope specifically for
69 /// replying or sending a message to the agent that supervises this one.
70 /// It clones the parent's return address information.
71 ///
72 /// # Returns
73 ///
74 /// * `Some(OutboundEnvelope)`: An envelope configured to be sent to the parent,
75 /// if this agent has a parent. The `return_address` will be the parent's address,
76 /// and the `recipient_address` will be this agent's address.
77 /// * `None`: If this agent does not have a parent (i.e., it's a top-level agent).
78 pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
79 // Clones the parent's handle and creates an envelope targeting the parent.
80 self.parent.as_ref().map(|parent_handle| {
81 // Create an envelope where the recipient is the parent and the sender is self.
82 OutboundEnvelope::new_with_recipient(
83 MessageAddress::new(self.handle.outbox.clone(), self.id.clone()), // Self is sender
84 parent_handle.reply_address() // Parent is recipient
85 )
86 })
87 // The original implementation `parent.create_envelope(None).clone()` might have different semantics.
88 // The above implementation assumes sending *to* the parent *from* self.
89 // If the intent was to create an envelope *as if* it came from the parent,
90 // the original logic might be needed, but seems less common. Let's stick to the clearer intent.
91 // Original logic: self.parent.as_ref().map(|parent| parent.create_envelope(None).clone())
92 }
93
94 // wake() and terminate() are internal implementation details (`pub(crate)` or private)
95 // and do not require public documentation.
96 #[instrument(skip(reactors, self))]
97 pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
98 (self.after_start)(self).await;
99 let mut terminate_requested = false;
100 while let Some(incoming_envelope) = self.inbox.recv().await {
101 let type_id;
102 let mut envelope;
103 trace!("Received envelope from: {}", incoming_envelope.reply_to.sender.root);
104 trace!("Message type: {}", type_name_of_val(&incoming_envelope.message));
105
106 // Handle potential BrokerRequestEnvelope indirection
107 if let Some(broker_request_envelope) = incoming_envelope
108 .message
109 .as_any()
110 .downcast_ref::<BrokerRequestEnvelope>()
111 {
112 trace!("Processing message via BrokerRequestEnvelope");
113 envelope = Envelope::new(
114 broker_request_envelope.message.clone(), // Extract inner message
115 incoming_envelope.reply_to.clone(),
116 incoming_envelope.recipient.clone(),
117 );
118 type_id = broker_request_envelope.message.as_any().type_id(); // Use inner message TypeId
119 } else {
120 envelope = incoming_envelope;
121 type_id = envelope.message.as_any().type_id();
122 }
123
124 // Dispatch to registered handler or handle system signals
125 if let Some(reactor) = reactors.get(&type_id) {
126 match reactor.value() {
127 ReactorItem::FutureReactor(fut) => fut(self, &mut envelope).await,
128 }
129 } else if let Some(SystemSignal::Terminate) =
130 envelope.message.as_any().downcast_ref::<SystemSignal>()
131 {
132 trace!("Terminate signal received for agent: {}", self.id());
133 terminate_requested = true;
134 (self.before_stop)(self).await; // Execute before_stop hook
135 // Short delay to allow before_stop processing, if needed.
136 sleep(Duration::from_millis(10)).await;
137 self.inbox.close(); // Close inbox to stop receiving new messages
138 trace!("Inbox closed for agent: {}", self.id());
139 } else {
140 trace!("No handler found for message type {:?} for agent {}", type_id, self.id());
141 // Optionally log or handle unknown message types
142 }
143
144 // Check if termination requested and inbox is now empty and closed
145 if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
146 trace!("Inbox empty and closed after terminate request, initiating termination for agent: {}", self.id());
147 self.terminate().await; // Initiate graceful shutdown of children etc.
148 break; // Exit the loop
149 }
150 }
151 trace!("Message loop finished for agent: {}", self.id());
152 (self.after_stop)(self).await; // Execute after_stop hook
153 trace!("Agent {} stopped.", self.id());
154 }
155
156 #[instrument(skip(self))]
157 async fn terminate(&mut self) {
158 trace!("Terminating children for agent: {}", self.id());
159 // Stop all child agents concurrently.
160 let stop_futures: Vec<_> = self.handle.children().iter().map(|item| {
161 let child_handle = item.value().clone();
162 async move {
163 trace!("Sending stop signal to child: {}", child_handle.id());
164 let _ = child_handle.stop().await; // Ignore result, best effort stop
165 trace!("Stop signal sent to child: {}", child_handle.id());
166 }
167 }).collect();
168
169 join_all(stop_futures).await; // Wait for all stop signals to be sent/processed.
170
171 trace!("All children stopped for agent: {}. Closing own inbox.", self.id());
172 // Ensure inbox is closed (might be redundant if closed in wake loop, but safe).
173 self.inbox.close();
174 }
175}