use crate::block::Block;
use crate::ledger::{Ledger, Message, Priority};
use crate::protocol::{BitswapConfig, MessageWrapper};
use fnv::{FnvHashMap, FnvHashSet};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use hash_hasher::HashedMap;
use libipld::Cid;
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p_swarm::handler::OneShotHandler;
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters};
use std::task::{Context, Poll};
use std::{
collections::{HashMap, VecDeque},
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum BitswapEvent {
ReceivedBlock(PeerId, Block),
ReceivedWant(PeerId, Cid, Priority),
ReceivedCancel(PeerId, Cid),
}
#[derive(Debug, Default)]
pub struct Stats {
pub sent_blocks: AtomicU64,
pub sent_data: AtomicU64,
pub received_blocks: AtomicU64,
pub received_data: AtomicU64,
pub duplicate_blocks: AtomicU64,
pub duplicate_data: AtomicU64,
}
impl Stats {
pub fn update_outgoing(&self, num_blocks: u64) {
self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
}
pub fn update_incoming_unique(&self, bytes: u64) {
self.received_blocks.fetch_add(1, Ordering::Relaxed);
self.received_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn update_incoming_duplicate(&self, bytes: u64) {
self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_assign(&self, other: &Stats) {
self.sent_blocks
.fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
self.sent_data
.fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
self.received_blocks.fetch_add(
other.received_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.received_data.fetch_add(
other.received_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_blocks.fetch_add(
other.duplicate_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_data.fetch_add(
other.duplicate_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
}
}
pub struct Bitswap {
events: VecDeque<
NetworkBehaviourAction<
BitswapEvent,
<Bitswap as NetworkBehaviour>::ConnectionHandler,
Message,
>,
>,
target_peers: FnvHashSet<PeerId>,
peer_connection_id: FnvHashMap<PeerId, ConnectionId>,
pub connected_peers: HashMap<PeerId, Ledger>,
wanted_blocks: HashedMap<Cid, Priority>,
pub queued_blocks: UnboundedSender<(PeerId, Block)>,
ready_blocks: UnboundedReceiver<(PeerId, Block)>,
pub stats: HashMap<PeerId, Arc<Stats>>,
}
impl Default for Bitswap {
fn default() -> Self {
let (tx, rx) = unbounded();
Bitswap {
events: Default::default(),
target_peers: Default::default(),
peer_connection_id: Default::default(),
connected_peers: Default::default(),
wanted_blocks: Default::default(),
queued_blocks: tx,
ready_blocks: rx,
stats: Default::default(),
}
}
}
impl Bitswap {
pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
self.wanted_blocks
.iter()
.map(|(cid, prio)| (*cid, *prio))
.collect()
}
pub fn peer_wantlist(&self, peer: &PeerId) -> Option<Vec<(Cid, Priority)>> {
self.connected_peers.get(peer).map(Ledger::wantlist)
}
pub fn stats(&self) -> Stats {
self.stats
.values()
.fold(Stats::default(), |acc, peer_stats| {
acc.add_assign(peer_stats);
acc
})
}
pub fn peers(&self) -> Vec<PeerId> {
self.connected_peers.keys().cloned().collect()
}
pub fn connect(&mut self, peer_id: PeerId) {
if self.target_peers.insert(peer_id) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id)
.condition(PeerCondition::Disconnected)
.build(),
handler,
});
}
}
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
trace!("queueing block to be sent to {}: {}", peer_id, block.cid());
if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
ledger.add_block(block);
}
}
fn send_want_list(&mut self, peer_id: PeerId) {
if !self.wanted_blocks.is_empty() {
let mut message = Message::default();
for (cid, priority) in &self.wanted_blocks {
message.want_block(cid, *priority);
}
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: message,
handler: NotifyHandler::Any,
});
}
}
pub fn want_block(&mut self, cid: Cid, priority: Priority) {
for (_peer_id, ledger) in self.connected_peers.iter_mut() {
ledger.want_block(&cid, priority);
}
self.wanted_blocks.insert(cid, priority);
}
pub fn cancel_block(&mut self, cid: &Cid) {
for (_peer_id, ledger) in self.connected_peers.iter_mut() {
ledger.cancel_block(cid);
}
self.wanted_blocks.remove(cid);
}
}
impl NetworkBehaviour for Bitswap {
type ConnectionHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
type OutEvent = BitswapEvent;
fn new_handler(&mut self) -> Self::ConnectionHandler {
debug!("bitswap: new_handler");
Default::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
debug!("bitswap: addresses_of_peer");
Vec::new()
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
debug!("bitswap: inject_connected {}", peer_id);
self.target_peers.remove(peer_id);
let ledger = Ledger::new();
self.stats.entry(*peer_id).or_default();
self.connected_peers.insert(*peer_id, ledger);
self.peer_connection_id.insert(*peer_id, *connection_id);
self.send_want_list(*peer_id);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_handler: Self::ConnectionHandler,
_remaining_established: usize,
) {
debug!("bitswap: inject_disconnected {:?}", peer_id);
self.connected_peers.remove(peer_id);
self.peer_connection_id.remove(peer_id);
}
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, message: MessageWrapper) {
let mut message = match message {
MessageWrapper::Tx => return,
MessageWrapper::Rx(msg) => msg,
};
debug!("bitswap: inject_event from {}: {:?}", source, message);
let current_wantlist = self.local_wantlist();
let ledger = match self.connected_peers.get_mut(&source) {
Some(ledger) => ledger,
None => {
debug!("bitswap: Peer {} is not in ledger", source);
return;
}
};
for cid in message.cancel() {
ledger.received_want_list.remove(cid);
let event = BitswapEvent::ReceivedCancel(source, *cid);
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
for (cid, priority) in message
.want()
.iter()
.filter(|&(cid, _)| !current_wantlist.iter().map(|(c, _)| c).any(|c| c == cid))
{
ledger.received_want_list.insert(cid.to_owned(), *priority);
let event = BitswapEvent::ReceivedWant(source, *cid, *priority);
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
for block in mem::take(&mut message.blocks) {
self.cancel_block(block.cid());
let event = BitswapEvent::ReceivedBlock(source, block);
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
#[allow(clippy::type_complexity)]
fn poll(
&mut self,
ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
use futures::stream::StreamExt;
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
while let Poll::Ready(Some((peer_id, block))) = self.ready_blocks.poll_next_unpin(ctx) {
self.send_block(peer_id, block);
}
for (peer_id, ledger) in &mut self.connected_peers {
if let Some(message) = ledger.send() {
if let Some(peer_stats) = self.stats.get_mut(peer_id) {
peer_stats.update_outgoing(message.blocks.len() as u64);
}
if let Some(id) = self.peer_connection_id.get(peer_id) {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*id),
event: message,
});
}
}
}
Poll::Pending
}
}