ipfs_bitswap/
behaviour.rs

1//! Handles the `/ipfs/bitswap/1.0.0` and `/ipfs/bitswap/1.1.0` protocols. This
2//! allows exchanging IPFS blocks.
3//!
4//! # Usage
5//!
6//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
7//! will allow providing and reciving IPFS blocks.
8use crate::block::Block;
9use crate::ledger::{Ledger, Message, Priority};
10use crate::protocol::{BitswapConfig, MessageWrapper};
11use cid::Cid;
12use fnv::FnvHashSet;
13use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
14use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId};
15use libp2p_swarm::protocols_handler::{IntoProtocolsHandler, OneShotHandler, ProtocolsHandler};
16use libp2p_swarm::{
17    DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
18};
19use std::task::{Context, Poll};
20use std::{
21    collections::{HashMap, VecDeque},
22    mem,
23    sync::{
24        atomic::{AtomicU64, Ordering},
25        Arc,
26    },
27};
28
29/// Event used to communicate with the swarm or the higher level behaviour.
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub enum BitswapEvent {
32    ReceivedBlock(PeerId, Block),
33    ReceivedWant(PeerId, Cid, Priority),
34    ReceivedCancel(PeerId, Cid),
35}
36
37/// Bitswap statistics.
38#[derive(Debug, Default)]
39pub struct Stats {
40    pub sent_blocks: AtomicU64,
41    pub sent_data: AtomicU64,
42    pub received_blocks: AtomicU64,
43    pub received_data: AtomicU64,
44    pub duplicate_blocks: AtomicU64,
45    pub duplicate_data: AtomicU64,
46}
47
48impl Stats {
49    pub fn update_outgoing(&self, num_blocks: u64) {
50        self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
51    }
52
53    pub fn update_incoming_unique(&self, bytes: u64) {
54        self.received_blocks.fetch_add(1, Ordering::Relaxed);
55        self.received_data.fetch_add(bytes, Ordering::Relaxed);
56    }
57
58    pub fn update_incoming_duplicate(&self, bytes: u64) {
59        self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
60        self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
61    }
62
63    pub fn add_assign(&self, other: &Stats) {
64        self.sent_blocks
65            .fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
66        self.sent_data
67            .fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
68        self.received_blocks.fetch_add(
69            other.received_blocks.load(Ordering::Relaxed),
70            Ordering::Relaxed,
71        );
72        self.received_data.fetch_add(
73            other.received_data.load(Ordering::Relaxed),
74            Ordering::Relaxed,
75        );
76        self.duplicate_blocks.fetch_add(
77            other.duplicate_blocks.load(Ordering::Relaxed),
78            Ordering::Relaxed,
79        );
80        self.duplicate_data.fetch_add(
81            other.duplicate_data.load(Ordering::Relaxed),
82            Ordering::Relaxed,
83        );
84    }
85}
86
87/// Network behaviour that handles sending and receiving IPFS blocks.
88pub struct Bitswap {
89    /// Queue of events to report to the user.
90    events: VecDeque<NetworkBehaviourAction<Message, BitswapEvent>>,
91    /// List of prospect peers to connect to.
92    target_peers: FnvHashSet<PeerId>,
93    /// Ledger
94    pub connected_peers: HashMap<PeerId, Ledger>,
95    /// Wanted blocks
96    wanted_blocks: HashMap<Cid, Priority>,
97    /// Blocks queued to be sent
98    pub queued_blocks: UnboundedSender<(PeerId, Block)>,
99    ready_blocks: UnboundedReceiver<(PeerId, Block)>,
100    /// Statistics related to peers.
101    pub stats: HashMap<PeerId, Arc<Stats>>,
102}
103
104impl Default for Bitswap {
105    fn default() -> Self {
106        let (tx, rx) = unbounded();
107
108        Bitswap {
109            events: Default::default(),
110            target_peers: Default::default(),
111            connected_peers: Default::default(),
112            wanted_blocks: Default::default(),
113            queued_blocks: tx,
114            ready_blocks: rx,
115            stats: Default::default(),
116        }
117    }
118}
119
120impl Bitswap {
121    /// Return the wantlist of the local node
122    pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
123        self.wanted_blocks
124            .iter()
125            .map(|(cid, prio)| (cid.clone(), *prio))
126            .collect()
127    }
128
129    /// Return the wantlist of a peer, if known
130    pub fn peer_wantlist(&self, peer: &PeerId) -> Option<Vec<(Cid, Priority)>> {
131        self.connected_peers.get(peer).map(Ledger::wantlist)
132    }
133
134    pub fn stats(&self) -> Stats {
135        self.stats
136            .values()
137            .fold(Stats::default(), |acc, peer_stats| {
138                acc.add_assign(&peer_stats);
139                acc
140            })
141    }
142
143    pub fn peers(&self) -> Vec<PeerId> {
144        self.connected_peers.keys().cloned().collect()
145    }
146
147    /// Connect to peer.
148    ///
149    /// Called from Kademlia behaviour.
150    pub fn connect(&mut self, peer_id: PeerId) {
151        if self.target_peers.insert(peer_id.clone()) {
152            self.events.push_back(NetworkBehaviourAction::DialPeer {
153                peer_id,
154                condition: DialPeerCondition::Disconnected,
155            });
156        }
157    }
158
159    /// Sends a block to the peer.
160    ///
161    /// Called from a Strategy.
162    pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
163        trace!("queueing block to be sent to {}: {}", peer_id, block.cid);
164        if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
165            ledger.add_block(block);
166        }
167    }
168
169    /// Sends the wantlist to the peer.
170    fn send_want_list(&mut self, peer_id: PeerId) {
171        if !self.wanted_blocks.is_empty() {
172            // FIXME: this can produce too long a message
173            // FIXME: we should shard these across all of our peers by some logic; also, peers may
174            // have been discovered to provide some specific wantlist item
175            let mut message = Message::default();
176            for (cid, priority) in &self.wanted_blocks {
177                message.want_block(cid, *priority);
178            }
179            self.events
180                .push_back(NetworkBehaviourAction::NotifyHandler {
181                    peer_id,
182                    event: message,
183                    handler: NotifyHandler::Any,
184                });
185        }
186    }
187
188    /// Queues the wanted block for all peers.
189    ///
190    /// A user request
191    pub fn want_block(&mut self, cid: Cid, priority: Priority) {
192        for (_peer_id, ledger) in self.connected_peers.iter_mut() {
193            ledger.want_block(&cid, priority);
194        }
195        self.wanted_blocks.insert(cid, priority);
196    }
197
198    /// Removes the block from our want list and updates all peers.
199    ///
200    /// Can be either a user request or be called when the block
201    /// was received.
202    pub fn cancel_block(&mut self, cid: &Cid) {
203        for (_peer_id, ledger) in self.connected_peers.iter_mut() {
204            ledger.cancel_block(cid);
205        }
206        self.wanted_blocks.remove(cid);
207    }
208}
209
210impl NetworkBehaviour for Bitswap {
211    type ProtocolsHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
212    type OutEvent = BitswapEvent;
213
214    fn new_handler(&mut self) -> Self::ProtocolsHandler {
215        debug!("bitswap: new_handler");
216        Default::default()
217    }
218
219    fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
220        debug!("bitswap: addresses_of_peer");
221        Vec::new()
222    }
223
224    fn inject_connected(&mut self, peer_id: &PeerId) {
225        debug!("bitswap: inject_connected {}", peer_id);
226        let ledger = Ledger::new();
227        self.stats.entry(peer_id.clone()).or_default();
228        self.connected_peers.insert(peer_id.clone(), ledger);
229        self.send_want_list(peer_id.clone());
230    }
231
232    fn inject_disconnected(&mut self, peer_id: &PeerId) {
233        debug!("bitswap: inject_disconnected {:?}", peer_id);
234        self.connected_peers.remove(peer_id);
235        // the related stats are not dropped, so that they
236        // persist for peers regardless of disconnects
237    }
238
239    fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, message: MessageWrapper) {
240        let mut message = match message {
241            // we just sent an outgoing bitswap message, nothing to do here
242            // FIXME: we could commit any pending stats accounting for this peer now
243            // that the message may have sent, if we'd do such accounting
244            MessageWrapper::Tx => return,
245            // we've received a bitswap message, process it
246            MessageWrapper::Rx(msg) => msg,
247        };
248
249        debug!("bitswap: inject_event from {}: {:?}", source, message);
250
251        let current_wantlist = self.local_wantlist();
252
253        let ledger = self
254            .connected_peers
255            .get_mut(&source)
256            .expect("Peer not in ledger?!");
257
258        // Process the incoming cancel list.
259        for cid in message.cancel() {
260            ledger.received_want_list.remove(cid);
261
262            let event = BitswapEvent::ReceivedCancel(source.clone(), cid.clone());
263            self.events
264                .push_back(NetworkBehaviourAction::GenerateEvent(event));
265        }
266
267        // Process the incoming wantlist.
268        for (cid, priority) in message
269            .want()
270            .iter()
271            .filter(|&(cid, _)| !current_wantlist.iter().map(|(c, _)| c).any(|c| c == cid))
272        {
273            ledger.received_want_list.insert(cid.to_owned(), *priority);
274
275            let event = BitswapEvent::ReceivedWant(source.clone(), cid.clone(), *priority);
276            self.events
277                .push_back(NetworkBehaviourAction::GenerateEvent(event));
278        }
279
280        // Process the incoming blocks.
281        for block in mem::take(&mut message.blocks) {
282            self.cancel_block(&block.cid());
283
284            let event = BitswapEvent::ReceivedBlock(source.clone(), block);
285            self.events
286                .push_back(NetworkBehaviourAction::GenerateEvent(event));
287        }
288    }
289
290    #[allow(clippy::type_complexity)]
291    fn poll(&mut self, ctx: &mut Context, _: &mut impl PollParameters)
292        -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>
293    {
294        use futures::stream::StreamExt;
295
296        while let Poll::Ready(Some((peer_id, block))) = self.ready_blocks.poll_next_unpin(ctx) {
297            self.send_block(peer_id, block);
298        }
299
300        if let Some(event) = self.events.pop_front() {
301            return Poll::Ready(event);
302        }
303
304        for (peer_id, ledger) in &mut self.connected_peers {
305            if let Some(message) = ledger.send() {
306                if let Some(peer_stats) = self.stats.get_mut(peer_id) {
307                    peer_stats.update_outgoing(message.blocks.len() as u64);
308                }
309
310                return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
311                    peer_id: peer_id.clone(),
312                    handler: NotifyHandler::Any,
313                    event: message,
314                });
315            }
316        }
317        Poll::Pending
318    }
319}