use crate::{
config::MultiaddrWithPeerId,
ipfs_block_provider::{BlockProvider, Change},
};
use futures::StreamExt;
use litep2p::{
protocol::libp2p::kademlia::{
Config, ConfigBuilder, KademliaEvent, KademliaHandle, Quorum, RecordKey,
},
types::multiaddr::Multiaddr,
PeerId,
};
use log::{debug, trace};
use sp_core::hexdisplay::HexDisplay;
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
time::Duration,
};
use tokio::time::MissedTickBehavior;
const LOG_TARGET: &str = "sub-libp2p::ipfs::dht";
const KAD_PROTOCOL: &str = "/ipfs/kad/1.0.0";
const QUORUM: Quorum = Quorum::N(NonZeroUsize::new(10).expect("10 > 0; qed"));
const MAX_PROVIDER_KEYS: usize = 2_000_000;
const RAW_CODEC: u64 = 0x55;
const RANDOM_WALK_INTERVAL: Duration = Duration::from_secs(10 * 60);
const MAX_INFLIGHT_QUERIES: usize = 100;
type Cid = cid::CidGeneric<32>;
pub(crate) struct IpfsDht {
kademlia_handle: KademliaHandle,
block_provider: Box<dyn BlockProvider>,
}
impl IpfsDht {
pub fn new(
bootnodes: Vec<MultiaddrWithPeerId>,
block_provider: Box<dyn BlockProvider>,
) -> (Self, Config) {
let known_peers = {
let mut known_peers: HashMap<PeerId, HashSet<Multiaddr>> = HashMap::new();
for address in bootnodes {
let peer_id = address.peer_id.into();
let multiaddr = address.concat().into();
known_peers.entry(peer_id).or_default().insert(multiaddr);
}
known_peers.into_iter().map(|(k, v)| (k, v.into_iter().collect())).collect()
};
let (config, kademlia_handle) = ConfigBuilder::new()
.with_protocol_names(vec![KAD_PROTOCOL.into()])
.with_known_peers(known_peers)
.with_max_provider_keys(MAX_PROVIDER_KEYS)
.build();
(Self { kademlia_handle, block_provider }, config)
}
pub async fn run(mut self) {
let mut changes = self.block_provider.changes();
let mut inflight_queries = HashMap::new();
let mut random_walk_query = None;
let mut random_walk_interval = tokio::time::interval(RANDOM_WALK_INTERVAL);
random_walk_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
change = changes.next(), if inflight_queries.len() < MAX_INFLIGHT_QUERIES => {
match change {
None => {
debug!(target: LOG_TARGET, "BlockProvider terminated, terminating IpfsDht");
return
},
Some(Change::Added(multihash)) => {
let key = RecordKey::new(&multihash.to_bytes());
trace!(
target: LOG_TARGET,
"IPFS DHT start providing key: {}, CID: {}",
HexDisplay::from(&key.as_ref()),
Cid::new_v1(RAW_CODEC, multihash),
);
let query_id = self.kademlia_handle.start_providing(key, QUORUM).await;
inflight_queries.insert(query_id, multihash);
},
Some(Change::Removed(multihash)) => {
let key = RecordKey::new(&multihash.to_bytes());
trace!(
target: LOG_TARGET,
"IPFS DHT stop providing key: {}, CID: {}",
HexDisplay::from(&key.as_ref()),
Cid::new_v1(RAW_CODEC, multihash),
);
self.kademlia_handle.stop_providing(key).await;
},
}
},
_ = random_walk_interval.tick() => {
if random_walk_query.is_some() {
continue;
}
random_walk_query = Some(self.kademlia_handle.find_node(PeerId::random()).await);
},
event = self.kademlia_handle.next() => {
match event {
None => {
debug!(target: LOG_TARGET, "IPFS Kademlia terminated, terminating IpfsDht");
return
}
Some(KademliaEvent::AddProviderSuccess { query_id, provided_key }) => {
if let Some(multihash) = inflight_queries.remove(&query_id) {
trace!(
target: LOG_TARGET,
"IPFS DHT provider publish success, key: {}, CID: {}",
HexDisplay::from(&provided_key.as_ref()),
Cid::new_v1(RAW_CODEC, multihash),
);
} else {
trace!(
target: LOG_TARGET,
"IPFS DHT provider refresh success, key: {}",
HexDisplay::from(&provided_key.as_ref()),
);
}
},
Some(KademliaEvent::FindNodeSuccess { query_id, peers, .. }) => {
debug_assert_eq!(Some(query_id), random_walk_query);
trace!(target: LOG_TARGET, "DHT random walk yielded {} peers", peers.len());
random_walk_query = None;
},
Some(KademliaEvent::QueryFailed { query_id }) => {
if Some(query_id) == random_walk_query {
trace!(target: LOG_TARGET, "DHT random walk failed");
random_walk_query = None;
} else if let Some(multihash) = inflight_queries.remove(&query_id) {
trace!(
target: LOG_TARGET,
"IPFS DHT provider publish failed, key: {}, CID: {}",
HexDisplay::from(&multihash.to_bytes()),
Cid::new_v1(RAW_CODEC, multihash),
);
} else {
trace!(
target: LOG_TARGET,
"IPFS DHT provider refresh failed",
);
}
},
Some(_) => {},
}
}
}
}
}
}