beetle_bitswap_next/
lib.rs

1//! Implements handling of the [bitswap protocol]((https://github.com/ipfs/specs/blob/master/BITSWAP.md)). Based on go-ipfs.
2//!
3//! Supports the versions `1.0.0`, `1.1.0` and `1.2.0`.
4
5use std::collections::hash_map::Entry;
6use std::collections::HashSet;
7use std::fmt::Debug;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use ahash::{AHashMap, AHashSet};
14use anyhow::Result;
15use async_trait::async_trait;
16use cid::Cid;
17use futures_util::StreamExt;
18use handler::{BitswapHandler, HandlerEvent};
19
20use futures::channel::{mpsc, oneshot};
21use libp2p::swarm::derive_prelude::ConnectionEstablished;
22use libp2p::swarm::dial_opts::DialOpts;
23use libp2p::swarm::{ConnectionClosed, ConnectionId, DialFailure, FromSwarm};
24use libp2p::swarm::{
25    ConnectionDenied, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, ToSwarm,
26};
27use libp2p::{Multiaddr, PeerId};
28use tokio::task::JoinHandle;
29use tracing::{debug, trace, warn};
30
31pub use self::client::session;
32use self::client::{Client, Config as ClientConfig};
33use self::message::BitswapMessage;
34use self::network::Network;
35use self::network::OutEvent;
36pub use self::protocol::ProtocolConfig;
37pub use self::server::{Config as ServerConfig, Server};
38
39mod block;
40mod client;
41mod error;
42mod handler;
43mod network;
44mod pb;
45mod prefix;
46mod protocol;
47mod server;
48
49pub mod message;
50pub mod peer_task_queue;
51
52pub use self::block::{tests::*, Block};
53pub use self::protocol::ProtocolId;
54
55// const DIAL_BACK_OFF: Duration = Duration::from_secs(10 * 60);
56
57type DialMap = AHashMap<
58    PeerId,
59    Vec<(
60        usize,
61        oneshot::Sender<std::result::Result<(ConnectionId, Option<ProtocolId>), String>>,
62    )>,
63>;
64
65#[derive(Debug)]
66pub struct Bitswap<S: Store> {
67    network: Network,
68    protocol_config: ProtocolConfig,
69    // peers: AHashMap<PeerId, Vec<(ConnectionId, PeerState)>>,
70    connected_peers: AHashMap<PeerId, AHashSet<ConnectionId>>,
71    connection_state: AHashMap<ConnectionId, ConnectionState>,
72    dials: DialMap,
73    /// Set to true when dialing should be disabled because we have reached the conn limit.
74    _pause_dialing: bool,
75    client: Client<S>,
76    server: Option<Server<S>>,
77    incoming_messages: mpsc::Sender<(PeerId, BitswapMessage)>,
78    peers_connected: mpsc::Sender<PeerId>,
79    peers_disconnected: mpsc::Sender<PeerId>,
80    _workers: Arc<Vec<JoinHandle<()>>>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum ConnectionState {
85    Pending,
86    Responsive(ProtocolId),
87    Unresponsive,
88}
89
90#[derive(Debug)]
91pub struct Config {
92    pub client: ClientConfig,
93    /// If no server config is set, the server is disabled.
94    pub server: Option<ServerConfig>,
95    pub protocol: ProtocolConfig,
96    pub idle_timeout: Duration,
97}
98
99impl Config {
100    pub fn default_client_mode() -> Self {
101        Config {
102            server: None,
103            ..Default::default()
104        }
105    }
106}
107
108impl Default for Config {
109    fn default() -> Self {
110        Config {
111            client: ClientConfig::default(),
112            server: Some(ServerConfig::default()),
113            protocol: ProtocolConfig::default(),
114            idle_timeout: Duration::from_secs(30),
115        }
116    }
117}
118
119#[async_trait]
120pub trait Store: Debug + Clone + Send + Sync + 'static {
121    async fn get_size(&self, cid: &Cid) -> Result<usize>;
122    async fn get(&self, cid: &Cid) -> Result<Block>;
123    async fn has(&self, cid: &Cid) -> Result<bool>;
124}
125
126impl<S: Store> Bitswap<S> {
127    pub async fn new(self_id: PeerId, store: S, config: Config) -> Self {
128        let network = Network::new(self_id);
129        let (server, cb) = if let Some(config) = config.server {
130            let server = Server::new(network.clone(), store.clone(), config).await;
131            let cb = server.received_blocks_cb();
132            (Some(server), Some(cb))
133        } else {
134            (None, None)
135        };
136        let client = Client::new(network.clone(), store, cb, config.client).await;
137
138        let (sender_msg, mut receiver_msg) = mpsc::channel::<(PeerId, BitswapMessage)>(2048);
139        let (sender_con, mut receiver_con) = mpsc::channel(2048);
140        let (sender_dis, mut receiver_dis) = mpsc::channel(2048);
141
142        let mut workers = Vec::new();
143        workers.push(tokio::task::spawn({
144            let server = server.clone();
145            let client = client.clone();
146
147            async move {
148                // process messages serially but without blocking the p2p loop
149                while let Some((peer, mut message)) = receiver_msg.next().await {
150                    let message = tokio::task::spawn_blocking(move || {
151                        message.verify_blocks();
152                        message
153                    })
154                    .await
155                    .expect("cannot spawn blocking thread");
156                    if let Some(ref server) = server {
157                        futures::future::join(
158                            client.receive_message(&peer, &message),
159                            server.receive_message(&peer, &message),
160                        )
161                        .await;
162                    } else {
163                        client.receive_message(&peer, &message).await;
164                    }
165                }
166            }
167        }));
168
169        workers.push(tokio::task::spawn({
170            let server = server.clone();
171            let client = client.clone();
172
173            async move {
174                // process messages serially but without blocking the p2p loop
175                while let Some(peer) = receiver_con.next().await {
176                    if let Some(ref server) = server {
177                        futures::future::join(
178                            client.peer_connected(&peer),
179                            server.peer_connected(&peer),
180                        )
181                        .await;
182                    } else {
183                        client.peer_connected(&peer).await;
184                    }
185                }
186            }
187        }));
188
189        workers.push(tokio::task::spawn({
190            let server = server.clone();
191            let client = client.clone();
192
193            async move {
194                // process messages serially but without blocking the p2p loop
195                while let Some(peer) = receiver_dis.next().await {
196                    if let Some(ref server) = server {
197                        futures::future::join(
198                            client.peer_disconnected(&peer),
199                            server.peer_disconnected(&peer),
200                        )
201                        .await;
202                    } else {
203                        client.peer_disconnected(&peer).await;
204                    }
205                }
206            }
207        }));
208
209        Bitswap {
210            network,
211            protocol_config: config.protocol,
212            connected_peers: Default::default(),
213            connection_state: Default::default(),
214            dials: Default::default(),
215            _pause_dialing: false,
216            server,
217            client,
218            incoming_messages: sender_msg,
219            peers_connected: sender_con,
220            peers_disconnected: sender_dis,
221            _workers: Arc::new(workers),
222        }
223    }
224
225    pub fn server(&self) -> Option<&Server<S>> {
226        self.server.as_ref()
227    }
228
229    pub fn client(&self) -> &Client<S> {
230        &self.client
231    }
232
233    pub async fn stop(self) -> Result<()> {
234        self.network.stop();
235        if let Some(server) = self.server {
236            futures::future::try_join(self.client.stop(), server.stop()).await?;
237        } else {
238            self.client.stop().await?;
239        }
240
241        Ok(())
242    }
243
244    pub async fn notify_new_blocks(&self, blocks: &[Block]) -> Result<()> {
245        self.client.notify_new_blocks(blocks).await?;
246        if let Some(ref server) = self.server {
247            server.notify_new_blocks(blocks).await?;
248        }
249
250        Ok(())
251    }
252
253    pub async fn wantlist_for_peer(&self, peer: &PeerId) -> Vec<Cid> {
254        if peer == self.network.self_id() {
255            return self.client.get_wantlist().await.into_iter().collect();
256        }
257
258        if let Some(ref server) = self.server {
259            server.wantlist_for_peer(peer).await
260        } else {
261            Vec::new()
262        }
263    }
264
265    fn peer_connected(&self, peer: PeerId) {
266        if let Err(err) = self.peers_connected.clone().try_send(peer) {
267            warn!(
268                "failed to process peer connection from {}: {:?}, dropping",
269                peer, err
270            );
271        }
272    }
273
274    fn peer_disconnected(&self, peer: PeerId) {
275        if let Err(err) = self.peers_disconnected.clone().try_send(peer) {
276            warn!(
277                "failed to process peer disconnection from {}: {:?}, dropping",
278                peer, err
279            );
280        }
281    }
282
283    fn receive_message(&self, peer: PeerId, message: BitswapMessage) {
284        // TODO: Handle backpressure properly
285        if let Err(err) = self.incoming_messages.clone().try_send((peer, message)) {
286            warn!(
287                "failed to receive message from {}: {:?}, dropping",
288                peer, err
289            );
290        }
291    }
292}
293
294#[derive(Debug)]
295pub enum BitswapEvent {
296    /// We have this content, and want it to be provided.
297    Provide { key: Cid },
298    FindProviders {
299        key: Cid,
300        response: mpsc::Sender<std::result::Result<HashSet<PeerId>, String>>,
301        limit: usize,
302    },
303    Ping {
304        peer: PeerId,
305        response: oneshot::Sender<Option<Duration>>,
306    },
307}
308
309impl<S: Store> NetworkBehaviour for Bitswap<S> {
310    type ConnectionHandler = BitswapHandler;
311    type ToSwarm = BitswapEvent;
312
313    fn handle_established_inbound_connection(
314        &mut self,
315        _connection_id: ConnectionId,
316        _: PeerId,
317        _: &Multiaddr,
318        _: &Multiaddr,
319    ) -> std::result::Result<THandler<Self>, ConnectionDenied> {
320        let protocol_config = self.protocol_config.clone();
321        Ok(BitswapHandler::new(protocol_config))
322    }
323
324    fn handle_established_outbound_connection(
325        &mut self,
326        _connection_id: ConnectionId,
327        _: PeerId,
328        _: &Multiaddr,
329        _: libp2p::core::Endpoint,
330    ) -> std::result::Result<THandler<Self>, ConnectionDenied> {
331        let protocol_config = self.protocol_config.clone();
332        Ok(BitswapHandler::new(protocol_config))
333    }
334
335    #[allow(clippy::collapsible_match)]
336    fn on_swarm_event(&mut self, event: FromSwarm) {
337        match event {
338            FromSwarm::ConnectionEstablished(ConnectionEstablished {
339                peer_id,
340                connection_id,
341                other_established,
342                ..
343            }) => {
344                trace!("connection established {} ({})", peer_id, other_established);
345
346                self.connected_peers
347                    .entry(peer_id)
348                    .or_default()
349                    .insert(connection_id);
350
351                self.connection_state
352                    .insert(connection_id, ConnectionState::Pending);
353                // self.set_peer_state(&peer_id, connection_id, ConnectionState::Connected);
354            }
355            FromSwarm::ConnectionClosed(ConnectionClosed {
356                peer_id,
357                remaining_established,
358                connection_id,
359                ..
360            }) => {
361                if let Entry::Occupied(mut entry) = self.connected_peers.entry(peer_id) {
362                    let list = entry.get_mut();
363                    list.remove(&connection_id);
364                    if list.is_empty() {
365                        entry.remove();
366                    }
367                }
368
369                self.connection_state.remove(&connection_id);
370
371                if remaining_established == 0 && !self.connected_peers.contains_key(&peer_id) {
372                    // Last connection, close it
373                    self.peer_disconnected(peer_id);
374                }
375            }
376            FromSwarm::DialFailure(DialFailure {
377                peer_id,
378                error,
379                connection_id: _,
380                ..
381            }) => {
382                let Some(peer_id) = peer_id else {
383                    return;
384                };
385
386                trace!("inject_dial_failure {}, {:?}", peer_id, error);
387                let dials = &mut self.dials;
388                if let Some(mut dials) = dials.remove(&peer_id) {
389                    while let Some((_id, sender)) = dials.pop() {
390                        let _ = sender.send(Err(error.to_string()));
391                    }
392                }
393            }
394            _ => {}
395        }
396    }
397
398    fn on_connection_handler_event(
399        &mut self,
400        peer_id: PeerId,
401        connection: ConnectionId,
402        event: HandlerEvent,
403    ) {
404        trace!(
405            "on_connection_handler_event from {}, event: {:?}",
406            peer_id,
407            event
408        );
409        match event {
410            HandlerEvent::Connected { protocol } => {
411                if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
412                    let state = entry.get_mut();
413                    let _old_state = *state;
414                    *state = ConnectionState::Responsive(protocol);
415
416                    self.peer_connected(peer_id);
417
418                    let dials = &mut self.dials;
419                    if let Some(mut dials) = dials.remove(&peer_id) {
420                        while let Some((id, sender)) = dials.pop() {
421                            if let Err(err) = sender.send(Ok((connection, Some(protocol)))) {
422                                warn!("dial:{}: failed to send dial response {:?}", id, err)
423                            }
424                        }
425                    }
426                }
427            }
428            HandlerEvent::ProtocolNotSuppported => {
429                if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
430                    *entry.get_mut() = ConnectionState::Unresponsive;
431
432                    let dials = &mut self.dials;
433                    if let Some(mut dials) = dials.remove(&peer_id) {
434                        while let Some((id, sender)) = dials.pop() {
435                            if let Err(err) = sender.send(Err("protocol not supported".into())) {
436                                warn!("dial:{} failed to send dial response {:?}", id, err)
437                            }
438                        }
439                    }
440                }
441            }
442            HandlerEvent::Message { message, protocol } => {
443                // mark peer as responsive
444                if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
445                    let state = entry.get_mut();
446                    let old_state = *state;
447                    if !matches!(old_state, ConnectionState::Responsive(_)) {
448                        *state = ConnectionState::Responsive(protocol);
449                        self.peer_connected(peer_id);
450                    }
451                }
452                self.receive_message(peer_id, message);
453            }
454            HandlerEvent::FailedToSendMessage { .. } => {
455                // Handle
456            }
457        }
458    }
459
460    #[allow(clippy::type_complexity)]
461    fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
462        // limit work
463        for _ in 0..50 {
464            match futures::ready!(Pin::new(&mut self.network).poll(cx)) {
465                OutEvent::Dial { peer, response, id } => {
466                    let connections = match self.connected_peers.get(&peer) {
467                        Some(connections) => connections,
468                        None => {
469                            self.dials.entry(peer).or_default().push((id, response));
470
471                            return Poll::Ready(ToSwarm::Dial {
472                                opts: DialOpts::peer_id(peer).build(),
473                            });
474                        }
475                    };
476
477                    let first_responseive = self
478                        .connection_state
479                        .iter()
480                        .filter(|(k, _)| connections.contains(k))
481                        .collect::<Vec<_>>();
482
483                    if let Some((conn, state)) = first_responseive
484                        .iter()
485                        .find(|(_, state)| matches!(state, ConnectionState::Responsive(_)))
486                    {
487                        if let ConnectionState::Responsive(protocol_id) = state {
488                            if let Err(err) = response.send(Ok((**conn, Some(*protocol_id)))) {
489                                debug!("dial:{}: failed to send dial response {:?}", id, err)
490                            }
491                        }
492                        continue;
493                    }
494
495                    if let Some((conn, _)) = first_responseive.iter().find(|(_, state)| {
496                        matches!(
497                            state,
498                            ConnectionState::Pending | ConnectionState::Unresponsive
499                        )
500                    }) {
501                        if let Err(err) = response.send(Ok((**conn, None))) {
502                            debug!("dial:{}: failed to send dial response {:?}", id, err)
503                        }
504                        continue;
505                    }
506                }
507                OutEvent::GenerateEvent(ev) => return Poll::Ready(ToSwarm::GenerateEvent(ev)),
508                OutEvent::SendMessage {
509                    peer,
510                    message,
511                    response,
512                    connection_id,
513                } => {
514                    tracing::debug!("send message to {}", peer);
515                    return Poll::Ready(ToSwarm::NotifyHandler {
516                        peer_id: peer,
517                        handler: NotifyHandler::One(connection_id),
518                        event: handler::BitswapHandlerIn::Message(message, response),
519                    });
520                }
521                OutEvent::ProtectPeer { peer } => {
522                    if self.connected_peers.contains_key(&peer) {
523                        return Poll::Ready(ToSwarm::NotifyHandler {
524                            peer_id: peer,
525                            handler: NotifyHandler::Any,
526                            event: handler::BitswapHandlerIn::Protect,
527                        });
528                    }
529                }
530                OutEvent::UnprotectPeer { peer, response } => {
531                    if self.connected_peers.contains_key(&peer) {
532                        let _ = response.send(true);
533                        return Poll::Ready(ToSwarm::NotifyHandler {
534                            peer_id: peer,
535                            handler: NotifyHandler::Any,
536                            event: handler::BitswapHandlerIn::Unprotect,
537                        });
538                    }
539                    let _ = response.send(false);
540                }
541            }
542        }
543
544        Poll::Pending
545    }
546}
547
548pub fn verify_hash(cid: &Cid, bytes: &[u8]) -> Option<bool> {
549    use cid::multihash::{Code, MultihashDigest};
550    Code::try_from(cid.hash().code()).ok().map(|code| {
551        let calculated_hash = code.digest(bytes);
552        &calculated_hash == cid.hash()
553    })
554}
555
556#[cfg(test)]
557mod tests {
558    use std::sync::Arc;
559
560    use anyhow::anyhow;
561    use futures::prelude::*;
562    use libp2p::identity::Keypair;
563    use libp2p::swarm::SwarmEvent;
564    use libp2p::Swarm;
565    use libp2p::SwarmBuilder;
566    use tokio::sync::{mpsc, RwLock};
567    use tracing::{info, trace};
568    use tracing_subscriber::{fmt, prelude::*, EnvFilter};
569
570    use super::*;
571    use crate::Block;
572
573    fn assert_send<T: Send + Sync>() {}
574
575    #[derive(Debug, Clone)]
576    struct DummyStore;
577
578    #[async_trait]
579    impl Store for DummyStore {
580        async fn get_size(&self, _: &Cid) -> Result<usize> {
581            todo!()
582        }
583        async fn get(&self, _: &Cid) -> Result<Block> {
584            todo!()
585        }
586        async fn has(&self, _: &Cid) -> Result<bool> {
587            todo!()
588        }
589    }
590
591    #[test]
592    fn test_traits() {
593        assert_send::<Bitswap<DummyStore>>();
594        assert_send::<&Bitswap<DummyStore>>();
595    }
596
597    #[derive(Debug, Clone, Default)]
598    struct TestStore {
599        store: Arc<RwLock<AHashMap<Cid, Block>>>,
600    }
601
602    #[async_trait]
603    impl Store for TestStore {
604        async fn get_size(&self, cid: &Cid) -> Result<usize> {
605            self.store
606                .read()
607                .await
608                .get(cid)
609                .map(|block| block.data().len())
610                .ok_or_else(|| anyhow!("missing"))
611        }
612
613        async fn get(&self, cid: &Cid) -> Result<Block> {
614            self.store
615                .read()
616                .await
617                .get(cid)
618                .cloned()
619                .ok_or_else(|| anyhow!("missing"))
620        }
621
622        async fn has(&self, cid: &Cid) -> Result<bool> {
623            Ok(self.store.read().await.contains_key(cid))
624        }
625    }
626
627    #[tokio::test]
628    async fn test_get_1_block() {
629        tracing_subscriber::registry()
630            .with(fmt::layer().pretty())
631            .with(EnvFilter::from_default_env())
632            .init();
633
634        get_block::<1>().await;
635    }
636
637    #[tokio::test]
638    async fn test_get_2_block() {
639        get_block::<2>().await;
640    }
641
642    #[tokio::test]
643    async fn test_get_4_block() {
644        get_block::<4>().await;
645    }
646
647    #[tokio::test]
648    async fn test_get_64_block() {
649        get_block::<64>().await;
650    }
651
652    #[tokio::test]
653    async fn test_get_65_block() {
654        get_block::<65>().await;
655    }
656
657    #[tokio::test]
658    async fn test_get_66_block() {
659        get_block::<66>().await;
660    }
661
662    #[tokio::test]
663    async fn test_get_128_block() {
664        tracing_subscriber::registry()
665            .with(fmt::layer().pretty())
666            .with(EnvFilter::from_default_env())
667            .init();
668
669        get_block::<128>().await;
670    }
671
672    #[tokio::test]
673    async fn test_get_1024_block() {
674        get_block::<1024>().await;
675    }
676
677    async fn get_block<const N: usize>() {
678        let kp = Keypair::generate_ed25519();
679        let store1 = TestStore::default();
680        let bs1 = Bitswap::new(kp.public().to_peer_id(), store1.clone(), Config::default()).await;
681
682        trace!("peer1: {}", kp.public().to_peer_id());
683
684        let mut swarm1 = SwarmBuilder::with_existing_identity(kp)
685            .with_tokio()
686            .with_tcp(
687                libp2p::tcp::Config::default(),
688                libp2p::noise::Config::new,
689                libp2p::yamux::Config::default,
690            )
691            .unwrap()
692            .with_behaviour(|_| bs1)
693            .unwrap()
694            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
695            .build();
696
697        let blocks = (0..N).map(|_| create_random_block_v1()).collect::<Vec<_>>();
698
699        for block in &blocks {
700            store1
701                .store
702                .write()
703                .await
704                .insert(*block.cid(), block.clone());
705        }
706
707        let (tx, mut rx) = mpsc::channel::<Multiaddr>(1);
708
709        Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
710
711        let peer1 = tokio::task::spawn(async move {
712            while swarm1.next().now_or_never().is_some() {}
713            let listeners: Vec<_> = Swarm::listeners(&swarm1).collect();
714            for l in listeners {
715                tx.send(l.clone()).await.unwrap();
716            }
717
718            loop {
719                let ev = swarm1.next().await;
720                trace!("peer1: {:?}", ev);
721            }
722        });
723
724        info!("peer2: startup");
725        let kp = Keypair::generate_ed25519();
726        let store2 = TestStore::default();
727        let bs2 = Bitswap::new(kp.public().to_peer_id(), store2.clone(), Config::default()).await;
728        trace!("peer2: {}", kp.public().to_peer_id());
729        let mut swarm2 = SwarmBuilder::with_existing_identity(kp)
730            .with_tokio()
731            .with_tcp(
732                libp2p::tcp::Config::default(),
733                libp2p::noise::Config::new,
734                libp2p::yamux::Config::default,
735            )
736            .unwrap()
737            .with_behaviour(|_| bs2)
738            .unwrap()
739            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
740            .build();
741
742        let swarm2_bs_client = swarm2.behaviour().client().clone();
743        let peer2 = tokio::task::spawn(async move {
744            let addr = rx.recv().await.unwrap();
745            info!("peer2: dialing peer1 at {}", addr);
746            Swarm::dial(&mut swarm2, addr).unwrap();
747
748            loop {
749                match swarm2.next().await {
750                    Some(SwarmEvent::ConnectionEstablished {
751                        connection_id,
752                        peer_id,
753                        ..
754                    }) => {
755                        trace!("peer2: connected to {} to {connection_id}", peer_id);
756                    }
757                    ev => trace!("peer2: {:?}", ev),
758                }
759            }
760        });
761
762        {
763            info!("peer2: fetching block - ordered");
764            let blocks = blocks.clone();
765            let mut futs = Vec::new();
766            for block in &blocks {
767                let client = swarm2_bs_client.clone();
768                futs.push(async move {
769                    // Should work, because retrieved
770                    let received_block = client.get_block(block.cid()).await?;
771
772                    info!("peer2: received block");
773                    Ok::<Block, anyhow::Error>(received_block)
774                });
775            }
776
777            let results = futures::future::join_all(futs).await;
778            for (block, result) in blocks.into_iter().zip(results.into_iter()) {
779                let received_block = result.unwrap();
780                assert_eq!(block, received_block);
781            }
782        }
783
784        {
785            info!("peer2: fetching block - unordered");
786            let mut blocks = blocks.clone();
787            let futs = futures::stream::FuturesUnordered::new();
788            for block in &blocks {
789                let client = swarm2_bs_client.clone();
790                futs.push(async move {
791                    // Should work, because retrieved
792                    let received_block = client.get_block(block.cid()).await?;
793
794                    info!("peer2: received block");
795                    Ok::<Block, anyhow::Error>(received_block)
796                });
797            }
798
799            let mut results = futs.try_collect::<Vec<_>>().await.unwrap();
800            results.sort();
801            blocks.sort();
802            for (block, received_block) in blocks.into_iter().zip(results.into_iter()) {
803                assert_eq!(block, received_block);
804            }
805        }
806
807        {
808            info!("peer2: fetching block - session");
809            let mut blocks = blocks.clone();
810            let ids: Vec<_> = blocks.iter().map(|b| *b.cid()).collect();
811            let session = swarm2_bs_client.new_session().await;
812            let (blocks_receiver, _guard) = session.get_blocks(&ids).await.unwrap().into_parts();
813            let mut results: Vec<_> = blocks_receiver.collect().await;
814
815            results.sort();
816            blocks.sort();
817            for (block, received_block) in blocks.into_iter().zip(results.into_iter()) {
818                assert_eq!(block, received_block);
819            }
820        }
821
822        info!("--shutting down peer1");
823        peer1.abort();
824        peer1.await.ok();
825
826        info!("--shutting down peer2");
827        peer2.abort();
828        peer2.await.ok();
829    }
830}