acton_core/message/
outbound_envelope.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::fmt::Debug; // Import Debug
19use std::hash::{Hash, Hasher}; // Import Hash and Hasher
20use std::sync::Arc;
21
22use tokio::runtime::Runtime; // Used in reply
23use tracing::{error, instrument, trace};
24
25use crate::common::{Envelope, MessageError};
26use crate::message::message_address::MessageAddress;
27use crate::traits::ActonMessage;
28
29/// Represents a message prepared for sending, including sender and optional recipient addresses.
30///
31/// An `OutboundEnvelope` is typically created by an agent (using methods like
32/// [`AgentHandle::create_envelope`](crate::common::AgentHandle::create_envelope))
33/// before sending a message. It holds the [`MessageAddress`] of the sender (`return_address`)
34/// and optionally the [`MessageAddress`] of the recipient (`recipient_address`).
35///
36/// The primary methods for dispatching the message are [`OutboundEnvelope::send`] (asynchronous)
37/// and [`OutboundEnvelope::reply`] (synchronous wrapper).
38///
39/// Equality and hashing are based solely on the `return_address`.
40#[derive(Clone, Debug)]
41pub struct OutboundEnvelope {
42    /// The address of the agent sending the message.
43    pub(crate) return_address: MessageAddress,
44    /// The address of the intended recipient agent, if specified directly.
45    /// If `None`, the recipient might be implied (e.g., sending back to `return_address`).
46    pub(crate) recipient_address: Option<MessageAddress>,
47    /// The cancellation token for the sending agent.
48    pub(crate) cancellation_token: tokio_util::sync::CancellationToken,
49}
50
51// Note: The PartialEq impl for MessageAddress is defined here, but ideally should be
52// in message_address.rs if it's generally applicable. Assuming it's needed here for now.
53/// Implements equality comparison for `MessageAddress` based on the sender's `Ern`.
54impl PartialEq for MessageAddress {
55    fn eq(&self, other: &Self) -> bool {
56        self.sender == other.sender // Compare based on Ern
57    }
58}
59
60/// Implements equality comparison for `OutboundEnvelope` based on the `return_address`.
61impl PartialEq for OutboundEnvelope {
62    fn eq(&self, other: &Self) -> bool {
63        self.return_address == other.return_address
64    }
65}
66
67/// Derives `Eq` based on the `PartialEq` implementation.
68impl Eq for OutboundEnvelope {}
69
70/// Implements hashing for `OutboundEnvelope` based on the `return_address`.
71impl Hash for OutboundEnvelope {
72    fn hash<H: Hasher>(&self, state: &mut H) {
73        // Hash only based on the return address's sender Ern, consistent with PartialEq.
74        self.return_address.sender.hash(state);
75    }
76}
77
78impl OutboundEnvelope {
79    /// Creates a new `OutboundEnvelope` with only a return address specified.
80    ///
81    /// The recipient address is initially set to `None`. Use [`OutboundEnvelope::send`]
82    /// or [`OutboundEnvelope::reply`] to send the message, typically back to the
83    /// `return_address` if no recipient is set later (though `send_message_inner` logic defaults to `return_address` if `recipient_address` is `None`).
84    ///
85    /// # Arguments
86    ///
87    /// * `return_address`: The [`MessageAddress`] of the agent creating this envelope (the sender).
88    ///
89    /// # Returns
90    ///
91    /// A new `OutboundEnvelope` instance.
92    #[instrument(skip(return_address))]
93    pub fn new(
94        return_address: MessageAddress,
95        cancellation_token: tokio_util::sync::CancellationToken,
96    ) -> Self {
97        trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
98        OutboundEnvelope {
99            return_address,
100            recipient_address: None,
101            cancellation_token,
102        }
103    }
104
105    /// Returns a clone of the sender's [`MessageAddress`].
106    #[inline]
107    pub fn reply_to(&self) -> MessageAddress {
108        self.return_address.clone()
109    }
110
111    /// Returns a reference to the optional recipient's [`MessageAddress`].
112    #[inline]
113    pub fn recipient(&self) -> &Option<MessageAddress> {
114        &self.recipient_address
115    }
116
117    /// Crate-internal constructor: Creates a new `OutboundEnvelope` with specified sender and recipient.
118    #[instrument(skip(return_address, recipient_address))]
119    pub(crate) fn new_with_recipient(
120        return_address: MessageAddress,
121        recipient_address: MessageAddress,
122        cancellation_token: tokio_util::sync::CancellationToken,
123    ) -> Self {
124        trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
125        OutboundEnvelope {
126            return_address,
127            recipient_address: Some(recipient_address),
128            cancellation_token,
129        }
130    }
131
132    /// Sends a message using this envelope, blocking the current thread until sent.
133    ///
134    /// **Warning:** This method spawns a blocking Tokio task and creates a new Tokio runtime
135    /// internally to execute the asynchronous `send` operation. This is generally **discouraged**
136    /// within an existing asynchronous context as it can lead to performance issues or deadlocks.
137    /// Prefer using the asynchronous [`OutboundEnvelope::send`] method whenever possible.
138    ///
139    /// This method is primarily intended for scenarios where an asynchronous context is not readily
140    /// available, but its use should be carefully considered.
141    ///
142    /// # Arguments
143    ///
144    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
145    ///
146    /// # Returns
147    ///
148    /// * `Ok(())`: If the message was successfully scheduled to be sent (actual delivery depends on the recipient).
149    /// * `Err(MessageError)`: Currently, this implementation always returns `Ok(())`, but the signature
150    ///   allows for future error handling. Potential errors (like closed channels) are logged internally.
151    #[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
152    pub fn reply(&self, message: impl ActonMessage + 'static) -> Result<(), MessageError> {
153        // Consider changing return type if errors aren't propagated.
154        let envelope = self.clone();
155        let message_arc = Arc::new(message); // Arc the message once
156
157        // Spawn a blocking task to handle the async send without blocking the caller's async runtime (if any).
158        // Note: Creating a new Runtime per call is inefficient.
159        tokio::task::spawn_blocking(move || {
160            trace!(sender = %envelope.return_address.sender, recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()), "Replying synchronously (blocking task)");
161            // Consider using Handle::current().block_on if already in a runtime context,
162            // but creating a new one avoids potential deadlocks if called from non-Tokio thread.
163            let rt = Runtime::new().unwrap();
164            rt.block_on(async move {
165                // Use the internal async send logic.
166                envelope.send_message_inner(message_arc).await;
167            });
168        });
169        Ok(()) // Currently doesn't propagate errors from send_message_inner
170    }
171
172    /// Crate-internal: Asynchronously sends the message payload to the recipient.
173    /// Handles channel reservation and error logging.
174    #[instrument(skip(self, message), level = "debug", fields(message_type = ?message.type_id()))]
175    async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
176        // Determine the target address: recipient if Some, otherwise return_address.
177        let target_address = self
178            .recipient_address
179            .as_ref()
180            .unwrap_or(&self.return_address);
181        let target_id = &target_address.sender;
182        let channel_sender = target_address.address.clone(); // Keep the owned clone
183
184        trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting to send message");
185
186        if !channel_sender.is_closed() {
187            // Cancellation-aware reservation of a send permit
188            let cancellation = self.cancellation_token.clone();
189            tokio::select! {
190                _ = cancellation.cancelled() => {
191                    error!(sender = %self.return_address.sender, recipient = %target_id, "Send aborted: cancellation_token triggered");
192                    return;
193                }
194                permit_result = channel_sender.reserve() => {
195                    match permit_result {
196                        Ok(permit) => {
197                            // Create the internal Envelope to put on the channel.
198                            let internal_envelope = Envelope::new(
199                                message, // Pass the Arc'd message
200                                self.return_address.clone(),
201                                target_address.clone(),
202                            );
203                            trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via permit");
204                            permit.send(internal_envelope); // Send using the permit
205                        }
206                        Err(e) => {
207                            // Error reserving capacity (likely channel closed).
208                            error!(sender = %self.return_address.sender, recipient = %target_id, error = %e, "Failed to reserve channel capacity");
209                        }
210                    }
211                }
212            }
213        } else {
214            // Channel was already closed.
215            error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
216        }
217    }
218
219    /// Sends a message asynchronously using this envelope.
220    ///
221    /// This method takes the message payload, wraps it in an `Arc`, and calls the
222    /// internal `send_message_inner` to dispatch it to the recipient's channel.
223    /// The recipient is determined by `recipient_address` if `Some`, otherwise it
224    /// defaults to `return_address`.
225    ///
226    /// This is the preferred method for sending messages from within an asynchronous context.
227    ///
228    /// # Arguments
229    ///
230    /// * `message`: The message payload to send. Must implement [`ActonMessage`] and be `'static`.
231    #[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
232    pub async fn send(&self, message: impl ActonMessage + 'static) {
233        // Arc the message and call the internal async sender.
234        self.send_message_inner(Arc::new(message)).await;
235    }
236}