libp2p_bitswap/
behaviour.rs

1//! Handles the `/ipfs/bitswap/1.0.0` and `/ipfs/bitswap/1.1.0` protocols. This
2//! allows exchanging IPFS blocks.
3//!
4//! # Usage
5//!
6//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
7//! will allow providing and reciving IPFS blocks.
8#[cfg(feature = "compat")]
9use crate::compat::{CompatMessage, CompatProtocol, InboundMessage};
10use crate::protocol::{
11    BitswapCodec, BitswapProtocol, BitswapRequest, BitswapResponse, RequestType,
12};
13use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response};
14use crate::stats::*;
15use fnv::FnvHashMap;
16#[cfg(feature = "compat")]
17use fnv::FnvHashSet;
18use futures::{
19    channel::mpsc,
20    stream::{Stream, StreamExt},
21    task::{Context, Poll},
22};
23use libipld::{error::BlockNotFound, store::StoreParams, Block, Cid, Result};
24#[cfg(feature = "compat")]
25use libp2p::core::either::EitherOutput;
26use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId};
27use libp2p::swarm::derive_prelude::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
28#[cfg(feature = "compat")]
29use libp2p::swarm::{ConnectionHandlerSelect, NotifyHandler, OneShotHandler};
30use libp2p::{
31    request_response::{
32        InboundFailure, OutboundFailure, ProtocolSupport, RequestId, RequestResponse,
33        RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
34    },
35    swarm::{ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters},
36};
37use prometheus::Registry;
38use std::{pin::Pin, time::Duration};
39
40/// Bitswap response channel.
41pub type Channel = ResponseChannel<BitswapResponse>;
42
43/// Event emitted by the bitswap behaviour.
44#[derive(Debug)]
45pub enum BitswapEvent {
46    /// Received a block from a peer. Includes the number of known missing blocks for a
47    /// sync query. When a block is received and missing blocks is not empty the counter
48    /// is increased. If missing blocks is empty the counter is decremented.
49    Progress(QueryId, usize),
50    /// A get or sync query completed.
51    Complete(QueryId, Result<()>),
52}
53
54/// Trait implemented by a block store.
55pub trait BitswapStore: Send + Sync + 'static {
56    /// The store params.
57    type Params: StoreParams;
58    /// A have query needs to know if the block store contains the block.
59    fn contains(&mut self, cid: &Cid) -> Result<bool>;
60    /// A block query needs to retrieve the block from the store.
61    fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
62    /// A block response needs to insert the block into the store.
63    fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
64    /// A sync query needs a list of missing blocks to make progress.
65    fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
66}
67
68/// Bitswap configuration.
69#[derive(Clone, Copy, Debug, Eq, PartialEq)]
70pub struct BitswapConfig {
71    /// Timeout of a request.
72    pub request_timeout: Duration,
73    /// Time a connection is kept alive.
74    pub connection_keep_alive: Duration,
75}
76
77impl BitswapConfig {
78    /// Creates a new `BitswapConfig`.
79    pub fn new() -> Self {
80        Self {
81            request_timeout: Duration::from_secs(10),
82            connection_keep_alive: Duration::from_secs(10),
83        }
84    }
85}
86
87impl Default for BitswapConfig {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
94enum BitswapId {
95    Bitswap(RequestId),
96    #[cfg(feature = "compat")]
97    Compat(Cid),
98}
99
100enum BitswapChannel {
101    Bitswap(Channel),
102    #[cfg(feature = "compat")]
103    Compat(PeerId, Cid),
104}
105
106/// Network behaviour that handles sending and receiving blocks.
107pub struct Bitswap<P: StoreParams> {
108    /// Inner behaviour.
109    inner: RequestResponse<BitswapCodec<P>>,
110    /// Query manager.
111    query_manager: QueryManager,
112    /// Requests.
113    requests: FnvHashMap<BitswapId, QueryId>,
114    /// Db request channel.
115    db_tx: mpsc::UnboundedSender<DbRequest<P>>,
116    /// Db response channel.
117    db_rx: mpsc::UnboundedReceiver<DbResponse>,
118    /// Compat peers.
119    #[cfg(feature = "compat")]
120    compat: FnvHashSet<PeerId>,
121}
122
123impl<P: StoreParams> Bitswap<P> {
124    /// Creates a new `Bitswap` behaviour.
125    pub fn new<S: BitswapStore<Params = P>>(config: BitswapConfig, store: S) -> Self {
126        let mut rr_config = RequestResponseConfig::default();
127        rr_config.set_connection_keep_alive(config.connection_keep_alive);
128        rr_config.set_request_timeout(config.request_timeout);
129        let protocols = std::iter::once((BitswapProtocol, ProtocolSupport::Full));
130        let inner = RequestResponse::new(BitswapCodec::<P>::default(), protocols, rr_config);
131        let (db_tx, db_rx) = start_db_thread(store);
132        Self {
133            inner,
134            query_manager: Default::default(),
135            requests: Default::default(),
136            db_tx,
137            db_rx,
138            #[cfg(feature = "compat")]
139            compat: Default::default(),
140        }
141    }
142
143    /// Adds an address for a peer.
144    pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
145        self.inner.add_address(peer_id, addr);
146    }
147
148    /// Removes an address for a peer.
149    pub fn remove_address(&mut self, peer_id: &PeerId, addr: &Multiaddr) {
150        self.inner.remove_address(peer_id, addr);
151    }
152
153    /// Starts a get query with an initial guess of providers.
154    pub fn get(&mut self, cid: Cid, peers: impl Iterator<Item = PeerId>) -> QueryId {
155        self.query_manager.get(None, cid, peers)
156    }
157
158    /// Starts a sync query with an the initial set of missing blocks.
159    pub fn sync(
160        &mut self,
161        cid: Cid,
162        peers: Vec<PeerId>,
163        missing: impl Iterator<Item = Cid>,
164    ) -> QueryId {
165        self.query_manager.sync(cid, peers, missing)
166    }
167
168    /// Cancels an in progress query. Returns true if a query was cancelled.
169    pub fn cancel(&mut self, id: QueryId) -> bool {
170        let res = self.query_manager.cancel(id);
171        if res {
172            REQUESTS_CANCELED.inc();
173        }
174        res
175    }
176
177    /// Registers prometheus metrics.
178    pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
179        registry.register(Box::new(REQUESTS_TOTAL.clone()))?;
180        registry.register(Box::new(REQUEST_DURATION_SECONDS.clone()))?;
181        registry.register(Box::new(REQUESTS_CANCELED.clone()))?;
182        registry.register(Box::new(BLOCK_NOT_FOUND.clone()))?;
183        registry.register(Box::new(PROVIDERS_TOTAL.clone()))?;
184        registry.register(Box::new(MISSING_BLOCKS_TOTAL.clone()))?;
185        registry.register(Box::new(RECEIVED_BLOCK_BYTES.clone()))?;
186        registry.register(Box::new(RECEIVED_INVALID_BLOCK_BYTES.clone()))?;
187        registry.register(Box::new(SENT_BLOCK_BYTES.clone()))?;
188        registry.register(Box::new(RESPONSES_TOTAL.clone()))?;
189        registry.register(Box::new(THROTTLED_INBOUND.clone()))?;
190        registry.register(Box::new(THROTTLED_OUTBOUND.clone()))?;
191        registry.register(Box::new(OUTBOUND_FAILURE.clone()))?;
192        registry.register(Box::new(INBOUND_FAILURE.clone()))?;
193        Ok(())
194    }
195}
196
197enum DbRequest<P: StoreParams> {
198    Bitswap(BitswapChannel, BitswapRequest),
199    Insert(Block<P>),
200    MissingBlocks(QueryId, Cid),
201}
202
203enum DbResponse {
204    Bitswap(BitswapChannel, BitswapResponse),
205    MissingBlocks(QueryId, Result<Vec<Cid>>),
206}
207
208fn start_db_thread<S: BitswapStore>(
209    mut store: S,
210) -> (
211    mpsc::UnboundedSender<DbRequest<S::Params>>,
212    mpsc::UnboundedReceiver<DbResponse>,
213) {
214    let (tx, requests) = mpsc::unbounded();
215    let (responses, rx) = mpsc::unbounded();
216    std::thread::spawn(move || {
217        let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
218        while let Some(request) = futures::executor::block_on(requests.next()) {
219            match request {
220                DbRequest::Bitswap(channel, request) => {
221                    let response = match request.ty {
222                        RequestType::Have => {
223                            let have = store.contains(&request.cid).ok().unwrap_or_default();
224                            if have {
225                                RESPONSES_TOTAL.with_label_values(&["have"]).inc();
226                            } else {
227                                RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
228                            }
229                            tracing::trace!("have {}", have);
230                            BitswapResponse::Have(have)
231                        }
232                        RequestType::Block => {
233                            let block = store.get(&request.cid).ok().unwrap_or_default();
234                            if let Some(data) = block {
235                                RESPONSES_TOTAL.with_label_values(&["block"]).inc();
236                                SENT_BLOCK_BYTES.inc_by(data.len() as u64);
237                                tracing::trace!("block {}", data.len());
238                                BitswapResponse::Block(data)
239                            } else {
240                                RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
241                                tracing::trace!("have false");
242                                BitswapResponse::Have(false)
243                            }
244                        }
245                    };
246                    responses
247                        .unbounded_send(DbResponse::Bitswap(channel, response))
248                        .ok();
249                }
250                DbRequest::Insert(block) => {
251                    if let Err(err) = store.insert(&block) {
252                        tracing::error!("error inserting blocks {}", err);
253                    }
254                }
255                DbRequest::MissingBlocks(id, cid) => {
256                    let res = store.missing_blocks(&cid);
257                    responses
258                        .unbounded_send(DbResponse::MissingBlocks(id, res))
259                        .ok();
260                }
261            }
262        }
263    });
264    (tx, rx)
265}
266
267impl<P: StoreParams> Bitswap<P> {
268    /// Processes an incoming bitswap request.
269    fn inject_request(&mut self, channel: BitswapChannel, request: BitswapRequest) {
270        self.db_tx
271            .unbounded_send(DbRequest::Bitswap(channel, request))
272            .ok();
273    }
274
275    /// Processes an incoming bitswap response.
276    fn inject_response(&mut self, id: BitswapId, peer: PeerId, response: BitswapResponse) {
277        if let Some(id) = self.requests.remove(&id) {
278            match response {
279                BitswapResponse::Have(have) => {
280                    self.query_manager
281                        .inject_response(id, Response::Have(peer, have));
282                }
283                BitswapResponse::Block(data) => {
284                    if let Some(info) = self.query_manager.query_info(id) {
285                        let len = data.len();
286                        if let Ok(block) = Block::new(info.cid, data) {
287                            RECEIVED_BLOCK_BYTES.inc_by(len as u64);
288                            self.db_tx.unbounded_send(DbRequest::Insert(block)).ok();
289                            self.query_manager
290                                .inject_response(id, Response::Block(peer, true));
291                        } else {
292                            tracing::error!("received invalid block");
293                            RECEIVED_INVALID_BLOCK_BYTES.inc_by(len as u64);
294                            self.query_manager
295                                .inject_response(id, Response::Block(peer, false));
296                        }
297                    }
298                }
299            }
300        }
301    }
302
303    fn inject_outbound_failure(
304        &mut self,
305        peer: &PeerId,
306        request_id: RequestId,
307        error: &OutboundFailure,
308    ) {
309        tracing::debug!(
310            "bitswap outbound failure {} {} {:?}",
311            peer,
312            request_id,
313            error
314        );
315        match error {
316            OutboundFailure::DialFailure => {
317                OUTBOUND_FAILURE.with_label_values(&["dial_failure"]).inc();
318            }
319            OutboundFailure::Timeout => {
320                OUTBOUND_FAILURE.with_label_values(&["timeout"]).inc();
321            }
322            OutboundFailure::ConnectionClosed => {
323                OUTBOUND_FAILURE
324                    .with_label_values(&["connection_closed"])
325                    .inc();
326            }
327            OutboundFailure::UnsupportedProtocols => {
328                OUTBOUND_FAILURE
329                    .with_label_values(&["unsupported_protocols"])
330                    .inc();
331            }
332        }
333    }
334
335    fn inject_inbound_failure(
336        &mut self,
337        peer: &PeerId,
338        request_id: RequestId,
339        error: &InboundFailure,
340    ) {
341        tracing::error!(
342            "bitswap inbound failure {} {} {:?}",
343            peer,
344            request_id,
345            error
346        );
347        match error {
348            InboundFailure::Timeout => {
349                INBOUND_FAILURE.with_label_values(&["timeout"]).inc();
350            }
351            InboundFailure::ConnectionClosed => {
352                INBOUND_FAILURE
353                    .with_label_values(&["connection_closed"])
354                    .inc();
355            }
356            InboundFailure::UnsupportedProtocols => {
357                INBOUND_FAILURE
358                    .with_label_values(&["unsupported_protocols"])
359                    .inc();
360            }
361            InboundFailure::ResponseOmission => {
362                INBOUND_FAILURE
363                    .with_label_values(&["response_omission"])
364                    .inc();
365            }
366        }
367    }
368}
369
370impl<P: StoreParams> NetworkBehaviour for Bitswap<P> {
371    #[cfg(not(feature = "compat"))]
372    type ConnectionHandler =
373        <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler;
374
375    #[cfg(feature = "compat")]
376    #[allow(clippy::type_complexity)]
377    type ConnectionHandler = ConnectionHandlerSelect<
378        <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler,
379        OneShotHandler<CompatProtocol, CompatMessage, InboundMessage>,
380    >;
381    type OutEvent = BitswapEvent;
382
383    fn new_handler(&mut self) -> Self::ConnectionHandler {
384        #[cfg(not(feature = "compat"))]
385        return self.inner.new_handler();
386        #[cfg(feature = "compat")]
387        ConnectionHandler::select(self.inner.new_handler(), OneShotHandler::default())
388    }
389
390    fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
391        self.inner.addresses_of_peer(peer_id)
392    }
393
394    fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
395        match event {
396            FromSwarm::ConnectionEstablished(ev) => self
397                .inner
398                .on_swarm_event(FromSwarm::ConnectionEstablished(ev)),
399            FromSwarm::ConnectionClosed(ConnectionClosed {
400                peer_id,
401                connection_id,
402                endpoint,
403                handler,
404                remaining_established,
405            }) => {
406                #[cfg(feature = "compat")]
407                if remaining_established == 0 {
408                    self.compat.remove(&peer_id);
409                }
410                #[cfg(feature = "compat")]
411                let (handler, _oneshot) = handler.into_inner();
412                self.inner
413                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
414                        peer_id,
415                        connection_id,
416                        endpoint,
417                        handler,
418                        remaining_established,
419                    }));
420            }
421            FromSwarm::DialFailure(DialFailure {
422                peer_id,
423                handler,
424                error,
425            }) => {
426                #[cfg(feature = "compat")]
427                let (handler, _oneshot) = handler.into_inner();
428                self.inner
429                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
430                        peer_id,
431                        handler,
432                        error,
433                    }));
434            }
435            FromSwarm::AddressChange(ev) => self.inner.on_swarm_event(FromSwarm::AddressChange(ev)),
436            FromSwarm::ListenFailure(ListenFailure {
437                local_addr,
438                send_back_addr,
439                handler,
440            }) => {
441                #[cfg(feature = "compat")]
442                let (handler, _oneshot) = handler.into_inner();
443                self.inner
444                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
445                        local_addr,
446                        send_back_addr,
447                        handler,
448                    }));
449            }
450            FromSwarm::NewListener(ev) => self.inner.on_swarm_event(FromSwarm::NewListener(ev)),
451            FromSwarm::NewListenAddr(ev) => self.inner.on_swarm_event(FromSwarm::NewListenAddr(ev)),
452            FromSwarm::ExpiredListenAddr(ev) => {
453                self.inner.on_swarm_event(FromSwarm::ExpiredListenAddr(ev))
454            }
455            FromSwarm::ListenerError(ev) => self.inner.on_swarm_event(FromSwarm::ListenerError(ev)),
456            FromSwarm::ListenerClosed(ev) => {
457                self.inner.on_swarm_event(FromSwarm::ListenerClosed(ev))
458            }
459            FromSwarm::NewExternalAddr(ev) => {
460                self.inner.on_swarm_event(FromSwarm::NewExternalAddr(ev))
461            }
462            FromSwarm::ExpiredExternalAddr(ev) => self
463                .inner
464                .on_swarm_event(FromSwarm::ExpiredExternalAddr(ev)),
465        }
466    }
467
468    fn on_connection_handler_event(
469        &mut self,
470        peer_id: PeerId,
471        conn: ConnectionId,
472        event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
473    ) {
474        #[cfg(not(feature = "compat"))]
475        return self.inner.on_connection_handler_event(peer_id, conn, event);
476        #[cfg(feature = "compat")]
477        match event {
478            EitherOutput::First(event) => {
479                self.inner.on_connection_handler_event(peer_id, conn, event)
480            }
481            EitherOutput::Second(msg) => {
482                for msg in msg.0 {
483                    match msg {
484                        CompatMessage::Request(req) => {
485                            tracing::trace!("received compat request");
486                            self.inject_request(BitswapChannel::Compat(peer_id, req.cid), req);
487                        }
488                        CompatMessage::Response(cid, res) => {
489                            tracing::trace!("received compat response");
490                            self.inject_response(BitswapId::Compat(cid), peer_id, res);
491                        }
492                    }
493                }
494            }
495        }
496    }
497
498    fn poll(
499        &mut self,
500        cx: &mut Context,
501        pp: &mut impl PollParameters,
502    ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
503        let mut exit = false;
504        while !exit {
505            exit = true;
506            while let Poll::Ready(Some(response)) = Pin::new(&mut self.db_rx).poll_next(cx) {
507                exit = false;
508                match response {
509                    DbResponse::Bitswap(channel, response) => match channel {
510                        BitswapChannel::Bitswap(channel) => {
511                            self.inner.send_response(channel, response).ok();
512                        }
513                        #[cfg(feature = "compat")]
514                        BitswapChannel::Compat(peer_id, cid) => {
515                            let compat = CompatMessage::Response(cid, response);
516                            return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
517                                peer_id,
518                                handler: NotifyHandler::Any,
519                                event: EitherOutput::Second(compat),
520                            });
521                        }
522                    },
523                    DbResponse::MissingBlocks(id, res) => match res {
524                        Ok(missing) => {
525                            MISSING_BLOCKS_TOTAL.inc_by(missing.len() as u64);
526                            self.query_manager
527                                .inject_response(id, Response::MissingBlocks(missing));
528                        }
529                        Err(err) => {
530                            self.query_manager.cancel(id);
531                            let event = BitswapEvent::Complete(id, Err(err));
532                            return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
533                        }
534                    },
535                }
536            }
537            while let Some(query) = self.query_manager.next() {
538                exit = false;
539                match query {
540                    QueryEvent::Request(id, req) => match req {
541                        Request::Have(peer_id, cid) => {
542                            let req = BitswapRequest {
543                                ty: RequestType::Have,
544                                cid,
545                            };
546                            let rid = self.inner.send_request(&peer_id, req);
547                            self.requests.insert(BitswapId::Bitswap(rid), id);
548                        }
549                        Request::Block(peer_id, cid) => {
550                            let req = BitswapRequest {
551                                ty: RequestType::Block,
552                                cid,
553                            };
554                            let rid = self.inner.send_request(&peer_id, req);
555                            self.requests.insert(BitswapId::Bitswap(rid), id);
556                        }
557                        Request::MissingBlocks(cid) => {
558                            self.db_tx
559                                .unbounded_send(DbRequest::MissingBlocks(id, cid))
560                                .ok();
561                        }
562                    },
563                    QueryEvent::Progress(id, missing) => {
564                        let event = BitswapEvent::Progress(id, missing);
565                        return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
566                    }
567                    QueryEvent::Complete(id, res) => {
568                        if res.is_err() {
569                            BLOCK_NOT_FOUND.inc();
570                        }
571                        let event = BitswapEvent::Complete(
572                            id,
573                            res.map_err(|cid| BlockNotFound(cid).into()),
574                        );
575                        return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
576                    }
577                }
578            }
579            while let Poll::Ready(event) = self.inner.poll(cx, pp) {
580                exit = false;
581                let event = match event {
582                    NetworkBehaviourAction::GenerateEvent(event) => event,
583                    NetworkBehaviourAction::Dial { opts, handler } => {
584                        #[cfg(feature = "compat")]
585                        let handler = ConnectionHandler::select(handler, Default::default());
586                        return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler });
587                    }
588                    NetworkBehaviourAction::NotifyHandler {
589                        peer_id,
590                        handler,
591                        event,
592                    } => {
593                        return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
594                            peer_id,
595                            handler,
596                            #[cfg(not(feature = "compat"))]
597                            event,
598                            #[cfg(feature = "compat")]
599                            event: EitherOutput::First(event),
600                        });
601                    }
602                    NetworkBehaviourAction::ReportObservedAddr { address, score } => {
603                        return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
604                            address,
605                            score,
606                        });
607                    }
608                    NetworkBehaviourAction::CloseConnection {
609                        peer_id,
610                        connection,
611                    } => {
612                        return Poll::Ready(NetworkBehaviourAction::CloseConnection {
613                            peer_id,
614                            connection,
615                        });
616                    }
617                };
618                match event {
619                    RequestResponseEvent::Message { peer, message } => match message {
620                        RequestResponseMessage::Request {
621                            request_id: _,
622                            request,
623                            channel,
624                        } => self.inject_request(BitswapChannel::Bitswap(channel), request),
625                        RequestResponseMessage::Response {
626                            request_id,
627                            response,
628                        } => self.inject_response(BitswapId::Bitswap(request_id), peer, response),
629                    },
630                    RequestResponseEvent::ResponseSent { .. } => {}
631                    RequestResponseEvent::OutboundFailure {
632                        peer,
633                        request_id,
634                        error,
635                    } => {
636                        self.inject_outbound_failure(&peer, request_id, &error);
637                        #[cfg(feature = "compat")]
638                        if let OutboundFailure::UnsupportedProtocols = error {
639                            if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id))
640                            {
641                                if let Some(info) = self.query_manager.query_info(id) {
642                                    let ty = match info.label {
643                                        "have" => RequestType::Have,
644                                        "block" => RequestType::Block,
645                                        _ => unreachable!(),
646                                    };
647                                    let request = BitswapRequest { ty, cid: info.cid };
648                                    self.requests.insert(BitswapId::Compat(info.cid), id);
649                                    tracing::trace!("adding compat peer {}", peer);
650                                    self.compat.insert(peer);
651                                    return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
652                                        peer_id: peer,
653                                        handler: NotifyHandler::Any,
654                                        event: EitherOutput::Second(CompatMessage::Request(
655                                            request,
656                                        )),
657                                    });
658                                }
659                            }
660                        }
661                        if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id)) {
662                            self.query_manager
663                                .inject_response(id, Response::Have(peer, false));
664                        }
665                    }
666                    RequestResponseEvent::InboundFailure {
667                        peer,
668                        request_id,
669                        error,
670                    } => {
671                        self.inject_inbound_failure(&peer, request_id, &error);
672                    }
673                }
674            }
675        }
676        Poll::Pending
677    }
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683    use async_std::task;
684    use futures::prelude::*;
685    use libipld::block::Block;
686    use libipld::cbor::DagCborCodec;
687    use libipld::ipld;
688    use libipld::ipld::Ipld;
689    use libipld::multihash::Code;
690    use libipld::store::DefaultParams;
691    use libp2p::core::muxing::StreamMuxerBox;
692    use libp2p::core::transport::Boxed;
693    use libp2p::identity;
694    use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
695    use libp2p::swarm::SwarmEvent;
696    use libp2p::tcp::{self, async_io};
697    use libp2p::yamux::YamuxConfig;
698    use libp2p::{PeerId, Swarm, Transport};
699    use std::sync::{Arc, Mutex};
700    use std::time::Duration;
701
702    fn tracing_try_init() {
703        tracing_subscriber::fmt()
704            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
705            .try_init()
706            .ok();
707    }
708
709    fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
710        let id_key = identity::Keypair::generate_ed25519();
711        let peer_id = id_key.public().to_peer_id();
712        let dh_key = Keypair::<X25519Spec>::new()
713            .into_authentic(&id_key)
714            .unwrap();
715        let noise = NoiseConfig::xx(dh_key).into_authenticated();
716
717        let transport = async_io::Transport::new(tcp::Config::new().nodelay(true))
718            .upgrade(libp2p::core::upgrade::Version::V1)
719            .authenticate(noise)
720            .multiplex(YamuxConfig::default())
721            .timeout(Duration::from_secs(20))
722            .boxed();
723        (peer_id, transport)
724    }
725
726    fn create_block(ipld: Ipld) -> Block<DefaultParams> {
727        Block::encode(DagCborCodec, Code::Blake3_256, &ipld).unwrap()
728    }
729
730    #[derive(Clone, Default)]
731    struct Store(Arc<Mutex<FnvHashMap<Cid, Vec<u8>>>>);
732
733    impl BitswapStore for Store {
734        type Params = DefaultParams;
735        fn contains(&mut self, cid: &Cid) -> Result<bool> {
736            Ok(self.0.lock().unwrap().contains_key(cid))
737        }
738        fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
739            Ok(self.0.lock().unwrap().get(cid).cloned())
740        }
741        fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
742            self.0
743                .lock()
744                .unwrap()
745                .insert(*block.cid(), block.data().to_vec());
746            Ok(())
747        }
748        fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
749            let mut stack = vec![*cid];
750            let mut missing = vec![];
751            while let Some(cid) = stack.pop() {
752                if let Some(data) = self.get(&cid)? {
753                    let block = Block::<Self::Params>::new_unchecked(cid, data);
754                    block.references(&mut stack)?;
755                } else {
756                    missing.push(cid);
757                }
758            }
759            Ok(missing)
760        }
761    }
762
763    struct Peer {
764        peer_id: PeerId,
765        addr: Multiaddr,
766        store: Store,
767        swarm: Swarm<Bitswap<DefaultParams>>,
768    }
769
770    impl Peer {
771        fn new() -> Self {
772            let (peer_id, trans) = mk_transport();
773            let store = Store::default();
774            let mut swarm = Swarm::with_async_std_executor(
775                trans,
776                Bitswap::new(BitswapConfig::new(), store.clone()),
777                peer_id,
778            );
779            Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
780            while swarm.next().now_or_never().is_some() {}
781            let addr = Swarm::listeners(&swarm).next().unwrap().clone();
782            Self {
783                peer_id,
784                addr,
785                store,
786                swarm,
787            }
788        }
789
790        fn add_address(&mut self, peer: &Peer) {
791            self.swarm
792                .behaviour_mut()
793                .add_address(&peer.peer_id, peer.addr.clone());
794        }
795
796        fn store(&mut self) -> impl std::ops::DerefMut<Target = FnvHashMap<Cid, Vec<u8>>> + '_ {
797            self.store.0.lock().unwrap()
798        }
799
800        fn swarm(&mut self) -> &mut Swarm<Bitswap<DefaultParams>> {
801            &mut self.swarm
802        }
803
804        fn spawn(mut self, name: &'static str) -> PeerId {
805            let peer_id = self.peer_id;
806            task::spawn(async move {
807                loop {
808                    let event = self.swarm.next().await;
809                    tracing::debug!("{}: {:?}", name, event);
810                }
811            });
812            peer_id
813        }
814
815        async fn next(&mut self) -> Option<BitswapEvent> {
816            loop {
817                let ev = self.swarm.next().await?;
818                if let SwarmEvent::Behaviour(event) = ev {
819                    return Some(event);
820                }
821            }
822        }
823    }
824
825    fn assert_progress(event: Option<BitswapEvent>, id: QueryId, missing: usize) {
826        if let Some(BitswapEvent::Progress(id2, missing2)) = event {
827            assert_eq!(id2, id);
828            assert_eq!(missing2, missing);
829        } else {
830            panic!("{:?} is not a progress event", event);
831        }
832    }
833
834    fn assert_complete_ok(event: Option<BitswapEvent>, id: QueryId) {
835        if let Some(BitswapEvent::Complete(id2, Ok(()))) = event {
836            assert_eq!(id2, id);
837        } else {
838            panic!("{:?} is not a complete event", event);
839        }
840    }
841
842    #[async_std::test]
843    async fn test_bitswap_get() {
844        tracing_try_init();
845        let mut peer1 = Peer::new();
846        let mut peer2 = Peer::new();
847        peer2.add_address(&peer1);
848
849        let block = create_block(ipld!(&b"hello world"[..]));
850        peer1.store().insert(*block.cid(), block.data().to_vec());
851        let peer1 = peer1.spawn("peer1");
852
853        let id = peer2
854            .swarm()
855            .behaviour_mut()
856            .get(*block.cid(), std::iter::once(peer1));
857
858        assert_complete_ok(peer2.next().await, id);
859    }
860
861    #[async_std::test]
862    async fn test_bitswap_cancel_get() {
863        tracing_try_init();
864        let mut peer1 = Peer::new();
865        let mut peer2 = Peer::new();
866        peer2.add_address(&peer1);
867
868        let block = create_block(ipld!(&b"hello world"[..]));
869        peer1.store().insert(*block.cid(), block.data().to_vec());
870        let peer1 = peer1.spawn("peer1");
871
872        let id = peer2
873            .swarm()
874            .behaviour_mut()
875            .get(*block.cid(), std::iter::once(peer1));
876        peer2.swarm().behaviour_mut().cancel(id);
877        let res = peer2.next().now_or_never();
878        println!("{:?}", res);
879        assert!(res.is_none());
880    }
881
882    #[async_std::test]
883    async fn test_bitswap_sync() {
884        tracing_try_init();
885        let mut peer1 = Peer::new();
886        let mut peer2 = Peer::new();
887        peer2.add_address(&peer1);
888
889        let b0 = create_block(ipld!({
890            "n": 0,
891        }));
892        let b1 = create_block(ipld!({
893            "prev": b0.cid(),
894            "n": 1,
895        }));
896        let b2 = create_block(ipld!({
897            "prev": b1.cid(),
898            "n": 2,
899        }));
900        peer1.store().insert(*b0.cid(), b0.data().to_vec());
901        peer1.store().insert(*b1.cid(), b1.data().to_vec());
902        peer1.store().insert(*b2.cid(), b2.data().to_vec());
903        let peer1 = peer1.spawn("peer1");
904
905        let id =
906            peer2
907                .swarm()
908                .behaviour_mut()
909                .sync(*b2.cid(), vec![peer1], std::iter::once(*b2.cid()));
910
911        assert_progress(peer2.next().await, id, 1);
912        assert_progress(peer2.next().await, id, 1);
913
914        assert_complete_ok(peer2.next().await, id);
915    }
916
917    #[async_std::test]
918    async fn test_bitswap_cancel_sync() {
919        tracing_try_init();
920        let mut peer1 = Peer::new();
921        let mut peer2 = Peer::new();
922        peer2.add_address(&peer1);
923
924        let block = create_block(ipld!(&b"hello world"[..]));
925        peer1.store().insert(*block.cid(), block.data().to_vec());
926        let peer1 = peer1.spawn("peer1");
927
928        let id = peer2.swarm().behaviour_mut().sync(
929            *block.cid(),
930            vec![peer1],
931            std::iter::once(*block.cid()),
932        );
933        peer2.swarm().behaviour_mut().cancel(id);
934        let res = peer2.next().now_or_never();
935        println!("{:?}", res);
936        assert!(res.is_none());
937    }
938
939    #[cfg(feature = "compat")]
940    #[async_std::test]
941    async fn compat_test() {
942        tracing_try_init();
943        let cid: Cid = "QmP8njGuyiw9cjkhwHD9nZhyBTHufXFanAvZgcy9xYoWiB"
944            .parse()
945            .unwrap();
946        let peer_id: PeerId = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
947            .parse()
948            .unwrap();
949        let multiaddr: Multiaddr = "/ip4/104.131.131.82/tcp/4001".parse().unwrap();
950
951        let mut peer = Peer::new();
952        peer.swarm()
953            .behaviour_mut()
954            .add_address(&peer_id, multiaddr);
955        let id = peer
956            .swarm()
957            .behaviour_mut()
958            .get(cid, std::iter::once(peer_id));
959        assert_complete_ok(peer.next().await, id);
960    }
961}