use std::cmp::Ordering;
use ahash::{AHashMap, AHashSet};
use anyhow::Result;
use cid::Cid;
use iroh_metrics::core::MRecorder;
use iroh_metrics::{bitswap::BitswapMetrics, inc};
use libp2p::PeerId;
use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, error, info, warn};
use crate::client::{
block_presence_manager::BlockPresenceManager, peer_manager::PeerManager,
session_manager::SessionManager,
};
use super::{
peer_response_tracker::PeerResponseTracker, sent_want_blocks_tracker::SentWantBlocksTracker,
};
const CHANGES_BUFFER_SIZE: usize = 128;
const PEER_DONT_HAVE_LIMIT: usize = 16;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(u8)]
pub enum BlockPresence {
DontHave = 0,
Unknown = 1,
Have = 2,
}
#[derive(Debug)]
struct Update {
from: PeerId,
keys: Vec<Cid>,
haves: Vec<Cid>,
dont_haves: Vec<Cid>,
}
#[derive(Debug)]
struct PeerAvailability {
target: PeerId,
is_available: bool,
}
#[derive(Debug)]
enum Change {
Add(Vec<Cid>),
Cancel(Vec<Cid>),
Update(Update),
Availability(PeerAvailability),
}
#[derive(Default, Debug, PartialEq, Eq)]
struct WantSets {
want_blocks: AHashSet<Cid>,
want_haves: AHashSet<Cid>,
}
#[derive(Default, Debug, PartialEq, Eq)]
struct AllWants(AHashMap<PeerId, WantSets>);
impl AllWants {
fn for_peer(&mut self, peer: &PeerId) -> &mut WantSets {
&mut *self.0.entry(*peer).or_default()
}
}
#[derive(Debug)]
pub struct SessionWantSender {
session_id: u64,
changes: async_channel::Sender<Change>,
closer: oneshot::Sender<()>,
worker: JoinHandle<()>,
}
#[derive(Debug, Clone)]
pub struct Signaler {
id: u64,
changes: async_channel::Sender<Change>,
}
impl Signaler {
pub fn id(&self) -> u64 {
self.id
}
pub fn signal_availability(&self, peer: PeerId, is_available: bool) {
signal_availability(self.changes.clone(), peer, is_available);
}
}
fn signal_availability(changes: async_channel::Sender<Change>, peer: PeerId, is_available: bool) {
let availability = PeerAvailability {
target: peer,
is_available,
};
if let Err(err) = changes.try_send(Change::Availability(availability)) {
warn!("unable to deliver changes: {:?}", err);
}
}
impl SessionWantSender {
pub(super) fn new(
session_id: u64,
peer_manager: PeerManager,
session_manager: SessionManager,
block_presence_manager: BlockPresenceManager,
session_ops: async_channel::Sender<super::Op>,
) -> Self {
debug!("session:{}: session_want_sender create", session_id);
let (changes_s, changes_r) = async_channel::bounded(64);
let (closer_s, mut closer_r) = oneshot::channel();
let signaler = Signaler {
id: session_id,
changes: changes_s.clone(),
};
let mut loop_state = LoopState::new(
changes_r.clone(),
signaler,
peer_manager,
session_manager,
block_presence_manager,
session_ops,
);
let rt = tokio::runtime::Handle::current();
let worker = rt.spawn(async move {
loop {
inc!(BitswapMetrics::SessionWantSenderLoopTick);
tokio::select! {
biased;
_ = &mut closer_r => {
break;
}
change = changes_r.recv() => {
match change {
Ok(change) => { loop_state.on_change(change).await },
Err(err) => {
warn!("changes sender error: {:?}", err);
break;
}
}
}
}
}
if let Err(err) = loop_state.stop().await {
error!(
"session:{}: failed to stop LoopState: {:?}",
session_id, err
);
}
});
SessionWantSender {
session_id,
changes: changes_s,
worker,
closer: closer_s,
}
}
pub async fn stop(self) -> Result<()> {
debug!("stopping session_want_sender");
if self.closer.send(()).is_ok() {
self.worker.await?;
}
Ok(())
}
pub async fn add(&self, keys: Vec<Cid>) {
if keys.is_empty() {
return;
}
self.add_change(Change::Add(keys)).await;
}
pub async fn cancel(&self, keys: Vec<Cid>) {
if keys.is_empty() {
return;
}
self.add_change(Change::Cancel(keys)).await;
}
pub async fn update(
&self,
from: PeerId,
keys: Vec<Cid>,
haves: Vec<Cid>,
dont_haves: Vec<Cid>,
) {
if keys.is_empty() && haves.is_empty() && dont_haves.is_empty() {
return;
}
self.add_change(Change::Update(Update {
from,
keys,
haves,
dont_haves,
}))
.await;
}
async fn add_change(&self, change: Change) {
if let Err(err) = self.changes.send(change).await {
warn!(
"session {}: unable to send changes: {:?}",
self.session_id, err
);
}
}
}
#[derive(Debug)]
struct WantInfo {
block_presence: AHashMap<PeerId, BlockPresence>,
sent_to: Option<PeerId>,
best_peer: Option<PeerId>,
peer_response_tracker: PeerResponseTracker,
exhausted: bool,
}
impl WantInfo {
fn new(peer_response_tracker: PeerResponseTracker) -> Self {
WantInfo {
block_presence: Default::default(),
sent_to: None,
best_peer: None,
peer_response_tracker,
exhausted: false,
}
}
async fn update_want_block_presence(
&mut self,
block_presence_manager: &BlockPresenceManager,
cid: &Cid,
peer: PeerId,
) {
let info = if block_presence_manager.peer_has_block(&peer, cid).await {
BlockPresence::Have
} else if block_presence_manager
.peer_does_not_have_block(&peer, cid)
.await
{
BlockPresence::DontHave
} else {
BlockPresence::Unknown
};
self.set_peer_block_presence(peer, info).await;
}
async fn set_peer_block_presence(&mut self, peer: PeerId, bp: BlockPresence) {
self.block_presence.insert(peer, bp);
self.calculate_best_peer().await;
if bp == BlockPresence::Have {
self.exhausted = false;
}
}
async fn remove_peer(&mut self, peer: &PeerId) {
if self.sent_to.is_some() && self.sent_to.as_ref().unwrap() == peer {
self.sent_to = None;
}
self.block_presence.remove(peer);
self.calculate_best_peer().await;
}
async fn calculate_best_peer(&mut self) {
debug!("calculate best peer");
let mut best_bp = BlockPresence::DontHave;
let mut best_peer = None;
let mut count_with_best = 0;
for (peer, bp) in &self.block_presence {
match bp.cmp(&best_bp) {
Ordering::Greater => {
best_bp = *bp;
best_peer = Some(*peer);
count_with_best = 1;
}
Ordering::Equal => {
count_with_best += 1;
}
_ => {}
}
}
self.best_peer = best_peer;
if best_peer.is_none() {
return;
}
if count_with_best <= 1 {
return;
}
let mut peers_with_best = Vec::new();
for (peer, bp) in &self.block_presence {
if bp == &best_bp {
peers_with_best.push(*peer);
}
}
self.best_peer = self.peer_response_tracker.choose(&peers_with_best).await;
}
}
#[derive(Debug)]
struct LoopState {
changes: async_channel::Receiver<Change>,
signaler: Signaler,
wants: AHashMap<Cid, WantInfo>,
peer_consecutive_dont_haves: AHashMap<PeerId, usize>,
sent_want_blocks_tracker: SentWantBlocksTracker,
peer_response_tracker: PeerResponseTracker,
peer_manager: PeerManager,
session_manager: SessionManager,
block_presence_manager: BlockPresenceManager,
session_ops: async_channel::Sender<super::Op>,
}
impl LoopState {
fn new(
changes: async_channel::Receiver<Change>,
signaler: Signaler,
peer_manager: PeerManager,
session_manager: SessionManager,
block_presence_manager: BlockPresenceManager,
session_ops: async_channel::Sender<super::Op>,
) -> Self {
LoopState {
changes,
signaler,
peer_manager,
wants: Default::default(),
peer_consecutive_dont_haves: Default::default(),
sent_want_blocks_tracker: SentWantBlocksTracker::default(),
peer_response_tracker: PeerResponseTracker::default(),
session_manager,
block_presence_manager,
session_ops,
}
}
async fn stop(self) -> Result<()> {
self.peer_manager.unregister_session(self.signaler.id).await;
Ok(())
}
fn id(&self) -> u64 {
self.signaler.id()
}
fn collect_changes(&self, changes: &mut Vec<Change>) {
while changes.len() < CHANGES_BUFFER_SIZE {
if let Ok(change) = self.changes.try_recv() {
changes.push(change);
} else {
break;
}
}
}
async fn on_change(&mut self, change: Change) {
let mut changes = vec![change];
self.collect_changes(&mut changes);
debug!("handling changes: {:?}", changes);
let mut availability = AHashMap::with_capacity(changes.len());
let mut cancels = Vec::new();
let mut updates = Vec::new();
for change in changes {
match change {
Change::Add(cids) => {
self.track_wants(cids).await;
}
Change::Cancel(cids) => {
for cid in cids {
self.untrack_want(&cid);
cancels.push(cid);
}
}
Change::Update(update) => {
if !update.keys.is_empty() || !update.haves.is_empty() {
let is_new = self
.peer_manager
.register_session(&update.from, self.signaler.clone())
.await;
availability.insert(update.from, (true, Some(is_new)));
}
updates.push(update);
}
Change::Availability(PeerAvailability {
target,
is_available,
}) => {
availability.insert(target, (is_available, None));
}
}
}
let (newly_available, newly_unavailable) = self.process_availability(&availability).await;
let dont_haves = self.process_updates(updates).await;
self.check_for_exhausted_wants(dont_haves, newly_unavailable)
.await;
if !cancels.is_empty() {
self.session_manager
.cancel_session_wants(self.id(), &cancels)
.await;
}
if self.peer_manager.session_has_peers(self.id()).await {
self.send_next_wants(newly_available).await;
}
}
async fn process_availability(
&mut self,
availability: &AHashMap<PeerId, (bool, Option<bool>)>,
) -> (Vec<PeerId>, Vec<PeerId>) {
let mut newly_available = Vec::new();
let mut newly_unavailable = Vec::new();
for (peer, (is_now_available, is_new)) in availability {
debug!(
"session_want_sender:{}: process availability: {}:{}",
self.signaler.id, peer, is_now_available
);
let mut state_change = false;
if *is_now_available {
let is_new_peer = self
.peer_manager
.add_peer_to_session(self.id(), *peer)
.await;
if is_new_peer || is_new.unwrap_or_default() {
state_change = true;
newly_available.push(*peer);
}
} else {
let was_available = self
.peer_manager
.remove_peer_from_session(self.id(), *peer)
.await;
if was_available {
state_change = true;
newly_unavailable.push(*peer);
}
}
if state_change {
self.update_wants_peer_availability(peer, *is_now_available)
.await;
self.peer_consecutive_dont_haves.remove(peer);
}
}
(newly_available, newly_unavailable)
}
async fn track_wants(&mut self, cids: Vec<Cid>) {
debug!("tracking: {} wants", cids.len());
let peers = self.peer_manager.peers_for_session(self.id()).await;
for cid in cids {
if self.wants.contains_key(&cid) {
continue;
}
let mut want_info = WantInfo::new(self.peer_response_tracker.clone());
for peer in &peers {
want_info
.update_want_block_presence(&self.block_presence_manager, &cid, *peer)
.await;
}
self.wants.insert(cid, want_info);
}
}
fn untrack_want(&mut self, cid: &Cid) {
self.wants.remove(cid);
}
async fn process_updates(&mut self, updates: Vec<Update>) -> AHashSet<Cid> {
let mut block_cids = AHashSet::new();
for update in &updates {
for cid in &update.keys {
block_cids.insert(*cid);
if self.remove_want(cid).is_some() {
self.peer_response_tracker
.received_block_from(&update.from)
.await;
self.peer_manager
.protect_connection(self.id(), update.from)
.await;
self.peer_consecutive_dont_haves.remove(&update.from);
}
}
}
let mut dont_haves = AHashSet::new();
let mut prune_peers = AHashSet::new();
for update in &updates {
for cid in &update.dont_haves {
let entry = self
.peer_consecutive_dont_haves
.entry(update.from)
.or_default();
if *entry == PEER_DONT_HAVE_LIMIT {
prune_peers.insert(update.from);
} else {
*entry += 1;
}
if block_cids.contains(cid) {
continue;
}
dont_haves.insert(*cid);
if let Some(wi) = self.wants.get_mut(cid) {
wi.update_want_block_presence(&self.block_presence_manager, cid, update.from)
.await;
}
if self
.sent_want_blocks_tracker
.have_sent_want_block_to(&update.from, cid)
{
if let Some(sent_to) = self.get_want_sent_to(cid) {
if sent_to == update.from {
self.set_want_sent_to(cid, None);
}
}
}
}
}
for update in &updates {
for cid in &update.haves {
if !block_cids.contains(cid) {
if let Some(wi) = self.wants.get_mut(cid) {
wi.update_want_block_presence(
&self.block_presence_manager,
cid,
update.from,
)
.await;
}
}
self.peer_consecutive_dont_haves.remove(&update.from);
prune_peers.remove(&update.from);
}
}
{
let mut to_remove = Vec::new();
for peer in &prune_peers {
for cid in self.wants.keys() {
if self.block_presence_manager.peer_has_block(peer, cid).await {
to_remove.push(*peer);
}
}
}
for peer in to_remove {
prune_peers.remove(&peer);
}
}
if !prune_peers.is_empty() {
for peer in prune_peers {
info!(
"peer {} sent too many dont haves, removing from session {}",
peer,
self.id()
);
self.signaler.signal_availability(peer, false);
}
}
dont_haves
}
async fn check_for_exhausted_wants(
&mut self,
dont_haves: AHashSet<Cid>,
newly_unavailable: Vec<PeerId>,
) {
if dont_haves.is_empty() && newly_unavailable.is_empty() {
return;
}
let mut wants = dont_haves;
if !newly_unavailable.is_empty() {
for cid in self.wants.keys() {
wants.insert(*cid);
}
if !self.peer_manager.session_has_peers(self.id()).await {
self.process_exhausted_wants(wants).await;
return;
}
}
if !wants.is_empty() {
let exhausted = self
.block_presence_manager
.all_peers_do_not_have_block(
&self.peer_manager.peers_for_session(self.id()).await,
wants,
)
.await;
self.process_exhausted_wants(exhausted).await;
}
}
async fn process_exhausted_wants(&mut self, exhausted: impl IntoIterator<Item = Cid>) {
let newly_exhausted = self.newly_exhausted(exhausted.into_iter());
if !newly_exhausted.is_empty() {
if let Err(err) = self
.session_ops
.send(super::Op::Broadcast(newly_exhausted.into_iter().collect()))
.await
{
warn!("unabel to send broadcast op: {:?}", err);
}
}
}
async fn send_next_wants(&mut self, newly_available: Vec<PeerId>) {
debug!(
"send_next_wants: newly_available ({}) current_wants: {}",
newly_available.len(),
self.wants.len(),
);
let mut to_send = AllWants::default();
let id = self.id();
for (cid, wi) in &mut self.wants {
for peer in &newly_available {
to_send.for_peer(peer).want_haves.insert(*cid);
}
if wi.sent_to.is_some() {
continue;
}
if let Some(ref best_peer) = wi.best_peer {
wi.sent_to = Some(*best_peer);
to_send.for_peer(best_peer).want_blocks.insert(*cid);
for op in self.peer_manager.peers_for_session(id).await {
if &op != best_peer {
to_send.for_peer(&op).want_haves.insert(*cid);
}
}
} else {
}
}
self.send_wants(to_send).await;
}
async fn send_wants(&mut self, sends: AllWants) {
for (peer, mut snd) in sends.0 {
debug!(
"send_wants to {}: {} {}",
peer,
snd.want_blocks.len(),
snd.want_haves.len()
);
for cid in self.get_piggyback_want_haves(&peer, &snd.want_blocks) {
snd.want_haves.insert(cid);
}
let mut want_blocks: Vec<_> = snd.want_blocks.into_iter().collect();
let want_haves: Vec<_> = snd.want_haves.into_iter().collect();
self.peer_manager
.send_wants(&peer, &want_blocks, &want_haves)
.await;
self.sent_want_blocks_tracker
.add_sent_want_blocks_to(&peer, &want_blocks);
want_blocks.extend(want_haves);
if let Err(err) = self
.session_ops
.send(super::Op::WantsSent(want_blocks))
.await
{
warn!("unabel to send broadcast op: {:?}", err);
}
}
}
fn get_piggyback_want_haves(&self, peer: &PeerId, want_blocks: &AHashSet<Cid>) -> Vec<Cid> {
let mut res = Vec::new();
for cid in self.wants.keys() {
if !want_blocks.contains(cid)
&& !self
.sent_want_blocks_tracker
.have_sent_want_block_to(peer, cid)
{
res.push(*cid);
}
}
res
}
fn newly_exhausted(&mut self, keys: impl Iterator<Item = Cid>) -> Vec<Cid> {
keys.filter(|cid| {
if let Some(wi) = self.wants.get_mut(cid) {
if !wi.exhausted {
wi.exhausted = true;
return true;
}
}
false
})
.collect()
}
fn remove_want(&mut self, cid: &Cid) -> Option<WantInfo> {
self.wants.remove(cid)
}
async fn update_wants_peer_availability(&mut self, peer: &PeerId, is_now_available: bool) {
for (cid, wi) in &mut self.wants {
if is_now_available {
wi.update_want_block_presence(&self.block_presence_manager, cid, *peer)
.await;
} else {
wi.remove_peer(peer).await;
}
}
}
fn get_want_sent_to(&self, cid: &Cid) -> Option<PeerId> {
self.wants.get(cid).and_then(|wi| wi.sent_to)
}
fn set_want_sent_to(&mut self, cid: &Cid, peer: Option<PeerId>) {
if let Some(wi) = self.wants.get_mut(cid) {
wi.sent_to = peer;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_block_presence_order() {
assert!(BlockPresence::Have > BlockPresence::DontHave);
assert!(BlockPresence::Unknown > BlockPresence::DontHave);
assert!(BlockPresence::Have > BlockPresence::Unknown);
}
}