use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
},
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, SubstreamProtocol,
};
use log::error;
use void::Void;
use super::RunStats;
#[derive(Debug)]
pub struct Event {
pub stats: RunStats,
}
pub struct Handler {
inbound: FuturesUnordered<BoxFuture<'static, Result<RunStats, std::io::Error>>>,
keep_alive: KeepAlive,
}
impl Handler {
pub fn new() -> Self {
Self {
inbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
}
}
impl Default for Handler {
fn default() -> Self {
Self::new()
}
}
impl ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Event;
type Error = Void;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ())
}
fn on_behaviour_event(&mut self, v: Self::FromBehaviour) {
void::unreachable(v)
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
info: _,
}) => {
self.inbound
.push(crate::protocol::receive_send(protocol).boxed());
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => {
void::unreachable(info)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => {
void::unreachable(info)
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => {
void::unreachable(error)
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok(stats) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event { stats }))
}
Err(e) => {
error!("{e:?}")
}
}
}
if self.inbound.is_empty() {
match self.keep_alive {
KeepAlive::Yes => {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
}
KeepAlive::Until(_) => {}
KeepAlive::No => panic!("Handler never sets KeepAlive::No."),
}
} else {
self.keep_alive = KeepAlive::Yes
}
Poll::Pending
}
}