use fnv::{FnvHashMap, FnvHashSet};
use std::borrow::Borrow;
use std::fmt;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
select,
};
use libp2prs_core::{Multiaddr, PeerId, ProtocolId};
use libp2prs_runtime::task;
use libp2prs_swarm::Control as SwarmControl;
use crate::control::{Control, ControlCommand, DumpCommand};
use crate::protocol::{
KadConnectionType, KadMessenger, KadMessengerView, KadPeer, KadProtocolHandler, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig, ProtocolEvent, RefreshStage,
};
use crate::addresses::PeerInfo;
use crate::kbucket::KBucketsTable;
use crate::query::{FixedQuery, IterativeQuery, PeerRecord, QueryConfig, QueryStats, QueryStatsAtomic, QueryType};
use crate::store::RecordStore;
use crate::{kbucket, record, KadError, ProviderRecord, Record};
use libp2prs_core::peerstore::{ADDRESS_TTL, PROVIDER_ADDR_TTL};
use libp2prs_swarm::protocol_handler::{IProtocolHandler, ProtocolImpl};
type Result<T> = std::result::Result<T, KadError>;
pub struct Kademlia<TStore> {
kbuckets: KBucketsTable<kbucket::Key<PeerId>, PeerInfo>,
protocol_config: KademliaProtocolConfig,
refreshing: bool,
query_config: QueryConfig,
query_stats: Arc<QueryStatsAtomic>,
stats: KademliaStats,
messengers: Option<MessengerManager>,
connected_peers: FnvHashSet<PeerId>,
provider_timer_handle: Option<task::TaskHandle<()>>,
refresh_timer_handle: Option<task::TaskHandle<()>>,
gc_interval: Duration,
refresh_interval: Option<Duration>,
record_ttl: Duration,
provider_ttl: Duration,
check_kad_peer_interval: Duration,
local_addrs: Vec<Multiaddr>,
store: TStore,
swarm: Option<SwarmControl>,
event_tx: mpsc::UnboundedSender<ProtocolEvent>,
event_rx: mpsc::UnboundedReceiver<ProtocolEvent>,
control_tx: mpsc::UnboundedSender<ControlCommand>,
control_rx: mpsc::UnboundedReceiver<ControlCommand>,
}
#[derive(Debug, Clone, Default)]
pub struct KademliaStats {
pub total_refreshes: usize,
pub query: QueryStats,
pub message_rx: MessageStats,
}
#[derive(Debug, Clone, Default)]
pub struct MessageStats {
pub(crate) ping: usize,
pub(crate) find_node: usize,
pub(crate) get_provider: usize,
pub(crate) add_provider: usize,
pub(crate) get_value: usize,
pub(crate) put_value: usize,
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub(crate) provider: Vec<ProviderRecord>,
pub(crate) record: Vec<Record>,
}
#[derive(Debug, Clone)]
pub struct KademliaConfig {
query_config: QueryConfig,
protocol_config: KademliaProtocolConfig,
refresh_interval: Option<Duration>,
gc_interval: Duration,
record_ttl: Duration,
provider_ttl: Duration,
check_kad_peer_interval: Duration,
}
impl Default for KademliaConfig {
fn default() -> Self {
let check_kad_peer_interval = Duration::from_secs(10 * 60);
KademliaConfig {
query_config: QueryConfig::default(),
protocol_config: Default::default(),
refresh_interval: Some(Duration::from_secs(10 * 60)),
gc_interval: Duration::from_secs(12 * 60 * 60),
record_ttl: Duration::from_secs(36 * 60 * 60),
provider_ttl: Duration::from_secs(24 * 60 * 60),
check_kad_peer_interval,
}
}
}
impl KademliaConfig {
pub fn with_protocol_name(mut self, name: ProtocolId) -> Self {
self.protocol_config.set_protocol_name(name);
self
}
pub fn with_query_timeout(mut self, timeout: Duration) -> Self {
self.query_config.timeout = timeout;
self
}
pub fn with_k(mut self, k: NonZeroUsize) -> Self {
self.query_config.k_value = k;
self
}
pub fn with_alpha(mut self, alpha: NonZeroUsize) -> Self {
self.query_config.alpha_value = alpha;
self
}
pub fn with_beta(mut self, beta: NonZeroUsize) -> Self {
self.query_config.beta_value = beta;
self
}
pub fn with_refresh_interval(mut self, interval: Option<Duration>) -> Self {
self.refresh_interval = interval;
self
}
pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
self.gc_interval = interval;
self
}
pub fn with_record_ttl(mut self, record_ttl: Duration) -> Self {
self.record_ttl = record_ttl;
self
}
pub fn with_provider_record_ttl(mut self, ttl: Duration) -> Self {
self.provider_ttl = ttl;
self
}
pub fn with_max_packet_size(mut self, size: usize) -> Self {
self.protocol_config.set_max_packet_size(size);
self
}
}
#[derive(Clone, Debug)]
pub(crate) struct KadPoster(mpsc::UnboundedSender<ProtocolEvent>);
impl KadPoster {
pub(crate) fn new(tx: mpsc::UnboundedSender<ProtocolEvent>) -> Self {
Self(tx)
}
pub(crate) async fn post(&mut self, event: ProtocolEvent) -> Result<()> {
self.0.send(event).await?;
Ok(())
}
pub(crate) fn unbounded_post(&mut self, event: ProtocolEvent) -> Result<()> {
self.0.unbounded_send(event).map_err(|e| e.into_send_error())?;
Ok(())
}
}
impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a> + Send + 'static,
{
pub fn new(id: PeerId, store: TStore) -> Self {
Self::with_config(id, store, Default::default())
}
pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
let local_key = kbucket::Key::from(id);
let (event_tx, event_rx) = mpsc::unbounded();
let (control_tx, control_rx) = mpsc::unbounded();
Kademlia {
store,
swarm: None,
event_rx,
event_tx,
control_tx,
control_rx,
kbuckets: KBucketsTable::new(local_key),
protocol_config: config.protocol_config,
refreshing: false,
query_config: config.query_config,
query_stats: Arc::new(Default::default()),
stats: Default::default(),
messengers: None,
connected_peers: Default::default(),
provider_timer_handle: None,
refresh_timer_handle: None,
gc_interval: config.gc_interval,
refresh_interval: config.refresh_interval,
record_ttl: config.record_ttl,
provider_ttl: config.provider_ttl,
check_kad_peer_interval: config.check_kad_peer_interval,
local_addrs: vec![],
}
}
fn poster(&self) -> KadPoster {
KadPoster::new(self.event_tx.clone())
}
fn try_add_peer(&mut self, peer: PeerId, queried: bool, permanent: bool) {
let timeout = self.check_kad_peer_interval;
let now = Instant::now();
let key = kbucket::Key::from(peer);
log::debug!(
"trying to add a peer: {:?} bucket-index={:?}, query={}, permanent={}",
peer,
self.kbuckets.bucket_index(&key),
queried,
permanent
);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry) => {
if queried {
entry.value().set_aliveness(Some(Instant::now()));
log::debug!("{:?} updated: {:?}", peer, entry.value());
}
}
kbucket::Entry::Absent(mut entry) => {
let info = PeerInfo::new(queried, permanent);
if entry.insert(info.clone()) {
log::debug!("Peer added to routing table: {} {:?}", peer, info);
if let Some(s) = self.swarm.as_ref() {
s.pin(&peer)
}
} else {
log::debug!("Bucket full, trying to replace an old node for {}", peer);
let bucket = entry.bucket();
let candidate = bucket
.iter()
.filter(|n| n.value.get_aliveness().map_or(true, |a| now.duration_since(a) > timeout))
.min_by(|x, y| x.value.get_aliveness().cmp(&y.value.get_aliveness()));
if let Some(candidate) = candidate {
let key = candidate.key.clone();
let evicted = bucket.remove(&key);
log::debug!("Bucket full. Peer node added, {} replacing {:?}", peer, evicted);
if let Some(s) = self.swarm.as_ref() {
s.unpin(key.preimage())
}
let _ = entry.insert(info);
if let Some(s) = self.swarm.as_ref() {
s.pin(&peer)
}
} else {
log::debug!("Bucket full, but can't find an replaced node, give up {}", peer);
}
}
}
_ => {}
}
}
fn try_remove_peer(&mut self, peer: PeerId, forced: bool) -> Option<kbucket::EntryView<kbucket::Key<PeerId>, PeerInfo>> {
let key = kbucket::Key::from(peer);
log::debug!(
"trying to remove a peer: {:?} bucket-index={:?}, forced={}",
peer,
self.kbuckets.bucket_index(&key),
forced
);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry) => {
if forced || !entry.value().is_permanent() {
if let Some(s) = self.swarm.as_ref() {
s.unpin(&peer)
}
Some(entry.remove())
} else {
entry.value().set_aliveness(None);
None
}
}
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => None,
}
}
fn try_deactivate_peer(&mut self, peer: PeerId) {
let key = kbucket::Key::from(peer);
log::debug!(
"trying to deactivate a peer: {:?} bucket-index={:?}",
peer,
self.kbuckets.bucket_index(&key)
);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry) => entry.value().set_aliveness(None),
kbucket::Entry::Absent(..) | kbucket::Entry::SelfEntry => {}
}
}
fn kbuckets(&mut self) -> impl Iterator<Item = kbucket::KBucketRef<'_, kbucket::Key<PeerId>, PeerInfo>> {
self.kbuckets.iter().filter(|b| !b.is_empty())
}
pub fn kbucket<K>(&mut self, key: K) -> Option<kbucket::KBucketRef<'_, kbucket::Key<PeerId>, PeerInfo>>
where
K: Borrow<[u8]> + Clone,
{
self.kbuckets.bucket(&kbucket::Key::new(key))
}
pub fn store_mut(&mut self) -> &mut TStore {
&mut self.store
}
fn prepare_iterative_query(&mut self, qt: QueryType, key: record::Key) -> IterativeQuery {
let local_id = *self.kbuckets.self_key().preimage();
let target = kbucket::Key::new(key.clone());
let seeds = self
.kbuckets
.closest_keys(&target)
.into_iter()
.take(self.query_config.k_value.get())
.collect();
IterativeQuery::new(
qt,
key,
self.swarm.clone().expect("must be Some"),
self.messengers.clone().expect("must be Some"),
local_id,
self.query_config.clone(),
seeds,
self.poster(),
self.query_stats.clone(),
)
}
fn get_closest_peers<F>(&mut self, key: record::Key, f: F)
where
F: FnOnce(Result<Vec<KadPeer>>) + Send + 'static,
{
log::debug!("finding closest peers {:?}", key);
let q = self.prepare_iterative_query(QueryType::GetClosestPeers, key);
q.run(|r| {
f(r.and_then(|r| r.closest_peers.ok_or(KadError::NotFound)));
});
}
fn find_peer<F>(&mut self, peer_id: PeerId, f: F)
where
F: FnOnce(Result<KadPeer>) + Send + 'static,
{
log::debug!("finding peer {:?}", peer_id);
if let Some(s) = self.swarm.as_ref() {
if let Some(addrs) = s.get_addrs(&peer_id) {
f(Ok(KadPeer {
node_id: peer_id,
multiaddrs: addrs,
connection_ty: KadConnectionType::NotConnected,
}));
return;
}
}
let q = self.prepare_iterative_query(QueryType::FindPeer, peer_id.into());
q.run(|r| {
f(r.and_then(|r| r.found_peer.ok_or(KadError::NotFound)));
});
}
fn find_providers<F>(&mut self, key: record::Key, count: usize, f: F)
where
F: FnOnce(Result<Vec<KadPeer>>) + Send + 'static,
{
log::debug!("finding providers {:?}", key);
let provider_peers = self.provider_peers(&key, None);
if count != 0 && provider_peers.len() >= count {
f(Ok(provider_peers));
} else {
let local = if provider_peers.is_empty() { None } else { Some(provider_peers) };
let q = self.prepare_iterative_query(QueryType::GetProviders { count, local }, key);
q.run(|r| {
f(r.and_then(|r| r.providers.ok_or(KadError::NotFound)));
});
}
}
fn get_record<F>(&mut self, key: record::Key, f: F)
where
F: FnOnce(Result<PeerRecord>) + Send + 'static,
{
log::debug!("getting record {:?}", key);
let quorum = self.query_config.k_value.get();
let mut records = Vec::with_capacity(quorum);
if let Some(record) = self.store.get(&key) {
records.push(PeerRecord {
peer: None,
record: record.into_owned(),
});
}
if records.len() >= quorum {
let record = records.first().cloned().map_or(Err(KadError::NotFound), Ok);
f(record);
} else {
let config = self.query_config.clone();
let messengers = self.messengers.clone().expect("must be Some");
let local = if records.is_empty() { None } else { Some(records) };
let q = self.prepare_iterative_query(QueryType::GetRecord { quorum, local }, key);
let stats = self.query_stats.clone();
q.run(|r| {
f(r.and_then(|r| {
let record = r.records.as_ref().map(|r| r.first().cloned());
if let Some(Some(record)) = record.clone() {
if let Some(cache_peers) = r.cache_peers {
let record = record.record;
let fixed_query = FixedQuery::new(QueryType::PutRecord { record }, messengers, config, cache_peers, stats);
fixed_query.run(|_| {});
}
}
record.map_or(Err(KadError::NotFound), |r| r.map_or(Err(KadError::NotFound), Ok))
}));
});
}
}
fn put_record<F>(&mut self, key: record::Key, value: Vec<u8>, f: F)
where
F: FnOnce(Result<()>) + Send + 'static,
{
log::debug!("putting record {:?}", key);
let publisher = Some(*self.kbuckets.self_key().preimage());
let record = Record::new(key, value, true, publisher);
if let Err(e) = self.store.put(record.clone()) {
f(Err(e));
return;
}
let config = self.query_config.clone();
let messengers = self.messengers.clone().expect("must be Some");
let stats = self.query_stats.clone();
self.get_closest_peers(record.key.clone(), move |peers| {
if let Err(e) = peers {
f(Err(e));
} else {
let peers = peers.unwrap().into_iter().map(KadPeer::into).collect::<Vec<_>>();
let fixed_query = FixedQuery::new(QueryType::PutRecord { record }, messengers, config, peers, stats);
fixed_query.run(f);
}
});
}
fn remove_record(&mut self, key: &record::Key) {
if let Some(r) = self.store.get(key) {
if r.publisher.as_ref() == Some(self.kbuckets.self_key().preimage()) {
self.store.remove(key)
}
}
}
fn bootstrap(&mut self, boot: Vec<(PeerId, Multiaddr)>, reply: Option<oneshot::Sender<Result<()>>>) {
for (node, addr) in boot {
self.add_node(node, vec![addr]);
}
log::debug!("bootstrapping...");
let mut poster = self.poster();
let _ = poster.unbounded_post(ProtocolEvent::Refresh(RefreshStage::Start(reply)));
}
fn add_node(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
if let Some(s) = self.swarm.as_ref() {
s.add_addrs(&peer, addresses, ADDRESS_TTL)
}
self.try_add_peer(peer, false, true);
}
fn remove_node(&mut self, peer: PeerId) {
if let Some(s) = self.swarm.as_ref() {
s.clear_addrs(&peer)
}
self.try_remove_peer(peer, true);
}
fn dump_messengers(&mut self) -> Vec<KadMessengerView> {
self.messengers.as_ref().expect("must be Some").messengers()
}
fn dump_statistics(&mut self) -> KademliaStats {
self.stats.query = self.query_stats.to_view();
self.stats.clone()
}
fn dump_storage(&mut self) -> StorageStats {
let provider = self.store.all_providers().map(|item| item.into_owned()).collect();
let record = self.store.all_records().map(|item| item.into_owned()).collect();
StorageStats { provider, record }
}
fn dump_kbuckets(&mut self) -> Vec<KBucketView> {
let swarm = self.swarm.as_ref().expect("must be Some");
let connected = &self.connected_peers;
let entries = self
.kbuckets
.iter()
.filter(|k| !k.is_empty())
.map(|k| {
let index = k.index();
let bucket = k
.iter()
.map(|n| {
let id = *n.node.key.preimage();
let aliveness = n.node.value.get_aliveness();
let connected = connected.contains(&id);
let addresses = swarm.get_addrs(&id).unwrap_or_else(Vec::new);
KNodeView {
id,
aliveness,
addresses,
connected,
}
})
.collect::<Vec<_>>();
KBucketView { index, bucket }
})
.collect::<Vec<_>>();
entries
}
fn start_providing<F>(&mut self, key: record::Key, f: F)
where
F: FnOnce(Result<()>) + Send + 'static,
{
log::debug!("start providing {:?}", key);
let provider = ProviderRecord::new(key.clone(), *self.kbuckets.self_key().preimage(), true);
if let Err(e) = self.store.add_provider(provider.clone()) {
f(Err(e));
return;
}
let config = self.query_config.clone();
let messengers = self.messengers.clone().expect("must be Some");
let addresses = self.local_addrs.clone();
let stats = self.query_stats.clone();
self.get_closest_peers(key, move |peers| {
if let Err(e) = peers {
f(Err(e));
} else {
let peers = peers.unwrap().into_iter().map(KadPeer::into).collect();
let fixed_query = FixedQuery::new(QueryType::AddProvider { provider, addresses }, messengers, config, peers, stats);
fixed_query.run(f);
}
});
}
fn stop_providing(&mut self, key: &record::Key) {
log::debug!("stop providing {:?}", key);
self.store.remove_provider(key, self.kbuckets.self_key().preimage());
}
fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
if target == self.kbuckets.self_key() {
vec![KadPeer {
node_id: *self.kbuckets.self_key().preimage(),
multiaddrs: vec![],
connection_ty: KadConnectionType::Connected,
}]
} else {
let connected = &self.connected_peers;
let swarm = self.swarm.as_ref().expect("must be Some");
self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.query_config.k_value.get())
.map(|n| {
let node_id = n.node.key.into_preimage();
let connection_ty = if connected.contains(&node_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
};
let multiaddrs = swarm.get_addrs(&node_id).unwrap_or_default();
KadPeer {
node_id,
multiaddrs,
connection_ty,
}
})
.collect()
}
}
fn provider_peers(&mut self, key: &record::Key, source: Option<&PeerId>) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
let connected = &self.connected_peers;
let local_addrs = &self.local_addrs;
let swarm = self.swarm.as_ref().expect("must be Some");
self.store
.providers(key)
.into_iter()
.filter_map(move |p| {
if source.map_or(true, |id| id != &p.provider) {
let node_id = p.provider;
let connection_ty = if connected.contains(&node_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
};
let multiaddrs = if &node_id == kbuckets.self_key().preimage() {
Some(local_addrs.clone())
} else {
swarm.get_addrs(&node_id)
}
.unwrap_or_default();
Some(KadPeer {
node_id,
multiaddrs,
connection_ty,
})
} else {
None
}
})
.take(self.query_config.k_value.get())
.collect()
}
fn handle_put_record(&mut self, _source: PeerId, record: Record) -> Result<KadResponseMsg> {
if record.publisher.as_ref() == Some(self.kbuckets.self_key().preimage()) {
return Ok(KadResponseMsg::PutValue {
key: record.key,
value: record.value,
});
}
log::debug!("handle adding record to store: {:?}", record);
match self.store.put(record.clone()) {
Ok(()) => log::debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
Err(e) => {
log::debug!("Record not stored: {:?}", e);
return Err(e);
}
}
Ok(KadResponseMsg::PutValue {
key: record.key,
value: record.value,
})
}
fn handle_add_provider(&mut self, key: record::Key, provider: KadPeer) {
if &provider.node_id != self.kbuckets.self_key().preimage() {
log::debug!("handle adding provider to store: {:?}", provider);
self.swarm
.as_ref()
.expect("must be Some")
.add_addrs(&provider.node_id, provider.multiaddrs, PROVIDER_ADDR_TTL);
let record = ProviderRecord::new(key, provider.node_id, false);
if let Err(e) = self.store.add_provider(record) {
log::debug!("Provider record not stored: {:?}", e);
}
}
}
pub fn control(&self) -> Control {
Control::new(self.control_tx.clone())
}
fn start_provider_gc_timer(&mut self) {
log::info!("starting provider timer runtime...");
let interval = self.gc_interval;
let mut poster = self.poster();
let h = task::spawn(async move {
loop {
task::sleep(interval).await;
let _ = poster.post(ProtocolEvent::GCTimer).await;
}
});
self.provider_timer_handle = Some(h);
}
fn start_refresh_timer(&mut self) {
if let Some(interval) = self.refresh_interval {
log::info!("starting refresh timer runtime...");
let mut poster = self.poster();
let h = task::spawn(async move {
loop {
task::sleep(interval).await;
let _ = poster.post(ProtocolEvent::RefreshTimer).await;
}
});
self.refresh_timer_handle = Some(h);
}
}
async fn process_loop(&mut self) -> Result<()> {
loop {
select! {
evt = self.event_rx.next() => {
self.on_events(evt)?;
}
cmd = self.control_rx.next() => {
self.on_control_command(cmd)?;
}
}
}
}
fn handle_peer_connected(&mut self, peer_id: PeerId) {
self.connected_peers.insert(peer_id);
}
fn handle_peer_disconnected(&mut self, peer_id: PeerId) {
self.connected_peers.remove(&peer_id);
if let Some(cache) = &mut self.messengers {
cache.clear_messengers(&peer_id);
}
}
fn handle_peer_identified(&mut self, peer_id: PeerId) {
if let Some(swarm) = &mut self.swarm {
if swarm
.first_supported_protocol(&peer_id, vec![self.protocol_config.protocol_name().to_string()])
.is_some()
{
log::debug!("A peer identified as a qualified Kad peer: {:}", peer_id);
self.try_add_peer(peer_id, false, false);
}
}
}
fn handle_address_changed(&mut self, addrs: Vec<Multiaddr>) {
log::debug!("address changed: {:?}, starting refresh...", addrs);
self.local_addrs = addrs;
self.handle_refresh_stage(RefreshStage::Start(None));
}
fn handle_peer_found(&mut self, peer_id: PeerId, queried: bool) {
self.try_add_peer(peer_id, queried, false);
}
fn handle_peer_stopped(&mut self, peer_id: PeerId) {
self.try_remove_peer(peer_id, false);
}
fn on_events(&mut self, msg: Option<ProtocolEvent>) -> Result<()> {
log::debug!("handle kad event: {:?}", msg);
match msg {
Some(ProtocolEvent::PeerConnected(peer_id)) => {
self.handle_peer_connected(peer_id);
}
Some(ProtocolEvent::PeerDisconnected(peer_id)) => {
self.handle_peer_disconnected(peer_id);
}
Some(ProtocolEvent::PeerIdentified(peer_id)) => {
self.handle_peer_identified(peer_id);
}
Some(ProtocolEvent::AddressChanged(addrs)) => {
self.handle_address_changed(addrs);
}
Some(ProtocolEvent::KadPeerFound(peer_id, queried)) => {
self.handle_peer_found(peer_id, queried);
}
Some(ProtocolEvent::KadPeerStopped(peer_id)) => {
self.handle_peer_stopped(peer_id);
}
Some(ProtocolEvent::KadRequest { request, source, reply }) => {
self.handle_kad_request(request, source, reply);
}
Some(ProtocolEvent::GCTimer) => {
self.handle_gc_timer();
}
Some(ProtocolEvent::RefreshTimer) => {
self.handle_refresh_timer();
}
Some(ProtocolEvent::Refresh(stage)) => {
self.handle_refresh_stage(stage);
}
None => {
return Err(KadError::Closing(2));
}
}
Ok(())
}
fn handle_kad_request(&mut self, request: KadRequestMsg, source: PeerId, reply: oneshot::Sender<Result<Option<KadResponseMsg>>>) {
log::debug!("handle Kad request message from {:?}, {:?} ", source, request);
let response = match request {
KadRequestMsg::Ping => {
self.stats.message_rx.ping += 1;
Ok(Some(KadResponseMsg::Pong))
}
KadRequestMsg::FindNode { key } => {
self.stats.message_rx.find_node += 1;
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
Ok(Some(KadResponseMsg::FindNode { closer_peers }))
}
KadRequestMsg::AddProvider { key, provider } => {
self.stats.message_rx.add_provider += 1;
if provider.node_id != source {
log::info!("received provider from wrong peer {:?}", source);
Err(KadError::InvalidSource(source))
} else {
self.handle_add_provider(key, provider);
Ok(None)
}
}
KadRequestMsg::GetProviders { key } => {
self.stats.message_rx.get_provider += 1;
let provider_peers = self.provider_peers(&key, Some(&source));
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
Ok(Some(KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
}))
}
KadRequestMsg::GetValue { key } => {
self.stats.message_rx.get_value += 1;
let record = self.store.get(&key).map(|r| r.into_owned());
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
Ok(Some(KadResponseMsg::GetValue { record, closer_peers }))
}
KadRequestMsg::PutValue { record } => {
self.stats.message_rx.put_value += 1;
self.handle_put_record(source, record).map(Some)
}
};
let _ = reply.send(response);
}
fn handle_gc_timer(&mut self) {
log::info!("handle_gc_timer, GC invoked");
self.store.gc_records(self.record_ttl);
self.store.gc_providers(self.provider_ttl);
}
fn handle_refresh_timer(&mut self) {
let interval = self.check_kad_peer_interval;
let now = Instant::now();
let self_key = self.kbuckets.self_key().clone();
let mut swarm = self.swarm.clone().expect("must be Some");
let mut poster = self.poster();
let peers_to_check = self
.kbuckets
.closest(&self_key)
.filter(|n| {
!n.node.value.is_permanent() && n.node.value.get_aliveness().map_or(true, |a| now.duration_since(a) > interval)
})
.map(|n| n.node.key.into_preimage())
.collect::<Vec<_>>();
task::spawn(async move {
log::debug!("about to health check {} nodes", peers_to_check.len());
let mut count: u32 = 0;
for peer in peers_to_check {
log::debug!("health checking {}", peer);
let r = swarm.new_connection(peer).await;
if r.is_err() {
log::debug!("health checking failed at {}, removing from Kbuckets", peer);
count += 1;
let _ = poster.post(ProtocolEvent::KadPeerStopped(peer)).await;
}
}
log::info!("Kad refresh restarted, total {} nodes removed from Kbuckets", count);
let _ = poster.post(ProtocolEvent::Refresh(RefreshStage::Start(None))).await;
});
}
fn handle_refresh_stage(&mut self, stage: RefreshStage) {
match stage {
RefreshStage::Start(reply) => {
if self.refreshing {
log::debug!("Don't refresh when RT is being refreshed");
if let Some(tx) = reply {
let _ = tx.send(Err(KadError::Bootstrap));
}
return;
}
if self.kbuckets.num_entries() == 0 {
log::debug!("Don't refresh when RT has nothing yet");
if let Some(tx) = reply {
let _ = tx.send(Err(KadError::Bootstrap));
}
return;
}
log::debug!("start refreshing kbuckets...");
self.refreshing = true;
self.stats.total_refreshes += 1;
let local_id = *self.kbuckets.self_key().preimage();
let mut poster = self.poster();
self.get_closest_peers(local_id.into(), |r| {
if r.is_err() {
log::info!("refresh get_closest_peers failed: {:?}", r);
}
task::spawn(async move {
let _ = poster.post(ProtocolEvent::Refresh(RefreshStage::SelfQueryDone(reply))).await;
});
});
}
RefreshStage::SelfQueryDone(reply) => {
log::debug!("bootstrap: self-query done, proceeding with random walk...");
log::debug!("kbuckets entries={}", self.kbuckets.num_entries());
let self_key = self.kbuckets.self_key().clone();
let peers = self.kbuckets.iter()
.skip_while(|b| b.is_empty())
.skip(1)
.take(16)
.map(|b| {
let mut target = kbucket::Key::from(PeerId::random());
for _ in 0..16 {
let d = self_key.distance(&target);
if b.contains(&d) {
log::trace!("random Id generated for bucket-index={:?}", d.ilog2());
break;
}
target = kbucket::Key::from(PeerId::random());
}
target.into_preimage()
}).collect::<Vec<_>>();
log::debug!("random nodes generated: {:?}", peers);
let mut control = self.control();
let mut poster = self.poster();
task::spawn(async move {
for peer in peers {
log::debug!("bootstrap: walk random node {:?}", peer);
let _ = control.lookup(peer.into()).await;
}
let _ = poster.post(ProtocolEvent::Refresh(RefreshStage::Completed)).await;
if let Some(tx) = reply {
let _ = tx.send(Ok(()));
}
});
}
RefreshStage::Completed => {
log::debug!("kbuckets entries={}", self.kbuckets.num_entries());
log::info!("bootstrap: finished");
self.refreshing = false;
}
}
}
fn on_control_command(&mut self, cmd: Option<ControlCommand>) -> Result<()> {
match cmd {
Some(ControlCommand::Bootstrap(boot, reply)) => {
self.bootstrap(boot, reply);
}
Some(ControlCommand::AddNode(peer, addresses)) => {
self.add_node(peer, addresses);
}
Some(ControlCommand::RemoveNode(peer)) => {
self.remove_node(peer);
}
Some(ControlCommand::Lookup(key, reply)) => {
self.get_closest_peers(key, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::FindPeer(peer_id, reply)) => {
self.find_peer(peer_id, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::FindProviders(key, count, reply)) => {
self.find_providers(key, count, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::Providing(key, reply)) => {
self.start_providing(key, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::Unprovide(key)) => {
self.stop_providing(&key);
}
Some(ControlCommand::PutValue(key, value, reply)) => {
self.put_record(key, value, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::GetValue(key, reply)) => {
self.get_record(key, |r| {
let _ = reply.send(r);
});
}
Some(ControlCommand::Dump(cmd)) => match cmd {
DumpCommand::Storage(reply) => {
let _ = reply.send(self.dump_storage());
}
DumpCommand::Entries(reply) => {
let _ = reply.send(self.dump_kbuckets());
}
DumpCommand::Statistics(reply) => {
let _ = reply.send(self.dump_statistics());
}
DumpCommand::Messengers(reply) => {
let _ = reply.send(self.dump_messengers());
}
},
None => {
return Err(KadError::Closing(1));
}
}
Ok(())
}
}
impl<TStore> ProtocolImpl for Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a> + Send + 'static,
{
fn handler(&self) -> IProtocolHandler {
Box::new(KadProtocolHandler::new(self.protocol_config.clone(), self.poster()))
}
fn start(mut self, swarm: SwarmControl) -> Option<task::TaskHandle<()>>
where
Self: Sized,
{
self.messengers = Some(MessengerManager::new(swarm.clone(), self.protocol_config.clone()));
self.swarm = Some(swarm);
self.start_provider_gc_timer();
self.start_refresh_timer();
let mut kad = self;
Some(task::spawn(async move {
let r = kad.process_loop().await;
assert!(r.is_err());
log::info!("Kad main loop closed, quitting due to {:?}", r);
if let Some(h) = kad.refresh_timer_handle.take() {
h.cancel().await;
}
if let Some(h) = kad.provider_timer_handle.take() {
h.cancel().await;
}
log::info!("Kad main loop exited");
}))
}
}
#[derive(Clone)]
pub(crate) struct MessengerManager {
swarm: SwarmControl,
config: KademliaProtocolConfig,
cache: Arc<Mutex<FnvHashMap<PeerId, KadMessenger>>>,
}
impl MessengerManager {
fn new(swarm: SwarmControl, config: KademliaProtocolConfig) -> Self {
Self {
swarm,
config,
cache: Arc::new(Default::default()),
}
}
pub(crate) fn swarm(&mut self) -> &mut SwarmControl {
&mut self.swarm
}
pub(crate) async fn get_messenger(&mut self, peer: &PeerId) -> Result<KadMessenger> {
let r = {
let mut cache = self.cache.lock().unwrap();
cache.remove(peer)
};
match r {
Some(sender) => Ok(sender),
None => {
KadMessenger::build(self.swarm.clone(), *peer, self.config.clone()).await
}
}
}
pub(crate) fn put_messenger(&mut self, mut messenger: KadMessenger) {
if messenger.reuse() {
let mut cache = self.cache.lock().unwrap();
let peer = messenger.get_peer_id();
if !cache.contains_key(peer) {
cache.insert(*peer, messenger);
}
}
}
pub(crate) fn clear_messengers(&mut self, peer_id: &PeerId) {
let mut cache = self.cache.lock().unwrap();
cache.remove(peer_id);
}
pub(crate) fn messengers(&self) -> Vec<KadMessengerView> {
let cache = self.cache.lock().unwrap();
cache.values().map(|m| m.to_view()).collect()
}
}
#[derive(Debug)]
pub struct KBucketView {
pub index: usize,
pub bucket: Vec<KNodeView>,
}
#[derive(Debug)]
pub struct KNodeView {
pub id: PeerId,
pub aliveness: Option<Instant>,
pub addresses: Vec<Multiaddr>,
pub connected: bool,
}
impl fmt::Display for KNodeView {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let now = Instant::now();
write!(
f,
"{:52} Conn({}) {:?} Addrs({:?})",
self.id,
self.connected,
self.aliveness.map(|a| now - a),
self.addresses
)
}
}