acton_core/message/
outbound_envelope.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::Arc;
21
22use tokio::runtime::Runtime; // Used in reply
23use tracing::{error, instrument, trace};
24
25use crate::common::{Envelope, MessageError};
26use crate::message::message_address::MessageAddress;
27use crate::traits::ActonMessage;
28
29/// Represents a message prepared for sending, including sender and optional recipient addresses.
30///
31/// An `OutboundEnvelope` is typically created by an agent (using methods like
32/// [`AgentHandle::create_envelope`](crate::common::AgentHandle::create_envelope))
33/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
34/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
35///
36/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
37/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
38///
39/// Equality and hashing are based solely on the `return_address`.
40#[derive(Clone, Debug, Default)]
41pub struct OutboundEnvelope {
42    /// The address of the agent sending the message.
43    pub(crate) return_address: MessageAddress,
44    /// The address of the intended recipient agent, if specified directly.
45    /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
46    pub(crate) recipient_address: Option<MessageAddress>,
47}
48
49// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
50// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
51/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
52impl PartialEq for MessageAddress {
53    fn eq(&self, other: &Self) -> bool {
54        self.sender == other.sender // Compare based on Ern
55    }
56}
57
58/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
59impl PartialEq for OutboundEnvelope {
60    fn eq(&self, other: &Self) -> bool {
61        self.return_address == other.return_address
62    }
63}
64
65/// Derives `Eq` based on the `PartialEq` implementation.
66impl Eq for OutboundEnvelope {}
67
68/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
69impl Hash for OutboundEnvelope {
70    fn hash<H: Hasher>(&self, state: &mut H) {
71        // Hash only based on the return address's sender Ern, consistent with PartialEq.
72        self.return_address.sender.hash(state);
73    }
74}
75
76
77impl OutboundEnvelope {
78    /// Creates a new `OutboundEnvelope` with only a return address specified.
79    ///
80    /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
81    /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
82    /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
83    ///
84    /// # Arguments
85    ///
86    /// * `return_address`: The [`MessageAddress`] of the agent creating this envelope (the sender).
87    ///
88    /// # Returns
89    ///
90    /// A new `OutboundEnvelope` instance.
91    #[instrument(skip(return_address))]
92    pub fn new(return_address: MessageAddress) -> Self {
93        trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
94        OutboundEnvelope { return_address, recipient_address: None }
95    }
96
97    /// Returns a clone of the sender's [`MessageAddress`].
98    #[inline]
99    pub fn reply_to(&self) -> MessageAddress {
100        self.return_address.clone()
101    }
102
103    /// Returns a reference to the optional recipient's [`MessageAddress`].
104    #[inline]
105    pub fn recipient(&self) -> &Option<MessageAddress> {
106        &self.recipient_address
107    }
108
109    /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
110    #[instrument(skip(return_address, recipient_address))]
111    pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
112        trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
113        OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
114    }
115
116
117    /// Sends a message using this envelope, blocking the current thread until sent.
118    ///
119    /// **Warning:** This method spawns a blocking Tokio task and creates a new Tokio runtime
120    /// internally to execute the asynchronous `send` operation. This is generally **discouraged**
121    /// within an existing asynchronous context as it can lead to performance issues or deadlocks.
122    /// Prefer using the asynchronous [`OutboundEnvelope::send`] method whenever possible.
123    ///
124    /// This method is primarily intended for scenarios where an asynchronous context is not readily
125    /// available, but its use should be carefully considered.
126    ///
127    /// # Arguments
128    ///
129    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
130    ///
131    /// # Returns
132    ///
133    /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
134    /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
135    ///   allows for future error handling. Potential errors (like closed channels) are logged internally.
136    #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
137    pub fn reply(
138        &self,
139        message: impl ActonMessage + 'static,
140    ) -> Result<(), MessageError> { // Consider changing return type if errors aren't propagated.
141        let envelope = self.clone();
142        let message_arc = Arc::new(message); // Arc the message once
143
144        // Spawn a blocking task to handle the async send without blocking the caller's async runtime (if any).
145        // Note: Creating a new Runtime per call is inefficient.
146        tokio::task::spawn_blocking(move || {
147            trace!(sender = %envelope.return_address.sender, recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()), "Replying synchronously (blocking task)");
148            // Consider using Handle::current().block_on if already in a runtime context,
149            // but creating a new one avoids potential deadlocks if called from non-Tokio thread.
150            let rt = Runtime::new().unwrap();
151            rt.block_on(async move {
152                // Use the internal async send logic.
153                envelope.send_message_inner(message_arc).await;
154            });
155        });
156        Ok(()) // Currently doesn't propagate errors from send_message_inner
157    }
158
159    /// Crate-internal: Asynchronously sends the message payload to the recipient.
160    /// Handles channel reservation and error logging.
161    #[instrument(skip(self, message), level = "debug", fields(message_type = ?message.type_id()))]
162    async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
163        // Determine the target address: recipient if Some, otherwise return_address.
164        let target_address = self.recipient_address.as_ref().unwrap_or(&self.return_address);
165        let target_id = &target_address.sender;
166        let channel_sender = target_address.address.clone(); // Keep the owned clone
167        
168        trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting to send message");
169
170        if !channel_sender.is_closed() { // Check the cloned sender
171            let permit_result = channel_sender.reserve().await; // Await and bind the result first
172            match permit_result { // Match on the bound result
173                Ok(permit) => {
174                    // Create the internal Envelope to put on the channel.
175                    let internal_envelope = Envelope::new(
176                        message, // Pass the Arc'd message
177                        self.return_address.clone(),
178                        target_address.clone(),
179                    );
180                    trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via permit");
181                    permit.send(internal_envelope); // Send using the permit
182                }
183                Err(e) => {
184                    // Error reserving capacity (likely channel closed).
185                    error!(sender = %self.return_address.sender, recipient = %target_id, error = %e, "Failed to reserve channel capacity");
186                }
187            }
188        } else {
189            // Channel was already closed.
190            error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
191        }
192    }
193
194    /// Sends a message asynchronously using this envelope.
195    ///
196    /// This method takes the message payload, wraps it in an `Arc`, and calls the
197    /// internal `send_message_inner` to dispatch it to the recipient's channel.
198    /// The recipient is determined by `recipient_address` if `Some`, otherwise it
199    /// defaults to `return_address`.
200    ///
201    /// This is the preferred method for sending messages from within an asynchronous context.
202    ///
203    /// # Arguments
204    ///
205    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
206    #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
207    pub async fn send(&self, message: impl ActonMessage + 'static) {
208        // Arc the message and call the internal async sender.
209        self.send_message_inner(Arc::new(message)).await;
210    }
211}