#![allow(clippy::mutable_key_type)]
use libp2p::{kad::RecordKey, PeerId};
use rand::{seq::SliceRandom, thread_rng};
use sn_protocol::NetworkAddress;
use std::{
collections::{BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
};
const MAX_PARALLEL_FETCH: usize = 8;
const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
const MAX_RETRIES_PER_PEER: u8 = 1;
const PEERS_TRIED_BEFORE_NETWORK_FETCH: u8 = 3;
type FailedAttempts = u8;
type ReplicationRequestSentTime = Instant;
#[derive(PartialEq, Debug)]
pub(crate) enum HolderStatus {
Pending,
OnGoing,
}
#[derive(Default)]
pub(crate) struct ReplicationFetcher {
to_be_fetched: HashMap<
RecordKey,
BTreeMap<PeerId, (ReplicationRequestSentTime, HolderStatus, FailedAttempts)>,
>,
on_going_fetches: usize,
}
impl ReplicationFetcher {
pub(crate) fn add_keys(
&mut self,
peer_id: PeerId,
incoming_keys: Vec<NetworkAddress>,
locally_stored_keys: &HashSet<RecordKey>,
) -> Vec<(RecordKey, Option<PeerId>)> {
self.retain_keys(locally_stored_keys);
incoming_keys
.into_iter()
.filter_map(|incoming| incoming.as_record_key())
.filter(|incoming| !locally_stored_keys.contains(incoming))
.for_each(|incoming| self.add_holder_pey_key(incoming, peer_id));
self.next_keys_to_fetch()
}
pub(crate) fn notify_about_new_put(
&mut self,
new_put: RecordKey,
) -> Vec<(RecordKey, Option<PeerId>)> {
if let Some(holders) = self.to_be_fetched.get(&new_put) {
if holders
.values()
.any(|(_, status, _)| *status == HolderStatus::OnGoing)
{
self.on_going_fetches = self.on_going_fetches.saturating_sub(1);
}
}
self.to_be_fetched.remove(&new_put);
self.next_keys_to_fetch()
}
fn next_keys_to_fetch(&mut self) -> Vec<(RecordKey, Option<PeerId>)> {
if self.on_going_fetches >= MAX_PARALLEL_FETCH {
return vec![];
}
let len = MAX_PARALLEL_FETCH - self.on_going_fetches;
debug!(
"Number of records awaiting fetch: {:?}",
self.to_be_fetched.len()
);
let mut rng = thread_rng();
let mut data_to_fetch = self.to_be_fetched.iter_mut().collect::<Vec<_>>();
data_to_fetch.shuffle(&mut rng);
let mut keys_to_fetch = HashMap::new();
let mut to_be_removed = vec![];
for (key, holders) in data_to_fetch {
let mut key_added_to_list = false;
for (peer_id, (replication_req_time, holder_status, failed_attempts)) in
holders.iter_mut()
{
match holder_status {
HolderStatus::Pending => {
if key_added_to_list
|| keys_to_fetch.len() >= len
|| *failed_attempts >= MAX_RETRIES_PER_PEER
{
continue;
}
*replication_req_time = Instant::now();
*holder_status = HolderStatus::OnGoing;
keys_to_fetch.insert(key.clone(), Some(*peer_id));
self.on_going_fetches += 1;
key_added_to_list = true;
}
HolderStatus::OnGoing => {
if *replication_req_time + FETCH_TIMEOUT > Instant::now() {
*failed_attempts += 1;
*holder_status = HolderStatus::Pending;
self.on_going_fetches = self.on_going_fetches.saturating_sub(1);
}
}
}
}
let failed_holders_count = holders
.values()
.filter(|(_, _, failed_attempts)| *failed_attempts >= MAX_RETRIES_PER_PEER)
.count();
if failed_holders_count == holders.len()
|| failed_holders_count >= PEERS_TRIED_BEFORE_NETWORK_FETCH as usize
{
to_be_removed.push(key.clone());
keys_to_fetch.insert(key.clone(), None);
self.on_going_fetches = self.on_going_fetches.saturating_sub(1);
}
}
for failed_key in to_be_removed {
let _ = self.to_be_fetched.remove(&failed_key);
}
trace!("Sending out keys to fetch {keys_to_fetch:?}");
keys_to_fetch
.into_iter()
.map(|(key, peer)| (key, peer))
.collect::<Vec<_>>()
}
fn retain_keys(&mut self, existing_keys: &HashSet<RecordKey>) {
self.to_be_fetched
.retain(|key, _| !existing_keys.contains(key));
}
fn add_holder_pey_key(&mut self, key: RecordKey, peer_id: PeerId) {
let holders = self.to_be_fetched.entry(key).or_insert(Default::default());
let _ = holders
.entry(peer_id)
.or_insert((Instant::now(), HolderStatus::Pending, 0));
}
}