mod test;
use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
    DialPeerCondition,
    NetworkBehaviour,
    NetworkBehaviourAction,
    NotifyHandler,
    PollParameters,
    ProtocolsHandler
};
use log::{info, debug, warn};
use smallvec::SmallVec;
use std::{borrow::{Borrow, Cow}, error, iter, time::Duration};
use std::collections::{HashSet, VecDeque};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use std::vec;
use wasm_timer::Instant;
pub use crate::query::QueryStats;
pub struct Kademlia<TStore> {
    
    kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
    
    protocol_config: KademliaProtocolConfig,
    
    queries: QueryPool<QueryInner>,
    
    
    
    connected_peers: FnvHashSet<PeerId>,
    
    
    add_provider_job: Option<AddProviderJob>,
    
    
    put_record_job: Option<PutRecordJob>,
    
    record_ttl: Option<Duration>,
    
    provider_record_ttl: Option<Duration>,
    
    connection_idle_timeout: Duration,
    
    queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
    
    store: TStore,
}
#[derive(Debug, Clone)]
pub struct KademliaConfig {
    kbucket_pending_timeout: Duration,
    query_config: QueryConfig,
    protocol_config: KademliaProtocolConfig,
    record_ttl: Option<Duration>,
    record_replication_interval: Option<Duration>,
    record_publication_interval: Option<Duration>,
    provider_record_ttl: Option<Duration>,
    provider_publication_interval: Option<Duration>,
    connection_idle_timeout: Duration,
}
impl Default for KademliaConfig {
    fn default() -> Self {
        KademliaConfig {
            kbucket_pending_timeout: Duration::from_secs(60),
            query_config: QueryConfig::default(),
            protocol_config: Default::default(),
            record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
            record_replication_interval: Some(Duration::from_secs(60 * 60)),
            record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
            provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
            provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
            connection_idle_timeout: Duration::from_secs(10),
        }
    }
}
impl KademliaConfig {
    
    
    
    
    
    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
        self.protocol_config.set_protocol_name(name);
        self
    }
    
    
    
    
    
    
    pub fn set_query_timeout(&mut self, timeout: Duration) -> &mut Self {
        self.query_config.timeout = timeout;
        self
    }
    
    
    
    
    pub fn set_replication_factor(&mut self, replication_factor: NonZeroUsize) -> &mut Self {
        self.query_config.replication_factor = replication_factor;
        self
    }
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
        self.query_config.parallelism = parallelism;
        self
    }
    
    
    
    
    
    
    
    
    pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
        self.query_config.disjoint_query_paths = enabled;
        self
    }
    
    
    
    
    
    
    
    
    
    pub fn set_record_ttl(&mut self, record_ttl: Option<Duration>) -> &mut Self {
        self.record_ttl = record_ttl;
        self
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn set_replication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
        self.record_replication_interval = interval;
        self
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn set_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
        self.record_publication_interval = interval;
        self
    }
    
    
    
    
    
    pub fn set_provider_record_ttl(&mut self, ttl: Option<Duration>) -> &mut Self {
        self.provider_record_ttl = ttl;
        self
    }
    
    
    
    
    
    
    
    pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
        self.provider_publication_interval = interval;
        self
    }
    
    pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
        self.connection_idle_timeout = duration;
        self
    }
    
    
    
    
    pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
        self.protocol_config.set_max_packet_size(size);
        self
    }
}
impl<TStore> Kademlia<TStore>
where
    for<'a> TStore: RecordStore<'a>
{
    
    pub fn new(id: PeerId, store: TStore) -> Self {
        Self::with_config(id, store, Default::default())
    }
    
    pub fn protocol_name(&self) -> &[u8] {
        self.protocol_config.protocol_name()
    }
    
    pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
        let local_key = kbucket::Key::new(id.clone());
        let put_record_job = config
            .record_replication_interval
            .or(config.record_publication_interval)
            .map(|interval| PutRecordJob::new(
                id.clone(),
                interval,
                config.record_publication_interval,
                config.record_ttl,
            ));
        let add_provider_job = config
            .provider_publication_interval
            .map(AddProviderJob::new);
        Kademlia {
            store,
            kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
            protocol_config: config.protocol_config,
            queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
            queries: QueryPool::new(config.query_config),
            connected_peers: Default::default(),
            add_provider_job,
            put_record_job,
            record_ttl: config.record_ttl,
            provider_record_ttl: config.provider_record_ttl,
            connection_idle_timeout: config.connection_idle_timeout,
        }
    }
    
    pub fn iter_queries<'a>(&'a self) -> impl Iterator<Item = QueryRef<'a>> {
        self.queries.iter().filter_map(|query|
            if !query.is_finished() {
                Some(QueryRef { query })
            } else {
                None
            })
    }
    
    pub fn iter_queries_mut<'a>(&'a mut self) -> impl Iterator<Item = QueryMut<'a>> {
        self.queries.iter_mut().filter_map(|query|
            if !query.is_finished() {
                Some(QueryMut { query })
            } else {
                None
            })
    }
    
    pub fn query<'a>(&'a self, id: &QueryId) -> Option<QueryRef<'a>> {
        self.queries.get(id).and_then(|query|
            if !query.is_finished() {
                Some(QueryRef { query })
            } else {
                None
            })
    }
    
    pub fn query_mut<'a>(&'a mut self, id: &QueryId) -> Option<QueryMut<'a>> {
        self.queries.get_mut(id).and_then(|query|
            if !query.is_finished() {
                Some(QueryMut { query })
            } else {
                None
            })
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
        let key = kbucket::Key::new(peer.clone());
        match self.kbuckets.entry(&key) {
            kbucket::Entry::Present(mut entry, _) => {
                if entry.value().insert(address) {
                    self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
                        KademliaEvent::RoutingUpdated {
                            peer: peer.clone(),
                            addresses: entry.value().clone(),
                            old_peer: None,
                        }
                    ))
                }
            }
            kbucket::Entry::Pending(mut entry, _) => {
                entry.value().insert(address);
            }
            kbucket::Entry::Absent(entry) => {
                let addresses = Addresses::new(address);
                let status =
                    if self.connected_peers.contains(peer) {
                        NodeStatus::Connected
                    } else {
                        NodeStatus::Disconnected
                    };
                match entry.insert(addresses.clone(), status) {
                    kbucket::InsertResult::Inserted => {
                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
                            KademliaEvent::RoutingUpdated {
                                peer: peer.clone(),
                                addresses,
                                old_peer: None,
                            }
                        ));
                    },
                    kbucket::InsertResult::Full => {
                        debug!("Bucket full. Peer not added to routing table: {}", peer)
                    },
                    kbucket::InsertResult::Pending { disconnected } => {
                        self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
                            peer_id: disconnected.into_preimage(),
                            condition: DialPeerCondition::Disconnected
                        })
                    },
                }
            },
            kbucket::Entry::SelfEntry => {},
        }
    }
    
    
    pub fn kbuckets_entries(&mut self) -> impl Iterator<Item = &PeerId> {
        self.kbuckets.iter().map(|entry| entry.node.key.preimage())
    }
    
    
    
    
    pub fn get_closest_peers<K>(&mut self, key: K) -> QueryId
    where
        K: Borrow<[u8]> + Clone
    {
        let info = QueryInfo::GetClosestPeers { key: key.borrow().to_vec() };
        let target = kbucket::Key::new(key);
        let peers = self.kbuckets.closest_keys(&target);
        let inner = QueryInner::new(info);
        self.queries.add_iter_closest(target.clone(), peers, inner)
    }
    
    
    
    
    pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) -> QueryId {
        let quorum = quorum.eval(self.queries.config().replication_factor);
        let mut records = Vec::with_capacity(quorum.get());
        if let Some(record) = self.store.get(key) {
            if record.is_expired(Instant::now()) {
                self.store.remove(key)
            } else {
                records.push(PeerRecord{ peer: None, record: record.into_owned()});
            }
        }
        let done = records.len() >= quorum.get();
        let target = kbucket::Key::new(key.clone());
        let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
        let peers = self.kbuckets.closest_keys(&target);
        let inner = QueryInner::new(info);
        let id = self.queries.add_iter_closest(target.clone(), peers, inner); 
        
        if done {
            self.queries.get_mut(&id).expect("by (*)").finish();
        }
        id
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
        record.publisher = Some(self.kbuckets.local_key().preimage().clone());
        self.store.put(record.clone())?;
        record.expires = record.expires.or_else(||
            self.record_ttl.map(|ttl| Instant::now() + ttl));
        let quorum = quorum.eval(self.queries.config().replication_factor);
        let target = kbucket::Key::new(record.key.clone());
        let peers = self.kbuckets.closest_keys(&target);
        let context = PutRecordContext::Publish;
        let info = QueryInfo::PutRecord {
            context,
            record,
            quorum,
            phase: PutRecordPhase::GetClosestPeers
        };
        let inner = QueryInner::new(info);
        Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
    }
    
    
    
    
    
    
    
    
    
    pub fn remove_record(&mut self, key: &record::Key) {
        if let Some(r) = self.store.get(key) {
            if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
                self.store.remove(key)
            }
        }
    }
    
    pub fn store_mut(&mut self) -> &mut TStore {
        &mut self.store
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
        let local_key = self.kbuckets.local_key().clone();
        let info = QueryInfo::Bootstrap {
            peer: local_key.preimage().clone(),
            remaining: None
        };
        let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
        if peers.is_empty() {
            Err(NoKnownPeers())
        } else {
            let inner = QueryInner::new(info);
            Ok(self.queries.add_iter_closest(local_key, peers, inner))
        }
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
        let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone());
        self.store.add_provider(record)?;
        let target = kbucket::Key::new(key.clone());
        let peers = self.kbuckets.closest_keys(&target);
        let context = AddProviderContext::Publish;
        let info = QueryInfo::AddProvider {
            context,
            key,
            phase: AddProviderPhase::GetClosestPeers
        };
        let inner = QueryInner::new(info);
        let id = self.queries.add_iter_closest(target.clone(), peers, inner);
        Ok(id)
    }
    
    
    
    
    pub fn stop_providing(&mut self, key: &record::Key) {
        self.store.remove_provider(key, self.kbuckets.local_key().preimage());
    }
    
    
    
    
    pub fn get_providers(&mut self, key: record::Key) -> QueryId {
        let info = QueryInfo::GetProviders {
            key: key.clone(),
            providers: HashSet::new(),
        };
        let target = kbucket::Key::new(key);
        let peers = self.kbuckets.closest_keys(&target);
        let inner = QueryInner::new(info);
        self.queries.add_iter_closest(target.clone(), peers, inner)
    }
    
    fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
    where
        I: Iterator<Item = &'a KadPeer> + Clone
    {
        let local_id = self.kbuckets.local_key().preimage().clone();
        let others_iter = peers.filter(|p| p.node_id != local_id);
        for peer in others_iter.clone() {
            self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
                KademliaEvent::Discovered {
                    peer_id: peer.node_id.clone(),
                    addresses: peer.multiaddrs.clone(),
                    ty: peer.connection_ty,
                }
            ));
        }
        if let Some(query) = self.queries.get_mut(query_id) {
            log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
            for peer in others_iter.clone() {
                log::trace!("Peer {:?} reported by {:?} in query {:?}.",
                            peer, source, query_id);
                let addrs = peer.multiaddrs.iter().cloned().collect();
                query.inner.addresses.insert(peer.node_id.clone(), addrs);
            }
            query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
        }
    }
    
    
    
    fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
        if target == self.kbuckets.local_key() {
            Vec::new()
        } else {
            self.kbuckets
                .closest(target)
                .filter(|e| e.node.key.preimage() != source)
                .take(self.queries.config().replication_factor.get())
                .map(KadPeer::from)
                .collect()
        }
    }
    
    fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
        let kbuckets = &mut self.kbuckets;
        self.store.providers(key)
            .into_iter()
            .filter_map(move |p|
                if &p.provider != source {
                    let key = kbucket::Key::new(p.provider.clone());
                    kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned()))
                } else {
                    None
                })
            .take(self.queries.config().replication_factor.get())
            .collect()
    }
    
    fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
        let info = QueryInfo::AddProvider {
            context,
            key: key.clone(),
            phase: AddProviderPhase::GetClosestPeers
        };
        let target = kbucket::Key::new(key);
        let peers = self.kbuckets.closest_keys(&target);
        let inner = QueryInner::new(info);
        self.queries.add_iter_closest(target.clone(), peers, inner);
    }
    
    fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
        let quorum = quorum.eval(self.queries.config().replication_factor);
        let target = kbucket::Key::new(record.key.clone());
        let peers = self.kbuckets.closest_keys(&target);
        let info = QueryInfo::PutRecord {
            record, quorum, context, phase: PutRecordPhase::GetClosestPeers
        };
        let inner = QueryInner::new(info);
        self.queries.add_iter_closest(target.clone(), peers, inner);
    }
    
    fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
        let key = kbucket::Key::new(peer.clone());
        match self.kbuckets.entry(&key) {
            kbucket::Entry::Present(mut entry, old_status) => {
                if let Some(address) = address {
                    if entry.value().insert(address) {
                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
                            KademliaEvent::RoutingUpdated {
                                peer,
                                addresses: entry.value().clone(),
                                old_peer: None,
                            }
                        ))
                    }
                }
                if old_status != new_status {
                    entry.update(new_status);
                }
            },
            kbucket::Entry::Pending(mut entry, old_status) => {
                if let Some(address) = address {
                    entry.value().insert(address);
                }
                if old_status != new_status {
                    entry.update(new_status);
                }
            },
            kbucket::Entry::Absent(entry) => {
                
                if new_status == NodeStatus::Connected {
                    if let Some(address) = address {
                        let addresses = Addresses::new(address);
                        match entry.insert(addresses.clone(), new_status) {
                            kbucket::InsertResult::Inserted => {
                                let event = KademliaEvent::RoutingUpdated {
                                    peer: peer.clone(),
                                    addresses,
                                    old_peer: None,
                                };
                                self.queued_events.push_back(
                                    NetworkBehaviourAction::GenerateEvent(event));
                            },
                            kbucket::InsertResult::Full => {
                                debug!("Bucket full. Peer not added to routing table: {}", peer)
                            },
                            kbucket::InsertResult::Pending { disconnected } => {
                                debug_assert!(!self.connected_peers.contains(disconnected.preimage()));
                                self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
                                    peer_id: disconnected.into_preimage(),
                                    condition: DialPeerCondition::Disconnected
                                })
                            },
                        }
                    } else {
                        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
                            KademliaEvent::UnroutablePeer { peer }
                        ));
                    }
                }
            },
            _ => {}
        }
    }
    
    fn query_finished(&mut self, q: Query<QueryInner>, params: &mut impl PollParameters)
        -> Option<KademliaEvent>
    {
        let query_id = q.id();
        log::trace!("Query {:?} finished.", query_id);
        let result = q.into_result();
        match result.inner.info {
            QueryInfo::Bootstrap { peer, remaining } => {
                let local_key = self.kbuckets.local_key().clone();
                let mut remaining = remaining.unwrap_or_else(|| {
                    debug_assert_eq!(&peer, local_key.preimage());
                    
                    
                    
                    
                    self.kbuckets.buckets()
                        .skip_while(|b| b.num_entries() == 0)
                        .skip(1) 
                        .map(|b| {
                            
                            
                            
                            
                            
                            
                            
                            
                            
                            
                            
                            
                            
                            let mut target = kbucket::Key::new(PeerId::random());
                            for _ in 0 .. 16 {
                                let d = local_key.distance(&target);
                                if b.contains(&d) {
                                    break;
                                }
                                target = kbucket::Key::new(PeerId::random());
                            }
                            target
                        }).collect::<Vec<_>>().into_iter()
                });
                let num_remaining = remaining.len().saturating_sub(1) as u32;
                if let Some(target) = remaining.next() {
                    let info = QueryInfo::Bootstrap {
                        peer: target.clone().into_preimage(),
                        remaining: Some(remaining)
                    };
                    let peers = self.kbuckets.closest_keys(&target);
                    let inner = QueryInner::new(info);
                    self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
                }
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::Bootstrap(Ok(BootstrapOk { peer, num_remaining }))
                })
            }
            QueryInfo::GetClosestPeers { key, .. } => {
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetClosestPeers(Ok(
                        GetClosestPeersOk { key, peers: result.peers.collect() }
                    ))
                })
            }
            QueryInfo::GetProviders { key, providers } => {
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetProviders(Ok(
                        GetProvidersOk {
                            key,
                            providers,
                            closest_peers: result.peers.collect()
                        }
                    ))
                })
            }
            QueryInfo::AddProvider {
                context,
                key,
                phase: AddProviderPhase::GetClosestPeers
            } => {
                let provider_id = params.local_peer_id().clone();
                let external_addresses = params.external_addresses().collect();
                let inner = QueryInner::new(QueryInfo::AddProvider {
                    context,
                    key,
                    phase: AddProviderPhase::AddProvider {
                        provider_id,
                        external_addresses,
                        get_closest_peers_stats: result.stats
                    }
                });
                self.queries.continue_fixed(query_id, result.peers, inner);
                None
            }
            QueryInfo::AddProvider {
                context,
                key,
                phase: AddProviderPhase::AddProvider { get_closest_peers_stats, .. }
            } => {
                match context {
                    AddProviderContext::Publish => {
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: get_closest_peers_stats.merge(result.stats),
                            result: QueryResult::StartProviding(Ok(AddProviderOk { key }))
                        })
                    }
                    AddProviderContext::Republish => {
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: get_closest_peers_stats.merge(result.stats),
                            result: QueryResult::RepublishProvider(Ok(AddProviderOk { key }))
                        })
                    }
                }
            }
            QueryInfo::GetRecord { key, records, quorum, cache_at } => {
                let results = if records.len() >= quorum.get() { 
                    if let Some(cache_key) = cache_at {
                        
                        
                        let record = records.first().expect("[not empty]").record.clone();
                        let quorum = NonZeroUsize::new(1).expect("1 > 0");
                        let context = PutRecordContext::Cache;
                        let info = QueryInfo::PutRecord {
                            context,
                            record,
                            quorum,
                            phase: PutRecordPhase::PutRecord {
                                success: vec![],
                                get_closest_peers_stats: QueryStats::empty()
                            }
                        };
                        let inner = QueryInner::new(info);
                        self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
                    }
                    Ok(GetRecordOk { records })
                } else if records.is_empty() {
                    Err(GetRecordError::NotFound {
                        key,
                        closest_peers: result.peers.collect()
                    })
                } else {
                    Err(GetRecordError::QuorumFailed { key, records, quorum })
                };
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetRecord(results)
                })
            }
            QueryInfo::PutRecord {
                context,
                record,
                quorum,
                phase: PutRecordPhase::GetClosestPeers
            } => {
                let info = QueryInfo::PutRecord {
                    context,
                    record,
                    quorum,
                    phase: PutRecordPhase::PutRecord {
                        success: vec![],
                        get_closest_peers_stats: result.stats
                    }
                };
                let inner = QueryInner::new(info);
                self.queries.continue_fixed(query_id, result.peers, inner);
                None
            }
            QueryInfo::PutRecord {
                context,
                record,
                quorum,
                phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
            } => {
                let mk_result = |key: record::Key| {
                    if success.len() >= quorum.get() {
                        Ok(PutRecordOk { key })
                    } else {
                        Err(PutRecordError::QuorumFailed { key, quorum, success })
                    }
                };
                match context {
                    PutRecordContext::Publish =>
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: get_closest_peers_stats.merge(result.stats),
                            result: QueryResult::PutRecord(mk_result(record.key))
                        }),
                    PutRecordContext::Republish =>
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: get_closest_peers_stats.merge(result.stats),
                            result: QueryResult::RepublishRecord(mk_result(record.key))
                        }),
                    PutRecordContext::Replicate => {
                        debug!("Record replicated: {:?}", record.key);
                        None
                    }
                    PutRecordContext::Cache => {
                        debug!("Record cached: {:?}", record.key);
                        None
                    }
                }
            }
        }
    }
    
    fn query_timeout(&mut self, query: Query<QueryInner>) -> Option<KademliaEvent> {
        let query_id = query.id();
        log::trace!("Query {:?} timed out.", query_id);
        let result = query.into_result();
        match result.inner.info {
            QueryInfo::Bootstrap { peer, mut remaining } => {
                let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
                if let Some(mut remaining) = remaining.take() {
                    
                    if let Some(target) = remaining.next() {
                        let info = QueryInfo::Bootstrap {
                            peer: target.clone().into_preimage(),
                            remaining: Some(remaining)
                        };
                        let peers = self.kbuckets.closest_keys(&target);
                        let inner = QueryInner::new(info);
                        self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
                    }
                }
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::Bootstrap(Err(
                        BootstrapError::Timeout { peer, num_remaining }
                    ))
                })
            }
            QueryInfo::AddProvider { context, key, .. } =>
                Some(match context {
                    AddProviderContext::Publish =>
                        KademliaEvent::QueryResult {
                            id: query_id,
                            stats: result.stats,
                            result: QueryResult::StartProviding(Err(
                                AddProviderError::Timeout { key }
                            ))
                        },
                    AddProviderContext::Republish =>
                        KademliaEvent::QueryResult {
                            id: query_id,
                            stats: result.stats,
                            result: QueryResult::RepublishProvider(Err(
                                AddProviderError::Timeout { key }
                            ))
                        }
                }),
            QueryInfo::GetClosestPeers { key } => {
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetClosestPeers(Err(
                        GetClosestPeersError::Timeout {
                            key,
                            peers: result.peers.collect()
                        }
                    ))
                })
            },
            QueryInfo::PutRecord { record, quorum, context, phase } => {
                let err = Err(PutRecordError::Timeout {
                    key: record.key,
                    quorum,
                    success: match phase {
                        PutRecordPhase::GetClosestPeers => vec![],
                        PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
                    }
                });
                match context {
                    PutRecordContext::Publish =>
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: result.stats,
                            result: QueryResult::PutRecord(err)
                        }),
                    PutRecordContext::Republish =>
                        Some(KademliaEvent::QueryResult {
                            id: query_id,
                            stats: result.stats,
                            result: QueryResult::RepublishRecord(err)
                        }),
                    PutRecordContext::Replicate => match phase {
                        PutRecordPhase::GetClosestPeers => {
                            warn!("Locating closest peers for replication failed: {:?}", err);
                            None
                        }
                        PutRecordPhase::PutRecord { .. } => {
                            debug!("Replicating record failed: {:?}", err);
                            None
                        }
                    }
                    PutRecordContext::Cache => match phase {
                        PutRecordPhase::GetClosestPeers => {
                            
                            
                            
                            unreachable!()
                        }
                        PutRecordPhase::PutRecord { .. } => {
                            debug!("Caching record failed: {:?}", err);
                            None
                        }
                    }
                }
            }
            QueryInfo::GetRecord { key, records, quorum, .. } =>
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetRecord(Err(
                        GetRecordError::Timeout { key, records, quorum },
                    ))
                }),
            QueryInfo::GetProviders { key, providers } =>
                Some(KademliaEvent::QueryResult {
                    id: query_id,
                    stats: result.stats,
                    result: QueryResult::GetProviders(Err(
                        GetProvidersError::Timeout {
                            key,
                            providers,
                            closest_peers: result.peers.collect()
                        }
                    ))
                })
            }
    }
    
    fn record_received(
        &mut self,
        source: PeerId,
        connection: ConnectionId,
        request_id: KademliaRequestId,
        mut record: Record
    ) {
        if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
            
            
            
            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id: source,
                handler: NotifyHandler::One(connection),
                event: KademliaHandlerIn::PutRecordRes {
                    key: record.key,
                    value: record.value,
                    request_id,
                },
            });
            return
        }
        let now = Instant::now();
        
        
        
        
        let target = kbucket::Key::new(record.key.clone());
        let num_between = self.kbuckets.count_nodes_between(&target);
        let k = self.queries.config().replication_factor.get();
        let num_beyond_k = (usize::max(k, num_between) - k) as u32;
        let expiration = self.record_ttl.map(|ttl| now + exp_decrease(ttl, num_beyond_k));
        
        
        record.expires = record.expires.or(expiration).min(expiration);
        if let Some(job) = self.put_record_job.as_mut() {
            
            
            
            
            
            job.skip(record.key.clone())
        }
        
        
        
        
        
        
        
        
        if !record.is_expired(now) {
            
            
            
            match self.store.put(record.clone()) {
                Ok(()) => debug!("Record stored: {:?}; {} bytes", record.key, record.value.len()),
                Err(e) => {
                    info!("Record not stored: {:?}", e);
                    self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                        peer_id: source,
                        handler: NotifyHandler::One(connection),
                        event: KademliaHandlerIn::Reset(request_id)
                    });
                    return
                }
            }
        }
        
        
        
        
        
        
        
        self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
            peer_id: source,
            handler: NotifyHandler::One(connection),
            event: KademliaHandlerIn::PutRecordRes {
                key: record.key,
                value: record.value,
                request_id,
            },
        })
    }
    
    fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
        self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
            KademliaEvent::Discovered {
                peer_id: provider.node_id.clone(),
                addresses: provider.multiaddrs.clone(),
                ty: provider.connection_ty,
            }));
        if &provider.node_id != self.kbuckets.local_key().preimage() {
            let record = ProviderRecord {
                key,
                provider: provider.node_id,
                expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl)
            };
            if let Err(e) = self.store.add_provider(record) {
                info!("Provider record not stored: {:?}", e);
            }
        }
    }
}
fn exp_decrease(ttl: Duration, exp: u32) -> Duration {
    Duration::from_secs(ttl.as_secs().checked_shr(exp).unwrap_or(0))
}
impl<TStore> NetworkBehaviour for Kademlia<TStore>
where
    for<'a> TStore: RecordStore<'a>,
    TStore: Send + 'static,
{
    type ProtocolsHandler = KademliaHandler<QueryId>;
    type OutEvent = KademliaEvent;
    fn new_handler(&mut self) -> Self::ProtocolsHandler {
        KademliaHandler::new(KademliaHandlerConfig {
            protocol_config: self.protocol_config.clone(),
            allow_listening: true,
            idle_timeout: self.connection_idle_timeout,
        })
    }
    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
        
        
        let key = kbucket::Key::new(peer_id.clone());
        let mut peer_addrs =
            if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
                let addrs = entry.value().iter().cloned().collect::<Vec<_>>();
                debug_assert!(!addrs.is_empty(), "Empty peer addresses in routing table.");
                addrs
            } else {
                Vec::new()
            };
        
        for query in self.queries.iter() {
            if let Some(addrs) = query.inner.addresses.get(peer_id) {
                peer_addrs.extend(addrs.iter().cloned())
            }
        }
        peer_addrs
    }
    fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, endpoint: &ConnectedPoint) {
        
        
        
        
        let address = match endpoint {
            ConnectedPoint::Dialer { address } => Some(address.clone()),
            ConnectedPoint::Listener { .. } => None,
        };
        self.connection_updated(peer.clone(), address, NodeStatus::Connected);
    }
    fn inject_connected(&mut self, peer: &PeerId) {
        
        
        for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
            q.inner.pending_rpcs.iter()
                .position(|(p, _)| p == peer)
                .map(|p| q.inner.pending_rpcs.remove(p)))
        {
            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id, event, handler: NotifyHandler::Any
            });
        }
        self.connected_peers.insert(peer.clone());
    }
    fn inject_addr_reach_failure(
        &mut self,
        peer_id: Option<&PeerId>,
        addr: &Multiaddr,
        err: &dyn error::Error
    ) {
        if let Some(peer_id) = peer_id {
            let key = kbucket::Key::new(peer_id.clone());
            if let Some(addrs) = self.kbuckets.entry(&key).value() {
                
                
                
                
                
                if addrs.remove(addr).is_ok() {
                    debug!("Address '{}' removed from peer '{}' due to error: {}.",
                        addr, peer_id, err);
                } else {
                    
                    
                    
                    
                    
                    
                    
                    
                    
                    debug!("Last remaining address '{}' of peer '{}' is unreachable: {}.",
                        addr, peer_id, err)
                }
            }
            for query in self.queries.iter_mut() {
                if let Some(addrs) = query.inner.addresses.get_mut(peer_id) {
                    addrs.retain(|a| a != addr);
                }
            }
        }
    }
    fn inject_dial_failure(&mut self, peer_id: &PeerId) {
        for query in self.queries.iter_mut() {
            query.on_failure(peer_id);
        }
    }
    fn inject_disconnected(&mut self, id: &PeerId) {
        for query in self.queries.iter_mut() {
            query.on_failure(id);
        }
        self.connection_updated(id.clone(), None, NodeStatus::Disconnected);
        self.connected_peers.remove(id);
    }
    fn inject_event(
        &mut self,
        source: PeerId,
        connection: ConnectionId,
        event: KademliaHandlerEvent<QueryId>
    ) {
        match event {
            KademliaHandlerEvent::FindNodeReq { key, request_id } => {
                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                    peer_id: source,
                    handler: NotifyHandler::One(connection),
                    event: KademliaHandlerIn::FindNodeRes {
                        closer_peers,
                        request_id,
                    },
                });
            }
            KademliaHandlerEvent::FindNodeRes {
                closer_peers,
                user_data,
            } => {
                self.discovered(&user_data, &source, closer_peers.iter());
            }
            KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
                let provider_peers = self.provider_peers(&key, &source);
                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                    peer_id: source,
                    handler: NotifyHandler::One(connection),
                    event: KademliaHandlerIn::GetProvidersRes {
                        closer_peers,
                        provider_peers,
                        request_id,
                    },
                });
            }
            KademliaHandlerEvent::GetProvidersRes {
                closer_peers,
                provider_peers,
                user_data,
            } => {
                let peers = closer_peers.iter().chain(provider_peers.iter());
                self.discovered(&user_data, &source, peers);
                if let Some(query) = self.queries.get_mut(&user_data) {
                    if let QueryInfo::GetProviders {
                        providers, ..
                    } = &mut query.inner.info {
                        for peer in provider_peers {
                            providers.insert(peer.node_id);
                        }
                    }
                }
            }
            KademliaHandlerEvent::QueryError { user_data, error } => {
                log::debug!("Request to {:?} in query {:?} failed with {:?}",
                            source, user_data, error);
                
                
                if let Some(query) = self.queries.get_mut(&user_data) {
                    query.on_failure(&source)
                }
            }
            KademliaHandlerEvent::AddProvider { key, provider } => {
                
                if provider.node_id != source {
                    return
                }
                self.provider_received(key, provider)
            }
            KademliaHandlerEvent::GetRecord { key, request_id } => {
                
                let record = match self.store.get(&key) {
                    Some(record) => {
                        if record.is_expired(Instant::now()) {
                            self.store.remove(&key);
                            None
                        } else {
                            Some(record.into_owned())
                        }
                    },
                    None => None
                };
                let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
                self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                    peer_id: source,
                    handler: NotifyHandler::One(connection),
                    event: KademliaHandlerIn::GetRecordRes {
                        record,
                        closer_peers,
                        request_id,
                    },
                });
            }
            KademliaHandlerEvent::GetRecordRes {
                record,
                closer_peers,
                user_data,
            } => {
                if let Some(query) = self.queries.get_mut(&user_data) {
                    if let QueryInfo::GetRecord {
                        key, records, quorum, cache_at
                    } = &mut query.inner.info {
                        if let Some(record) = record {
                            records.push(PeerRecord{ peer: Some(source.clone()), record });
                            let quorum = quorum.get();
                            if records.len() >= quorum {
                                
                                
                                let peers = records.iter()
                                    .filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
                                    .cloned()
                                    .collect::<Vec<_>>();
                                let finished = query.try_finish(peers.iter());
                                if !finished {
                                    debug!(
                                        "GetRecord query ({:?}) reached quorum ({}/{}) with \
                                         response from peer {} but could not yet finish.",
                                        user_data, peers.len(), quorum, source,
                                    );
                                }
                            }
                        } else if quorum.get() == 1 {
                            
                            
                            
                            
                            let source_key = kbucket::Key::from(source.clone());
                            if let Some(cache_key) = cache_at {
                                let key = kbucket::Key::new(key.clone());
                                if source_key.distance(&key) < cache_key.distance(&key) {
                                    *cache_at = Some(source_key)
                                }
                            } else {
                                *cache_at = Some(source_key)
                            }
                        }
                    }
                }
                self.discovered(&user_data, &source, closer_peers.iter());
            }
            KademliaHandlerEvent::PutRecord {
                record,
                request_id
            } => {
                self.record_received(source, connection, request_id, record);
            }
            KademliaHandlerEvent::PutRecordRes {
                user_data, ..
            } => {
                if let Some(query) = self.queries.get_mut(&user_data) {
                    query.on_success(&source, vec![]);
                    if let QueryInfo::PutRecord {
                        phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
                    } = &mut query.inner.info {
                        success.push(source.clone());
                        let quorum = quorum.get();
                        if success.len() >= quorum {
                            let peers = success.clone();
                            let finished = query.try_finish(peers.iter());
                            if !finished {
                                debug!(
                                    "PutRecord query ({:?}) reached quorum ({}/{}) with response \
                                     from peer {} but could not yet finish.",
                                    user_data, peers.len(), quorum, source,
                                );
                            }
                        }
                    }
                }
            }
        };
    }
    fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
        NetworkBehaviourAction<
            <KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
            Self::OutEvent,
        >,
    > {
        let now = Instant::now();
        
        let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
        
        if let Some(mut job) = self.add_provider_job.take() {
            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
            for _ in 0 .. num {
                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
                    self.start_add_provider(r.key, AddProviderContext::Republish)
                } else {
                    break
                }
            }
            jobs_query_capacity -= num;
            self.add_provider_job = Some(job);
        }
        
        if let Some(mut job) = self.put_record_job.take() {
            let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
            for _ in 0 .. num {
                if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
                    let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
                        PutRecordContext::Republish
                    } else {
                        PutRecordContext::Replicate
                    };
                    self.start_put_record(r, Quorum::All, context)
                } else {
                    break
                }
            }
            self.put_record_job = Some(job);
        }
        loop {
            
            if let Some(event) = self.queued_events.pop_front() {
                return Poll::Ready(event);
            }
            
            if let Some(entry) = self.kbuckets.take_applied_pending() {
                let kbucket::Node { key, value } = entry.inserted;
                let event = KademliaEvent::RoutingUpdated {
                    peer: key.into_preimage(),
                    addresses: value,
                    old_peer: entry.evicted.map(|n| n.key.into_preimage())
                };
                return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
            }
            
            loop {
                match self.queries.poll(now) {
                    QueryPoolState::Finished(q) => {
                        if let Some(event) = self.query_finished(q, parameters) {
                            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
                        }
                    }
                    QueryPoolState::Timeout(q) => {
                        if let Some(event) = self.query_timeout(q) {
                            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
                        }
                    }
                    QueryPoolState::Waiting(Some((query, peer_id))) => {
                        let event = query.inner.info.to_request(query.id());
                        
                        
                        
                        
                        
                        if let QueryInfo::AddProvider {
                            phase: AddProviderPhase::AddProvider { .. },
                            ..
                        } = &query.inner.info {
                            query.on_success(&peer_id, vec![])
                        }
                        if self.connected_peers.contains(&peer_id) {
                            self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
                                peer_id, event, handler: NotifyHandler::Any
                            });
                        } else if &peer_id != self.kbuckets.local_key().preimage() {
                            query.inner.pending_rpcs.push((peer_id.clone(), event));
                            self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
                                peer_id, condition: DialPeerCondition::Disconnected
                            });
                        }
                    }
                    QueryPoolState::Waiting(None) | QueryPoolState::Idle => break,
                }
            }
            
            
            
            if self.queued_events.is_empty() {
                return Poll::Pending
            }
        }
    }
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Quorum {
    One,
    Majority,
    All,
    N(NonZeroUsize)
}
impl Quorum {
    
