rust_ipfs_bitswap/
behaviour.rs

1//! Handles the `/ipfs/bitswap/1.0.0` and `/ipfs/bitswap/1.1.0` protocols. This
2//! allows exchanging IPFS blocks.
3//!
4//! # Usage
5//!
6//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
7//! will allow providing and reciving IPFS blocks.
8use crate::block::Block;
9use crate::ledger::{Ledger, Message, Priority};
10use crate::protocol::{BitswapConfig, MessageWrapper};
11use fnv::FnvHashSet;
12use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
13use hash_hasher::HashedMap;
14use libipld::Cid;
15use libp2p::core::Multiaddr;
16use libp2p::identity::PeerId;
17use libp2p::swarm::derive_prelude::ConnectionEstablished;
18use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
19use libp2p::swarm::handler::OneShotHandler;
20use libp2p::swarm::{
21    ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler,
22    PollParameters, THandler, ToSwarm as NetworkBehaviourAction,
23};
24use std::task::{Context, Poll};
25use std::{
26    collections::{HashMap, VecDeque},
27    mem,
28    sync::{
29        atomic::{AtomicU64, Ordering},
30        Arc,
31    },
32};
33
34/// Event used to communicate with the swarm or the higher level behaviour.
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub enum BitswapEvent {
37    ReceivedBlock(PeerId, Block),
38    ReceivedWant(PeerId, Cid, Priority),
39    ReceivedCancel(PeerId, Cid),
40}
41
42/// Bitswap statistics.
43#[derive(Debug, Default)]
44pub struct Stats {
45    pub sent_blocks: AtomicU64,
46    pub sent_data: AtomicU64,
47    pub received_blocks: AtomicU64,
48    pub received_data: AtomicU64,
49    pub duplicate_blocks: AtomicU64,
50    pub duplicate_data: AtomicU64,
51}
52
53impl Stats {
54    pub fn update_outgoing(&self, num_blocks: u64) {
55        self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
56    }
57
58    pub fn update_incoming_unique(&self, bytes: u64) {
59        self.received_blocks.fetch_add(1, Ordering::Relaxed);
60        self.received_data.fetch_add(bytes, Ordering::Relaxed);
61    }
62
63    pub fn update_incoming_duplicate(&self, bytes: u64) {
64        self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
65        self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
66    }
67
68    pub fn add_assign(&self, other: &Stats) {
69        self.sent_blocks
70            .fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
71        self.sent_data
72            .fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
73        self.received_blocks.fetch_add(
74            other.received_blocks.load(Ordering::Relaxed),
75            Ordering::Relaxed,
76        );
77        self.received_data.fetch_add(
78            other.received_data.load(Ordering::Relaxed),
79            Ordering::Relaxed,
80        );
81        self.duplicate_blocks.fetch_add(
82            other.duplicate_blocks.load(Ordering::Relaxed),
83            Ordering::Relaxed,
84        );
85        self.duplicate_data.fetch_add(
86            other.duplicate_data.load(Ordering::Relaxed),
87            Ordering::Relaxed,
88        );
89    }
90}
91
92/// Network behaviour that handles sending and receiving IPFS blocks.
93pub struct Bitswap {
94    /// Queue of events to report to the user.
95    events: VecDeque<NetworkBehaviourAction<BitswapEvent, Message>>,
96    /// List of prospect peers to connect to.
97    target_peers: FnvHashSet<PeerId>,
98    /// Ledger
99    pub connected_peers: HashMap<PeerId, Ledger>,
100    /// Wanted blocks
101    wanted_blocks: HashedMap<Cid, Priority>,
102    /// Blocks queued to be sent
103    pub queued_blocks: UnboundedSender<(PeerId, Block)>,
104    ready_blocks: UnboundedReceiver<(PeerId, Block)>,
105
106    pub dont_have_tx: UnboundedSender<(PeerId, Cid)>,
107    dont_have_rx: UnboundedReceiver<(PeerId, Cid)>,
108    /// Statistics related to peers.
109    pub stats: HashMap<PeerId, Arc<Stats>>,
110}
111
112impl Default for Bitswap {
113    fn default() -> Self {
114        let (tx, rx) = unbounded();
115        let (dtx, drx) = unbounded();
116
117        Bitswap {
118            events: Default::default(),
119            target_peers: Default::default(),
120            connected_peers: Default::default(),
121            wanted_blocks: Default::default(),
122            queued_blocks: tx,
123            dont_have_rx: drx,
124            dont_have_tx: dtx,
125            ready_blocks: rx,
126            stats: Default::default(),
127        }
128    }
129}
130
131impl Bitswap {
132    /// Return the wantlist of the local node
133    pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
134        self.wanted_blocks
135            .iter()
136            .map(|(cid, prio)| (*cid, *prio))
137            .collect()
138    }
139
140    /// Return the wantlist of a peer, if known
141    pub fn peer_wantlist(&self, peer: &PeerId) -> Option<Vec<(Cid, Priority)>> {
142        self.connected_peers.get(peer).map(Ledger::wantlist)
143    }
144
145    pub fn stats(&self) -> Stats {
146        self.stats
147            .values()
148            .fold(Stats::default(), |acc, peer_stats| {
149                acc.add_assign(peer_stats);
150                acc
151            })
152    }
153
154    pub fn peers(&self) -> Vec<PeerId> {
155        self.connected_peers.keys().cloned().collect()
156    }
157
158    /// Connect to peer.
159    ///
160    /// Called from Kademlia behaviour.
161    pub fn connect(&mut self, peer_id: PeerId) {
162        if self.target_peers.insert(peer_id) {
163            self.events.push_back(NetworkBehaviourAction::Dial {
164                opts: DialOpts::peer_id(peer_id)
165                    .condition(PeerCondition::Disconnected)
166                    .build(),
167            });
168        }
169    }
170
171    /// Sends a block to the peer.
172    ///
173    /// Called from a Strategy.
174    pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
175        trace!("queueing block to be sent to {}: {}", peer_id, block.cid());
176        if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
177            ledger.add_block(block);
178        }
179    }
180
181    /// Sends the wantlist to the peer.
182    fn send_want_list(&mut self, peer_id: PeerId) {
183        if !self.wanted_blocks.is_empty() {
184            // FIXME: this can produce too long a message
185            // FIXME: we should shard these across all of our peers by some logic; also, peers may
186            // have been discovered to provide some specific wantlist item
187            let mut message = Message::default();
188            for (cid, priority) in &self.wanted_blocks {
189                message.want_block(cid, *priority);
190            }
191            self.events
192                .push_back(NetworkBehaviourAction::NotifyHandler {
193                    peer_id,
194                    event: message,
195                    handler: NotifyHandler::Any,
196                });
197        }
198    }
199
200    /// Queues the wanted block for all peers.
201    ///
202    /// A user request
203    pub fn want_block(&mut self, cid: Cid, priority: Priority) {
204        for (_peer_id, ledger) in self.connected_peers.iter_mut() {
205            ledger.want_block(&cid, priority);
206        }
207        self.wanted_blocks.insert(cid, priority);
208    }
209
210    /// Queues the wanted block for specific peers.
211    ///
212    /// A user request
213    pub fn want_block_from_peers(&mut self, cid: Cid, priority: Priority, peers: &[PeerId]) {
214        for peer in peers {
215            if let Some(ledger) = self.connected_peers.get_mut(peer) {
216                ledger.want_block(&cid, priority);
217            }
218        }
219        self.wanted_blocks.insert(cid, priority);
220    }
221
222    /// Remove wanted blocks from ledger that we dont have
223    pub fn dont_have(&mut self, cid: Cid) {
224        for (_peer_id, ledger) in self.connected_peers.iter_mut() {
225            ledger.received_want_list.remove(&cid);
226            //TODO: Implement dont have in ledger (for 1.2.0 spec), if we are to continue using this
227        }
228    }
229
230    pub fn dont_have_for_peer(&mut self, peer_id: PeerId, cid: Cid) {
231        if let Some(ledger) = self.connected_peers.get_mut(&peer_id) {
232            ledger.received_want_list.remove(&cid);
233            ledger.received_want_list.shrink_to_fit();
234            //TODO: Implement dont have in ledger (for 1.2.0 spec), if we are to continue using this
235        }
236    }
237
238    /// Removes the block from our want list and updates all peers.
239    ///
240    /// Can be either a user request or be called when the block
241    /// was received.
242    pub fn cancel_block(&mut self, cid: &Cid) {
243        for (_peer_id, ledger) in self.connected_peers.iter_mut() {
244            ledger.cancel_block(cid);
245        }
246        self.wanted_blocks.remove(cid);
247    }
248}
249
250impl NetworkBehaviour for Bitswap {
251    type ConnectionHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
252    type OutEvent = BitswapEvent;
253
254    fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
255        debug!("bitswap: addresses_of_peer");
256        Vec::new()
257    }
258
259    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
260        match event {
261            FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => {
262                debug!("bitswap: inject_connected {}", peer_id);
263                self.target_peers.remove(&peer_id);
264                let ledger = Ledger::new();
265                self.stats.entry(peer_id).or_default();
266                self.connected_peers.insert(peer_id, ledger);
267                self.send_want_list(peer_id);
268            }
269            FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
270                debug!("bitswap: inject_disconnected {:?}", peer_id);
271                self.connected_peers.remove(&peer_id);
272            }
273            _ => {}
274        }
275    }
276
277    fn handle_established_inbound_connection(
278        &mut self,
279        _connection_id: ConnectionId,
280        _peer: PeerId,
281        _local_addr: &Multiaddr,
282        _remote_addr: &Multiaddr,
283    ) -> Result<THandler<Self>, ConnectionDenied> {
284        Ok(Self::ConnectionHandler::default())
285    }
286
287    fn handle_established_outbound_connection(
288        &mut self,
289        _connection_id: ConnectionId,
290        _peer: PeerId,
291        _addr: &Multiaddr,
292        _role_override: libp2p::core::Endpoint,
293    ) -> Result<THandler<Self>, ConnectionDenied> {
294        Ok(Self::ConnectionHandler::default())
295    }
296
297    fn on_connection_handler_event(
298        &mut self,
299        source: PeerId,
300        _connection: ConnectionId,
301        message: MessageWrapper,
302    ) {
303        let mut message = match message {
304            // we just sent an outgoing bitswap message, nothing to do here
305            // FIXME: we could commit any pending stats accounting for this peer now
306            // that the message may have sent, if we'd do such accounting
307            MessageWrapper::Tx => return,
308            // we've received a bitswap message, process it
309            MessageWrapper::Rx(msg) => msg,
310        };
311
312        debug!("bitswap: inject_event from {}: {:?}", source, message);
313
314        let current_wantlist = self.local_wantlist();
315
316        // we shouldnt be panicing here unless this is actually a bug
317        // let ledger = self
318        //     .connected_peers
319        //     .get_mut(&source)
320        //     .expect("Peer not in ledger?!");
321
322        let ledger = match self.connected_peers.get_mut(&source) {
323            Some(ledger) => ledger,
324            None => {
325                debug!("bitswap: Peer {} is not in ledger", source);
326                return;
327            }
328        };
329
330        // Process the incoming cancel list.
331        for cid in message.cancel() {
332            ledger.received_want_list.remove(cid);
333
334            let event = BitswapEvent::ReceivedCancel(source, *cid);
335            self.events
336                .push_back(NetworkBehaviourAction::GenerateEvent(event));
337        }
338
339        // Process the incoming wantlist.
340        for (cid, priority) in message
341            .want()
342            .iter()
343            .filter(|&(cid, _)| !current_wantlist.iter().map(|(c, _)| c).any(|c| c == cid))
344        {
345            ledger.received_want_list.insert(cid.to_owned(), *priority);
346
347            let event = BitswapEvent::ReceivedWant(source, *cid, *priority);
348            self.events
349                .push_back(NetworkBehaviourAction::GenerateEvent(event));
350        }
351
352        // Process the incoming blocks.
353        for block in mem::take(&mut message.blocks) {
354            self.cancel_block(block.cid());
355
356            let event = BitswapEvent::ReceivedBlock(source, block);
357            self.events
358                .push_back(NetworkBehaviourAction::GenerateEvent(event));
359        }
360    }
361
362    #[allow(clippy::type_complexity)]
363    fn poll(
364        &mut self,
365        ctx: &mut Context,
366        _: &mut impl PollParameters,
367    ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Message>> {
368        use futures::stream::StreamExt;
369
370        if let Some(event) = self.events.pop_front() {
371            return Poll::Ready(event);
372        }
373
374        while let Poll::Ready(Some((peer_id, block))) = self.dont_have_rx.poll_next_unpin(ctx) {
375            self.dont_have_for_peer(peer_id, block);
376        }
377
378        while let Poll::Ready(Some((peer_id, block))) = self.ready_blocks.poll_next_unpin(ctx) {
379            self.send_block(peer_id, block);
380        }
381
382        for (peer_id, ledger) in &mut self.connected_peers {
383            if let Some(message) = ledger.send() {
384                if let Some(peer_stats) = self.stats.get_mut(peer_id) {
385                    peer_stats.update_outgoing(message.blocks.len() as u64);
386                }
387
388                return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
389                    peer_id: *peer_id,
390                    handler: NotifyHandler::Any,
391                    event: message,
392                });
393            }
394        }
395        Poll::Pending
396    }
397}