acton_core/message/
outbound_envelope.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::cmp::PartialEq;
18use std::sync::Arc;
19
20use tokio::runtime::Runtime;
21use tracing::{error, instrument, trace};
22
23use crate::common::{Envelope, MessageError};
24use crate::message::message_address::MessageAddress;
25use crate::traits::ActonMessage;
26
27/// Represents an outbound envelope for sending messages in the actor system.
28#[derive(Clone, Debug, Default)]
29pub struct OutboundEnvelope {
30    pub(crate) return_address: MessageAddress,
31    pub(crate) recipient_address: Option<MessageAddress>,
32}
33
34impl PartialEq for MessageAddress {
35    fn eq(&self, other: &Self) -> bool {
36        self.sender == other.sender
37    }
38}
39
40// Manually implement PartialEq for OutboundEnvelope
41impl PartialEq for OutboundEnvelope {
42    fn eq(&self, other: &Self) -> bool {
43        self.return_address == other.return_address
44    }
45}
46
47// Implement Eq for OutboundEnvelope as it is required when implementing PartialEq
48impl Eq for OutboundEnvelope {}
49
50// Implement Hash for OutboundEnvelope as it is required for HashSet
51impl std::hash::Hash for OutboundEnvelope {
52    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
53        self.return_address.sender.hash(state);
54    }
55}
56
57impl OutboundEnvelope {
58    /// Creates a new outbound envelope.
59    ///
60    /// # Parameters
61    /// - `reply_to`: The optional channel for sending replies.
62    /// - `sender`: The sender's ARN.
63    ///
64    /// # Returns
65    /// A new `OutboundEnvelope` instance.
66    #[instrument(skip(return_address))]
67    pub fn new(return_address: MessageAddress) -> Self {
68        OutboundEnvelope { return_address, recipient_address: None }
69    }
70
71    /// Gets the return address for the outbound envelope.
72    pub fn reply_to(&self) -> MessageAddress {
73        self.return_address.clone()
74    }
75
76    /// Gets the recipient address for the outbound envelope.
77    pub fn recipient(&self) -> &Option<MessageAddress> {
78        &self.recipient_address
79    }
80
81    #[instrument(skip(return_address))]
82    pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
83        OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
84    }
85
86
87    /// Sends a reply message synchronously.
88    ///
89    /// # Parameters
90    /// - `message`: The message to be sent.
91    /// - `pool_id`: An optional pool ID.
92    ///
93    /// # Returns
94    /// A result indicating success or failure.
95    #[instrument(skip(self))]
96    pub fn reply(
97        &self,
98        message: impl ActonMessage + 'static,
99    ) -> Result<(), MessageError> {
100        let envelope = self.clone();
101        trace!("*");
102        // Event: Replying to Message
103        // Description: Replying to a message with an optional pool ID.
104        // Context: Message details and pool ID.
105        tokio::task::spawn_blocking(move || {
106            tracing::trace!(msg = ?message, "Replying to message.");
107            let rt = Runtime::new().unwrap();
108            rt.block_on(async move {
109                envelope.send(message).await;
110            });
111        });
112        Ok(())
113    }
114
115    /// Sends a reply message asynchronously.
116    ///
117    /// # Parameters
118    /// - `message`: The message to be sent.
119    /// - `pool_id`: An optional pool ID.
120    ///
121    /// # Returns
122    /// A result indicating success or failure.
123    #[instrument(skip(self), level = "debug")]
124    async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
125        let recipient_channel = {
126            if let Some(recipient_address) = &self.recipient_address {
127                recipient_address.clone()
128            } else {
129                self.return_address.clone()
130            }
131        };
132        let recipient_id = &recipient_channel.sender.root.to_string();
133        let address = &recipient_channel.address;
134
135        if !&address.is_closed() {
136            // Reserve capacity
137            match recipient_channel.clone().address.reserve().await {
138                Ok(permit) => {
139                    trace!(
140                        "...to {} with message: ",
141                        recipient_id
142                    );
143                    let envelope = Envelope::new(message, self.return_address.clone(), recipient_channel);
144                    permit.send(envelope);
145                }
146                Err(e) => {
147                    error!(
148                        "{}::{}",
149                        &self.return_address.name(), e.to_string()
150                    )
151                }
152            }
153        } else {
154            error!(
155    "recipient channel closed: {}",
156    self.return_address.name()
157);
158        }
159    }
160
161    /// Sends a reply message asynchronously.
162    ///
163    /// # Parameters
164    /// - `message`: The message to be sent.
165    /// - `pool_id`: An optional pool ID.
166    ///
167    /// # Returns
168    /// A result indicating success or failure.
169    #[instrument(skip(self), level = "trace")]
170    pub async fn send(&self, message: impl ActonMessage + 'static) {
171        self.send_message_inner(Arc::new(message)).await;
172    }
173}