rust_ipfs/p2p/
bitswap.rs

1mod message;
2mod pb;
3mod prefix;
4mod protocol;
5mod sessions;
6
7use std::{
8    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
9    fmt::Debug,
10    task::{Context, Poll, Waker},
11    time::Duration,
12};
13
14use futures::StreamExt;
15use ipld_core::cid::Cid;
16use libp2p::core::transport::PortUse;
17use libp2p::{
18    core::Endpoint,
19    swarm::{
20        behaviour::ConnectionEstablished, dial_opts::DialOpts, ConnectionClosed, ConnectionDenied,
21        ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, NotifyHandler, OneShotHandler,
22        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23    },
24    Multiaddr, PeerId,
25};
26use pollable_map::stream::StreamMap;
27
28mod bitswap_pb {
29    pub use super::pb::bitswap_pb::Message;
30    pub mod message {
31        use super::super::pb::bitswap_pb::mod_Message as message;
32        pub use message::mod_Wantlist as wantlist;
33        pub use message::Wantlist;
34        pub use message::{Block, BlockPresence, BlockPresenceType};
35    }
36}
37
38use crate::{repo::Repo, Block};
39
40use self::{
41    message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType},
42    protocol::{BitswapProtocol, Message},
43    sessions::{HaveSession, HaveSessionEvent, WantSession, WantSessionEvent},
44};
45
46const CAP_THRESHOLD: usize = 100;
47
48#[derive(Default, Debug, Clone, Copy)]
49pub struct Config {
50    pub max_wanted_blocks: Option<u8>,
51    pub timeout: Option<Duration>,
52}
53
54#[derive(Debug)]
55pub enum Event {
56    NeedBlock { cid: Cid },
57    BlockRetrieved { cid: Cid },
58    CancelBlock { cid: Cid },
59}
60
61pub struct Behaviour {
62    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
63    connections: HashMap<PeerId, HashSet<ConnectionId>>,
64    blacklist_connections: HashMap<PeerId, BTreeSet<ConnectionId>>,
65    store: Repo,
66    want_session: StreamMap<Cid, WantSession>,
67    have_session: StreamMap<Cid, HaveSession>,
68    waker: Option<Waker>,
69}
70
71impl Behaviour {
72    pub fn new(store: &Repo) -> Self {
73        Self {
74            events: Default::default(),
75            connections: Default::default(),
76            blacklist_connections: Default::default(),
77            store: store.clone(),
78            want_session: StreamMap::new(),
79            have_session: StreamMap::new(),
80            waker: None,
81        }
82    }
83
84    pub fn get(&mut self, cid: &Cid, providers: &[PeerId], timeout: Option<Duration>) {
85        self.gets(vec![*cid], providers, timeout)
86    }
87
88    pub fn gets(&mut self, cids: Vec<Cid>, providers: &[PeerId], timeout: Option<Duration>) {
89        let peers = match providers.is_empty() {
90            true => {
91                //If no providers are provided, we can send requests connected peers
92                self.connections
93                    .keys()
94                    .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
95                    .copied()
96                    .collect::<Vec<_>>()
97            }
98            false => {
99                let mut connected = VecDeque::new();
100                for peer_id in providers
101                    .iter()
102                    .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
103                {
104                    if self.connections.contains_key(peer_id) {
105                        connected.push_back(*peer_id);
106                        continue;
107                    }
108                    let opts = DialOpts::peer_id(*peer_id).build();
109
110                    self.events.push_back(ToSwarm::Dial { opts });
111                }
112                Vec::from_iter(connected)
113            }
114        };
115
116        for cid in &cids {
117            if self.want_session.contains_key(cid) {
118                continue;
119            }
120            let session = WantSession::new(&self.store, *cid, timeout);
121            self.want_session.insert(*cid, session);
122        }
123
124        if peers.is_empty() {
125            // Since no connections, peers or providers are provided, we need to notify swarm to attempt a form of content discovery
126            for cid in cids {
127                self.events
128                    .push_back(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
129            }
130            return;
131        }
132
133        self.send_wants(peers, cids)
134    }
135
136    pub fn local_wantlist(&self) -> Vec<Cid> {
137        self.want_session.keys().copied().collect()
138    }
139
140    pub fn peer_wantlist(&self, peer_id: PeerId) -> Vec<Cid> {
141        let mut blocks = HashSet::new();
142
143        for (cid, session) in self.have_session.iter() {
144            if session.has_peer(peer_id) {
145                blocks.insert(*cid);
146            }
147        }
148
149        Vec::from_iter(blocks)
150    }
151
152    // Note: This is called specifically to cancel the request and not just emitting a request
153    //       after receiving a request.
154    pub fn cancel(&mut self, cid: Cid) {
155        if self.want_session.remove(&cid).is_none() {
156            return;
157        }
158
159        self.events
160            .push_back(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
161
162        if let Some(waker) = self.waker.take() {
163            waker.wake();
164        }
165    }
166
167    // This will notify connected peers who have the bitswap protocol that we have this block
168    // if they wanted it
169    // TODO: Maybe have a general `Session` where we could collectively notify a peer of new blocks
170    //       in a single message
171    pub fn notify_new_blocks(&mut self, cid: impl IntoIterator<Item = Cid>) {
172        let blocks = cid.into_iter().collect::<Vec<_>>();
173
174        for (cid, session) in self.have_session.iter_mut() {
175            if !blocks.contains(cid) {
176                continue;
177            }
178
179            session.reset();
180        }
181
182        if !self.have_session.is_empty() {
183            if let Some(waker) = self.waker.take() {
184                waker.wake();
185            }
186        }
187    }
188
189    fn on_connection_established(
190        &mut self,
191        ConnectionEstablished {
192            connection_id,
193            peer_id,
194            other_established,
195            ..
196        }: ConnectionEstablished,
197    ) {
198        tracing::info!(%peer_id, %connection_id, "connection established");
199        self.connections
200            .entry(peer_id)
201            .or_default()
202            .insert(connection_id);
203
204        if other_established > 0 {
205            return;
206        }
207
208        self.send_wants(vec![peer_id], vec![]);
209    }
210
211    fn on_connection_close(
212        &mut self,
213        ConnectionClosed {
214            connection_id,
215            peer_id,
216            remaining_established,
217            ..
218        }: ConnectionClosed,
219    ) {
220        tracing::debug!(%connection_id, %peer_id, "connection closed");
221        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
222            let list = entry.get_mut();
223            list.remove(&connection_id);
224            if list.is_empty() {
225                entry.remove();
226            }
227        }
228
229        if let Entry::Occupied(mut entry) = self.blacklist_connections.entry(peer_id) {
230            let list = entry.get_mut();
231            list.remove(&connection_id);
232            if list.is_empty() {
233                entry.remove();
234            }
235        }
236
237        if remaining_established == 0 {
238            tracing::debug!(%connection_id, %peer_id, "peer disconnected");
239            for (cid, session) in self.want_session.iter_mut() {
240                tracing::debug!(session=%*cid, %peer_id, "marking peer as disconnected");
241                session.peer_disconnected(peer_id);
242            }
243        }
244    }
245
246    fn on_dial_failure(
247        &mut self,
248        DialFailure {
249            connection_id,
250            peer_id,
251            error,
252        }: DialFailure,
253    ) {
254        let Some(peer_id) = peer_id else {
255            return;
256        };
257
258        tracing::warn!(%peer_id, %connection_id, error = %error, "unable to dial peer");
259
260        if self.connections.contains_key(&peer_id) {
261            // Since there is still an existing connection for the peer
262            // we can ignore the dial failure
263            return;
264        }
265
266        for session in self.want_session.values_mut() {
267            session.remove_peer(peer_id);
268        }
269
270        for session in self.have_session.values_mut() {
271            session.remove_peer(peer_id);
272        }
273    }
274
275    fn send_wants(&mut self, peers: Vec<PeerId>, cids: Vec<Cid>) {
276        if let Some(waker) = self.waker.take() {
277            waker.wake();
278        }
279
280        match cids.is_empty() {
281            false => {
282                for cid in cids {
283                    let Some(session) = self.want_session.get_mut(&cid) else {
284                        continue;
285                    };
286                    for peer_id in &peers {
287                        session.send_have_block(*peer_id)
288                    }
289                }
290            }
291            true => {
292                for session in self.want_session.values_mut() {
293                    for peer_id in &peers {
294                        session.send_have_block(*peer_id)
295                    }
296                }
297            }
298        }
299    }
300}
301
302impl NetworkBehaviour for Behaviour {
303    type ConnectionHandler = OneShotHandler<BitswapProtocol, BitswapMessage, Message>;
304    type ToSwarm = Event;
305
306    fn handle_pending_inbound_connection(
307        &mut self,
308        _: ConnectionId,
309        _: &Multiaddr,
310        _: &Multiaddr,
311    ) -> Result<(), ConnectionDenied> {
312        Ok(())
313    }
314
315    fn handle_pending_outbound_connection(
316        &mut self,
317        _: ConnectionId,
318        _: Option<PeerId>,
319        _: &[Multiaddr],
320        _: Endpoint,
321    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
322        Ok(vec![])
323    }
324
325    fn handle_established_inbound_connection(
326        &mut self,
327        _: ConnectionId,
328        _: PeerId,
329        _: &Multiaddr,
330        _: &Multiaddr,
331    ) -> Result<THandler<Self>, ConnectionDenied> {
332        Ok(OneShotHandler::default())
333    }
334
335    fn handle_established_outbound_connection(
336        &mut self,
337        _: ConnectionId,
338        _: PeerId,
339        _: &Multiaddr,
340        _: Endpoint,
341        _: PortUse,
342    ) -> Result<THandler<Self>, ConnectionDenied> {
343        Ok(OneShotHandler::default())
344    }
345
346    fn on_connection_handler_event(
347        &mut self,
348        peer_id: PeerId,
349        connection_id: ConnectionId,
350        event: THandlerOutEvent<Self>,
351    ) {
352        let message = match event {
353            Ok(Message::Receive { message }) => {
354                tracing::trace!(%peer_id, %connection_id, "message received");
355                if let Entry::Occupied(mut e) = self.blacklist_connections.entry(peer_id) {
356                    let list = e.get_mut();
357                    list.remove(&connection_id);
358                    if list.is_empty() {
359                        e.remove();
360                    }
361                }
362
363                message
364            }
365            Ok(Message::Sent) => {
366                tracing::trace!(%peer_id, %connection_id, "message sent");
367                return;
368            }
369            Err(e) => {
370                tracing::error!(%peer_id, %connection_id, error = %e, "error sending or receiving message");
371                //TODO: Depending on the underlining error, maybe blacklist the peer from further sending/receiving
372                //      until a valid response or request is produced?
373                self.blacklist_connections
374                    .entry(peer_id)
375                    .or_default()
376                    .insert(connection_id);
377                return;
378            }
379        };
380
381        if message.is_empty() {
382            tracing::warn!(%peer_id, %connection_id, "received an empty message");
383            return;
384        }
385
386        let BitswapMessage {
387            requests,
388            responses,
389            ..
390        } = message;
391
392        for request in requests {
393            let BitswapRequest {
394                ty,
395                cid,
396                send_dont_have,
397                cancel,
398                priority: _,
399            } = &request;
400
401            if !self.have_session.contains_key(cid) && !cancel {
402                // Lets build out have new sessions
403                let have_session = HaveSession::new(&self.store, *cid);
404                self.have_session.insert(*cid, have_session);
405            }
406
407            let Some(session) = self.have_session.get_mut(cid) else {
408                if !*cancel {
409                    tracing::warn!(block = %cid, %peer_id, %connection_id, "have session does not exist. Skipping request");
410                }
411                continue;
412            };
413
414            if *cancel {
415                session.cancel(peer_id);
416                continue;
417            }
418
419            match ty {
420                RequestType::Have => {
421                    session.want_block(peer_id, *send_dont_have);
422                }
423                RequestType::Block => {
424                    session.need_block(peer_id);
425                }
426            }
427        }
428
429        for (cid, response) in responses {
430            let Some(session) = self.want_session.get_mut(&cid) else {
431                tracing::warn!(block = %cid, %peer_id, %connection_id, "want session does not exist. Skipping response");
432                continue;
433            };
434            match response {
435                BitswapResponse::Have(have) => match have {
436                    true => {
437                        session.has_block(peer_id);
438                    }
439                    false => {
440                        session.dont_have_block(peer_id);
441                    }
442                },
443                BitswapResponse::Block(bytes) => {
444                    let Ok(block) = Block::new(cid, bytes) else {
445                        // The block is invalid so we will notify the session that we still dont have the block
446                        // from said peer
447                        // TODO: In the future, mark the peer as a bad sender
448                        tracing::error!(block = %cid, %peer_id, %connection_id, "block is invalid or corrupted");
449                        session.dont_have_block(peer_id);
450                        continue;
451                    };
452                    session.put_block(peer_id, block);
453                }
454            }
455        }
456    }
457
458    fn on_swarm_event(&mut self, event: FromSwarm) {
459        match event {
460            FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
461            FromSwarm::ConnectionClosed(event) => self.on_connection_close(event),
462            FromSwarm::DialFailure(event) => self.on_dial_failure(event),
463            _ => {}
464        }
465    }
466
467    fn poll(&mut self, ctx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
468        if let Some(event) = self.events.pop_front() {
469            return Poll::Ready(event);
470        } else if self.events.capacity() > CAP_THRESHOLD {
471            self.events.shrink_to_fit();
472        }
473
474        while let Poll::Ready(Some((cid, event))) = self.have_session.poll_next_unpin(ctx) {
475            match event {
476                HaveSessionEvent::Have { peer_id } => {
477                    return Poll::Ready(ToSwarm::NotifyHandler {
478                        peer_id,
479                        handler: NotifyHandler::Any,
480                        event: BitswapMessage::default()
481                            .add_response(cid, BitswapResponse::Have(true)),
482                    });
483                }
484                HaveSessionEvent::DontHave { peer_id } => {
485                    return Poll::Ready(ToSwarm::NotifyHandler {
486                        peer_id,
487                        handler: NotifyHandler::Any,
488                        event: BitswapMessage::default()
489                            .add_response(cid, BitswapResponse::Have(false)),
490                    })
491                }
492                HaveSessionEvent::Block { peer_id, bytes } => {
493                    return Poll::Ready(ToSwarm::NotifyHandler {
494                        peer_id,
495                        handler: NotifyHandler::Any,
496                        event: BitswapMessage::default()
497                            .add_response(cid, BitswapResponse::Block(bytes)),
498                    })
499                }
500                HaveSessionEvent::Cancelled => {
501                    //TODO: Maybe notify peers from this session about any cancelled request?
502                    self.have_session.remove(&cid);
503                }
504            };
505        }
506
507        match self.want_session.poll_next_unpin(ctx) {
508            Poll::Ready(Some((cid, event))) => match event {
509                WantSessionEvent::SendWant { peer_id } => {
510                    return Poll::Ready(ToSwarm::NotifyHandler {
511                        peer_id,
512                        handler: NotifyHandler::Any,
513                        event: BitswapMessage::default()
514                            .add_request(BitswapRequest::have(cid).send_dont_have(true)),
515                    });
516                }
517                WantSessionEvent::SendCancel { peer_id } => {
518                    return Poll::Ready(ToSwarm::NotifyHandler {
519                        peer_id,
520                        handler: NotifyHandler::Any,
521                        event: BitswapMessage::default().add_request(BitswapRequest::cancel(cid)),
522                    });
523                }
524                WantSessionEvent::SendBlock { peer_id } => {
525                    ctx.waker().wake_by_ref();
526
527                    return Poll::Ready(ToSwarm::NotifyHandler {
528                        peer_id,
529                        handler: NotifyHandler::Any,
530                        event: BitswapMessage::default()
531                            .add_request(BitswapRequest::block(cid).send_dont_have(true)),
532                    });
533                }
534                WantSessionEvent::NeedBlock => {
535                    return Poll::Ready(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
536                }
537                WantSessionEvent::BlockStored => {
538                    return Poll::Ready(ToSwarm::GenerateEvent(Event::BlockRetrieved { cid }))
539                }
540                WantSessionEvent::Dial { peer_id } => {
541                    let opts = DialOpts::peer_id(peer_id).build();
542                    return Poll::Ready(ToSwarm::Dial { opts });
543                }
544                WantSessionEvent::Cancelled => {
545                    self.want_session.remove(&cid);
546                    return Poll::Ready(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
547                }
548            },
549            Poll::Pending | Poll::Ready(None) => {}
550        }
551
552        self.waker = Some(ctx.waker().clone());
553
554        Poll::Pending
555    }
556}
557
558#[cfg(test)]
559mod test {
560    use std::time::Duration;
561
562    use crate::block::BlockCodec;
563    use futures::StreamExt;
564    use ipld_core::cid::Cid;
565    use libp2p::{
566        core::{transport::MemoryTransport, upgrade::Version},
567        swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent},
568        Multiaddr, PeerId, Swarm, SwarmBuilder, Transport,
569    };
570    use multihash_codetable::{Code, MultihashDigest};
571
572    use crate::{repo::Repo, Block};
573
574    fn create_block() -> Block {
575        let data = b"hello block\n".to_vec();
576        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
577
578        Block::new_unchecked(cid, data)
579    }
580
581    #[tokio::test]
582    async fn exchange_blocks() -> anyhow::Result<()> {
583        let (_, _, mut swarm1, repo) = build_swarm().await;
584        let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
585
586        let block = create_block();
587
588        let cid = *block.cid();
589
590        repo.put_block(&block).await?;
591
592        let opt = DialOpts::peer_id(peer2)
593            .addresses(vec![addr2.clone()])
594            .build();
595
596        swarm1.dial(opt)?;
597
598        loop {
599            futures::select! {
600                event = swarm1.select_next_some() => {
601                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
602                        assert_eq!(peer_id, peer2);
603                        break;
604                    }
605                }
606                _ = swarm2.next() => {}
607            }
608        }
609
610        swarm2.behaviour_mut().bitswap.get(&cid, &[], None);
611
612        loop {
613            tokio::select! {
614                _ = swarm1.next() => {}
615                e = swarm2.select_next_some() => {
616                    match e {
617                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) => {
618                            assert_eq!(inner_cid, cid);
619                        }
620                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
621                            assert_eq!(inner_cid, cid);
622                            unreachable!("exchange should not timeout");
623                        }
624                        _ => {}
625                    }
626                },
627                Ok(true) = repo2.contains(&cid) => {
628                    break;
629                }
630            }
631        }
632
633        let b = repo2
634            .get_block_now(&cid)
635            .await
636            .unwrap()
637            .expect("block exist");
638
639        assert_eq!(b, block);
640
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn notify_swarm() -> anyhow::Result<()> {
646        let (_, _, mut swarm1, _) = build_swarm().await;
647
648        let block = create_block();
649
650        let cid = *block.cid();
651
652        swarm1
653            .behaviour_mut()
654            .bitswap
655            .get(&cid, &[], Some(Duration::from_millis(500)));
656
657        let mut notified_counter = 0;
658
659        loop {
660            tokio::select! {
661                e = swarm1.select_next_some() => {
662                    match e {
663                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) => {
664                            assert_eq!(inner_cid, cid);
665                            notified_counter += 1;
666                        }
667                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
668                            assert_eq!(inner_cid, cid);
669                            unreachable!()
670                        }
671                        _ => {}
672                    }
673                },
674            }
675
676            if notified_counter == 2 {
677                break;
678            }
679        }
680
681        Ok(())
682    }
683
684    #[tokio::test]
685    async fn bitswap_timeout() -> anyhow::Result<()> {
686        let (_, _, mut swarm1, _) = build_swarm().await;
687        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
688
689        let block = create_block();
690
691        let cid = *block.cid();
692
693        let opt = DialOpts::peer_id(peer2)
694            .addresses(vec![addr2.clone()])
695            .build();
696
697        swarm1.dial(opt)?;
698
699        loop {
700            futures::select! {
701                event = swarm1.select_next_some() => {
702                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
703                        assert_eq!(peer_id, peer2);
704                        break;
705                    }
706                }
707                _ = swarm2.next() => {}
708            }
709        }
710
711        swarm2
712            .behaviour_mut()
713            .bitswap
714            .get(&cid, &[], Some(Duration::from_millis(150)));
715
716        loop {
717            tokio::select! {
718                _ = swarm1.next() => {}
719                e = swarm2.select_next_some() => {
720                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
721                        assert_eq!(inner_cid, cid);
722                        break;
723                    }
724                },
725            }
726        }
727
728        Ok(())
729    }
730
731    #[tokio::test]
732    async fn exchange_blocks_with_explicit_peer() -> anyhow::Result<()> {
733        let (peer1, _, mut swarm1, repo) = build_swarm().await;
734        let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
735
736        let block = create_block();
737
738        let cid = *block.cid();
739
740        repo.put_block(&block).await?;
741
742        let opt = DialOpts::peer_id(peer2)
743            .addresses(vec![addr2.clone()])
744            .build();
745
746        swarm1.dial(opt)?;
747
748        loop {
749            futures::select! {
750                event = swarm1.select_next_some() => {
751                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
752                        assert_eq!(peer_id, peer2);
753                        break;
754                    }
755                }
756                _ = swarm2.next() => {}
757            }
758        }
759
760        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
761
762        loop {
763            tokio::select! {
764                _ = swarm1.next() => {}
765                e = swarm2.select_next_some() => {
766                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
767                        assert_eq!(inner_cid, cid);
768                    }
769                },
770                Ok(true) = repo2.contains(&cid) => {
771                    break;
772                }
773            }
774        }
775
776        let b = repo2
777            .get_block_now(&cid)
778            .await
779            .unwrap()
780            .expect("block exist");
781
782        assert_eq!(b, block);
783
784        Ok(())
785    }
786
787    #[tokio::test]
788    async fn notify_after_block_exchange() -> anyhow::Result<()> {
789        let (peer1, _, mut swarm1, repo) = build_swarm().await;
790        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
791        let (peer3, addr3, mut swarm3, repo3) = build_swarm().await;
792
793        let block = create_block();
794
795        let cid = *block.cid();
796
797        repo.put_block(&block).await?;
798
799        let opt = DialOpts::peer_id(peer2)
800            .addresses(vec![addr2.clone()])
801            .build();
802        swarm1.dial(opt)?;
803
804        let opt = DialOpts::peer_id(peer3)
805            .addresses(vec![addr3.clone()])
806            .build();
807
808        swarm2.dial(opt)?;
809        let mut peer_1_connected = false;
810        let mut peer_2_connected = false;
811        let mut peer_3_connected = false;
812
813        loop {
814            futures::select! {
815                event = swarm1.select_next_some() => {
816                    if let SwarmEvent::ConnectionEstablished { .. } = event {
817                        peer_1_connected = true;
818                    }
819                }
820                event = swarm2.select_next_some() => {
821                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
822                        if peer_id == peer1 {
823                            peer_2_connected = true;
824                        }
825                    }
826                }
827
828                event = swarm3.select_next_some() => {
829                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
830                        assert_eq!(peer_id, peer2);
831                        peer_3_connected = true;
832                    }
833                }
834            }
835            if peer_1_connected && peer_2_connected && peer_3_connected {
836                break;
837            }
838        }
839        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
840        swarm3.behaviour_mut().bitswap.get(&cid, &[], None);
841
842        loop {
843            tokio::select! {
844                _ = swarm1.next() => {}
845                e = swarm2.select_next_some() => {
846                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
847                        assert_eq!(inner_cid, cid);
848                        swarm2.behaviour_mut().bitswap.notify_new_blocks(std::iter::once(cid));
849                    }
850                },
851                e = swarm3.select_next_some() => {
852                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
853                        assert_eq!(inner_cid, cid);
854                        break;
855                    }
856                },
857            }
858        }
859
860        let b = repo3
861            .get_block_now(&cid)
862            .await
863            .unwrap()
864            .expect("block exist");
865
866        assert_eq!(b, block);
867
868        Ok(())
869    }
870
871    #[tokio::test]
872    async fn cancel_block_exchange() -> anyhow::Result<()> {
873        let (_, _, mut swarm1, _) = build_swarm().await;
874
875        let block = create_block();
876
877        let cid = *block.cid();
878
879        swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
880        swarm1.behaviour_mut().bitswap.cancel(cid);
881
882        loop {
883            tokio::select! {
884                e = swarm1.select_next_some() => {
885                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
886                        assert_eq!(inner_cid, cid);
887                        break;
888                    }
889                },
890            }
891        }
892
893        Ok(())
894    }
895
896    #[tokio::test]
897    async fn local_wantlist() -> anyhow::Result<()> {
898        let (_, _, mut swarm1, _) = build_swarm().await;
899
900        let block = create_block();
901
902        let cid = *block.cid();
903
904        swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
905
906        let list = swarm1.behaviour().bitswap.local_wantlist();
907
908        assert_eq!(list[0], cid);
909
910        Ok(())
911    }
912
913    #[tokio::test]
914    async fn peer_wantlist() -> anyhow::Result<()> {
915        let (peer1, _, mut swarm1, _) = build_swarm().await;
916        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
917
918        let block = create_block();
919
920        let cid = *block.cid();
921
922        let opt = DialOpts::peer_id(peer2)
923            .addresses(vec![addr2.clone()])
924            .build();
925        swarm1.dial(opt)?;
926
927        let mut peer_1_connected = false;
928        let mut peer_2_connected = false;
929
930        loop {
931            futures::select! {
932                event = swarm1.select_next_some() => {
933                    if let SwarmEvent::ConnectionEstablished { .. } = event {
934                        peer_1_connected = true;
935                    }
936                }
937                event = swarm2.select_next_some() => {
938                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
939                        if peer_id == peer1 {
940                            peer_2_connected = true;
941                        }
942                    }
943                }
944            }
945            if peer_1_connected && peer_2_connected {
946                break;
947            }
948        }
949        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
950
951        loop {
952            tokio::select! {
953                _ = swarm1.next() => {}
954                e = swarm2.select_next_some() => {
955                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) = e {
956                        assert_eq!(inner_cid, cid);
957                        break;
958                    }
959                },
960            }
961        }
962
963        let list = swarm1.behaviour().bitswap.peer_wantlist(peer2);
964        assert_eq!(list[0], cid);
965
966        Ok(())
967    }
968
969    async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo) {
970        let repo = Repo::new_memory();
971
972        let mut swarm = SwarmBuilder::with_new_identity()
973            .with_tokio()
974            .with_other_transport(|kp| {
975                MemoryTransport::default()
976                    .upgrade(Version::V1)
977                    .authenticate(libp2p::noise::Config::new(kp).expect("valid config"))
978                    .multiplex(libp2p::yamux::Config::default())
979                    .timeout(Duration::from_secs(20))
980                    .boxed()
981            })
982            .expect("")
983            .with_behaviour(|_| Behaviour {
984                bitswap: super::Behaviour::new(&repo),
985                address_book: crate::p2p::addressbook::Behaviour::with_config(
986                    crate::p2p::addressbook::Config {
987                        store_on_connection: true,
988                        ..Default::default()
989                    },
990                ),
991            })
992            .expect("")
993            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
994            .build();
995
996        Swarm::listen_on(&mut swarm, "/memory/0".parse().unwrap()).unwrap();
997
998        if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm.next().await {
999            let peer_id = swarm.local_peer_id();
1000            return (*peer_id, address, swarm, repo);
1001        }
1002
1003        unreachable!()
1004    }
1005
1006    #[derive(NetworkBehaviour)]
1007    struct Behaviour {
1008        bitswap: super::Behaviour,
1009        address_book: crate::p2p::addressbook::Behaviour,
1010    }
1011}