use std::cmp::PartialEq;
use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc;
use tokio::runtime::Runtime; use tracing::{error, instrument, trace};
use crate::common::{Envelope, MessageError};
use crate::message::message_address::MessageAddress;
use crate::traits::ActonMessage;
#[derive(Clone, Debug, Default)]
pub struct OutboundEnvelope {
pub(crate) return_address: MessageAddress,
pub(crate) recipient_address: Option<MessageAddress>,
}
impl PartialEq for MessageAddress {
fn eq(&self, other: &Self) -> bool {
self.sender == other.sender }
}
impl PartialEq for OutboundEnvelope {
fn eq(&self, other: &Self) -> bool {
self.return_address == other.return_address
}
}
impl Eq for OutboundEnvelope {}
impl Hash for OutboundEnvelope {
fn hash<H: Hasher>(&self, state: &mut H) {
self.return_address.sender.hash(state);
}
}
impl OutboundEnvelope {
#[instrument(skip(return_address))]
pub fn new(return_address: MessageAddress) -> Self {
trace!(sender = %return_address.sender, "Creating new OutboundEnvelope");
OutboundEnvelope { return_address, recipient_address: None }
}
#[inline]
pub fn reply_to(&self) -> MessageAddress {
self.return_address.clone()
}
#[inline]
pub fn recipient(&self) -> &Option<MessageAddress> {
&self.recipient_address
}
#[instrument(skip(return_address, recipient_address))]
pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
trace!(sender = %return_address.sender, recipient = %recipient_address.sender, "Creating new OutboundEnvelope with recipient");
OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
}
#[instrument(skip(self, message), fields(message_type = std::any::type_name_of_val(&message)))]
pub fn reply(
&self,
message: impl ActonMessage + 'static,
) -> Result<(), MessageError> { let envelope = self.clone();
let message_arc = Arc::new(message);
tokio::task::spawn_blocking(move || {
trace!(sender = %envelope.return_address.sender, recipient = ?envelope.recipient_address.as_ref().map(|r| r.sender.to_string()), "Replying synchronously (blocking task)");
let rt = Runtime::new().unwrap();
rt.block_on(async move {
envelope.send_message_inner(message_arc).await;
});
});
Ok(()) }
#[instrument(skip(self, message), level = "debug", fields(message_type = ?message.type_id()))]
async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
let target_address = self.recipient_address.as_ref().unwrap_or(&self.return_address);
let target_id = &target_address.sender;
let channel_sender = target_address.address.clone();
trace!(sender = %self.return_address.sender, recipient = %target_id, "Attempting to send message");
if !channel_sender.is_closed() { let permit_result = channel_sender.reserve().await; match permit_result { Ok(permit) => {
let internal_envelope = Envelope::new(
message, self.return_address.clone(),
target_address.clone(),
);
trace!(sender = %self.return_address.sender, recipient = %target_id, "Sending message via permit");
permit.send(internal_envelope); }
Err(e) => {
error!(sender = %self.return_address.sender, recipient = %target_id, error = %e, "Failed to reserve channel capacity");
}
}
} else {
error!(sender = %self.return_address.sender, recipient = %target_id, "Recipient channel is closed");
}
}
#[instrument(skip(self, message), level = "trace", fields(message_type = std::any::type_name_of_val(&message)))]
pub async fn send(&self, message: impl ActonMessage + 'static) {
self.send_message_inner(Arc::new(message)).await;
}
}