use std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
time::Duration,
};
use libp2p_identity::PeerId;
use web_time::Instant;
use crate::topic::TopicHash;
#[derive(Copy, Clone)]
struct HeartbeatIndex(usize);
pub(crate) struct BackoffStorage {
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
heartbeat_index: HeartbeatIndex,
heartbeat_interval: Duration,
backoff_slack: u32,
}
impl BackoffStorage {
fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
d.as_nanos().div_ceil(heartbeat_interval.as_nanos()) as usize
}
pub(crate) fn new(
prune_backoff: &Duration,
heartbeat_interval: Duration,
backoff_slack: u32,
) -> BackoffStorage {
let max_heartbeats =
Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
BackoffStorage {
backoffs: HashMap::new(),
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
heartbeat_index: HeartbeatIndex(0),
heartbeat_interval,
backoff_slack,
}
}
pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let Some(instant) = Instant::now().checked_add(time) else {
tracing::warn!("ignoring oversized prune backoff");
return;
};
let insert_into_backoffs_by_heartbeat =
|heartbeat_index: HeartbeatIndex,
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
heartbeat_interval,
backoff_slack| {
let pair = (topic.clone(), *peer);
let index = (heartbeat_index.0
+ Self::heartbeats(&time, heartbeat_interval)
+ backoff_slack as usize)
% backoffs_by_heartbeat.len();
backoffs_by_heartbeat[index].insert(pair);
HeartbeatIndex(index)
};
match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
Entry::Occupied(mut o) => {
let (backoff, index) = o.get();
if backoff < &instant {
let pair = (topic.clone(), *peer);
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
s.remove(&pair);
}
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
o.insert((instant, index));
}
}
Entry::Vacant(v) => {
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
v.insert((instant, index));
}
};
}
pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
self.backoffs
.get(topic)
.is_some_and(|m| m.contains_key(peer))
}
pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
}
fn get_backoff_time_from_backoffs(
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
topic: &TopicHash,
peer: &PeerId,
) -> Option<Instant> {
backoffs
.get(topic)
.and_then(|m| m.get(peer).map(|(i, _)| *i))
}
pub(crate) fn heartbeat(&mut self) {
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
let backoffs = &mut self.backoffs;
let slack = self.heartbeat_interval * self.backoff_slack;
let now = Instant::now();
s.retain(|(topic, peer)| {
let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
Some(backoff_time) => backoff_time
.checked_add(slack)
.map(|backoff| backoff > now)
.unwrap_or(false),
None => false,
};
if !keep {
if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
m.remove();
}
}
}
keep
});
}
self.heartbeat_index =
HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
}
}