acton_core/message/outbound_envelope.rs
1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 * * Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 * * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::Arc;
21
22use tokio::runtime::Runtime; // Used in reply
23use tracing::{error, instrument, trace};
24
25use crate::common::{Envelope, MessageError};
26use crate::message::message_address::MessageAddress;
27use crate::traits::ActonMessage;
28
29/// Represents a message prepared for sending, including sender and optional recipient addresses.
30///
31/// An `OutboundEnvelope` is typically created by an agent (using methods like
32/// [`AgentHandle::create_envelope`](crate::common::AgentHandle::create_envelope))
33/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
34/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
35///
36/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
37/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
38///
39/// Equality and hashing are based solely on the `return_address`.
40#[derive(Clone, Debug, Default)]
41pub struct OutboundEnvelope {
42 /// The address of the agent sending the message.
43 pub(crate) return_address: MessageAddress,
44 /// The address of the intended recipient agent, if specified directly.
45 /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
46 pub(crate) recipient_address: Option<MessageAddress>,
47}
48
49// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
50// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
51/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
52impl PartialEq for MessageAddress {
53 fn eq(&self, other: &Self) -> bool {
54 self.sender == other.sender // Compare based on Ern
55 }
56}
57
58/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
59impl PartialEq for OutboundEnvelope {
60 fn eq(&self, other: &Self) -> bool {
61 self.return_address == other.return_address
62 }
63}
64
65/// Derives `Eq` based on the `PartialEq` implementation.
66impl Eq for OutboundEnvelope {}
67
68/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
69impl Hash for OutboundEnvelope {
70 fn hash<H: Hasher>(&self, state: &mut H) {
71 // Hash only based on the return address's sender Ern, consistent with PartialEq.
72 self.return_address.sender.hash(state);
73 }
74}
75
76
77impl OutboundEnvelope {
78 /// Creates a new `OutboundEnvelope` with only a return address specified.
79 ///
80 /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
81 /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
82 /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
83 ///
84 /// # Arguments
85 ///
86 /// * `return_address`: The [`MessageAddress`] of the agent creating this envelope (the sender).
87 ///
88 /// # Returns
89 ///
90 /// A new `OutboundEnvelope` instance.
91 #[instrument(skip(return_address))]
92 pub fn new(return_address: MessageAddress) -> Self {
93 trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
94 OutboundEnvelope { return_address, recipient_address: None }
95 }
96
97 /// Returns a clone of the sender's [`MessageAddress`].
98 #[inline]
99 pub fn reply_to(&self) -> MessageAddress {
100 self.return_address.clone()
101 }
102
103 /// Returns a reference to the optional recipient's [`MessageAddress`].
104 #[inline]
105 pub fn recipient(&self) -> &Option<MessageAddress> {
106 &self.recipient_address
107 }
108
109 /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
110 #[instrument(skip(return_address, recipient_address))]
111 pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
112 trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
113 OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
114 }
115
116
117 /// Sends a message using this envelope, blocking the current thread until sent.
118 ///
119 /// **Warning:** This method spawns a blocking Tokio task and creates a new Tokio runtime
120 /// internally to execute the asynchronous `send` operation. This is generally **discouraged**
121 /// within an existing asynchronous context as it can lead to performance issues or deadlocks.
122 /// Prefer using the asynchronous [`OutboundEnvelope::send`] method whenever possible.
123 ///
124 /// This method is primarily intended for scenarios where an asynchronous context is not readily
125 /// available, but its use should be carefully considered.
126 ///
127 /// # Arguments
128 ///
129 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
130 ///
131 /// # Returns
132 ///
133 /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
134 /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
135 /// allows for future error handling. Potential errors (like closed channels) are logged internally.
136 #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
137 pub fn reply(
138 &self,
139 message: impl ActonMessage + 'static,
140 ) -> Result<(), MessageError> { // Consider changing return type if errors aren't propagated.
141 let envelope = self.clone();
142 let message_arc = Arc::new(message); // Arc the message once
143
144 // Spawn a blocking task to handle the async send without blocking the caller's async runtime (if any).
145 // Note: Creating a new Runtime per call is inefficient.
146 tokio::task::spawn_blocking(move || {
147 trace!(sender = %envelope.return_address.sender, recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()), "Replying synchronously (blocking task)");
148 // Consider using Handle::current().block_on if already in a runtime context,
149 // but creating a new one avoids potential deadlocks if called from non-Tokio thread.
150 let rt = Runtime::new().unwrap();
151 rt.block_on(async move {
152 // Use the internal async send logic.
153 envelope.send_message_inner(message_arc).await;
154 });
155 });
156 Ok(()) // Currently doesn't propagate errors from send_message_inner
157 }
158
159 /// Crate-internal: Asynchronously sends the message payload to the recipient.
160 /// Handles channel reservation and error logging.
161 #[instrument(skip(self, message), level = "debug", fields(message_type = ?message.type_id()))]
162 async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
163 // Determine the target address: recipient if Some, otherwise return_address.
164 let target_address = self.recipient_address.as_ref().unwrap_or(&self.return_address);
165 let target_id = &target_address.sender;
166 let channel_sender = target_address.address.clone(); // Keep the owned clone
167
168 trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting to send message");
169
170 if !channel_sender.is_closed() { // Check the cloned sender
171 let permit_result = channel_sender.reserve().await; // Await and bind the result first
172 match permit_result { // Match on the bound result
173 Ok(permit) => {
174 // Create the internal Envelope to put on the channel.
175 let internal_envelope = Envelope::new(
176 message, // Pass the Arc'd message
177 self.return_address.clone(),
178 target_address.clone(),
179 );
180 trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via permit");
181 permit.send(internal_envelope); // Send using the permit
182 }
183 Err(e) => {
184 // Error reserving capacity (likely channel closed).
185 error!(sender = %self.return_address.sender, recipient = %target_id, error = %e, "Failed to reserve channel capacity");
186 }
187 }
188 } else {
189 // Channel was already closed.
190 error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
191 }
192 }
193
194 /// Sends a message asynchronously using this envelope.
195 ///
196 /// This method takes the message payload, wraps it in an `Arc`, and calls the
197 /// internal `send_message_inner` to dispatch it to the recipient's channel.
198 /// The recipient is determined by `recipient_address` if `Some`, otherwise it
199 /// defaults to `return_address`.
200 ///
201 /// This is the preferred method for sending messages from within an asynchronous context.
202 ///
203 /// # Arguments
204 ///
205 /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
206 #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
207 pub async fn send(&self, message: impl ActonMessage + 'static) {
208 // Arc the message and call the internal async sender.
209 self.send_message_inner(Arc::new(message)).await;
210 }
211}