libp2p_bitswap_next/
behaviour.rs

1//! Handles the `/ipfs/bitswap/1.0.0` and `/ipfs/bitswap/1.1.0` protocols. This
2//! allows exchanging IPFS blocks.
3//!
4//! # Usage
5//!
6//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
7//! will allow providing and reciving IPFS blocks.
8#[cfg(feature = "compat")]
9use crate::compat::{CompatMessage, CompatProtocol, InboundMessage};
10use crate::protocol::{
11    BitswapCodec, BitswapRequest, BitswapResponse, RequestType, LIBP2P_BITSWAP_PROTOCOL,
12};
13use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response};
14use crate::stats::*;
15use fnv::FnvHashMap;
16#[cfg(feature = "compat")]
17use fnv::FnvHashSet;
18use futures::future::BoxFuture;
19use futures::{
20    channel::mpsc,
21    stream::{Stream, StreamExt},
22    task::{Context, Poll},
23};
24use libipld::{error::BlockNotFound, store::StoreParams, Block, Cid, Result};
25use libp2p::core::{Endpoint, Multiaddr};
26use libp2p::identity::PeerId;
27
28use libp2p::swarm::behaviour::ConnectionEstablished;
29#[cfg(feature = "compat")]
30use libp2p::swarm::derive_prelude::Either;
31use libp2p::swarm::{
32    derive_prelude::{ConnectionClosed, FromSwarm},
33    ConnectionId, THandlerInEvent,
34};
35use libp2p::swarm::{ConnectionDenied, THandler};
36#[cfg(feature = "compat")]
37use libp2p::swarm::{ConnectionHandlerSelect, NotifyHandler, OneShotHandler};
38use libp2p::{
39    request_response::{
40        Behaviour as RequestResponse, Config as RequestResponseConfig,
41        Event as RequestResponseEvent, InboundFailure, InboundRequestId,
42        Message as RequestResponseMessage, OutboundFailure, OutboundRequestId, ProtocolSupport,
43        ResponseChannel,
44    },
45    swarm::{ConnectionHandler, NetworkBehaviour, ToSwarm},
46};
47use prometheus::Registry;
48use std::{pin::Pin, time::Duration};
49
50/// Bitswap response channel.
51pub type Channel = ResponseChannel<BitswapResponse>;
52
53/// Event emitted by the bitswap behaviour.
54#[derive(Debug)]
55pub enum BitswapEvent {
56    /// Received a block from a peer. Includes the number of known missing blocks for a
57    /// sync query. When a block is received and missing blocks is not empty the counter
58    /// is increased. If missing blocks is empty the counter is decremented.
59    Progress(QueryId, usize),
60    /// A get or sync query completed.
61    Complete(QueryId, Result<()>),
62}
63
64/// Trait implemented by a block store.
65#[async_trait::async_trait]
66pub trait BitswapStore: Send + Sync + 'static {
67    /// The store params.
68    type Params: StoreParams;
69    /// A have query needs to know if the block store contains the block.
70    async fn contains(&mut self, cid: &Cid) -> Result<bool>;
71    /// A block query needs to retrieve the block from the store.
72    async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
73    /// A block response needs to insert the block into the store.
74    async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
75    /// A sync query needs a list of missing blocks to make progress.
76    async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
77}
78
79/// Bitswap configuration.
80#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub struct BitswapConfig {
82    /// Timeout of a request.
83    pub request_timeout: Duration,
84}
85
86impl BitswapConfig {
87    /// Creates a new `BitswapConfig`.
88    pub fn new() -> Self {
89        Self {
90            request_timeout: Duration::from_secs(10),
91        }
92    }
93}
94
95impl Default for BitswapConfig {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
102enum BitswapId {
103    Bitswap(OutboundRequestId),
104    #[cfg(feature = "compat")]
105    Compat(Cid),
106}
107
108enum BitswapChannel {
109    Bitswap(Channel),
110    #[cfg(feature = "compat")]
111    Compat(PeerId, Cid),
112}
113
114/// Network behaviour that handles sending and receiving blocks.
115pub struct Bitswap<P: StoreParams> {
116    /// Inner behaviour.
117    inner: RequestResponse<BitswapCodec<P>>,
118    /// Query manager.
119    query_manager: QueryManager,
120    /// Requests.
121    requests: FnvHashMap<BitswapId, QueryId>,
122    /// Db request channel.
123    db_tx: mpsc::UnboundedSender<DbRequest<P>>,
124    /// Db response channel.
125    db_rx: mpsc::UnboundedReceiver<DbResponse>,
126    /// Compat peers.
127    #[cfg(feature = "compat")]
128    compat: FnvHashSet<PeerId>,
129}
130
131impl<P: StoreParams> Bitswap<P> {
132    /// Creates a new `Bitswap` behaviour.
133    pub fn new<S: BitswapStore<Params = P>>(
134        config: BitswapConfig,
135        store: S,
136        executor: Box<dyn FnOnce(BoxFuture<'static, ()>)>,
137    ) -> Self {
138        let rr_config =
139            RequestResponseConfig::default().with_request_timeout(config.request_timeout);
140        let protocols = std::iter::once((LIBP2P_BITSWAP_PROTOCOL, ProtocolSupport::Full));
141        let inner = RequestResponse::new(protocols, rr_config);
142        let (db_tx, db_rx) = start_db_thread(store, executor);
143        Self {
144            inner,
145            query_manager: Default::default(),
146            requests: Default::default(),
147            db_tx,
148            db_rx,
149            #[cfg(feature = "compat")]
150            compat: Default::default(),
151        }
152    }
153
154    /// Adds an address for a peer.
155    pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
156        self.query_manager.add_peer(peer_id);
157        #[allow(deprecated)]
158        self.inner.add_address(peer_id, addr);
159    }
160
161    /// Removes an address for a peer.
162    pub fn remove_address(&mut self, peer_id: &PeerId, addr: &Multiaddr) {
163        self.query_manager.remove_peer(peer_id);
164        #[allow(deprecated)]
165        self.inner.remove_address(peer_id, addr);
166    }
167
168    /// Starts a get query with an initial guess of providers.
169    pub fn get(&mut self, cid: Cid, peers: impl Iterator<Item = PeerId>) -> QueryId {
170        self.query_manager.get(None, cid, peers)
171    }
172
173    /// Starts a sync query with an the initial set of missing blocks.
174    pub fn sync(
175        &mut self,
176        cid: Cid,
177        peers: Vec<PeerId>,
178        missing: impl Iterator<Item = Cid>,
179    ) -> QueryId {
180        self.query_manager.sync(cid, peers, missing)
181    }
182
183    /// Cancels an in progress query. Returns true if a query was cancelled.
184    pub fn cancel(&mut self, id: QueryId) -> bool {
185        let res = self.query_manager.cancel(id);
186        if res {
187            REQUESTS_CANCELED.inc();
188        }
189        res
190    }
191
192    /// Registers prometheus metrics.
193    pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
194        registry.register(Box::new(REQUESTS_TOTAL.clone()))?;
195        registry.register(Box::new(REQUEST_DURATION_SECONDS.clone()))?;
196        registry.register(Box::new(REQUESTS_CANCELED.clone()))?;
197        registry.register(Box::new(BLOCK_NOT_FOUND.clone()))?;
198        registry.register(Box::new(PROVIDERS_TOTAL.clone()))?;
199        registry.register(Box::new(MISSING_BLOCKS_TOTAL.clone()))?;
200        registry.register(Box::new(RECEIVED_BLOCK_BYTES.clone()))?;
201        registry.register(Box::new(RECEIVED_INVALID_BLOCK_BYTES.clone()))?;
202        registry.register(Box::new(SENT_BLOCK_BYTES.clone()))?;
203        registry.register(Box::new(RESPONSES_TOTAL.clone()))?;
204        registry.register(Box::new(THROTTLED_INBOUND.clone()))?;
205        registry.register(Box::new(THROTTLED_OUTBOUND.clone()))?;
206        registry.register(Box::new(OUTBOUND_FAILURE.clone()))?;
207        registry.register(Box::new(INBOUND_FAILURE.clone()))?;
208        Ok(())
209    }
210}
211
212enum DbRequest<P: StoreParams> {
213    Bitswap(BitswapChannel, BitswapRequest),
214    Insert(Block<P>),
215    MissingBlocks(QueryId, Cid),
216}
217
218enum DbResponse {
219    Bitswap(BitswapChannel, BitswapResponse),
220    MissingBlocks(QueryId, Result<Vec<Cid>>),
221}
222
223fn start_db_thread<S: BitswapStore>(
224    mut store: S,
225    executor: Box<dyn FnOnce(BoxFuture<'static, ()>)>,
226) -> (
227    mpsc::UnboundedSender<DbRequest<S::Params>>,
228    mpsc::UnboundedReceiver<DbResponse>,
229) {
230    let (tx, requests) = mpsc::unbounded();
231    let (responses, rx) = mpsc::unbounded();
232    executor(Box::pin(async move {
233        let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
234        while let Some(request) = requests.next().await {
235            match request {
236                DbRequest::Bitswap(channel, request) => {
237                    let response = match request.ty {
238                        RequestType::Have => {
239                            let have = store.contains(&request.cid).await.unwrap_or_default();
240                            if have {
241                                RESPONSES_TOTAL.with_label_values(&["have"]).inc();
242                            } else {
243                                RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
244                            }
245                            tracing::trace!("have {}", have);
246                            BitswapResponse::Have(have)
247                        }
248                        RequestType::Block => {
249                            let block = store.get(&request.cid).await.unwrap_or_default();
250                            if let Some(data) = block {
251                                RESPONSES_TOTAL.with_label_values(&["block"]).inc();
252                                SENT_BLOCK_BYTES.inc_by(data.len() as u64);
253                                tracing::trace!("block {}", data.len());
254                                BitswapResponse::Block(data)
255                            } else {
256                                RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
257                                tracing::trace!("have false");
258                                BitswapResponse::Have(false)
259                            }
260                        }
261                    };
262                    responses
263                        .unbounded_send(DbResponse::Bitswap(channel, response))
264                        .ok();
265                }
266                DbRequest::Insert(block) => {
267                    if let Err(err) = store.insert(&block).await {
268                        tracing::error!("error inserting blocks {}", err);
269                    }
270                }
271                DbRequest::MissingBlocks(id, cid) => {
272                    let res = store.missing_blocks(&cid).await;
273                    responses
274                        .unbounded_send(DbResponse::MissingBlocks(id, res))
275                        .ok();
276                }
277            }
278        }
279    }));
280    (tx, rx)
281}
282
283impl<P: StoreParams> Bitswap<P> {
284    /// Processes an incoming bitswap request.
285    fn inject_request(&mut self, channel: BitswapChannel, request: BitswapRequest) {
286        self.db_tx
287            .unbounded_send(DbRequest::Bitswap(channel, request))
288            .ok();
289    }
290
291    /// Processes an incoming bitswap response.
292    fn inject_response(&mut self, id: BitswapId, peer: PeerId, response: BitswapResponse) {
293        if let Some(id) = self.requests.remove(&id) {
294            match response {
295                BitswapResponse::Have(have) => {
296                    self.query_manager
297                        .inject_response(id, Response::Have(peer, have));
298                }
299                BitswapResponse::Block(data) => {
300                    if let Some(info) = self.query_manager.query_info(id) {
301                        let len = data.len();
302                        if let Ok(block) = Block::new(info.cid, data) {
303                            RECEIVED_BLOCK_BYTES.inc_by(len as u64);
304                            self.db_tx.unbounded_send(DbRequest::Insert(block)).ok();
305                            self.query_manager
306                                .inject_response(id, Response::Block(peer, true));
307                        } else {
308                            tracing::error!("received invalid block");
309                            RECEIVED_INVALID_BLOCK_BYTES.inc_by(len as u64);
310                            self.query_manager
311                                .inject_response(id, Response::Block(peer, false));
312                        }
313                    }
314                }
315            }
316        }
317    }
318
319    fn inject_outbound_failure(
320        &mut self,
321        peer: &PeerId,
322        request_id: OutboundRequestId,
323        error: &OutboundFailure,
324    ) {
325        tracing::debug!(
326            "bitswap outbound failure {} {} {:?}",
327            peer,
328            request_id,
329            error
330        );
331        match error {
332            OutboundFailure::DialFailure => {
333                OUTBOUND_FAILURE.with_label_values(&["dial_failure"]).inc();
334            }
335            OutboundFailure::Timeout => {
336                OUTBOUND_FAILURE.with_label_values(&["timeout"]).inc();
337            }
338            OutboundFailure::ConnectionClosed => {
339                OUTBOUND_FAILURE
340                    .with_label_values(&["connection_closed"])
341                    .inc();
342            }
343            OutboundFailure::UnsupportedProtocols => {
344                OUTBOUND_FAILURE
345                    .with_label_values(&["unsupported_protocols"])
346                    .inc();
347            }
348            OutboundFailure::Io(_) => {
349                OUTBOUND_FAILURE.with_label_values(&["io"]).inc();
350            }
351        }
352    }
353
354    fn inject_inbound_failure(
355        &mut self,
356        peer: &PeerId,
357        request_id: InboundRequestId,
358        error: &InboundFailure,
359    ) {
360        tracing::error!(
361            "bitswap inbound failure {} {} {:?}",
362            peer,
363            request_id,
364            error
365        );
366        match error {
367            InboundFailure::Timeout => {
368                INBOUND_FAILURE.with_label_values(&["timeout"]).inc();
369            }
370            InboundFailure::ConnectionClosed => {
371                INBOUND_FAILURE
372                    .with_label_values(&["connection_closed"])
373                    .inc();
374            }
375            InboundFailure::UnsupportedProtocols => {
376                INBOUND_FAILURE
377                    .with_label_values(&["unsupported_protocols"])
378                    .inc();
379            }
380            InboundFailure::ResponseOmission => {
381                INBOUND_FAILURE
382                    .with_label_values(&["response_omission"])
383                    .inc();
384            }
385            InboundFailure::Io(_) => {
386                INBOUND_FAILURE.with_label_values(&["io"]).inc();
387            }
388        }
389    }
390}
391
392impl<P: StoreParams> NetworkBehaviour for Bitswap<P> {
393    #[cfg(not(feature = "compat"))]
394    type ConnectionHandler =
395        <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler;
396
397    #[cfg(feature = "compat")]
398    #[allow(clippy::type_complexity)]
399    type ConnectionHandler = ConnectionHandlerSelect<
400        <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler,
401        OneShotHandler<CompatProtocol, CompatMessage, InboundMessage>,
402    >;
403    type ToSwarm = BitswapEvent;
404
405    fn handle_established_inbound_connection(
406        &mut self,
407        connection_id: ConnectionId,
408        peer: PeerId,
409        local_addr: &Multiaddr,
410        remote_addr: &Multiaddr,
411    ) -> Result<THandler<Self>, ConnectionDenied> {
412        #[cfg(not(feature = "compat"))]
413        return self.inner.handle_established_inbound_connection(
414            connection_id,
415            peer,
416            local_addr,
417            remote_addr,
418        );
419        #[cfg(feature = "compat")]
420        return self
421            .inner
422            .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)
423            .map(|handle| ConnectionHandler::select(handle, OneShotHandler::default()));
424    }
425
426    fn handle_established_outbound_connection(
427        &mut self,
428        connection_id: ConnectionId,
429        peer: PeerId,
430        addr: &Multiaddr,
431        role_override: Endpoint,
432    ) -> Result<THandler<Self>, ConnectionDenied> {
433        #[cfg(not(feature = "compat"))]
434        return self.inner.handle_established_outbound_connection(
435            connection_id,
436            peer,
437            addr,
438            role_override,
439        );
440        #[cfg(feature = "compat")]
441        return self
442            .inner
443            .handle_established_outbound_connection(connection_id, peer, addr, role_override)
444            .map(|handle| ConnectionHandler::select(handle, OneShotHandler::default()));
445    }
446
447    fn handle_pending_outbound_connection(
448        &mut self,
449        connection_id: ConnectionId,
450        maybe_peer: Option<PeerId>,
451        addresses: &[Multiaddr],
452        effective_role: Endpoint,
453    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
454        self.inner.handle_pending_outbound_connection(
455            connection_id,
456            maybe_peer,
457            addresses,
458            effective_role,
459        )
460    }
461
462    fn on_swarm_event(&mut self, event: FromSwarm) {
463        match event {
464            FromSwarm::ConnectionEstablished(ConnectionEstablished {
465                peer_id,
466                other_established,
467                connection_id,
468                endpoint,
469                failed_addresses,
470            }) => {
471                if other_established == 0 {
472                    self.query_manager.add_peer(&peer_id);
473                }
474
475                self.inner.on_swarm_event(FromSwarm::ConnectionEstablished(
476                    ConnectionEstablished {
477                        peer_id,
478                        other_established,
479                        connection_id,
480                        endpoint,
481                        failed_addresses,
482                    },
483                ));
484            }
485            FromSwarm::ConnectionClosed(ConnectionClosed {
486                peer_id,
487                connection_id,
488                endpoint,
489                remaining_established,
490            }) => {
491                #[cfg(feature = "compat")]
492                if remaining_established == 0 {
493                    self.compat.remove(&peer_id);
494                }
495                if remaining_established == 0 {
496                    self.query_manager.remove_peer(&peer_id);
497                }
498                self.inner
499                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
500                        peer_id,
501                        connection_id,
502                        endpoint,
503                        remaining_established,
504                    }));
505            }
506            ev => self.inner.on_swarm_event(ev),
507        }
508    }
509
510    fn on_connection_handler_event(
511        &mut self,
512        peer_id: PeerId,
513        conn: ConnectionId,
514        event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
515    ) {
516        tracing::trace!(?event, "on_connection_handler_event");
517        #[cfg(not(feature = "compat"))]
518        return self.inner.on_connection_handler_event(peer_id, conn, event);
519        #[cfg(feature = "compat")]
520        match event {
521            Either::Left(event) => self.inner.on_connection_handler_event(peer_id, conn, event),
522            Either::Right(msg) => {
523                if let Ok(msg) = msg {
524                    for msg in msg.0 {
525                        match msg {
526                            CompatMessage::Request(req) => {
527                                tracing::trace!("received compat request");
528                                self.inject_request(BitswapChannel::Compat(peer_id, req.cid), req);
529                            }
530                            CompatMessage::Response(cid, res) => {
531                                tracing::trace!("received compat response");
532                                self.inject_response(BitswapId::Compat(cid), peer_id, res);
533                            }
534                        }
535                    }
536                }
537            }
538        }
539    }
540
541    fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
542        let mut exit = false;
543        while !exit {
544            exit = true;
545            while let Poll::Ready(Some(response)) = Pin::new(&mut self.db_rx).poll_next(cx) {
546                exit = false;
547                match response {
548                    DbResponse::Bitswap(channel, response) => match channel {
549                        BitswapChannel::Bitswap(channel) => {
550                            self.inner.send_response(channel, response).ok();
551                        }
552                        #[cfg(feature = "compat")]
553                        BitswapChannel::Compat(peer_id, cid) => {
554                            let compat = CompatMessage::Response(cid, response);
555                            return Poll::Ready(ToSwarm::NotifyHandler {
556                                peer_id,
557                                handler: NotifyHandler::Any,
558                                event: Either::Right(compat),
559                            });
560                        }
561                    },
562                    DbResponse::MissingBlocks(id, res) => match res {
563                        Ok(missing) => {
564                            MISSING_BLOCKS_TOTAL.inc_by(missing.len() as u64);
565                            self.query_manager
566                                .inject_response(id, Response::MissingBlocks(missing));
567                        }
568                        Err(err) => {
569                            self.query_manager.cancel(id);
570                            let event = BitswapEvent::Complete(id, Err(err));
571                            return Poll::Ready(ToSwarm::GenerateEvent(event));
572                        }
573                    },
574                }
575            }
576            while let Some(query) = self.query_manager.next() {
577                exit = false;
578                match query {
579                    QueryEvent::Request(id, req) => match req {
580                        Request::Have(peer_id, cid) => {
581                            let req = BitswapRequest {
582                                ty: RequestType::Have,
583                                cid,
584                            };
585                            let rid = self.inner.send_request(&peer_id, req);
586                            self.requests.insert(BitswapId::Bitswap(rid), id);
587                        }
588                        Request::Block(peer_id, cid) => {
589                            let req = BitswapRequest {
590                                ty: RequestType::Block,
591                                cid,
592                            };
593                            let rid = self.inner.send_request(&peer_id, req);
594                            self.requests.insert(BitswapId::Bitswap(rid), id);
595                        }
596                        Request::MissingBlocks(cid) => {
597                            self.db_tx
598                                .unbounded_send(DbRequest::MissingBlocks(id, cid))
599                                .ok();
600                        }
601                    },
602                    QueryEvent::Progress(id, missing) => {
603                        let event = BitswapEvent::Progress(id, missing);
604                        return Poll::Ready(ToSwarm::GenerateEvent(event));
605                    }
606                    QueryEvent::Complete(id, res) => {
607                        if res.is_err() {
608                            BLOCK_NOT_FOUND.inc();
609                        }
610                        let event = BitswapEvent::Complete(
611                            id,
612                            res.map_err(|cid| BlockNotFound(cid).into()),
613                        );
614                        return Poll::Ready(ToSwarm::GenerateEvent(event));
615                    }
616                }
617            }
618            while let Poll::Ready(event) = self.inner.poll(cx) {
619                exit = false;
620                let event = match event {
621                    ToSwarm::GenerateEvent(event) => event,
622                    ToSwarm::Dial { opts } => {
623                        return Poll::Ready(ToSwarm::Dial { opts });
624                    }
625                    ToSwarm::NotifyHandler {
626                        peer_id,
627                        handler,
628                        event,
629                    } => {
630                        return Poll::Ready(ToSwarm::NotifyHandler {
631                            peer_id,
632                            handler,
633                            #[cfg(not(feature = "compat"))]
634                            event,
635                            #[cfg(feature = "compat")]
636                            event: Either::Left(event),
637                        });
638                    }
639                    ToSwarm::ListenOn { opts } => {
640                        return Poll::Ready(ToSwarm::ListenOn { opts });
641                    }
642                    ToSwarm::RemoveListener { id } => {
643                        return Poll::Ready(ToSwarm::RemoveListener { id });
644                    }
645                    ToSwarm::NewExternalAddrCandidate(address) => {
646                        return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address));
647                    }
648                    ToSwarm::ExternalAddrConfirmed(address) => {
649                        return Poll::Ready(ToSwarm::ExternalAddrConfirmed(address));
650                    }
651                    ToSwarm::ExternalAddrExpired(address) => {
652                        return Poll::Ready(ToSwarm::ExternalAddrExpired(address));
653                    }
654                    ToSwarm::CloseConnection {
655                        peer_id,
656                        connection,
657                    } => {
658                        return Poll::Ready(ToSwarm::CloseConnection {
659                            peer_id,
660                            connection,
661                        });
662                    }
663                    _ => todo!(),
664                };
665                match event {
666                    RequestResponseEvent::Message { peer, message } => match message {
667                        RequestResponseMessage::Request {
668                            request_id: _,
669                            request,
670                            channel,
671                        } => self.inject_request(BitswapChannel::Bitswap(channel), request),
672                        RequestResponseMessage::Response {
673                            request_id,
674                            response,
675                        } => self.inject_response(BitswapId::Bitswap(request_id), peer, response),
676                    },
677                    RequestResponseEvent::ResponseSent { .. } => {}
678                    RequestResponseEvent::OutboundFailure {
679                        peer,
680                        request_id,
681                        error,
682                    } => {
683                        self.inject_outbound_failure(&peer, request_id, &error);
684                        #[cfg(feature = "compat")]
685                        if let OutboundFailure::UnsupportedProtocols = error {
686                            if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id))
687                            {
688                                if let Some(info) = self.query_manager.query_info(id) {
689                                    let ty = match info.label {
690                                        "have" => RequestType::Have,
691                                        "block" => RequestType::Block,
692                                        _ => unreachable!(),
693                                    };
694                                    let request = BitswapRequest { ty, cid: info.cid };
695                                    self.requests.insert(BitswapId::Compat(info.cid), id);
696                                    tracing::trace!("adding compat peer {}", peer);
697                                    self.compat.insert(peer);
698                                    return Poll::Ready(ToSwarm::NotifyHandler {
699                                        peer_id: peer,
700                                        handler: NotifyHandler::Any,
701                                        event: Either::Right(CompatMessage::Request(request)),
702                                    });
703                                }
704                            }
705                        }
706                        if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id)) {
707                            self.query_manager
708                                .inject_response(id, Response::Have(peer, false));
709                        }
710                    }
711                    RequestResponseEvent::InboundFailure {
712                        peer,
713                        request_id,
714                        error,
715                    } => {
716                        self.inject_inbound_failure(&peer, request_id, &error);
717                    }
718                }
719            }
720        }
721        Poll::Pending
722    }
723}
724
725#[cfg(test)]
726mod tests {
727    use super::*;
728    use async_std::task;
729    use futures::prelude::*;
730    use libipld::block::Block;
731    use libipld::cbor::DagCborCodec;
732    use libipld::ipld;
733    use libipld::ipld::Ipld;
734    use libipld::multihash::Code;
735    use libipld::store::DefaultParams;
736    use libp2p::swarm::SwarmEvent;
737    use libp2p::SwarmBuilder;
738    use libp2p::{PeerId, Swarm};
739    use std::sync::{Arc, Mutex};
740    use std::time::Duration;
741    use tracing_subscriber::fmt::TestWriter;
742
743    fn tracing_try_init() {
744        tracing_subscriber::fmt()
745            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
746            .with_writer(TestWriter::new())
747            .try_init()
748            .ok();
749    }
750
751    fn create_block(ipld: Ipld) -> Block<DefaultParams> {
752        Block::encode(DagCborCodec, Code::Blake3_256, &ipld).unwrap()
753    }
754
755    #[derive(Clone, Default)]
756    struct Store(Arc<Mutex<FnvHashMap<Cid, Vec<u8>>>>);
757
758    #[async_trait::async_trait]
759    impl BitswapStore for Store {
760        type Params = DefaultParams;
761        async fn contains(&mut self, cid: &Cid) -> Result<bool> {
762            Ok(self.0.lock().unwrap().contains_key(cid))
763        }
764        async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
765            Ok(self.0.lock().unwrap().get(cid).cloned())
766        }
767        async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
768            self.0
769                .lock()
770                .unwrap()
771                .insert(*block.cid(), block.data().to_vec());
772            Ok(())
773        }
774        async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
775            let mut stack = vec![*cid];
776            let mut missing = vec![];
777            while let Some(cid) = stack.pop() {
778                if let Some(data) = self.get(&cid).await? {
779                    let block = Block::<Self::Params>::new_unchecked(cid, data);
780                    block.references(&mut stack)?;
781                } else {
782                    missing.push(cid);
783                }
784            }
785            Ok(missing)
786        }
787    }
788
789    struct Peer {
790        peer_id: PeerId,
791        addr: Multiaddr,
792        store: Store,
793        swarm: Swarm<Bitswap<DefaultParams>>,
794    }
795
796    impl Peer {
797        fn new() -> Self {
798            let store = Store::default();
799            let mut swarm = SwarmBuilder::with_new_identity()
800                .with_async_std()
801                .with_tcp(
802                    Default::default(),
803                    libp2p::noise::Config::new,
804                    libp2p::yamux::Config::default,
805                )
806                .expect("valid")
807                .with_behaviour(|_| {
808                    Bitswap::new(
809                        BitswapConfig::new(),
810                        store.clone(),
811                        Box::new(|t| {
812                            async_std::task::spawn(t);
813                        }),
814                    )
815                })
816                .expect("Valid")
817                .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
818                .build();
819            let peer_id = *swarm.local_peer_id();
820            Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
821            while swarm.next().now_or_never().is_some() {}
822            let addr = Swarm::listeners(&swarm).next().unwrap().clone();
823            Self {
824                peer_id,
825                addr,
826                store,
827                swarm,
828            }
829        }
830
831        fn add_address(&mut self, peer: &Peer) {
832            self.swarm
833                .behaviour_mut()
834                .add_address(&peer.peer_id, peer.addr.clone());
835        }
836
837        fn store(&mut self) -> impl std::ops::DerefMut<Target = FnvHashMap<Cid, Vec<u8>>> + '_ {
838            self.store.0.lock().unwrap()
839        }
840
841        fn swarm(&mut self) -> &mut Swarm<Bitswap<DefaultParams>> {
842            &mut self.swarm
843        }
844
845        fn spawn(mut self, name: &'static str) -> PeerId {
846            let peer_id = self.peer_id;
847            task::spawn(async move {
848                loop {
849                    let event = self.swarm.next().await;
850                    tracing::debug!("{}: {:?}", name, event);
851                }
852            });
853            peer_id
854        }
855
856        async fn next(&mut self) -> Option<BitswapEvent> {
857            loop {
858                let ev = self.swarm.next().await?;
859                if let SwarmEvent::Behaviour(event) = ev {
860                    return Some(event);
861                }
862            }
863        }
864    }
865
866    fn assert_progress(event: Option<BitswapEvent>, id: QueryId, missing: usize) {
867        if let Some(BitswapEvent::Progress(id2, missing2)) = event {
868            assert_eq!(id2, id);
869            assert_eq!(missing2, missing);
870        } else {
871            panic!("{:?} is not a progress event", event);
872        }
873    }
874
875    fn assert_complete_ok(event: Option<BitswapEvent>, id: QueryId) {
876        if let Some(BitswapEvent::Complete(id2, Ok(()))) = event {
877            assert_eq!(id2, id);
878        } else {
879            panic!("{:?} is not a complete event", event);
880        }
881    }
882
883    #[async_std::test]
884    async fn test_bitswap_get() {
885        tracing_try_init();
886        let mut peer1 = Peer::new();
887        let mut peer2 = Peer::new();
888        peer2.add_address(&peer1);
889
890        let block = create_block(ipld!(&b"hello world"[..]));
891        peer1.store().insert(*block.cid(), block.data().to_vec());
892        let peer1 = peer1.spawn("peer1");
893
894        let id = peer2
895            .swarm()
896            .behaviour_mut()
897            .get(*block.cid(), std::iter::once(peer1));
898
899        assert_complete_ok(peer2.next().await, id);
900    }
901
902    #[async_std::test]
903    async fn test_bitswap_cancel_get() {
904        tracing_try_init();
905        let mut peer1 = Peer::new();
906        let mut peer2 = Peer::new();
907        peer2.add_address(&peer1);
908
909        let block = create_block(ipld!(&b"hello world"[..]));
910        peer1.store().insert(*block.cid(), block.data().to_vec());
911        let peer1 = peer1.spawn("peer1");
912
913        let id = peer2
914            .swarm()
915            .behaviour_mut()
916            .get(*block.cid(), std::iter::once(peer1));
917        peer2.swarm().behaviour_mut().cancel(id);
918        let res = peer2.next().now_or_never();
919        println!("{:?}", res);
920        assert!(res.is_none());
921    }
922
923    #[async_std::test]
924    async fn test_bitswap_sync() {
925        tracing_try_init();
926        let mut peer1 = Peer::new();
927        let mut peer2 = Peer::new();
928        peer2.add_address(&peer1);
929
930        let b0 = create_block(ipld!({
931            "n": 0,
932        }));
933        let b1 = create_block(ipld!({
934            "prev": b0.cid(),
935            "n": 1,
936        }));
937        let b2 = create_block(ipld!({
938            "prev": b1.cid(),
939            "n": 2,
940        }));
941        peer1.store().insert(*b0.cid(), b0.data().to_vec());
942        peer1.store().insert(*b1.cid(), b1.data().to_vec());
943        peer1.store().insert(*b2.cid(), b2.data().to_vec());
944        let peer1 = peer1.spawn("peer1");
945
946        let id =
947            peer2
948                .swarm()
949                .behaviour_mut()
950                .sync(*b2.cid(), vec![peer1], std::iter::once(*b2.cid()));
951
952        assert_progress(peer2.next().await, id, 1);
953        assert_progress(peer2.next().await, id, 1);
954
955        assert_complete_ok(peer2.next().await, id);
956    }
957
958    #[async_std::test]
959    async fn test_bitswap_cancel_sync() {
960        tracing_try_init();
961        let mut peer1 = Peer::new();
962        let mut peer2 = Peer::new();
963        peer2.add_address(&peer1);
964
965        let block = create_block(ipld!(&b"hello world"[..]));
966        peer1.store().insert(*block.cid(), block.data().to_vec());
967        let peer1 = peer1.spawn("peer1");
968
969        let id = peer2.swarm().behaviour_mut().sync(
970            *block.cid(),
971            vec![peer1],
972            std::iter::once(*block.cid()),
973        );
974        peer2.swarm().behaviour_mut().cancel(id);
975        let res = peer2.next().now_or_never();
976        println!("{:?}", res);
977        assert!(res.is_none());
978    }
979
980    #[cfg(feature = "compat")]
981    #[async_std::test]
982    async fn compat_test() {
983        tracing_try_init();
984        let cid: Cid = "QmP8njGuyiw9cjkhwHD9nZhyBTHufXFanAvZgcy9xYoWiB"
985            .parse()
986            .unwrap();
987        let peer_id: PeerId = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
988            .parse()
989            .unwrap();
990        let multiaddr: Multiaddr = "/ip4/104.131.131.82/tcp/4001".parse().unwrap();
991
992        let mut peer = Peer::new();
993        peer.swarm()
994            .behaviour_mut()
995            .add_address(&peer_id, multiaddr);
996        let id = peer
997            .swarm()
998            .behaviour_mut()
999            .get(cid, std::iter::once(peer_id));
1000        assert_complete_ok(peer.next().await, id);
1001    }
1002}