pub mod either;
mod map_in;
mod map_out;
pub mod multi;
mod one_shot;
mod pending;
mod select;
pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend};
use instant::Instant;
use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId};
use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
pub use map_in::MapInEvent;
pub use map_out::MapOutEvent;
pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
pub use pending::PendingConnectionHandler;
pub use select::{ConnectionHandlerSelect, IntoConnectionHandlerSelect};
pub trait ConnectionHandler: Send + 'static {
type InEvent: fmt::Debug + Send + 'static;
type OutEvent: fmt::Debug + Send + 'static;
type Error: error::Error + fmt::Debug + Send + 'static;
type InboundProtocol: InboundUpgradeSend;
type OutboundProtocol: OutboundUpgradeSend;
type InboundOpenInfo: Send + 'static;
type OutboundOpenInfo: Send + 'static;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::FullyNegotiatedInbound` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
info: Self::InboundOpenInfo,
) {
self.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
FullyNegotiatedInbound { protocol, info },
))
}
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::FullyNegotiatedOutbound` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo,
) {
self.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
FullyNegotiatedOutbound { protocol, info },
))
}
#[deprecated(
since = "0.41.0",
note = "Implement `ConnectionHandler::on_behaviour_event` instead. The default implementation of `inject_event` delegates to it."
)]
fn inject_event(&mut self, event: Self::InEvent) {
self.on_behaviour_event(event);
}
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::AddressChange` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_address_change(&mut self, new_address: &Multiaddr) {
self.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address,
}))
}
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::DialUpgradeError` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_dial_upgrade_error(
&mut self,
info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
self.on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
info,
error,
}))
}
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::ListenUpgradeError` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_listen_upgrade_error(
&mut self,
info: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
) {
self.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info,
error,
}))
}
fn connection_keep_alive(&self) -> KeepAlive;
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
>;
fn map_in_event<TNewIn, TMap>(self, map: TMap) -> MapInEvent<Self, TNewIn, TMap>
where
Self: Sized,
TMap: Fn(&TNewIn) -> Option<&Self::InEvent>,
{
MapInEvent::new(self, map)
}
fn map_out_event<TMap, TNewOut>(self, map: TMap) -> MapOutEvent<Self, TMap>
where
Self: Sized,
TMap: FnMut(Self::OutEvent) -> TNewOut,
{
MapOutEvent::new(self, map)
}
fn select<TProto2>(self, other: TProto2) -> ConnectionHandlerSelect<Self, TProto2>
where
Self: Sized,
{
ConnectionHandlerSelect::new(self, other)
}
fn on_behaviour_event(&mut self, _event: Self::InEvent) {}
fn on_connection_event(
&mut self,
_event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
}
}
pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
AddressChange(AddressChange<'a>),
DialUpgradeError(DialUpgradeError<OOI, OP>),
ListenUpgradeError(ListenUpgradeError<IOI, IP>),
}
pub struct FullyNegotiatedInbound<IP: InboundUpgradeSend, IOI> {
pub protocol: IP::Output,
pub info: IOI,
}
pub struct FullyNegotiatedOutbound<OP: OutboundUpgradeSend, OOI> {
pub protocol: OP::Output,
pub info: OOI,
}
pub struct AddressChange<'a> {
pub new_address: &'a Multiaddr,
}
pub struct DialUpgradeError<OOI, OP: OutboundUpgradeSend> {
pub info: OOI,
pub error: ConnectionHandlerUpgrErr<OP::Error>,
}
pub struct ListenUpgradeError<IOI, IP: InboundUpgradeSend> {
pub info: IOI,
pub error: ConnectionHandlerUpgrErr<IP::Error>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct SubstreamProtocol<TUpgrade, TInfo> {
upgrade: TUpgrade,
info: TInfo,
timeout: Duration,
}
impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
pub fn new(upgrade: TUpgrade, info: TInfo) -> Self {
SubstreamProtocol {
upgrade,
info,
timeout: Duration::from_secs(10),
}
}
pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
where
F: FnOnce(TUpgrade) -> U,
{
SubstreamProtocol {
upgrade: f(self.upgrade),
info: self.info,
timeout: self.timeout,
}
}
pub fn map_info<U, F>(self, f: F) -> SubstreamProtocol<TUpgrade, U>
where
F: FnOnce(TInfo) -> U,
{
SubstreamProtocol {
upgrade: self.upgrade,
info: f(self.info),
timeout: self.timeout,
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn upgrade(&self) -> &TUpgrade {
&self.upgrade
}
pub fn info(&self) -> &TInfo {
&self.info
}
pub fn timeout(&self) -> &Duration {
&self.timeout
}
pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
(self.upgrade, self.info)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr> {
OutboundSubstreamRequest {
protocol: SubstreamProtocol<TConnectionUpgrade, TOutboundOpenInfo>,
},
Close(TErr),
Custom(TCustom),
}
impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, TErr>
{
pub fn map_outbound_open_info<F, I>(
self,
map: F,
) -> ConnectionHandlerEvent<TConnectionUpgrade, I, TCustom, TErr>
where
F: FnOnce(TOutboundOpenInfo) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_info(map),
}
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
}
}
pub fn map_protocol<F, I>(
self,
map: F,
) -> ConnectionHandlerEvent<I, TOutboundOpenInfo, TCustom, TErr>
where
F: FnOnce(TConnectionUpgrade) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(map),
}
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
}
}
pub fn map_custom<F, I>(
self,
map: F,
) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, I, TErr>
where
F: FnOnce(TCustom) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(val),
}
}
pub fn map_close<F, I>(
self,
map: F,
) -> ConnectionHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom, I>
where
F: FnOnce(TErr) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => {
ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
ConnectionHandlerEvent::Close(val) => ConnectionHandlerEvent::Close(map(val)),
}
}
}
#[derive(Debug)]
pub enum ConnectionHandlerUpgrErr<TUpgrErr> {
Timeout,
Timer,
Upgrade(UpgradeError<TUpgrErr>),
}
impl<TUpgrErr> ConnectionHandlerUpgrErr<TUpgrErr> {
pub fn map_upgrade_err<F, E>(self, f: F) -> ConnectionHandlerUpgrErr<E>
where
F: FnOnce(UpgradeError<TUpgrErr>) -> UpgradeError<E>,
{
match self {
ConnectionHandlerUpgrErr::Timeout => ConnectionHandlerUpgrErr::Timeout,
ConnectionHandlerUpgrErr::Timer => ConnectionHandlerUpgrErr::Timer,
ConnectionHandlerUpgrErr::Upgrade(e) => ConnectionHandlerUpgrErr::Upgrade(f(e)),
}
}
}
impl<TUpgrErr> fmt::Display for ConnectionHandlerUpgrErr<TUpgrErr>
where
TUpgrErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionHandlerUpgrErr::Timeout => {
write!(f, "Timeout error while opening a substream")
}
ConnectionHandlerUpgrErr::Timer => {
write!(f, "Timer error while opening a substream")
}
ConnectionHandlerUpgrErr::Upgrade(err) => write!(f, "{}", err),
}
}
}
impl<TUpgrErr> error::Error for ConnectionHandlerUpgrErr<TUpgrErr>
where
TUpgrErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
ConnectionHandlerUpgrErr::Timeout => None,
ConnectionHandlerUpgrErr::Timer => None,
ConnectionHandlerUpgrErr::Upgrade(err) => Some(err),
}
}
}
pub trait IntoConnectionHandler: Send + 'static {
type Handler: ConnectionHandler;
fn into_handler(
self,
remote_peer_id: &PeerId,
connected_point: &ConnectedPoint,
) -> Self::Handler;
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol;
fn select<TProto2>(self, other: TProto2) -> IntoConnectionHandlerSelect<Self, TProto2>
where
Self: Sized,
{
IntoConnectionHandlerSelect::new(self, other)
}
}
impl<T> IntoConnectionHandler for T
where
T: ConnectionHandler,
{
type Handler = Self;
fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self {
self
}
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
self.listen_protocol().into_upgrade().0
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum KeepAlive {
Until(Instant),
Yes,
No,
}
impl KeepAlive {
pub fn is_yes(&self) -> bool {
matches!(*self, KeepAlive::Yes)
}
}
impl PartialOrd for KeepAlive {
fn partial_cmp(&self, other: &KeepAlive) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for KeepAlive {
fn cmp(&self, other: &KeepAlive) -> Ordering {
use self::KeepAlive::*;
match (self, other) {
(No, No) | (Yes, Yes) => Ordering::Equal,
(No, _) | (_, Yes) => Ordering::Less,
(_, No) | (Yes, _) => Ordering::Greater,
(Until(t1), Until(t2)) => t1.cmp(t2),
}
}
}