    fn eval(&self, total: NonZeroUsize) -> NonZeroUsize {
        match self {
            Quorum::One => NonZeroUsize::new(1).expect("1 != 0"),
            Quorum::Majority => NonZeroUsize::new(total.get() / 2 + 1).expect("n + 1 != 0"),
            Quorum::All => total,
            Quorum::N(n) => NonZeroUsize::min(total, *n)
        }
    }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
    
    
    pub peer: Option<PeerId>,
    pub record: Record,
}
#[derive(Debug)]
pub enum KademliaEvent {
    
    QueryResult {
        
        id: QueryId,
        
        result: QueryResult,
        
        stats: QueryStats
    },
    
    Discovered {
        
        peer_id: PeerId,
        
        addresses: Vec<Multiaddr>,
        
        
        ty: KadConnectionType,
    },
    
    RoutingUpdated {
        
        peer: PeerId,
        
        addresses: Addresses,
        
        
        old_peer: Option<PeerId>,
    },
    
    
    
    
    UnroutablePeer {
        peer: PeerId
    }
}
#[derive(Debug)]
pub enum QueryResult {
    
    Bootstrap(BootstrapResult),
    
    GetClosestPeers(GetClosestPeersResult),
    
    GetProviders(GetProvidersResult),
    
    StartProviding(AddProviderResult),
    
