use std::cmp::PartialEq;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tracing::{error, instrument, trace};
use crate::common::{Envelope, MessageError};
use crate::message::message_address::MessageAddress;
use crate::traits::ActonMessage;
#[derive(Clone, Debug, Default)]
pub struct OutboundEnvelope {
pub(crate) return_address: MessageAddress,
pub(crate) recipient_address: Option<MessageAddress>,
}
impl PartialEq for MessageAddress {
fn eq(&self, other: &Self) -> bool {
self.sender == other.sender
}
}
impl PartialEq for OutboundEnvelope {
fn eq(&self, other: &Self) -> bool {
self.return_address == other.return_address
}
}
impl Eq for OutboundEnvelope {}
impl std::hash::Hash for OutboundEnvelope {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.return_address.sender.hash(state);
}
}
impl OutboundEnvelope {
#[instrument(skip(return_address))]
pub fn new(return_address: MessageAddress) -> Self {
OutboundEnvelope { return_address, recipient_address: None }
}
pub fn reply_to(&self) -> MessageAddress {
self.return_address.clone()
}
pub fn recipient(&self) -> &Option<MessageAddress> {
&self.recipient_address
}
#[instrument(skip(return_address))]
pub(crate) fn new_with_recipient(return_address: MessageAddress, recipient_address: MessageAddress) -> Self {
OutboundEnvelope { return_address, recipient_address: Some(recipient_address) }
}
#[instrument(skip(self))]
pub fn reply(
&self,
message: impl ActonMessage + 'static,
) -> Result<(), MessageError> {
let envelope = self.clone();
trace!("*");
tokio::task::spawn_blocking(move || {
tracing::trace!(msg = ?message, "Replying to message.");
let rt = Runtime::new().unwrap();
rt.block_on(async move {
envelope.send(message).await;
});
});
Ok(())
}
#[instrument(skip(self), level = "debug")]
async fn send_message_inner(&self, message: Arc<dyn ActonMessage + Send + Sync>) {
let recipient_channel = {
if let Some(recipient_address) = &self.recipient_address {
recipient_address.clone()
} else {
self.return_address.clone()
}
};
let recipient_id = &recipient_channel.sender.root.to_string();
let address = &recipient_channel.address;
if !&address.is_closed() {
match recipient_channel.clone().address.reserve().await {
Ok(permit) => {
trace!(
"...to {} with message: ",
recipient_id
);
let envelope = Envelope::new(message, self.return_address.clone(), recipient_channel);
permit.send(envelope);
}
Err(e) => {
error!(
"{}::{}",
&self.return_address.name(), e.to_string()
)
}
}
} else {
error!(
"recipient channel closed: {}",
self.return_address.name()
);
}
}
#[instrument(skip(self), level = "trace")]
pub async fn send(&self, message: impl ActonMessage + 'static) {
self.send_message_inner(Arc::new(message)).await;
}
}