use crate::protocol::{
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig,
};
use crate::record::{self, Record};
use futures::prelude::*;
use instant::Instant;
use libp2p_core::{
either::EitherOutput,
upgrade::{self, InboundUpgrade, OutboundUpgrade},
ConnectedPoint, PeerId,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::trace;
use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
};
const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;
pub struct KademliaHandlerProto<T> {
config: KademliaHandlerConfig,
_type: PhantomData<T>,
}
impl<T> KademliaHandlerProto<T> {
pub fn new(config: KademliaHandlerConfig) -> Self {
KademliaHandlerProto {
config,
_type: PhantomData,
}
}
}
impl<T: Clone + fmt::Debug + Send + 'static> IntoConnectionHandler for KademliaHandlerProto<T> {
type Handler = KademliaHandler<T>;
fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id)
}
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
if self.config.allow_listening {
upgrade::EitherUpgrade::A(self.config.protocol_config.clone())
} else {
upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
}
}
}
pub struct KademliaHandler<TUserData> {
config: KademliaHandlerConfig,
next_connec_unique_id: UniqueConnecId,
outbound_substreams: Vec<OutboundSubstreamState<TUserData>>,
inbound_substreams: Vec<InboundSubstreamState>,
keep_alive: KeepAlive,
endpoint: ConnectedPoint,
remote_peer_id: PeerId,
protocol_status: ProtocolStatus,
}
enum ProtocolStatus {
Unconfirmed,
Confirmed,
Reported,
}
#[derive(Debug, Clone)]
pub struct KademliaHandlerConfig {
pub protocol_config: KademliaProtocolConfig,
pub allow_listening: bool,
pub idle_timeout: Duration,
}
enum OutboundSubstreamState<TUserData> {
PendingOpen(KadRequestMsg, Option<TUserData>),
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg,
Option<TUserData>,
),
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
ReportError(KademliaHandlerQueryErr, TUserData),
Closing(KadOutStreamSink<NegotiatedSubstream>),
}
enum InboundSubstreamState {
WaitingMessage {
first: bool,
connection_id: UniqueConnecId,
substream: KadInStreamSink<NegotiatedSubstream>,
},
WaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
PendingSend(
UniqueConnecId,
KadInStreamSink<NegotiatedSubstream>,
KadResponseMsg,
),
PendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
Closing(KadInStreamSink<NegotiatedSubstream>),
}
impl<TUserData> OutboundSubstreamState<TUserData> {
fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self {
OutboundSubstreamState::PendingOpen(_, _)
| OutboundSubstreamState::ReportError(_, _) => Poll::Ready(()),
OutboundSubstreamState::PendingSend(ref mut stream, _, _)
| OutboundSubstreamState::PendingFlush(ref mut stream, _)
| OutboundSubstreamState::WaitingAnswer(ref mut stream, _)
| OutboundSubstreamState::Closing(ref mut stream) => {
match Sink::poll_close(Pin::new(stream), cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
}
}
}
impl InboundSubstreamState {
fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self {
InboundSubstreamState::WaitingMessage {
substream: ref mut stream,
..
}
| InboundSubstreamState::WaitingUser(_, ref mut stream)
| InboundSubstreamState::PendingSend(_, ref mut stream, _)
| InboundSubstreamState::PendingFlush(_, ref mut stream)
| InboundSubstreamState::Closing(ref mut stream) => {
match Sink::poll_close(Pin::new(stream), cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
}
}
}
#[derive(Debug)]
pub enum KademliaHandlerEvent<TUserData> {
ProtocolConfirmed { endpoint: ConnectedPoint },
FindNodeReq {
key: Vec<u8>,
request_id: KademliaRequestId,
},
FindNodeRes {
closer_peers: Vec<KadPeer>,
user_data: TUserData,
},
GetProvidersReq {
key: record::Key,
request_id: KademliaRequestId,
},
GetProvidersRes {
closer_peers: Vec<KadPeer>,
provider_peers: Vec<KadPeer>,
user_data: TUserData,
},
QueryError {
error: KademliaHandlerQueryErr,
user_data: TUserData,
},
AddProvider {
key: record::Key,
provider: KadPeer,
},
GetRecord {
key: record::Key,
request_id: KademliaRequestId,
},
GetRecordRes {
record: Option<Record>,
closer_peers: Vec<KadPeer>,
user_data: TUserData,
},
PutRecord {
record: Record,
request_id: KademliaRequestId,
},
PutRecordRes {
key: record::Key,
value: Vec<u8>,
user_data: TUserData,
},
}
#[derive(Debug)]
pub enum KademliaHandlerQueryErr {
Upgrade(ConnectionHandlerUpgrErr<io::Error>),
UnexpectedMessage,
Io(io::Error),
}
impl fmt::Display for KademliaHandlerQueryErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
KademliaHandlerQueryErr::Upgrade(err) => {
write!(f, "Error while performing Kademlia query: {}", err)
}
KademliaHandlerQueryErr::UnexpectedMessage => {
write!(
f,
"Remote answered our Kademlia RPC query with the wrong message type"
)
}
KademliaHandlerQueryErr::Io(err) => {
write!(f, "I/O error during a Kademlia RPC query: {}", err)
}
}
}
}
impl error::Error for KademliaHandlerQueryErr {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
KademliaHandlerQueryErr::Upgrade(err) => Some(err),
KademliaHandlerQueryErr::UnexpectedMessage => None,
KademliaHandlerQueryErr::Io(err) => Some(err),
}
}
}
impl From<ConnectionHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
fn from(err: ConnectionHandlerUpgrErr<io::Error>) -> Self {
KademliaHandlerQueryErr::Upgrade(err)
}
}
#[derive(Debug)]
pub enum KademliaHandlerIn<TUserData> {
Reset(KademliaRequestId),
FindNodeReq {
key: Vec<u8>,
user_data: TUserData,
},
FindNodeRes {
closer_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
GetProvidersReq {
key: record::Key,
user_data: TUserData,
},
GetProvidersRes {
closer_peers: Vec<KadPeer>,
provider_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
AddProvider {
key: record::Key,
provider: KadPeer,
},
GetRecord {
key: record::Key,
user_data: TUserData,
},
GetRecordRes {
record: Option<Record>,
closer_peers: Vec<KadPeer>,
request_id: KademliaRequestId,
},
PutRecord {
record: Record,
user_data: TUserData,
},
PutRecordRes {
key: record::Key,
value: Vec<u8>,
request_id: KademliaRequestId,
},
}
#[derive(Debug, PartialEq, Eq)]
pub struct KademliaRequestId {
connec_unique_id: UniqueConnecId,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct UniqueConnecId(u64);
impl<TUserData> KademliaHandler<TUserData> {
pub fn new(
config: KademliaHandlerConfig,
endpoint: ConnectedPoint,
remote_peer_id: PeerId,
) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
KademliaHandler {
config,
endpoint,
remote_peer_id,
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
}
}
impl<TUserData> ConnectionHandler for KademliaHandler<TUserData>
where
TUserData: Clone + fmt::Debug + Send + 'static,
{
type InEvent = KademliaHandlerIn<TUserData>;
type OutEvent = KademliaHandlerEvent<TUserData>;
type Error = io::Error; type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = KademliaProtocolConfig;
type OutboundOpenInfo = (KadRequestMsg, Option<TUserData>);
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
if self.config.allow_listening {
SubstreamProtocol::new(self.config.protocol_config.clone(), ())
.map_upgrade(upgrade::EitherUpgrade::A)
} else {
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ())
}
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
(msg, user_data): Self::OutboundOpenInfo,
) {
self.outbound_substreams
.push(OutboundSubstreamState::PendingSend(
protocol, msg, user_data,
));
if let ProtocolStatus::Unconfirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Confirmed;
}
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): Self::InboundOpenInfo,
) {
let protocol = match protocol {
EitherOutput::First(p) => p,
EitherOutput::Second(p) => void::unreachable(p),
};
if let ProtocolStatus::Unconfirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Confirmed;
}
if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS {
if let Some(position) = self.inbound_substreams.iter().position(|s| {
matches!(
s,
InboundSubstreamState::WaitingMessage { first: false, .. }
)
}) {
self.inbound_substreams.remove(position);
log::warn!(
"New inbound substream to {:?} exceeds inbound substream limit. \
Removed older substream waiting to be reused.",
self.remote_peer_id,
)
} else {
log::warn!(
"New inbound substream to {:?} exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream.",
self.remote_peer_id,
);
return;
}
}
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.inbound_substreams
.push(InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
});
}
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
match message {
KademliaHandlerIn::Reset(request_id) => {
let pos = self
.inbound_substreams
.iter()
.position(|state| match state {
InboundSubstreamState::WaitingUser(conn_id, _) => {
conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos {
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = self.inbound_substreams.remove(pos).try_close(&mut cx);
}
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => {
let pos = self
.inbound_substreams
.iter()
.position(|state| match state {
InboundSubstreamState::WaitingUser(ref conn_id, _) => {
conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(),
};
let msg = KadResponseMsg::FindNode { closer_peers };
self.inbound_substreams
.push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
request_id,
} => {
let pos = self
.inbound_substreams
.iter()
.position(|state| match state {
InboundSubstreamState::WaitingUser(ref conn_id, _)
if conn_id == &request_id.connec_unique_id =>
{
true
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(),
};
let msg = KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
};
self.inbound_substreams
.push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(msg, None));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::GetRecordRes {
record,
closer_peers,
request_id,
} => {
let pos = self
.inbound_substreams
.iter()
.position(|state| match state {
InboundSubstreamState::WaitingUser(ref conn_id, _) => {
conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(),
};
let msg = KadResponseMsg::GetValue {
record,
closer_peers,
};
self.inbound_substreams
.push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::PutRecordRes {
key,
request_id,
value,
} => {
let pos = self
.inbound_substreams
.iter()
.position(|state| match state {
InboundSubstreamState::WaitingUser(ref conn_id, _)
if conn_id == &request_id.connec_unique_id =>
{
true
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(),
};
let msg = KadResponseMsg::PutValue { key, value };
self.inbound_substreams
.push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
}
}
}
}
fn inject_dial_upgrade_error(
&mut self,
(_, user_data): Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<io::Error>,
) {
if let Some(user_data) = user_data {
self.outbound_substreams
.push(OutboundSubstreamState::ReportError(error.into(), user_data));
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
return Poll::Pending;
}
if let ProtocolStatus::Confirmed = self.protocol_status {
self.protocol_status = ProtocolStatus::Reported;
return Poll::Ready(ConnectionHandlerEvent::Custom(
KademliaHandlerEvent::ProtocolConfirmed {
endpoint: self.endpoint.clone(),
},
));
}
for n in (0..self.outbound_substreams.len()).rev() {
let mut substream = self.outbound_substreams.swap_remove(n);
loop {
match advance_outbound_substream(substream, self.config.protocol_config.clone(), cx)
{
(Some(new_state), Some(event), _) => {
self.outbound_substreams.push(new_state);
return Poll::Ready(event);
}
(None, Some(event), _) => {
if self.outbound_substreams.is_empty() {
self.keep_alive =
KeepAlive::Until(Instant::now() + self.config.idle_timeout);
}
return Poll::Ready(event);
}
(Some(new_state), None, false) => {
self.outbound_substreams.push(new_state);
break;
}
(Some(new_state), None, true) => {
substream = new_state;
continue;
}
(None, None, _) => {
break;
}
}
}
}
for n in (0..self.inbound_substreams.len()).rev() {
let mut substream = self.inbound_substreams.swap_remove(n);
loop {
match advance_inbound_substream(substream, cx) {
(Some(new_state), Some(event), _) => {
self.inbound_substreams.push(new_state);
return Poll::Ready(event);
}
(None, Some(event), _) => {
if self.inbound_substreams.is_empty() {
self.keep_alive =
KeepAlive::Until(Instant::now() + self.config.idle_timeout);
}
return Poll::Ready(event);
}
(Some(new_state), None, false) => {
self.inbound_substreams.push(new_state);
break;
}
(Some(new_state), None, true) => {
substream = new_state;
continue;
}
(None, None, _) => {
break;
}
}
}
}
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
} else {
self.keep_alive = KeepAlive::Yes;
}
Poll::Pending
}
}
impl Default for KademliaHandlerConfig {
fn default() -> Self {
KademliaHandlerConfig {
protocol_config: Default::default(),
allow_listening: true,
idle_timeout: Duration::from_secs(10),
}
}
}
fn advance_outbound_substream<TUserData>(
state: OutboundSubstreamState<TUserData>,
upgrade: KademliaProtocolConfig,
cx: &mut Context<'_>,
) -> (
Option<OutboundSubstreamState<TUserData>>,
Option<
ConnectionHandlerEvent<
KademliaProtocolConfig,
(KadRequestMsg, Option<TUserData>),
KademliaHandlerEvent<TUserData>,
io::Error,
>,
>,
bool,
) {
match state {
OutboundSubstreamState::PendingOpen(msg, user_data) => {
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(upgrade, (msg, user_data)),
};
(None, Some(ev), false)
}
OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => (
Some(OutboundSubstreamState::PendingFlush(substream, user_data)),
None,
true,
),
Err(error) => {
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
});
(None, event, false)
}
},
Poll::Pending => (
Some(OutboundSubstreamState::PendingSend(
substream, msg, user_data,
)),
None,
false,
),
Poll::Ready(Err(error)) => {
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
});
(None, event, false)
}
}
}
OutboundSubstreamState::PendingFlush(mut substream, user_data) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
if let Some(user_data) = user_data {
(
Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)),
None,
true,
)
} else {
(Some(OutboundSubstreamState::Closing(substream)), None, true)
}
}
Poll::Pending => (
Some(OutboundSubstreamState::PendingFlush(substream, user_data)),
None,
false,
),
Poll::Ready(Err(error)) => {
let event = user_data.map(|user_data| {
ConnectionHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
})
});
(None, event, false)
}
}
}
OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => {
match Stream::poll_next(Pin::new(&mut substream), cx) {
Poll::Ready(Some(Ok(msg))) => {
let new_state = OutboundSubstreamState::Closing(substream);
let event = process_kad_response(msg, user_data);
(
Some(new_state),
Some(ConnectionHandlerEvent::Custom(event)),
true,
)
}
Poll::Pending => (
Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)),
None,
false,
),
Poll::Ready(Some(Err(error))) => {
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data,
};
(None, Some(ConnectionHandlerEvent::Custom(event)), false)
}
Poll::Ready(None) => {
let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
user_data,
};
(None, Some(ConnectionHandlerEvent::Custom(event)), false)
}
}
}
OutboundSubstreamState::ReportError(error, user_data) => {
let event = KademliaHandlerEvent::QueryError { error, user_data };
(None, Some(ConnectionHandlerEvent::Custom(event)), false)
}
OutboundSubstreamState::Closing(mut stream) => {
match Sink::poll_close(Pin::new(&mut stream), cx) {
Poll::Ready(Ok(())) => (None, None, false),
Poll::Pending => (Some(OutboundSubstreamState::Closing(stream)), None, false),
Poll::Ready(Err(_)) => (None, None, false),
}
}
}
}
fn advance_inbound_substream<TUserData>(
state: InboundSubstreamState,
cx: &mut Context<'_>,
) -> (
Option<InboundSubstreamState>,
Option<
ConnectionHandlerEvent<
KademliaProtocolConfig,
(KadRequestMsg, Option<TUserData>),
KademliaHandlerEvent<TUserData>,
io::Error,
>,
>,
bool,
) {
match state {
InboundSubstreamState::WaitingMessage {
first,
connection_id,
mut substream,
} => match Stream::poll_next(Pin::new(&mut substream), cx) {
Poll::Ready(Some(Ok(msg))) => {
if let Ok(ev) = process_kad_request(msg, connection_id) {
(
Some(InboundSubstreamState::WaitingUser(connection_id, substream)),
Some(ConnectionHandlerEvent::Custom(ev)),
false,
)
} else {
(Some(InboundSubstreamState::Closing(substream)), None, true)
}
}
Poll::Pending => (
Some(InboundSubstreamState::WaitingMessage {
first,
connection_id,
substream,
}),
None,
false,
),
Poll::Ready(None) => {
trace!("Inbound substream: EOF");
(None, None, false)
}
Poll::Ready(Some(Err(e))) => {
trace!("Inbound substream error: {:?}", e);
(None, None, false)
}
},
InboundSubstreamState::WaitingUser(id, substream) => (
Some(InboundSubstreamState::WaitingUser(id, substream)),
None,
false,
),
InboundSubstreamState::PendingSend(id, mut substream, msg) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => (
Some(InboundSubstreamState::PendingFlush(id, substream)),
None,
true,
),
Err(_) => (None, None, false),
},
Poll::Pending => (
Some(InboundSubstreamState::PendingSend(id, substream, msg)),
None,
false,
),
Poll::Ready(Err(_)) => (None, None, false),
}
}
InboundSubstreamState::PendingFlush(id, mut substream) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => (
Some(InboundSubstreamState::WaitingMessage {
first: false,
connection_id: id,
substream,
}),
None,
true,
),
Poll::Pending => (
Some(InboundSubstreamState::PendingFlush(id, substream)),
None,
false,
),
Poll::Ready(Err(_)) => (None, None, false),
}
}
InboundSubstreamState::Closing(mut stream) => {
match Sink::poll_close(Pin::new(&mut stream), cx) {
Poll::Ready(Ok(())) => (None, None, false),
Poll::Pending => (Some(InboundSubstreamState::Closing(stream)), None, false),
Poll::Ready(Err(_)) => (None, None, false),
}
}
}
}
fn process_kad_request<TUserData>(
event: KadRequestMsg,
connec_unique_id: UniqueConnecId,
) -> Result<KademliaHandlerEvent<TUserData>, io::Error> {
match event {
KadRequestMsg::Ping => {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"the PING Kademlia message is not implemented",
))
}
KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::AddProvider { key, provider } => {
Ok(KademliaHandlerEvent::AddProvider { key, provider })
}
KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord {
record,
request_id: KademliaRequestId { connec_unique_id },
}),
}
}
fn process_kad_response<TUserData>(
event: KadResponseMsg,
user_data: TUserData,
) -> KademliaHandlerEvent<TUserData> {
match event {
KadResponseMsg::Pong => {
KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::UnexpectedMessage,
user_data,
}
}
KadResponseMsg::FindNode { closer_peers } => KademliaHandlerEvent::FindNodeRes {
closer_peers,
user_data,
},
KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
} => KademliaHandlerEvent::GetProvidersRes {
closer_peers,
provider_peers,
user_data,
},
KadResponseMsg::GetValue {
record,
closer_peers,
} => KademliaHandlerEvent::GetRecordRes {
record,
closer_peers,
user_data,
},
KadResponseMsg::PutValue { key, value, .. } => KademliaHandlerEvent::PutRecordRes {
key,
value,
user_data,
},
}
}