    RepublishProvider(AddProviderResult),
    
    GetRecord(GetRecordResult),
    
    PutRecord(PutRecordResult),
    
    RepublishRecord(PutRecordResult),
}
pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
#[derive(Debug, Clone)]
pub struct GetRecordOk {
    pub records: Vec<PeerRecord>
}
#[derive(Debug, Clone)]
pub enum GetRecordError {
    NotFound {
        key: record::Key,
        closest_peers: Vec<PeerId>
    },
    QuorumFailed {
        key: record::Key,
        records: Vec<PeerRecord>,
        quorum: NonZeroUsize
    },
    Timeout {
        key: record::Key,
        records: Vec<PeerRecord>,
        quorum: NonZeroUsize
    }
}
impl GetRecordError {
    
    pub fn key(&self) -> &record::Key {
        match self {
            GetRecordError::QuorumFailed { key, .. } => key,
            GetRecordError::Timeout { key, .. } => key,
            GetRecordError::NotFound { key, .. } => key,
        }
    }
    
    
    pub fn into_key(self) -> record::Key {
        match self {
            GetRecordError::QuorumFailed { key, .. } => key,
            GetRecordError::Timeout { key, .. } => key,
            GetRecordError::NotFound { key, .. } => key,
        }
    }
}
pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
#[derive(Debug, Clone)]
pub struct PutRecordOk {
    pub key: record::Key
}
#[derive(Debug)]
pub enum PutRecordError {
    QuorumFailed {
        key: record::Key,
        
        success: Vec<PeerId>,
        quorum: NonZeroUsize
    },
    Timeout {
        key: record::Key,
        
        success: Vec<PeerId>,
        quorum: NonZeroUsize
    },
}
impl PutRecordError {
    
