use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use futures::prelude::*;
use libp2p_core::upgrade::{
InboundUpgrade,
OutboundUpgrade,
ReadOneError
};
use libp2p_swarm::{
NegotiatedSubstream,
KeepAlive,
SubstreamProtocol,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use std::{pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);
pub struct IdentifyHandler {
config: IdentifyProtocolConfig,
events: SmallVec<[IdentifyHandlerEvent; 4]>,
next_id: Delay,
keep_alive: KeepAlive,
}
#[derive(Debug)]
pub enum IdentifyHandlerEvent {
Identified(RemoteInfo),
Identify(ReplySubstream<NegotiatedSubstream>),
IdentificationError(ProtocolsHandlerUpgrErr<ReadOneError>),
}
impl IdentifyHandler {
pub fn new() -> Self {
IdentifyHandler {
config: IdentifyProtocolConfig,
events: SmallVec::new(),
next_id: Delay::new(DELAY_TO_FIRST_ID),
keep_alive: KeepAlive::Yes,
}
}
}
impl ProtocolsHandler for IdentifyHandler {
type InEvent = ();
type OutEvent = IdentifyHandlerEvent;
type Error = ReadOneError;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = IdentifyProtocolConfig;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(self.config.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::InboundOpenInfo
) {
self.events.push(IdentifyHandlerEvent::Identify(protocol))
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
self.events.push(IdentifyHandlerEvent::Identified(protocol));
self.keep_alive = KeepAlive::No;
}
fn inject_event(&mut self, _: Self::InEvent) {}
fn inject_dial_upgrade_error(
&mut self,
_info: Self::OutboundOpenInfo,
err: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error
>
) {
self.events.push(IdentifyHandlerEvent::IdentificationError(err));
self.keep_alive = KeepAlive::No;
self.next_id.reset(TRY_AGAIN_ON_ERR);
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
IdentifyHandlerEvent,
Self::Error,
>,
> {
if !self.events.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
self.events.remove(0),
));
}
match Future::poll(Pin::new(&mut self.next_id), cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
self.next_id.reset(DELAY_TO_NEXT_ID);
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.config.clone(), ())
};
Poll::Ready(ev)
}
Poll::Ready(Err(err)) => Poll::Ready(ProtocolsHandlerEvent::Close(err.into()))
}
}
}