use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kad_hash::KadHash;
use crate::kbucket::{self, KBucketsTable, KBucketsPeerId};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, stream};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash;
use smallvec::SmallVec;
use std::{error, marker::PhantomData, num::NonZeroUsize, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Interval;
mod test;
pub struct Kademlia<TSubstream> {
kbuckets: KBucketsTable<KadHash, Addresses>,
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
connected_peers: FnvHashSet<PeerId>,
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); 8]>,
next_query_id: QueryId,
values_providers: FnvHashMap<Multihash, SmallVec<[PeerId; 20]>>,
providing_keys: FnvHashSet<Multihash>,
refresh_add_providers: stream::Fuse<Interval>,
parallelism: usize,
num_results: usize,
rpc_timeout: Duration,
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
add_provider: SmallVec<[(Multihash, PeerId); 32]>,
marker: PhantomData<TSubstream>,
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct QueryId(usize);
#[derive(Debug, Clone, PartialEq, Eq)]
struct QueryInfo {
inner: QueryInfoInner,
untrusted_addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum QueryInfoInner {
Initialization {
target: PeerId,
},
FindPeer(PeerId),
GetProviders {
target: Multihash,
pending_results: Vec<PeerId>,
},
AddProvider {
target: Multihash,
},
}
impl KBucketsPeerId<PeerId> for QueryInfo {
fn distance_with(&self, other: &PeerId) -> u32 {
let other: &Multihash = other.as_ref();
self.as_ref().distance_with(other)
}
fn max_distance() -> NonZeroUsize {
<PeerId as KBucketsPeerId>::max_distance()
}
}
impl AsRef<Multihash> for QueryInfo {
fn as_ref(&self) -> &Multihash {
match &self.inner {
QueryInfoInner::Initialization { target } => target.as_ref(),
QueryInfoInner::FindPeer(peer) => peer.as_ref(),
QueryInfoInner::GetProviders { target, .. } => target,
QueryInfoInner::AddProvider { target } => target,
}
}
}
impl PartialEq<PeerId> for QueryInfo {
fn eq(&self, other: &PeerId) -> bool {
self.as_ref().eq(other)
}
}
impl QueryInfo {
fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
match &self.inner {
QueryInfoInner::Initialization { target } => KademliaHandlerIn::FindNodeReq {
key: target.clone(),
user_data,
},
QueryInfoInner::FindPeer(key) => KademliaHandlerIn::FindNodeReq {
key: key.clone(),
user_data,
},
QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq {
key: target.clone().into(),
user_data,
},
QueryInfoInner::AddProvider { .. } => KademliaHandlerIn::FindNodeReq {
key: unimplemented!(),
user_data,
},
}
}
}
impl<TSubstream> Kademlia<TSubstream> {
#[inline]
pub fn new(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id)
}
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id)
}
pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_address(peer_id, address, true)
}
pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_address(peer_id, address, false)
}
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) {
let kad_hash = KadHash::from(peer_id.clone());
match self.kbuckets.entry(&kad_hash) {
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
addresses.insert(address);
match entry.insert_disconnected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: peer_id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.peer_id().clone(),
})
},
}
return;
},
kbucket::Entry::SelfEntry => return,
};
}
fn new_inner(local_peer_id: PeerId) -> Self {
let parallelism = 3;
Kademlia {
kbuckets: KBucketsTable::new(KadHash::from(local_peer_id), Duration::from_secs(60)),
queued_events: SmallVec::new(),
active_queries: Default::default(),
connected_peers: Default::default(),
pending_rpcs: SmallVec::with_capacity(parallelism),
next_query_id: QueryId(0),
values_providers: FnvHashMap::default(),
providing_keys: FnvHashSet::default(),
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(),
parallelism,
num_results: 20,
rpc_timeout: Duration::from_secs(8),
add_provider: SmallVec::new(),
marker: PhantomData,
}
}
pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.entries_not_pending().map(|(kad_hash, _)| kad_hash.peer_id())
}
pub fn find_node(&mut self, peer_id: PeerId) {
self.start_query(QueryInfoInner::FindPeer(peer_id));
}
pub fn get_providers(&mut self, target: Multihash) {
self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() });
}
pub fn add_providing(&mut self, key: PeerId) {
self.providing_keys.insert(key.clone().into());
let providers = self.values_providers.entry(key.into()).or_insert_with(Default::default);
let my_id = self.kbuckets.my_id();
if !providers.iter().any(|peer_id| peer_id == my_id.peer_id()) {
providers.push(my_id.peer_id().clone());
}
self.refresh_add_providers = Interval::new(Instant::now(), Duration::from_secs(60)).fuse();
}
pub fn remove_providing(&mut self, key: &Multihash) {
self.providing_keys.remove(key);
let providers = match self.values_providers.get_mut(key) {
Some(p) => p,
None => return,
};
if let Some(position) = providers.iter().position(|k| k == key) {
providers.remove(position);
providers.shrink_to_fit();
}
}
fn start_query(&mut self, target: QueryInfoInner) {
let query_id = self.next_query_id;
self.next_query_id.0 += 1;
let target = QueryInfo {
inner: target,
untrusted_addresses: Default::default(),
};
let known_closest_peers = self.kbuckets
.find_closest(target.as_ref())
.take(self.num_results)
.map(|h| h.peer_id().clone());
self.active_queries.insert(
query_id,
QueryState::new(QueryConfig {
target,
parallelism: self.parallelism,
num_results: self.num_results,
rpc_timeout: self.rpc_timeout,
known_closest_peers,
})
);
}
}
impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
type OutEvent = KademliaOut;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
KademliaHandler::dial_and_listen()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut out_list = self.kbuckets
.entry(&KadHash::from(peer_id.clone()))
.value_not_pending()
.map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new);
for query in self.active_queries.values() {
if let Some(addrs) = query.target().untrusted_addresses.get(peer_id) {
for addr in addrs {
out_list.push(addr.clone());
}
}
}
out_list
}
fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) {
if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) {
let (_, rpc) = self.pending_rpcs.remove(pos);
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: id.clone(),
event: rpc,
});
}
let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address),
ConnectedPoint::Listener { .. } => None,
};
let id_kad_hash = KadHash::from(id.clone());
match self.kbuckets.entry(&id_kad_hash) {
kbucket::Entry::InKbucketConnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketConnectedPending(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketDisconnected(mut entry) => {
if let Some(address) = address {
entry.value().insert(address);
}
entry.set_connected();
},
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => {
if let Some(address) = address {
entry.value().insert(address);
}
entry.set_connected();
},
kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
if let Some(address) = address {
addresses.insert(address);
}
match entry.insert_connected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.peer_id().clone(),
})
},
}
},
kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
}
self.connected_peers.insert(id);
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) {
if let Some(peer_id) = peer_id {
let id_kad_hash = KadHash::from(peer_id.clone());
if let Some(list) = self.kbuckets.entry(&id_kad_hash).value() {
list.remove(addr);
}
for query in self.active_queries.values_mut() {
if let Some(addrs) = query.target_mut().untrusted_addresses.get_mut(id_kad_hash.peer_id()) {
addrs.retain(|a| a != addr);
}
}
}
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(peer_id);
}
}
fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in);
for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
}
match self.kbuckets.entry(&KadHash::from(id.clone())) {
kbucket::Entry::InKbucketConnected(entry) => {
match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
let event = KademliaOut::KBucketAdded {
peer_id: replacement.peer_id().clone(),
replaced: Some(id.clone()),
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
}
},
kbucket::Entry::InKbucketConnectedPending(entry) => {
entry.set_disconnected();
},
kbucket::Entry::InKbucketDisconnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketDisconnectedPending(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::NotInKbucket(_) => {},
kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
}
}
fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) {
for (query_id, query) in self.active_queries.iter() {
if query.is_waiting(&peer_id) {
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: query.target().to_rpc_request(*query_id),
});
}
}
if let Some(list) = self.kbuckets.entry(&KadHash::from(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
list.insert(address);
}
}
}
fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
match event {
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.kbuckets
.find_closest(&KadHash::from(key.clone()))
.take(self.num_results)
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets))
.collect();
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
},
});
}
KademliaHandlerEvent::FindNodeRes {
closer_peers,
user_data,
} => {
for peer in closer_peers.iter() {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
if let Some(query) = self.active_queries.get_mut(&user_data) {
for peer in closer_peers.iter() {
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
}
}
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
let closer_peers = self.kbuckets
.find_closest(&key)
.take(self.num_results)
.map(|kad_hash| build_kad_peer(&kad_hash, &mut self.kbuckets))
.collect();
let provider_peers = {
let kbuckets = &mut self.kbuckets;
self.values_providers
.get(&key)
.into_iter()
.flat_map(|peers| peers)
.map(move |peer_id| build_kad_peer(&KadHash::from(peer_id.clone()), kbuckets))
.collect()
};
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
request_id,
},
});
}
KademliaHandlerEvent::GetProvidersRes {
closer_peers,
provider_peers,
user_data,
} => {
for peer in closer_peers.iter().chain(provider_peers.iter()) {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
if let Some(query) = self.active_queries.get_mut(&user_data) {
if let QueryInfoInner::GetProviders { pending_results, .. } = &mut query.target_mut().inner {
for peer in provider_peers {
pending_results.push(peer.node_id);
}
}
for peer in closer_peers.iter() {
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
}
}
KademliaHandlerEvent::QueryError { user_data, .. } => {
if let Some(query) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_error(&source)
}
}
KademliaHandlerEvent::AddProvider { key, provider_peer } => {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: provider_peer.node_id.clone(),
addresses: provider_peer.multiaddrs.clone(),
ty: provider_peer.connection_ty,
}));
self.add_provider.push((key, provider_peer.node_id));
return;
}
};
}
fn poll(
&mut self,
parameters: &mut PollParameters<'_>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
for (key, provider) in self.add_provider.drain() {
if provider == *self.kbuckets.my_id().peer_id() {
continue;
}
let providers = self.values_providers.entry(key).or_insert_with(Default::default);
if !providers.iter().any(|peer_id| peer_id == &provider) {
providers.push(provider);
}
}
self.add_provider.shrink_to_fit();
match self.refresh_add_providers.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(Some(_))) => {
for target in self.providing_keys.clone().into_iter() {
self.start_query(QueryInfoInner::AddProvider { target });
}
},
Ok(Async::Ready(None)) | Err(_) => {},
}
loop {
if !self.queued_events.is_empty() {
return Async::Ready(self.queued_events.remove(0));
}
self.queued_events.shrink_to_fit();
let mut finished_query = None;
'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() {
loop {
match query.poll() {
Async::Ready(QueryStatePollOut::Finished) => {
finished_query = Some(query_id);
break 'queries_iter;
}
Async::Ready(QueryStatePollOut::SendRpc {
peer_id,
query_target,
}) => {
let rpc = query_target.to_rpc_request(query_id);
if self.connected_peers.contains(&peer_id) {
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: rpc,
});
} else {
self.pending_rpcs.push((peer_id.clone(), rpc));
return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
});
}
}
Async::Ready(QueryStatePollOut::CancelRpc { peer_id }) => {
self.pending_rpcs.retain(|(id, _)| id != peer_id);
}
Async::NotReady => break,
}
}
}
if let Some(finished_query) = finished_query {
let (query_info, closer_peers) = self
.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when iterating active_queries; QED.")
.into_target_and_closest_peers();
match query_info.inner {
QueryInfoInner::Initialization { .. } => {},
QueryInfoInner::FindPeer(target) => {
let event = KademliaOut::FindNodeResult {
key: target,
closer_peers: closer_peers.collect(),
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::GetProviders { target, pending_results } => {
let event = KademliaOut::GetProvidersResult {
key: target,
closer_peers: closer_peers.collect(),
provider_peers: pending_results,
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::AddProvider { target } => {
for closest in closer_peers {
let event = NetworkBehaviourAction::SendEvent {
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
key: target.clone(),
provider_peer: build_kad_peer(&KadHash::from(parameters.local_peer_id().clone()), &mut self.kbuckets),
},
};
self.queued_events.push(event);
}
},
}
} else {
break Async::NotReady;
}
}
}
}
#[derive(Debug, Clone)]
pub enum KademliaOut {
Discovered {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
ty: KadConnectionType,
},
KBucketAdded {
peer_id: PeerId,
replaced: Option<PeerId>,
},
FindNodeResult {
key: PeerId,
closer_peers: Vec<PeerId>,
},
GetProvidersResult {
key: Multihash,
provider_peers: Vec<PeerId>,
closer_peers: Vec<PeerId>,
},
}
fn build_kad_peer(
kad_hash: &KadHash,
kbuckets: &mut KBucketsTable<KadHash, Addresses>
) -> KadPeer {
let (multiaddrs, connection_ty) = match kbuckets.entry(kad_hash) {
kbucket::Entry::NotInKbucket(_) => (Vec::new(), KadConnectionType::NotConnected),
kbucket::Entry::InKbucketConnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnected(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::InKbucketConnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::Connected),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => (entry.value().iter().cloned().collect(), KadConnectionType::NotConnected),
kbucket::Entry::SelfEntry => panic!("build_kad_peer expects not to be called with the KadHash of the local ID"),
};
KadPeer {
node_id: kad_hash.peer_id().clone(),
multiaddrs,
connection_ty,
}
}