    pub fn key(&self) -> &record::Key {
        match self {
            PutRecordError::QuorumFailed { key, .. } => key,
            PutRecordError::Timeout { key, .. } => key,
        }
    }
    
    
    pub fn into_key(self) -> record::Key {
        match self {
            PutRecordError::QuorumFailed { key, .. } => key,
            PutRecordError::Timeout { key, .. } => key,
        }
    }
}
pub type BootstrapResult = Result<BootstrapOk, BootstrapError>;
#[derive(Debug, Clone)]
pub struct BootstrapOk {
    pub peer: PeerId,
    pub num_remaining: u32,
}
#[derive(Debug, Clone)]
pub enum BootstrapError {
    Timeout {
        peer: PeerId,
        num_remaining: Option<u32>,
    }
}
pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>;
#[derive(Debug, Clone)]
pub struct GetClosestPeersOk {
    pub key: Vec<u8>,
    pub peers: Vec<PeerId>
}
#[derive(Debug, Clone)]
pub enum GetClosestPeersError {
    Timeout {
        key: Vec<u8>,
        peers: Vec<PeerId>
    }
}
impl GetClosestPeersError {
    
    pub fn key(&self) -> &Vec<u8> {
        match self {
            GetClosestPeersError::Timeout { key, .. } => key,
        }
    }
    
    
    pub fn into_key(self) -> Vec<u8> {
        match self {
            GetClosestPeersError::Timeout { key, .. } => key,
        }
    }
}
pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
#[derive(Debug, Clone)]
pub struct GetProvidersOk {
    pub key: record::Key,
    pub providers: HashSet<PeerId>,
    pub closest_peers: Vec<PeerId>
}
#[derive(Debug, Clone)]
pub enum GetProvidersError {
    Timeout {
        key: record::Key,
        providers: HashSet<PeerId>,
        closest_peers: Vec<PeerId>
    }
}
impl GetProvidersError {
    
