use crate::protocol::{RemoteInfo, IdentifyProtocolConfig};
use futures::prelude::*;
use libp2p_core::{
protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{DeniedUpgrade, OutboundUpgrade}
};
use std::{io, marker::PhantomData, time::{Duration, Instant}};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{self, Delay};
use void::{Void, unreachable};
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);
pub struct PeriodicIdHandler<TSubstream> {
config: IdentifyProtocolConfig,
pending_result: Option<PeriodicIdHandlerEvent>,
next_id: Delay,
first_id_happened: bool,
marker: PhantomData<TSubstream>,
}
#[derive(Debug)]
pub enum PeriodicIdHandlerEvent {
Identified(RemoteInfo),
IdentificationError(ProtocolsHandlerUpgrErr<io::Error>),
}
impl<TSubstream> PeriodicIdHandler<TSubstream> {
#[inline]
pub fn new() -> Self {
PeriodicIdHandler {
config: IdentifyProtocolConfig,
pending_result: None,
next_id: Delay::new(Instant::now() + DELAY_TO_FIRST_ID),
first_id_happened: false,
marker: PhantomData,
}
}
}
impl<TSubstream> ProtocolsHandler for PeriodicIdHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type InEvent = Void;
type OutEvent = PeriodicIdHandlerEvent;
type Error = tokio_timer::Error;
type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = IdentifyProtocolConfig;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
DeniedUpgrade
}
fn inject_fully_negotiated_inbound(&mut self, protocol: Void) {
unreachable(protocol)
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
self.pending_result = Some(PeriodicIdHandlerEvent::Identified(protocol));
self.first_id_happened = true;
}
#[inline]
fn inject_event(&mut self, _: Self::InEvent) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
self.pending_result = Some(PeriodicIdHandlerEvent::IdentificationError(err));
self.first_id_happened = true;
self.next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR);
}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
if self.first_id_happened {
KeepAlive::Now
} else {
KeepAlive::Forever
}
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
PeriodicIdHandlerEvent,
>,
Self::Error,
> {
if let Some(pending_result) = self.pending_result.take() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
pending_result,
)));
}
match self.next_id.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(()) => {
self.next_id.reset(Instant::now() + DELAY_TO_NEXT_ID);
let upgrade = self.config.clone();
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () };
Ok(Async::Ready(ev))
}
}
}
}