use std::{
pin::Pin,
task::{Context, Poll},
};
use asynchronous_codec::Framed;
use futures::{future::Either, prelude::*, StreamExt};
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::{
handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
},
Stream,
};
use web_time::Instant;
use crate::{
protocol::{GossipsubCodec, ProtocolConfig},
rpc::Receiver,
rpc_proto::proto,
types::{PeerKind, RawMessage, Rpc, RpcOut},
ValidationError,
};
#[derive(Debug)]
pub enum HandlerEvent {
Message {
rpc: Rpc,
invalid_messages: Vec<(RawMessage, ValidationError)>,
},
PeerKind(PeerKind),
MessageDropped(RpcOut),
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum HandlerIn {
JoinedMesh,
LeftMesh,
}
const MAX_SUBSTREAM_ATTEMPTS: usize = 5;
#[allow(clippy::large_enum_variant)]
pub enum Handler {
Enabled(EnabledHandler),
Disabled(DisabledHandler),
}
pub struct EnabledHandler {
listen_protocol: ProtocolConfig,
outbound_substream: Option<OutboundSubstreamState>,
inbound_substream: Option<InboundSubstreamState>,
send_queue: Receiver,
outbound_substream_establishing: bool,
outbound_substream_attempts: usize,
inbound_substream_attempts: usize,
peer_kind: Option<PeerKind>,
peer_kind_sent: bool,
last_io_activity: Instant,
in_mesh: bool,
}
pub enum DisabledHandler {
ProtocolUnsupported {
peer_kind_sent: bool,
},
MaxSubstreamAttempts,
}
enum InboundSubstreamState {
WaitingInput(Framed<Stream, GossipsubCodec>),
Closing(Framed<Stream, GossipsubCodec>),
Poisoned,
}
enum OutboundSubstreamState {
WaitingOutput(Framed<Stream, GossipsubCodec>),
PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
PendingFlush(Framed<Stream, GossipsubCodec>),
Poisoned,
}
impl Handler {
pub fn new(protocol_config: ProtocolConfig, message_queue: Receiver) -> Self {
Handler::Enabled(EnabledHandler {
listen_protocol: protocol_config,
inbound_substream: None,
outbound_substream: None,
outbound_substream_establishing: false,
outbound_substream_attempts: 0,
inbound_substream_attempts: 0,
send_queue: message_queue,
peer_kind: None,
peer_kind_sent: false,
last_io_activity: Instant::now(),
in_mesh: false,
})
}
}
impl EnabledHandler {
fn on_fully_negotiated_inbound(
&mut self,
(substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
) {
if self.peer_kind.is_none() {
self.peer_kind = Some(peer_kind);
}
tracing::trace!("New inbound substream request");
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
}
fn on_fully_negotiated_outbound(
&mut self,
FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound<
<Handler as ConnectionHandler>::OutboundProtocol,
>,
) {
let (substream, peer_kind) = protocol;
if self.peer_kind.is_none() {
self.peer_kind = Some(peer_kind);
}
assert!(
self.outbound_substream.is_none(),
"Established an outbound substream with one already available"
);
self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream));
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
<Handler as ConnectionHandler>::OutboundProtocol,
(),
<Handler as ConnectionHandler>::ToBehaviour,
>,
> {
if !self.peer_kind_sent {
if let Some(peer_kind) = self.peer_kind.as_ref() {
self.peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(*peer_kind),
));
}
}
if !self.send_queue.poll_is_empty(cx)
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
self.outbound_substream_establishing = true;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
});
}
loop {
match self
.outbound_substream
.replace(OutboundSubstreamState::Poisoned)
{
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {
match message {
RpcOut::Publish {
message: _,
ref mut timeout,
}
| RpcOut::Forward {
message: _,
ref mut timeout,
} => {
if Pin::new(timeout).poll(cx).is_ready() {
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(message),
));
}
}
_ => {} }
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
substream,
message.into_protobuf(),
));
continue;
}
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
}
Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
match Sink::start_send(Pin::new(&mut substream), message) {
Ok(()) => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream))
}
Err(e) => {
tracing::debug!(
"Failed to send message on outbound stream: {e}"
);
self.outbound_substream = None;
break;
}
}
}
Poll::Ready(Err(e)) => {
tracing::debug!("Failed to send message on outbound stream: {e}");
self.outbound_substream = None;
break;
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
break;
}
}
}
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.last_io_activity = Instant::now();
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => {
tracing::debug!("Failed to flush outbound stream: {e}");
self.outbound_substream = None;
break;
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream));
break;
}
}
}
None => {
self.outbound_substream = None;
break;
}
Some(OutboundSubstreamState::Poisoned) => {
unreachable!("Error occurred during outbound stream processing")
}
}
}
loop {
match self
.inbound_substream
.replace(InboundSubstreamState::Poisoned)
{
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(message))) => {
self.last_io_activity = Instant::now();
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
}
Poll::Ready(Some(Err(error))) => {
tracing::debug!("Failed to read from inbound stream: {error}");
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
Poll::Ready(None) => {
tracing::debug!("Inbound stream closed by remote");
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
break;
}
}
}
Some(InboundSubstreamState::Closing(mut substream)) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(res) => {
if let Err(e) = res {
tracing::debug!("Inbound substream error while closing: {e}");
}
self.inbound_substream = None;
break;
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
break;
}
}
}
None => {
self.inbound_substream = None;
break;
}
Some(InboundSubstreamState::Poisoned) => {
unreachable!("Error occurred during inbound stream processing")
}
}
}
if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(rpc),
));
}
Poll::Pending
}
}
impl ConnectionHandler for Handler {
type FromBehaviour = HandlerIn;
type ToBehaviour = HandlerEvent;
type InboundOpenInfo = ();
type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
type OutboundOpenInfo = ();
type OutboundProtocol = ProtocolConfig;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
match self {
Handler::Enabled(handler) => {
SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ())
}
Handler::Disabled(_) => {
SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ())
}
}
}
fn on_behaviour_event(&mut self, message: HandlerIn) {
match self {
Handler::Enabled(handler) => match message {
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
}
HandlerIn::LeftMesh => {
handler.in_mesh = false;
}
},
Handler::Disabled(_) => {
tracing::debug!(?message, "Handler is disabled. Dropping message");
}
}
}
fn connection_keep_alive(&self) -> bool {
matches!(self, Handler::Enabled(h) if h.in_mesh)
}
#[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
match self {
Handler::Enabled(handler) => handler.poll(cx),
Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
if !*peer_kind_sent {
*peer_kind_sent = true;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
}
Poll::Pending
}
Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
}
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
) {
match self {
Handler::Enabled(handler) => {
if event.is_inbound() {
handler.inbound_substream_attempts += 1;
if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
tracing::warn!(
"The maximum number of inbound substreams attempts has been exceeded"
);
*self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
return;
}
}
if event.is_outbound() {
handler.outbound_substream_establishing = false;
handler.outbound_substream_attempts += 1;
if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
tracing::warn!(
"The maximum number of outbound substream attempts has been exceeded"
);
*self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
return;
}
}
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
..
}) => match protocol {
Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol),
Either::Right(v) => libp2p_core::util::unreachable(v),
},
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: StreamUpgradeError::Timeout,
..
}) => {
tracing::debug!("Dial upgrade error: Protocol negotiation timeout");
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: StreamUpgradeError::Apply(e),
..
}) => libp2p_core::util::unreachable(e),
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: StreamUpgradeError::NegotiationFailed,
..
}) => {
tracing::debug!(
"The remote peer does not support gossipsub on this connection"
);
*self = Handler::Disabled(DisabledHandler::ProtocolUnsupported {
peer_kind_sent: false,
});
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: StreamUpgradeError::Io(e),
..
}) => {
tracing::debug!("Protocol negotiation failed: {e}")
}
_ => {}
}
}
Handler::Disabled(_) => {}
}
}
}