use std::{
fmt,
fmt::{Error, Formatter},
};
use bytes::Bytes;
use tokio::sync::oneshot;
use crate::{message::MessageTag, peer_manager::NodeId, protocol::messaging::SendFailReason};
pub type MessagingReplyResult = Result<(), SendFailReason>;
pub type MessagingReplyRx = oneshot::Receiver<MessagingReplyResult>;
#[derive(Debug)]
pub struct OutboundMessage {
pub tag: MessageTag,
pub peer_node_id: NodeId,
pub body: Bytes,
pub reply: MessagingReplyTx,
}
impl OutboundMessage {
pub fn new(peer_node_id: NodeId, body: Bytes) -> Self {
Self {
tag: MessageTag::new(),
peer_node_id,
body,
reply: MessagingReplyTx::none(),
}
}
pub fn with_reply(peer_node_id: NodeId, body: Bytes, reply: MessagingReplyTx) -> Self {
Self {
tag: MessageTag::new(),
peer_node_id,
body,
reply,
}
}
#[inline]
pub fn reply_success(&mut self) {
self.reply.reply_success();
}
pub fn reply_fail(&mut self, reason: SendFailReason) {
self.reply.reply_fail(reason);
}
pub fn take_reply(&mut self) -> Option<MessagingReplyTx> {
self.reply.take()
}
}
impl fmt::Display for OutboundMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
write!(
f,
"OutboundMessage (tag: {}, {} bytes) for peer '{}'",
self.tag,
self.body.len(),
self.peer_node_id.short_str()
)
}
}
#[derive(Debug)]
pub struct MessagingReplyTx(Option<oneshot::Sender<MessagingReplyResult>>);
impl MessagingReplyTx {
pub fn into_inner(mut self) -> Option<oneshot::Sender<MessagingReplyResult>> {
self.0.take()
}
pub fn none() -> Self {
Self(None)
}
pub fn reply_success(&mut self) {
if let Some(reply_tx) = self.0.take() {
let _ = reply_tx.send(Ok(()));
}
}
pub fn reply_fail(&mut self, reason: SendFailReason) {
if let Some(reply_tx) = self.0.take() {
let _ = reply_tx.send(Err(reason));
}
}
pub fn take(&mut self) -> Option<Self> {
self.0.take().map(Into::into)
}
}
impl From<oneshot::Sender<MessagingReplyResult>> for MessagingReplyTx {
fn from(inner: oneshot::Sender<MessagingReplyResult>) -> Self {
Self(Some(inner))
}
}
impl From<Option<oneshot::Sender<MessagingReplyResult>>> for MessagingReplyTx {
fn from(inner: Option<oneshot::Sender<MessagingReplyResult>>) -> Self {
Self(inner)
}
}
impl Drop for MessagingReplyTx {
fn drop(&mut self) {
if let Some(reply_tx) = self.0.take() {
let _ = reply_tx.send(Err(SendFailReason::Dropped));
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn with_tag() {
static TEST_MSG: Bytes = Bytes::from_static(b"The ghost brigades");
let node_id = NodeId::new();
let tag = MessageTag::new();
let subject = OutboundMessage {
tag,
peer_node_id: node_id.clone(),
reply: MessagingReplyTx::none(),
body: TEST_MSG.clone(),
};
assert_eq!(tag, subject.tag);
assert_eq!(subject.body, TEST_MSG);
assert_eq!(subject.peer_node_id, node_id);
}
}