use std::time::Instant;
use futures::{SinkExt, StreamExt, future};
use tokio::{pin, sync::mpsc};
use tracing::{Instrument, Level, debug, error, span, trace};
#[cfg(feature = "metrics")]
use super::metrics;
use super::{MessagingEvent, MessagingProtocol, SendFailReason, error::MessagingProtocolError};
use crate::{
connection_manager::{NegotiatedSubstream, PeerConnection},
connectivity::{ConnectivityError, ConnectivityRequester},
message::OutboundMessage,
multiplexing::Substream,
peer_manager::NodeId,
protocol::ProtocolId,
stream_id::StreamId,
};
const LOG_TARGET: &str = "comms::protocol::messaging::outbound";
const MAX_SEND_RETRIES: usize = 1;
pub struct OutboundMessaging {
connectivity: ConnectivityRequester,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
protocol_id: ProtocolId,
}
impl OutboundMessaging {
pub fn new(
connectivity: ConnectivityRequester,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
protocol_id: ProtocolId,
) -> Self {
Self {
connectivity,
messages_rx,
messaging_events_tx,
retry_queue_tx,
peer_node_id,
protocol_id,
}
}
pub async fn run(self) {
let span = span!(
Level::DEBUG,
"comms::messaging::outbound",
node_id = self.peer_node_id.to_string().as_str()
);
#[cfg(feature = "metrics")]
metrics::num_sessions().inc();
async move {
debug!(
target: LOG_TARGET,
"Attempting to dial peer '{}' if required", self.peer_node_id
);
let peer_node_id = self.peer_node_id.clone();
let messaging_events_tx = self.messaging_events_tx.clone();
match self.run_inner().await {
Ok(_) => {
debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because the stream was closed", peer_node_id
);
},
Err(MessagingProtocolError::PeerDialFailed(err)) => {
debug!(
target: LOG_TARGET,
"Outbound messaging protocol was unable to dial peer {}: {}", peer_node_id, err
);
},
Err(MessagingProtocolError::ConnectionClosed(err)) => {
#[cfg(feature = "metrics")]
metrics::error_count().inc();
debug!(
target: LOG_TARGET,
"Connection closed {}: {} {}",
peer_node_id,
err.kind(),
err
);
},
Err(err) => {
#[cfg(feature = "metrics")]
metrics::error_count().inc();
error!(
target: LOG_TARGET,
"Outbound messaging protocol failed for peer {}: {}", peer_node_id, err
);
},
}
#[cfg(feature = "metrics")]
metrics::num_sessions().dec();
let _ignore = messaging_events_tx
.send(MessagingEvent::OutboundProtocolExited(peer_node_id))
.await;
}
.instrument(span)
.await
}
async fn run_inner(mut self) -> Result<(), MessagingProtocolError> {
let mut attempts = 0;
let (conn, substream) = loop {
match self.try_establish().await {
Ok(conn_and_substream) => {
break conn_and_substream;
},
Err(err) => {
if attempts >= MAX_SEND_RETRIES {
debug!(
target: LOG_TARGET,
"Error establishing messaging protocol: {}. Aborting because maximum retries reached.", err
);
self.fail_all_pending_messages(SendFailReason::PeerDialFailed).await;
return Err(err);
}
debug!(
target: LOG_TARGET,
"Error establishing messaging protocol: {}. Retrying...", err
);
attempts += 1;
},
}
};
self.start_forwarding_messages(conn, substream).await?;
Ok(())
}
async fn try_dial_peer(&mut self) -> Result<PeerConnection, MessagingProtocolError> {
loop {
match self.connectivity.dial_peer(self.peer_node_id.clone()).await {
Ok(conn) => break Ok(conn),
Err(ConnectivityError::DialCancelled) => {
debug!(
target: LOG_TARGET,
"Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \
Retrying...",
self.peer_node_id,
);
continue;
},
Err(err) => {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to dial peer '{}' because '{:?}'", self.peer_node_id, err
);
break Err(MessagingProtocolError::PeerDialFailed(err));
},
}
}
}
async fn try_establish(
&mut self,
) -> Result<(PeerConnection, NegotiatedSubstream<Substream>), MessagingProtocolError> {
let span = span!(
Level::DEBUG,
"establish_connection",
node_id = self.peer_node_id.to_string().as_str()
);
async move {
debug!(
target: LOG_TARGET,
"Attempting to establish messaging protocol connection to peer `{}`", self.peer_node_id
);
let start = Instant::now();
let mut conn = self.try_dial_peer().await?;
debug!(
target: LOG_TARGET,
"Connection succeeded for peer `{}` in {:.0?}",
self.peer_node_id,
start.elapsed()
);
let substream = self.try_open_substream(&mut conn).await?;
debug!(
target: LOG_TARGET,
"Substream established for peer `{}`", self.peer_node_id,
);
Ok((conn, substream))
}
.instrument(span)
.await
}
async fn try_open_substream(
&mut self,
conn: &mut PeerConnection,
) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
match conn.open_substream(&self.protocol_id).await {
Ok(substream) => Ok(substream),
Err(err) => {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to open a substream to peer '{}' because '{}'", self.peer_node_id, err
);
Err(err.into())
},
}
}
async fn start_forwarding_messages(
self,
conn: PeerConnection,
substream: NegotiatedSubstream<Substream>,
) -> Result<(), MessagingProtocolError> {
let Self {
mut messages_rx,
peer_node_id,
..
} = self;
let span = span!(
Level::DEBUG,
"start_forwarding_messages",
node_id = peer_node_id.to_string().as_str()
);
let _enter = span.enter();
let stream_id = substream.stream.stream_id();
debug!(
target: LOG_TARGET,
"Starting direct message forwarding for peer `{}` (stream: {})", peer_node_id, stream_id
);
let (sink, mut remote_stream) = MessagingProtocol::framed(substream.stream).split();
let outbound_stream = futures::stream::unfold(&mut messages_rx, |rx| async move {
let v = rx.recv().await;
v.map(|v| (v, rx))
});
#[cfg(feature = "metrics")]
let outbound_count = metrics::outbound_message_count();
let stream = outbound_stream.map(|mut out_msg| {
#[cfg(feature = "metrics")]
outbound_count.inc();
trace!(
target: LOG_TARGET,
"Message for peer '{}' sending {} on stream {}", peer_node_id, out_msg, stream_id
);
out_msg.reply_success();
Result::<_, MessagingProtocolError>::Ok(out_msg.body)
});
let stream = stream.take_until(async move {
let on_disconnect = conn.on_disconnect();
let peer_node_id = conn.peer_node_id().clone();
drop(conn);
let close_detect = remote_stream.next();
pin!(on_disconnect);
pin!(close_detect);
future::select(on_disconnect, close_detect).await;
debug!(
target: LOG_TARGET,
"Outbound messaging stream {} ended for peer {}.", stream_id, peer_node_id
)
});
super::forward::Forward::new(stream, sink.sink_map_err(Into::into)).await?;
messages_rx.close();
let mut retried_messages_count = 0;
while let Some(msg) = messages_rx.recv().await {
if self.retry_queue_tx.send(msg).is_err() {
break;
}
retried_messages_count += 1;
}
if retried_messages_count > 0 {
debug!(
target: LOG_TARGET,
"{} pending message(s) were still queued after disconnect. Retrying them.", retried_messages_count
);
}
debug!(
target: LOG_TARGET,
"Direct message forwarding successfully completed for peer `{}` (stream: {}).", peer_node_id, stream_id
);
Ok(())
}
async fn fail_all_pending_messages(&mut self, reason: SendFailReason) {
self.messages_rx.close();
while let Some(mut out_msg) = self.messages_rx.recv().await {
out_msg.reply_fail(reason);
}
}
}