    pub fn key(&self) -> &record::Key {
        match self {
            GetProvidersError::Timeout { key, .. } => key,
        }
    }
    
    
    pub fn into_key(self) -> record::Key {
        match self {
            GetProvidersError::Timeout { key, .. } => key,
        }
    }
}
pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
#[derive(Debug, Clone)]
pub struct AddProviderOk {
    pub key: record::Key,
}
#[derive(Debug)]
pub enum AddProviderError {
    
    Timeout {
        key: record::Key,
    },
}
impl AddProviderError {
    
    pub fn key(&self) -> &record::Key {
        match self {
            AddProviderError::Timeout { key, .. } => key,
        }
    }
    
    pub fn into_key(self) -> record::Key {
        match self {
            AddProviderError::Timeout { key, .. } => key,
        }
    }
}
impl From<kbucket::EntryView<kbucket::Key<PeerId>, Addresses>> for KadPeer {
    fn from(e: kbucket::EntryView<kbucket::Key<PeerId>, Addresses>) -> KadPeer {
        KadPeer {
            node_id: e.node.key.into_preimage(),
            multiaddrs: e.node.value.into_vec(),
            connection_ty: match e.status {
                NodeStatus::Connected => KadConnectionType::Connected,
                NodeStatus::Disconnected => KadConnectionType::NotConnected
            }
        }
    }
}
struct QueryInner {
    
