use std::sync::Arc;
use ahash::{AHashMap, AHashSet};
use cid::Cid;
use tokio::sync::RwLock;
use tracing::debug;
use crate::Block;
#[derive(Default, Debug, Clone)]
pub struct SessionInterestManager {
wants: Arc<RwLock<AHashMap<Cid, AHashMap<u64, bool>>>>,
}
impl SessionInterestManager {
pub async fn record_session_interest(&self, session: u64, keys: &[Cid]) {
debug!("session:{} record_session_interest: {:?}", session, keys);
let wants = &mut *self.wants.write().await;
for key in keys {
wants.entry(*key).or_default().insert(session, true);
}
}
pub async fn remove_session(&self, session: u64) -> Vec<Cid> {
debug!("session:{}: remove_session", session);
let wants = &mut *self.wants.write().await;
let mut deleted_keys = Vec::new();
for (key, wants) in wants.iter_mut() {
wants.remove(&session);
if wants.is_empty() {
deleted_keys.push(*key);
}
}
for key in &deleted_keys {
wants.remove(key);
}
deleted_keys
}
pub async fn remove_session_wants(&self, session: u64, keys: &[Cid]) {
debug!(
"session:{}: remove_session_wants: {:?}",
session,
keys.iter().map(|s| s.to_string()).collect::<Vec<_>>()
);
let wants = &mut *self.wants.write().await;
for key in keys {
if let Some(wants) = wants.get_mut(key) {
if let Some(wanted) = wants.get_mut(&session) {
if *wanted {
*wanted = false;
}
}
}
}
}
pub async fn remove_session_interested(&self, session: u64, keys: &[Cid]) -> Vec<Cid> {
debug!(
"session:{}: remove_session_interested: {:?}",
session,
keys.iter().map(|s| s.to_string()).collect::<Vec<_>>()
);
let wants = &mut *self.wants.write().await;
let mut deleted_keys = Vec::new();
for key in keys {
if let Some(wants) = wants.get_mut(key) {
wants.remove(&session);
if wants.is_empty() {
deleted_keys.push(*key);
}
}
}
for key in &deleted_keys {
wants.remove(key);
}
deleted_keys
}
pub async fn filter_session_interested(
&self,
session: u64,
key_sets: &[&[Cid]],
) -> Vec<Vec<Cid>> {
debug!("filter_session_interested",);
let mut results = Vec::with_capacity(key_sets.len());
let wants = &*self.wants.read().await;
for key_set in key_sets {
let mut has = Vec::new();
for key in *key_set {
if let Some(wants) = wants.get(key) {
if wants.get(&session).copied().unwrap_or_default() {
has.push(*key);
}
}
}
results.push(has);
}
results
}
pub async fn split_wanted_unwanted<'a>(
&self,
blocks: &'a [Block],
) -> (Vec<&'a Block>, Vec<&'a Block>) {
debug!("split_wanted_unwantedn",);
let wants = &*self.wants.read().await;
let mut wanted_keys = AHashSet::new();
for block in blocks {
let cid = block.cid();
if let Some(wants) = wants.get(cid) {
for wanted in wants.values() {
if *wanted {
wanted_keys.insert(*cid);
}
}
}
}
let mut wanted_blocks = Vec::new();
let mut not_wanted_blocks = Vec::new();
for block in blocks {
if wanted_keys.contains(block.cid()) {
wanted_blocks.push(block);
} else {
not_wanted_blocks.push(block);
}
}
(wanted_blocks, not_wanted_blocks)
}
pub async fn interested_sessions(
&self,
blocks: &[Cid],
haves: &[Cid],
dont_haves: &[Cid],
) -> AHashSet<u64> {
debug!("interested sessions");
let wants = &*self.wants.read().await;
let mut session_keys = AHashSet::new();
let keys = blocks.iter().chain(haves.iter()).chain(dont_haves.iter());
for key in keys {
if let Some(wants) = wants.get(key) {
for session in wants.keys() {
session_keys.insert(*session);
}
}
}
session_keys
}
}