Skip to main content

rust_ipfs/p2p/
bitswap.rs

1mod message;
2mod pb;
3mod prefix;
4mod protocol;
5mod sessions;
6
7use std::{
8    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
9    fmt::Debug,
10    task::{Context, Poll, Waker},
11    time::Duration,
12};
13
14use connexa::prelude::{
15    swarm::{
16        behaviour::ConnectionEstablished, dial_opts::DialOpts, ConnectionClosed, ConnectionDenied,
17        ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, NotifyHandler, OneShotHandler,
18        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
19    },
20    transport::transport::PortUse,
21    transport::Endpoint,
22    Multiaddr, PeerId,
23};
24use futures::StreamExt;
25use ipld_core::cid::Cid;
26
27use pollable_map::stream::StreamMap;
28
29mod bitswap_pb {
30    pub use super::pb::bitswap_pb::Message;
31    pub mod message {
32        use super::super::pb::bitswap_pb::mod_Message as message;
33        pub use message::mod_Wantlist as wantlist;
34        pub use message::Wantlist;
35        pub use message::{Block, BlockPresence, BlockPresenceType};
36    }
37}
38
39use self::{
40    message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType},
41    protocol::{BitswapProtocol, Message},
42    sessions::{HaveSession, HaveSessionEvent, WantSession, WantSessionEvent},
43};
44use crate::repo::DefaultStorage;
45use crate::{repo::Repo, Block};
46
47const CAP_THRESHOLD: usize = 100;
48
49#[derive(Default, Debug, Clone, Copy)]
50pub struct Config {
51    pub max_wanted_blocks: Option<u8>,
52    pub timeout: Option<Duration>,
53}
54
55#[derive(Debug)]
56pub enum Event {
57    NeedBlock { cid: Cid },
58    BlockRetrieved { cid: Cid },
59    CancelBlock { cid: Cid },
60}
61
62pub struct Behaviour {
63    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
64    connections: HashMap<PeerId, HashSet<ConnectionId>>,
65    blacklist_connections: HashMap<PeerId, BTreeSet<ConnectionId>>,
66    store: Repo<DefaultStorage>,
67    want_session: StreamMap<Cid, WantSession>,
68    have_session: StreamMap<Cid, HaveSession>,
69    waker: Option<Waker>,
70}
71
72impl Behaviour {
73    pub fn new(store: &Repo<DefaultStorage>) -> Self {
74        Self {
75            events: Default::default(),
76            connections: Default::default(),
77            blacklist_connections: Default::default(),
78            store: store.clone(),
79            want_session: StreamMap::new(),
80            have_session: StreamMap::new(),
81            waker: None,
82        }
83    }
84
85    pub fn get(&mut self, cid: &Cid, providers: &[PeerId], timeout: Option<Duration>) {
86        self.gets(vec![*cid], providers, timeout)
87    }
88
89    pub fn gets(&mut self, cids: Vec<Cid>, providers: &[PeerId], timeout: Option<Duration>) {
90        let peers = match providers.is_empty() {
91            true => {
92                //If no providers are provided, we can send requests connected peers
93                self.connections
94                    .keys()
95                    .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
96                    .copied()
97                    .collect::<Vec<_>>()
98            }
99            false => {
100                let mut connected = VecDeque::new();
101                for peer_id in providers
102                    .iter()
103                    .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
104                {
105                    if self.connections.contains_key(peer_id) {
106                        connected.push_back(*peer_id);
107                        continue;
108                    }
109                    let opts = DialOpts::peer_id(*peer_id).build();
110
111                    self.events.push_back(ToSwarm::Dial { opts });
112                }
113                Vec::from_iter(connected)
114            }
115        };
116
117        for cid in &cids {
118            if self.want_session.contains_key(cid) {
119                continue;
120            }
121            let session = WantSession::new(&self.store, *cid, timeout);
122            self.want_session.insert(*cid, session);
123        }
124
125        if peers.is_empty() {
126            // Since no connections, peers or providers are provided, we need to notify swarm to attempt a form of content discovery
127            for cid in cids {
128                self.events
129                    .push_back(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
130            }
131            return;
132        }
133
134        self.send_wants(peers, cids)
135    }
136
137    pub fn local_wantlist(&self) -> Vec<Cid> {
138        self.want_session.keys().copied().collect()
139    }
140
141    pub fn peer_wantlist(&self, peer_id: PeerId) -> Vec<Cid> {
142        let mut blocks = HashSet::new();
143
144        for (cid, session) in self.have_session.iter() {
145            if session.has_peer(peer_id) {
146                blocks.insert(*cid);
147            }
148        }
149
150        Vec::from_iter(blocks)
151    }
152
153    // Note: This is called specifically to cancel the request and not just emitting a request
154    //       after receiving a request.
155    pub fn cancel(&mut self, cid: Cid) {
156        if self.want_session.remove(&cid).is_none() {
157            return;
158        }
159
160        self.events
161            .push_back(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
162
163        if let Some(waker) = self.waker.take() {
164            waker.wake();
165        }
166    }
167
168    // This will notify connected peers who have the bitswap protocol that we have this block
169    // if they wanted it
170    // TODO: Maybe have a general `Session` where we could collectively notify a peer of new blocks
171    //       in a single message
172    pub fn notify_new_blocks(&mut self, cid: impl IntoIterator<Item = Cid>) {
173        let blocks = cid.into_iter().collect::<Vec<_>>();
174
175        for (cid, session) in self.have_session.iter_mut() {
176            if !blocks.contains(cid) {
177                continue;
178            }
179
180            session.reset();
181        }
182
183        if !self.have_session.is_empty() {
184            if let Some(waker) = self.waker.take() {
185                waker.wake();
186            }
187        }
188    }
189
190    fn on_connection_established(
191        &mut self,
192        ConnectionEstablished {
193            connection_id,
194            peer_id,
195            other_established,
196            ..
197        }: ConnectionEstablished,
198    ) {
199        tracing::info!(%peer_id, %connection_id, "connection established");
200        self.connections
201            .entry(peer_id)
202            .or_default()
203            .insert(connection_id);
204
205        if other_established > 0 {
206            return;
207        }
208
209        self.send_wants(vec![peer_id], vec![]);
210    }
211
212    fn on_connection_close(
213        &mut self,
214        ConnectionClosed {
215            connection_id,
216            peer_id,
217            remaining_established,
218            ..
219        }: ConnectionClosed,
220    ) {
221        tracing::debug!(%connection_id, %peer_id, "connection closed");
222        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
223            let list = entry.get_mut();
224            list.remove(&connection_id);
225            if list.is_empty() {
226                entry.remove();
227            }
228        }
229
230        if let Entry::Occupied(mut entry) = self.blacklist_connections.entry(peer_id) {
231            let list = entry.get_mut();
232            list.remove(&connection_id);
233            if list.is_empty() {
234                entry.remove();
235            }
236        }
237
238        if remaining_established == 0 {
239            tracing::debug!(%connection_id, %peer_id, "peer disconnected");
240            for (cid, session) in self.want_session.iter_mut() {
241                tracing::debug!(session=%*cid, %peer_id, "marking peer as disconnected");
242                session.peer_disconnected(peer_id);
243            }
244        }
245    }
246
247    fn on_dial_failure(
248        &mut self,
249        DialFailure {
250            connection_id,
251            peer_id,
252            error,
253        }: DialFailure,
254    ) {
255        let Some(peer_id) = peer_id else {
256            return;
257        };
258
259        tracing::warn!(%peer_id, %connection_id, error = %error, "unable to dial peer");
260
261        if self.connections.contains_key(&peer_id) {
262            // Since there is still an existing connection for the peer
263            // we can ignore the dial failure
264            return;
265        }
266
267        for session in self.want_session.values_mut() {
268            session.remove_peer(peer_id);
269        }
270
271        for session in self.have_session.values_mut() {
272            session.remove_peer(peer_id);
273        }
274    }
275
276    fn send_wants(&mut self, peers: Vec<PeerId>, cids: Vec<Cid>) {
277        if let Some(waker) = self.waker.take() {
278            waker.wake();
279        }
280
281        match cids.is_empty() {
282            false => {
283                for cid in cids {
284                    let Some(session) = self.want_session.get_mut(&cid) else {
285                        continue;
286                    };
287                    for peer_id in &peers {
288                        session.send_have_block(*peer_id)
289                    }
290                }
291            }
292            true => {
293                for session in self.want_session.values_mut() {
294                    for peer_id in &peers {
295                        session.send_have_block(*peer_id)
296                    }
297                }
298            }
299        }
300    }
301}
302
303impl NetworkBehaviour for Behaviour {
304    type ConnectionHandler = OneShotHandler<BitswapProtocol, BitswapMessage, Message>;
305    type ToSwarm = Event;
306
307    fn handle_pending_inbound_connection(
308        &mut self,
309        _: ConnectionId,
310        _: &Multiaddr,
311        _: &Multiaddr,
312    ) -> Result<(), ConnectionDenied> {
313        Ok(())
314    }
315
316    fn handle_pending_outbound_connection(
317        &mut self,
318        _: ConnectionId,
319        _: Option<PeerId>,
320        _: &[Multiaddr],
321        _: Endpoint,
322    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
323        Ok(vec![])
324    }
325
326    fn handle_established_inbound_connection(
327        &mut self,
328        _: ConnectionId,
329        _: PeerId,
330        _: &Multiaddr,
331        _: &Multiaddr,
332    ) -> Result<THandler<Self>, ConnectionDenied> {
333        Ok(OneShotHandler::default())
334    }
335
336    fn handle_established_outbound_connection(
337        &mut self,
338        _: ConnectionId,
339        _: PeerId,
340        _: &Multiaddr,
341        _: Endpoint,
342        _: PortUse,
343    ) -> Result<THandler<Self>, ConnectionDenied> {
344        Ok(OneShotHandler::default())
345    }
346
347    fn on_connection_handler_event(
348        &mut self,
349        peer_id: PeerId,
350        connection_id: ConnectionId,
351        event: THandlerOutEvent<Self>,
352    ) {
353        let message = match event {
354            Ok(Message::Receive { message }) => {
355                tracing::trace!(%peer_id, %connection_id, "message received");
356                if let Entry::Occupied(mut e) = self.blacklist_connections.entry(peer_id) {
357                    let list = e.get_mut();
358                    list.remove(&connection_id);
359                    if list.is_empty() {
360                        e.remove();
361                    }
362                }
363
364                message
365            }
366            Ok(Message::Sent) => {
367                tracing::trace!(%peer_id, %connection_id, "message sent");
368                return;
369            }
370            Err(e) => {
371                tracing::error!(%peer_id, %connection_id, error = %e, "error sending or receiving message");
372                //TODO: Depending on the underlining error, maybe blacklist the peer from further sending/receiving
373                //      until a valid response or request is produced?
374                self.blacklist_connections
375                    .entry(peer_id)
376                    .or_default()
377                    .insert(connection_id);
378                return;
379            }
380        };
381
382        if message.is_empty() {
383            tracing::warn!(%peer_id, %connection_id, "received an empty message");
384            return;
385        }
386
387        let BitswapMessage {
388            requests,
389            responses,
390            ..
391        } = message;
392
393        for request in requests {
394            let BitswapRequest {
395                ty,
396                cid,
397                send_dont_have,
398                cancel,
399                priority: _,
400            } = &request;
401
402            if !self.have_session.contains_key(cid) && !cancel {
403                // Lets build out have new sessions
404                let have_session = HaveSession::new(&self.store, *cid);
405                self.have_session.insert(*cid, have_session);
406            }
407
408            let Some(session) = self.have_session.get_mut(cid) else {
409                if !*cancel {
410                    tracing::warn!(block = %cid, %peer_id, %connection_id, "have session does not exist. Skipping request");
411                }
412                continue;
413            };
414
415            if *cancel {
416                session.cancel(peer_id);
417                continue;
418            }
419
420            match ty {
421                RequestType::Have => {
422                    session.want_block(peer_id, *send_dont_have);
423                }
424                RequestType::Block => {
425                    session.need_block(peer_id);
426                }
427            }
428        }
429
430        for (cid, response) in responses {
431            let Some(session) = self.want_session.get_mut(&cid) else {
432                tracing::warn!(block = %cid, %peer_id, %connection_id, "want session does not exist. Skipping response");
433                continue;
434            };
435            match response {
436                BitswapResponse::Have(have) => match have {
437                    true => {
438                        session.has_block(peer_id);
439                    }
440                    false => {
441                        session.dont_have_block(peer_id);
442                    }
443                },
444                BitswapResponse::Block(bytes) => {
445                    let Ok(block) = Block::new(cid, bytes) else {
446                        // The block is invalid so we will notify the session that we still dont have the block
447                        // from said peer
448                        // TODO: In the future, mark the peer as a bad sender
449                        tracing::error!(block = %cid, %peer_id, %connection_id, "block is invalid or corrupted");
450                        session.dont_have_block(peer_id);
451                        continue;
452                    };
453                    session.put_block(peer_id, block);
454                }
455            }
456        }
457    }
458
459    fn on_swarm_event(&mut self, event: FromSwarm) {
460        match event {
461            FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
462            FromSwarm::ConnectionClosed(event) => self.on_connection_close(event),
463            FromSwarm::DialFailure(event) => self.on_dial_failure(event),
464            _ => {}
465        }
466    }
467
468    fn poll(&mut self, ctx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
469        if let Some(event) = self.events.pop_front() {
470            return Poll::Ready(event);
471        } else if self.events.capacity() > CAP_THRESHOLD {
472            self.events.shrink_to_fit();
473        }
474
475        while let Poll::Ready(Some((cid, event))) = self.have_session.poll_next_unpin(ctx) {
476            match event {
477                HaveSessionEvent::Have { peer_id } => {
478                    return Poll::Ready(ToSwarm::NotifyHandler {
479                        peer_id,
480                        handler: NotifyHandler::Any,
481                        event: BitswapMessage::default()
482                            .add_response(cid, BitswapResponse::Have(true)),
483                    });
484                }
485                HaveSessionEvent::DontHave { peer_id } => {
486                    return Poll::Ready(ToSwarm::NotifyHandler {
487                        peer_id,
488                        handler: NotifyHandler::Any,
489                        event: BitswapMessage::default()
490                            .add_response(cid, BitswapResponse::Have(false)),
491                    });
492                }
493                HaveSessionEvent::Block { peer_id, bytes } => {
494                    return Poll::Ready(ToSwarm::NotifyHandler {
495                        peer_id,
496                        handler: NotifyHandler::Any,
497                        event: BitswapMessage::default()
498                            .add_response(cid, BitswapResponse::Block(bytes)),
499                    });
500                }
501                HaveSessionEvent::Cancelled => {
502                    //TODO: Maybe notify peers from this session about any cancelled request?
503                    self.have_session.remove(&cid);
504                }
505            };
506        }
507
508        match self.want_session.poll_next_unpin(ctx) {
509            Poll::Ready(Some((cid, event))) => match event {
510                WantSessionEvent::SendWant { peer_id } => {
511                    return Poll::Ready(ToSwarm::NotifyHandler {
512                        peer_id,
513                        handler: NotifyHandler::Any,
514                        event: BitswapMessage::default()
515                            .add_request(BitswapRequest::have(cid).send_dont_have(true)),
516                    });
517                }
518                WantSessionEvent::SendCancel { peer_id } => {
519                    return Poll::Ready(ToSwarm::NotifyHandler {
520                        peer_id,
521                        handler: NotifyHandler::Any,
522                        event: BitswapMessage::default().add_request(BitswapRequest::cancel(cid)),
523                    });
524                }
525                WantSessionEvent::SendBlock { peer_id } => {
526                    ctx.waker().wake_by_ref();
527
528                    return Poll::Ready(ToSwarm::NotifyHandler {
529                        peer_id,
530                        handler: NotifyHandler::Any,
531                        event: BitswapMessage::default()
532                            .add_request(BitswapRequest::block(cid).send_dont_have(true)),
533                    });
534                }
535                WantSessionEvent::NeedBlock => {
536                    return Poll::Ready(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
537                }
538                WantSessionEvent::BlockStored => {
539                    return Poll::Ready(ToSwarm::GenerateEvent(Event::BlockRetrieved { cid }));
540                }
541                WantSessionEvent::Dial { peer_id } => {
542                    let opts = DialOpts::peer_id(peer_id).build();
543                    return Poll::Ready(ToSwarm::Dial { opts });
544                }
545                WantSessionEvent::Cancelled => {
546                    self.want_session.remove(&cid);
547                    return Poll::Ready(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
548                }
549            },
550            Poll::Pending | Poll::Ready(None) => {}
551        }
552
553        self.waker = Some(ctx.waker().clone());
554
555        Poll::Pending
556    }
557}
558
559#[cfg(test)]
560mod test {
561    use std::time::Duration;
562
563    use crate::{block::BlockCodec, repo::DefaultStorage};
564    use connexa::prelude::{
565        swarm::{dial_opts::DialOpts, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent},
566        transport::{
567            noise,
568            transport::{MemoryTransport, Transport},
569            upgrade::Version,
570            yamux,
571        },
572        Multiaddr, PeerId,
573    };
574    use futures::StreamExt;
575    use ipld_core::cid::Cid;
576    use multihash_codetable::{Code, MultihashDigest};
577
578    use crate::{repo::Repo, Block};
579
580    fn create_block() -> Block {
581        let data = b"hello block\n".to_vec();
582        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
583
584        Block::new_unchecked(cid, data)
585    }
586
587    async fn wait_on_connection(
588        swarm1: &mut Swarm<Behaviour>,
589        swarm2: &mut Swarm<Behaviour>,
590        peer_id: PeerId,
591    ) {
592        loop {
593            futures::select! {
594                event = swarm1.select_next_some() => {
595                    if let SwarmEvent::ConnectionEstablished { peer_id: peer, .. } = event {
596                        assert_eq!(peer, peer_id);
597                        break;
598                    }
599                }
600                _ = swarm2.next() => {}
601            }
602        }
603    }
604
605    #[tokio::test]
606    async fn exchange_blocks() -> anyhow::Result<()> {
607        let (_, _, mut swarm1, repo) = build_swarm().await;
608        let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
609
610        let block = create_block();
611
612        let cid = *block.cid();
613
614        repo.put_block(&block).await?;
615
616        let opt = DialOpts::peer_id(peer2)
617            .addresses(vec![addr2.clone()])
618            .build();
619
620        swarm1.dial(opt)?;
621
622        wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
623
624        swarm2.behaviour_mut().bitswap.get(&cid, &[], None);
625
626        loop {
627            tokio::select! {
628                _ = swarm1.next() => {}
629                e = swarm2.select_next_some() => {
630                    match e {
631                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) => {
632                            assert_eq!(inner_cid, cid);
633                        }
634                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
635                            assert_eq!(inner_cid, cid);
636                            unreachable!("exchange should not timeout");
637                        }
638                        _ => {}
639                    }
640                },
641                Ok(true) = repo2.contains(&cid) => {
642                    break;
643                }
644            }
645        }
646
647        let b = repo2
648            .get_block_now(&cid)
649            .await
650            .unwrap()
651            .expect("block exist");
652
653        assert_eq!(b, block);
654
655        Ok(())
656    }
657
658    #[tokio::test]
659    async fn notify_swarm() -> anyhow::Result<()> {
660        let (_, _, mut swarm1, _) = build_swarm().await;
661
662        let block = create_block();
663
664        let cid = *block.cid();
665
666        swarm1
667            .behaviour_mut()
668            .bitswap
669            .get(&cid, &[], Some(Duration::from_millis(500)));
670
671        let mut notified_counter = 0;
672
673        loop {
674            tokio::select! {
675                e = swarm1.select_next_some() => {
676                    match e {
677                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) => {
678                            assert_eq!(inner_cid, cid);
679                            notified_counter += 1;
680                        }
681                        SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
682                            assert_eq!(inner_cid, cid);
683                            unreachable!()
684                        }
685                        _ => {}
686                    }
687                },
688            }
689
690            if notified_counter == 2 {
691                break;
692            }
693        }
694
695        Ok(())
696    }
697
698    #[tokio::test]
699    async fn bitswap_timeout() -> anyhow::Result<()> {
700        let (_, _, mut swarm1, _) = build_swarm().await;
701        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
702
703        let block = create_block();
704
705        let cid = *block.cid();
706
707        let opt = DialOpts::peer_id(peer2)
708            .addresses(vec![addr2.clone()])
709            .build();
710
711        swarm1.dial(opt)?;
712
713        wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
714
715        swarm2
716            .behaviour_mut()
717            .bitswap
718            .get(&cid, &[], Some(Duration::from_millis(150)));
719
720        loop {
721            tokio::select! {
722                _ = swarm1.next() => {}
723                e = swarm2.select_next_some() => {
724                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
725                        assert_eq!(inner_cid, cid);
726                        break;
727                    }
728                },
729            }
730        }
731
732        Ok(())
733    }
734
735    #[tokio::test]
736    async fn exchange_blocks_with_explicit_peer() -> anyhow::Result<()> {
737        let (peer1, _, mut swarm1, repo) = build_swarm().await;
738        let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
739
740        let block = create_block();
741
742        let cid = *block.cid();
743
744        repo.put_block(&block).await?;
745
746        let opt = DialOpts::peer_id(peer2)
747            .addresses(vec![addr2.clone()])
748            .build();
749
750        swarm1.dial(opt)?;
751
752        wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
753
754        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
755
756        loop {
757            tokio::select! {
758                _ = swarm1.next() => {}
759                e = swarm2.select_next_some() => {
760                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
761                        assert_eq!(inner_cid, cid);
762                    }
763                },
764                Ok(true) = repo2.contains(&cid) => {
765                    break;
766                }
767            }
768        }
769
770        let b = repo2
771            .get_block_now(&cid)
772            .await
773            .unwrap()
774            .expect("block exist");
775
776        assert_eq!(b, block);
777
778        Ok(())
779    }
780
781    #[tokio::test]
782    async fn notify_after_block_exchange() -> anyhow::Result<()> {
783        let (peer1, _, mut swarm1, repo) = build_swarm().await;
784        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
785        let (peer3, addr3, mut swarm3, repo3) = build_swarm().await;
786
787        let block = create_block();
788
789        let cid = *block.cid();
790
791        repo.put_block(&block).await?;
792
793        let opt = DialOpts::peer_id(peer2)
794            .addresses(vec![addr2.clone()])
795            .build();
796        swarm1.dial(opt)?;
797
798        let opt = DialOpts::peer_id(peer3)
799            .addresses(vec![addr3.clone()])
800            .build();
801
802        swarm2.dial(opt)?;
803        let mut peer_1_connected = false;
804        let mut peer_2_connected = false;
805        let mut peer_3_connected = false;
806
807        loop {
808            futures::select! {
809                event = swarm1.select_next_some() => {
810                    if let SwarmEvent::ConnectionEstablished { .. } = event {
811                        peer_1_connected = true;
812                    }
813                }
814                event = swarm2.select_next_some() => {
815                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
816                        if peer_id == peer1 {
817                            peer_2_connected = true;
818                        }
819                    }
820                }
821
822                event = swarm3.select_next_some() => {
823                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
824                        assert_eq!(peer_id, peer2);
825                        peer_3_connected = true;
826                    }
827                }
828            }
829            if peer_1_connected && peer_2_connected && peer_3_connected {
830                break;
831            }
832        }
833        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
834        swarm3.behaviour_mut().bitswap.get(&cid, &[], None);
835
836        loop {
837            tokio::select! {
838                _ = swarm1.next() => {}
839                e = swarm2.select_next_some() => {
840                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
841                        assert_eq!(inner_cid, cid);
842                        swarm2.behaviour_mut().bitswap.notify_new_blocks(std::iter::once(cid));
843                    }
844                },
845                e = swarm3.select_next_some() => {
846                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
847                        assert_eq!(inner_cid, cid);
848                        break;
849                    }
850                },
851            }
852        }
853
854        let b = repo3
855            .get_block_now(&cid)
856            .await
857            .unwrap()
858            .expect("block exist");
859
860        assert_eq!(b, block);
861
862        Ok(())
863    }
864
865    #[tokio::test]
866    async fn cancel_block_exchange() -> anyhow::Result<()> {
867        let (_, _, mut swarm1, _) = build_swarm().await;
868
869        let block = create_block();
870
871        let cid = *block.cid();
872
873        swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
874        swarm1.behaviour_mut().bitswap.cancel(cid);
875
876        loop {
877            tokio::select! {
878                e = swarm1.select_next_some() => {
879                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
880                        assert_eq!(inner_cid, cid);
881                        break;
882                    }
883                },
884            }
885        }
886
887        Ok(())
888    }
889
890    #[tokio::test]
891    async fn local_wantlist() -> anyhow::Result<()> {
892        let (_, _, mut swarm1, _) = build_swarm().await;
893
894        let block = create_block();
895
896        let cid = *block.cid();
897
898        swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
899
900        let list = swarm1.behaviour().bitswap.local_wantlist();
901
902        assert_eq!(list[0], cid);
903
904        Ok(())
905    }
906
907    #[tokio::test]
908    async fn peer_wantlist() -> anyhow::Result<()> {
909        let (peer1, _, mut swarm1, _) = build_swarm().await;
910        let (peer2, addr2, mut swarm2, _) = build_swarm().await;
911
912        let block = create_block();
913
914        let cid = *block.cid();
915
916        let opt = DialOpts::peer_id(peer2)
917            .addresses(vec![addr2.clone()])
918            .build();
919        swarm1.dial(opt)?;
920
921        let mut peer_1_connected = false;
922        let mut peer_2_connected = false;
923
924        loop {
925            futures::select! {
926                event = swarm1.select_next_some() => {
927                    if let SwarmEvent::ConnectionEstablished { .. } = event {
928                        peer_1_connected = true;
929                    }
930                }
931                event = swarm2.select_next_some() => {
932                    if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
933                        if peer_id == peer1 {
934                            peer_2_connected = true;
935                        }
936                    }
937                }
938            }
939            if peer_1_connected && peer_2_connected {
940                break;
941            }
942        }
943        swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
944
945        loop {
946            tokio::select! {
947                _ = swarm1.next() => {}
948                e = swarm2.select_next_some() => {
949                    if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) = e {
950                        assert_eq!(inner_cid, cid);
951                        break;
952                    }
953                },
954            }
955        }
956
957        let list = swarm1.behaviour().bitswap.peer_wantlist(peer2);
958        assert_eq!(list[0], cid);
959
960        Ok(())
961    }
962
963    async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo<DefaultStorage>) {
964        let repo = Repo::new_memory();
965
966        let mut swarm = SwarmBuilder::with_new_identity()
967            .with_tokio()
968            .with_other_transport(|kp| {
969                MemoryTransport::default()
970                    .upgrade(Version::V1)
971                    .authenticate(noise::Config::new(kp).expect("valid config"))
972                    .multiplex(yamux::Config::default())
973                    .timeout(Duration::from_secs(20))
974                    .boxed()
975            })
976            .expect("")
977            .with_behaviour(|_| Behaviour {
978                bitswap: super::Behaviour::new(&repo),
979                address_book: crate::p2p::addressbook::Behaviour::with_config(
980                    crate::p2p::addressbook::Config {
981                        store_on_connection: true,
982                        ..Default::default()
983                    },
984                ),
985            })
986            .expect("")
987            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
988            .build();
989
990        Swarm::listen_on(&mut swarm, "/memory/0".parse().unwrap()).unwrap();
991
992        if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm.next().await {
993            let peer_id = swarm.local_peer_id();
994            return (*peer_id, address, swarm, repo);
995        }
996
997        unreachable!()
998    }
999
1000    #[derive(NetworkBehaviour)]
1001    #[behaviour(prelude = "connexa::prelude::swarm::derive_prelude")]
1002    struct Behaviour {
1003        bitswap: super::Behaviour,
1004        address_book: crate::p2p::addressbook::Behaviour,
1005    }
1006}