    info: QueryInfo,
    
    addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
    
    
    
    
    pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
}
impl QueryInner {
    fn new(info: QueryInfo) -> Self {
        QueryInner {
            info,
            addresses: Default::default(),
            pending_rpcs: SmallVec::default()
        }
    }
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddProviderContext {
    Publish,
    Republish,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PutRecordContext {
    Publish,
    Republish,
    Replicate,
    Cache,
}
#[derive(Debug, Clone)]
pub enum QueryInfo {
    
    Bootstrap {
        
        peer: PeerId,
        
        
        
        
        
        
        remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>
    },
    
    GetClosestPeers { key: Vec<u8> },
    
    GetProviders {
        
        key: record::Key,
        
        providers: HashSet<PeerId>,
    },
    
    AddProvider {
        
        key: record::Key,
        
        phase: AddProviderPhase,
        
        context: AddProviderContext,
    },
    
    PutRecord {
        record: Record,
        
        quorum: NonZeroUsize,
        
        phase: PutRecordPhase,
        
        context: PutRecordContext,
    },
    
    GetRecord {
        
        key: record::Key,
        
        
        records: Vec<PeerRecord>,
        
        quorum: NonZeroUsize,
        
        
        
        
        cache_at: Option<kbucket::Key<PeerId>>,
    },
}
impl QueryInfo {
    
    
    fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
        match &self {
            QueryInfo::Bootstrap { peer, .. } => KademliaHandlerIn::FindNodeReq {
                key: peer.clone().into_bytes(),
                user_data: query_id,
            },
            QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
                key: key.clone(),
                user_data: query_id,
            },
            QueryInfo::GetProviders { key, .. } => KademliaHandlerIn::GetProvidersReq {
                key: key.clone(),
                user_data: query_id,
            },
            QueryInfo::AddProvider { key, phase, .. } => match phase {
                AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
                    key: key.to_vec(),
                    user_data: query_id,
                },
                AddProviderPhase::AddProvider { provider_id, external_addresses, .. } => {
                    KademliaHandlerIn::AddProvider {
                        key: key.clone(),
                        provider: crate::protocol::KadPeer {
                            node_id: provider_id.clone(),
                            multiaddrs: external_addresses.clone(),
                            connection_ty: crate::protocol::KadConnectionType::Connected,
                        }
                    }
                }
            },
            QueryInfo::GetRecord { key, .. } => KademliaHandlerIn::GetRecord {
                key: key.clone(),
                user_data: query_id,
            },
            QueryInfo::PutRecord { record, phase, .. } => match phase {
                PutRecordPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
                    key: record.key.to_vec(),
                    user_data: query_id,
                },
                PutRecordPhase::PutRecord { .. } => KademliaHandlerIn::PutRecord {
                    record: record.clone(),
                    user_data: query_id
                }
            }
        }
    }
}
#[derive(Debug, Clone)]
pub enum AddProviderPhase {
    
