acton_core/message/
outbound_envelope.rs1use std::cmp::PartialEq;
18use std::sync::Arc;
19
20use tokio::runtime::Runtime;
21use tracing::{error, instrument, trace};
22
23use crate::common::{Envelope, MessageError};
24use crate::message::message_address::MessageAddress;
25use crate::traits::ActonMessage;
26
27#[derive(Clone, Debug, Default)]
29pub struct OutboundEnvelope {
30 pub(crate) return_address: MessageAddress,
31 pub(crate) recipient_address: Option<MessageAddress>,
32}
33
34impl PartialEq for MessageAddress {
35 fn eq(&self, other: &Self) -> bool {
36 self.sender == other.sender
37 }
38}
39
40impl PartialEq for OutboundEnvelope {
42 fn eq(&self, other: &Self) -> bool {
43 self.return_address == other.return_address
44 }
45}
46
47impl Eq for OutboundEnvelope {}
49
50impl std::hash::Hash for OutboundEnvelope {
52 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
53 self.return_address.sender.hash(state);
54 }
55}
56
57impl OutboundEnvelope {
58 #[instrument(skip(return_address))]
67 pub fn new(return_address: MessageAddress) -> Self {
68 OutboundEnvelope { return_address, recipient_address: None }
69 }
70
71 pub fn reply_to(&self) -> MessageAddress {
73 self.return_address.clone()
74 }
75
76 pub fn recipient(&self) -> &Option<MessageAddress> {
78 &self.recipient_address
79 }
80
81 #[instrument(skip(return_address))]
82 pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
83 OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
84 }
85
86
87 #[instrument(skip(self))]
96 pub fn reply(
97 &self,
98 message: impl ActonMessage + 'static,
99 ) -> Result<(), MessageError> {
100 let envelope = self.clone();
101 trace!("*");
102 tokio::task::spawn_blocking(move || {
106 tracing::trace!(msg = ?message, "Replying to message.");
107 let rt = Runtime::new().unwrap();
108 rt.block_on(async move {
109 envelope.send(message).await;
110 });
111 });
112 Ok(())
113 }
114
115 #[instrument(skip(self), level = "debug")]
124 async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
125 let recipient_channel = {
126 if let Some(recipient_address) = &self.recipient_address {
127 recipient_address.clone()
128 } else {
129 self.return_address.clone()
130 }
131 };
132 let recipient_id = &recipient_channel.sender.root.to_string();
133 let address = &recipient_channel.address;
134
135 if !&address.is_closed() {
136 match recipient_channel.clone().address.reserve().await {
138 Ok(permit) => {
139 trace!(
140 "...to {} with message: ",
141 recipient_id
142 );
143 let envelope = Envelope::new(message, self.return_address.clone(), recipient_channel);
144 permit.send(envelope);
145 }
146 Err(e) => {
147 error!(
148 "{}::{}",
149 &self.return_address.name(), e.to_string()
150 )
151 }
152 }
153 } else {
154 error!(
155 "recipient channel closed: {}",
156 self.return_address.name()
157);
158 }
159 }
160
161 #[instrument(skip(self), level = "trace")]
170 pub async fn send(&self, message: impl ActonMessage + 'static) {
171 self.send_message_inner(Arc::new(message)).await;
172 }
173}