acton_core/message/outbound_envelope.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::Arc;
21
22use tokio::runtime::Runtime; // Used in reply
23use tracing::{error, instrument, trace};
24
25use crate::common::{Envelope, MessageError};
26use crate::message::message_address::MessageAddress;
27use crate::traits::ActonMessage;
28
29/// Represents a message prepared for sending, including sender and optional recipient addresses.
30///
31/// An `OutboundEnvelope` is typically created by an agent (using methods like
32/// [`AgentHandle::create_envelope`](crate::common::AgentHandle::create_envelope))
33/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
34/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
35///
36/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
37/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
38///
39/// Equality and hashing are based solely on the `return_address`.
40#[derive(Clone, Debug)]
41pub struct OutboundEnvelope {
42 /// The address of the agent sending the message.
43 pub(crate) return_address: MessageAddress,
44 /// The address of the intended recipient agent, if specified directly.
45 /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
46 pub(crate) recipient_address: Option<MessageAddress>,
47 /// The cancellation token for the sending agent.
48 pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
49}
50
51// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
52// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
53/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
54impl PartialEq for MessageAddress {
55 fn eq(&self, other: &Self) -> bool {
56 self.sender == other.sender // Compare based on Ern
57 }
58}
59
60/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
61impl PartialEq for OutboundEnvelope {
62 fn eq(&self, other: &Self) -> bool {
63 self.return_address == other.return_address
64 }
65}
66
67/// Derives `Eq` based on the `PartialEq` implementation.
68impl Eq for OutboundEnvelope {}
69
70/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
71impl Hash for OutboundEnvelope {
72 fn hash<H: Hasher>(&self, state: &mut H) {
73 // Hash only based on the return address's sender Ern, consistent with PartialEq.
74 self.return_address.sender.hash(state);
75 }
76}
77
78impl OutboundEnvelope {
79 /// Creates a new `OutboundEnvelope` with only a return address specified.
80 ///
81 /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
82 /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
83 /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
84 ///
85 /// # Arguments
86 ///
87 /// * `return_address`: The [`MessageAddress`] of the agent creating this envelope (the sender).
88 ///
89 /// # Returns
90 ///
91 /// A new `OutboundEnvelope` instance.
92 #[instrument(skip(return_address))]
93 pub fn new(
94 return_address: MessageAddress,
95 cancellation_token: tokio_util::sync::CancellationToken,
96 ) -> Self {
97 trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
98 OutboundEnvelope {
99 return_address,
100 recipient_address: None,
101 cancellation_token,
102 }
103 }
104
105 /// Returns a clone of the sender's [`MessageAddress`].
106 #[inline]
107 pub fn reply_to(&self) -> MessageAddress {
108 self.return_address.clone()
109 }
110
111 /// Returns a reference to the optional recipient's [`MessageAddress`].
112 #[inline]
113 pub fn recipient(&self) -> &Option<MessageAddress> {
114 &self.recipient_address
115 }
116
117 /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
118 #[instrument(skip(return_address, recipient_address))]
119 pub(crate) fn new_with_recipient(
120 return_address: MessageAddress,
121 recipient_address: MessageAddress,
122 cancellation_token: tokio_util::sync::CancellationToken,
123 ) -> Self {
124 trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
125 OutboundEnvelope {
126 return_address,
127 recipient_address: Some(recipient_address),
128 cancellation_token,
129 }
130 }
131
132 /// Sends a message using this envelope, blocking the current thread until sent.
133 ///
134 /// **Warning:** This method spawns a blocking Tokio task and creates a new Tokio runtime
135 /// internally to execute the asynchronous `send` operation. This is generally **discouraged**
136 /// within an existing asynchronous context as it can lead to performance issues or deadlocks.
137 /// Prefer using the asynchronous [`OutboundEnvelope::send`] method whenever possible.
138 ///
139 /// This method is primarily intended for scenarios where an asynchronous context is not readily
140 /// available, but its use should be carefully considered.
141 ///
142 /// # Arguments
143 ///
144 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
145 ///
146 /// # Returns
147 ///
148 /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
149 /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
150 /// allows for future error handling. Potential errors (like closed channels) are logged internally.
151 #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
152 pub fn reply(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
153 // Consider changing return type if errors aren't propagated.
154 let envelope = self.clone();
155 let message_arc = Arc::new(message); // Arc the message once
156
157 // Spawn a blocking task to handle the async send without blocking the caller's async runtime (if any).
158 // Note: Creating a new Runtime per call is inefficient.
159 tokio::task::spawn_blocking(move || {
160 trace!(sender = %envelope.return_address.sender, recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()), "Replying synchronously (blocking task)");
161 // Consider using Handle::current().block_on if already in a runtime context,
162 // but creating a new one avoids potential deadlocks if called from non-Tokio thread.
163 let rt = Runtime::new().unwrap();
164 rt.block_on(async move {
165 // Use the internal async send logic.
166 envelope.send_message_inner(message_arc).await;
167 });
168 });
169 Ok(()) // Currently doesn't propagate errors from send_message_inner
170 }
171
172 /// Crate-internal: Asynchronously sends the message payload to the recipient.
173 /// Handles channel reservation and error logging.
174 #[instrument(skip(self, message), level = "debug", fields(message_type = ?message.type_id()))]
175 async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
176 // Determine the target address: recipient if Some, otherwise return_address.
177 let target_address = self
178 .recipient_address
179 .as_ref()
180 .unwrap_or(&self.return_address);
181 let target_id = &target_address.sender;
182 let channel_sender = target_address.address.clone(); // Keep the owned clone
183
184 trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting to send message");
185
186 if !channel_sender.is_closed() {
187 // Cancellation-aware reservation of a send permit
188 let cancellation = self.cancellation_token.clone();
189 tokio::select! {
190 _ = cancellation.cancelled() => {
191 error!(sender = %self.return_address.sender, recipient = %target_id, "Send aborted: cancellation_token triggered");
192 return;
193 }
194 permit_result = channel_sender.reserve() => {
195 match permit_result {
196 Ok(permit) => {
197 // Create the internal Envelope to put on the channel.
198 let internal_envelope = Envelope::new(
199 message, // Pass the Arc'd message
200 self.return_address.clone(),
201 target_address.clone(),
202 );
203 trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via permit");
204 permit.send(internal_envelope); // Send using the permit
205 }
206 Err(e) => {
207 // Error reserving capacity (likely channel closed).
208 error!(sender = %self.return_address.sender, recipient = %target_id, error = %e, "Failed to reserve channel capacity");
209 }
210 }
211 }
212 }
213 } else {
214 // Channel was already closed.
215 error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
216 }
217 }
218
219 /// Sends a message asynchronously using this envelope.
220 ///
221 /// This method takes the message payload, wraps it in an `Arc`, and calls the
222 /// internal `send_message_inner` to dispatch it to the recipient's channel.
223 /// The recipient is determined by `recipient_address` if `Some`, otherwise it
224 /// defaults to `return_address`.
225 ///
226 /// This is the preferred method for sending messages from within an asynchronous context.
227 ///
228 /// # Arguments
229 ///
230 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
231 #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
232 pub async fn send(&self, message: impl ActonMessage + 'static) {
233 // Arc the message and call the internal async sender.
234 self.send_message_inner(Arc::new(message)).await;
235 }
236}