    GetClosestPeers,
    
    
    AddProvider {
        
        provider_id: PeerId,
        
        external_addresses: Vec<Multiaddr>,
        
        get_closest_peers_stats: QueryStats,
    },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PutRecordPhase {
    
    GetClosestPeers,
    
    PutRecord {
        
        success: Vec<PeerId>,
        
        get_closest_peers_stats: QueryStats,
    },
}
pub struct QueryMut<'a> {
    query: &'a mut Query<QueryInner>,
}
impl<'a> QueryMut<'a> {
    pub fn id(&self) -> QueryId {
        self.query.id()
    }
    
    pub fn info(&self) -> &QueryInfo {
        &self.query.inner.info
    }
    
    
    
    
    pub fn stats(&self) -> &QueryStats {
        self.query.stats()
    }
    
    
    pub fn finish(&mut self) {
        self.query.finish()
    }
}
pub struct QueryRef<'a> {
    query: &'a Query<QueryInner>,
}
impl<'a> QueryRef<'a> {
    pub fn id(&self) -> QueryId {
        self.query.id()
    }
    
    pub fn info(&self) -> &QueryInfo {
        &self.query.inner.info
    }
    
    
    
    
    pub fn stats(&self) -> &QueryStats {
        self.query.stats()
    }
}
#[derive(Debug, Clone)]
pub struct NoKnownPeers();
impl fmt::Display for NoKnownPeers {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "No known peers.")
    }
}
impl std::error::Error for NoKnownPeers {}