use std::{
fmt::Display,
time::{Duration, Instant},
};
use ahash::{AHashMap, AHashSet};
use cid::Cid;
use rand::{thread_rng, Rng};
use super::cid_queue::CidQueue;
const LIVE_WANTS_ORDER_GC_LIMIT: usize = 32;
#[derive(Debug)]
pub struct SessionWants {
to_fetch: CidQueue,
live_wants: AHashMap<Cid, Instant>,
live_wants_order: Vec<Cid>,
broadcast_limit: usize,
}
impl Display for SessionWants {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} pending / {} live",
self.to_fetch.len(),
self.live_wants.len()
)
}
}
impl SessionWants {
pub fn new(broadcast_limit: usize) -> Self {
SessionWants {
to_fetch: Default::default(),
live_wants: Default::default(),
live_wants_order: Default::default(),
broadcast_limit,
}
}
pub fn blocks_requested(&mut self, new_wants: &[Cid]) {
for cid in new_wants {
self.to_fetch.push(*cid);
}
}
pub fn get_next_wants(&mut self) -> AHashSet<Cid> {
let now = Instant::now();
let current_live_count = self.live_wants.len();
let to_add = self.broadcast_limit - current_live_count;
let mut live = AHashSet::new();
for _ in 0..to_add {
if let Some(cid) = self.to_fetch.pop() {
live.insert(cid);
self.live_wants_order.push(cid);
self.live_wants.insert(cid, now);
} else {
break;
}
}
live
}
pub fn wants_sent(&mut self, keys: &[Cid]) {
let now = Instant::now();
for key in keys {
if !self.live_wants.contains_key(key) && self.to_fetch.has(key) {
self.to_fetch.remove(key);
self.live_wants_order.push(*key);
self.live_wants.insert(*key, now);
}
}
}
pub fn blocks_received(&mut self, keys: &[Cid]) -> (Vec<Cid>, Duration) {
let mut wanted = Vec::with_capacity(keys.len());
let mut total_latency = Duration::default();
let now = Instant::now();
for key in keys {
if self.is_wanted(key) {
wanted.push(*key);
if let Some(sent_at) = self.live_wants.get(key) {
total_latency += now - *sent_at;
}
self.live_wants.remove(key);
self.to_fetch.remove(key);
}
}
if self.live_wants_order.len() - self.live_wants.len() > LIVE_WANTS_ORDER_GC_LIMIT {
self.live_wants_order
.retain(|key| self.live_wants.contains_key(key));
}
(wanted, total_latency)
}
pub fn prepare_broadcast(&mut self) -> AHashSet<Cid> {
let now = Instant::now();
let mut live = AHashSet::with_capacity(self.live_wants.len());
for key in &self.live_wants_order {
if let Some(want) = self.live_wants.get_mut(key) {
*want = now;
live.insert(*key);
if live.len() == self.broadcast_limit {
break;
}
}
}
live
}
pub fn cancel_pending(&mut self, keys: &[Cid]) {
for key in keys {
self.to_fetch.remove(key);
}
}
pub fn random_live_want(&self) -> Option<Cid> {
if self.live_wants.is_empty() {
return None;
}
let mut rng = thread_rng();
let i = rng.gen_range(0..self.live_wants.len());
self.live_wants.keys().nth(i).copied()
}
pub fn has_live_wants(&self) -> bool {
!self.live_wants.is_empty()
}
fn is_wanted(&self, key: &Cid) -> bool {
if !self.live_wants.contains_key(key) {
self.to_fetch.has(key)
} else {
true
}
}
}