1mod message;
2mod pb;
3mod prefix;
4mod protocol;
5mod sessions;
6
7use std::{
8 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
9 fmt::Debug,
10 task::{Context, Poll, Waker},
11 time::Duration,
12};
13
14use connexa::prelude::{
15 swarm::{
16 behaviour::ConnectionEstablished, dial_opts::DialOpts, ConnectionClosed, ConnectionDenied,
17 ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, NotifyHandler, OneShotHandler,
18 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
19 },
20 transport::transport::PortUse,
21 transport::Endpoint,
22 Multiaddr, PeerId,
23};
24use futures::StreamExt;
25use ipld_core::cid::Cid;
26
27use pollable_map::stream::StreamMap;
28
29mod bitswap_pb {
30 pub use super::pb::bitswap_pb::Message;
31 pub mod message {
32 use super::super::pb::bitswap_pb::mod_Message as message;
33 pub use message::mod_Wantlist as wantlist;
34 pub use message::Wantlist;
35 pub use message::{Block, BlockPresence, BlockPresenceType};
36 }
37}
38
39use self::{
40 message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType},
41 protocol::{BitswapProtocol, Message},
42 sessions::{HaveSession, HaveSessionEvent, WantSession, WantSessionEvent},
43};
44use crate::repo::DefaultStorage;
45use crate::{repo::Repo, Block};
46
47const CAP_THRESHOLD: usize = 100;
48
49#[derive(Default, Debug, Clone, Copy)]
50pub struct Config {
51 pub max_wanted_blocks: Option<u8>,
52 pub timeout: Option<Duration>,
53}
54
55#[derive(Debug)]
56pub enum Event {
57 NeedBlock { cid: Cid },
58 BlockRetrieved { cid: Cid },
59 CancelBlock { cid: Cid },
60}
61
62pub struct Behaviour {
63 events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
64 connections: HashMap<PeerId, HashSet<ConnectionId>>,
65 blacklist_connections: HashMap<PeerId, BTreeSet<ConnectionId>>,
66 store: Repo<DefaultStorage>,
67 want_session: StreamMap<Cid, WantSession>,
68 have_session: StreamMap<Cid, HaveSession>,
69 waker: Option<Waker>,
70}
71
72impl Behaviour {
73 pub fn new(store: &Repo<DefaultStorage>) -> Self {
74 Self {
75 events: Default::default(),
76 connections: Default::default(),
77 blacklist_connections: Default::default(),
78 store: store.clone(),
79 want_session: StreamMap::new(),
80 have_session: StreamMap::new(),
81 waker: None,
82 }
83 }
84
85 pub fn get(&mut self, cid: &Cid, providers: &[PeerId], timeout: Option<Duration>) {
86 self.gets(vec![*cid], providers, timeout)
87 }
88
89 pub fn gets(&mut self, cids: Vec<Cid>, providers: &[PeerId], timeout: Option<Duration>) {
90 let peers = match providers.is_empty() {
91 true => {
92 self.connections
94 .keys()
95 .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
96 .copied()
97 .collect::<Vec<_>>()
98 }
99 false => {
100 let mut connected = VecDeque::new();
101 for peer_id in providers
102 .iter()
103 .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
104 {
105 if self.connections.contains_key(peer_id) {
106 connected.push_back(*peer_id);
107 continue;
108 }
109 let opts = DialOpts::peer_id(*peer_id).build();
110
111 self.events.push_back(ToSwarm::Dial { opts });
112 }
113 Vec::from_iter(connected)
114 }
115 };
116
117 for cid in &cids {
118 if self.want_session.contains_key(cid) {
119 continue;
120 }
121 let session = WantSession::new(&self.store, *cid, timeout);
122 self.want_session.insert(*cid, session);
123 }
124
125 if peers.is_empty() {
126 for cid in cids {
128 self.events
129 .push_back(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
130 }
131 return;
132 }
133
134 self.send_wants(peers, cids)
135 }
136
137 pub fn local_wantlist(&self) -> Vec<Cid> {
138 self.want_session.keys().copied().collect()
139 }
140
141 pub fn peer_wantlist(&self, peer_id: PeerId) -> Vec<Cid> {
142 let mut blocks = HashSet::new();
143
144 for (cid, session) in self.have_session.iter() {
145 if session.has_peer(peer_id) {
146 blocks.insert(*cid);
147 }
148 }
149
150 Vec::from_iter(blocks)
151 }
152
153 pub fn cancel(&mut self, cid: Cid) {
156 if self.want_session.remove(&cid).is_none() {
157 return;
158 }
159
160 self.events
161 .push_back(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
162
163 if let Some(waker) = self.waker.take() {
164 waker.wake();
165 }
166 }
167
168 pub fn notify_new_blocks(&mut self, cid: impl IntoIterator<Item = Cid>) {
173 let blocks = cid.into_iter().collect::<Vec<_>>();
174
175 for (cid, session) in self.have_session.iter_mut() {
176 if !blocks.contains(cid) {
177 continue;
178 }
179
180 session.reset();
181 }
182
183 if !self.have_session.is_empty() {
184 if let Some(waker) = self.waker.take() {
185 waker.wake();
186 }
187 }
188 }
189
190 fn on_connection_established(
191 &mut self,
192 ConnectionEstablished {
193 connection_id,
194 peer_id,
195 other_established,
196 ..
197 }: ConnectionEstablished,
198 ) {
199 tracing::info!(%peer_id, %connection_id, "connection established");
200 self.connections
201 .entry(peer_id)
202 .or_default()
203 .insert(connection_id);
204
205 if other_established > 0 {
206 return;
207 }
208
209 self.send_wants(vec![peer_id], vec![]);
210 }
211
212 fn on_connection_close(
213 &mut self,
214 ConnectionClosed {
215 connection_id,
216 peer_id,
217 remaining_established,
218 ..
219 }: ConnectionClosed,
220 ) {
221 tracing::debug!(%connection_id, %peer_id, "connection closed");
222 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
223 let list = entry.get_mut();
224 list.remove(&connection_id);
225 if list.is_empty() {
226 entry.remove();
227 }
228 }
229
230 if let Entry::Occupied(mut entry) = self.blacklist_connections.entry(peer_id) {
231 let list = entry.get_mut();
232 list.remove(&connection_id);
233 if list.is_empty() {
234 entry.remove();
235 }
236 }
237
238 if remaining_established == 0 {
239 tracing::debug!(%connection_id, %peer_id, "peer disconnected");
240 for (cid, session) in self.want_session.iter_mut() {
241 tracing::debug!(session=%*cid, %peer_id, "marking peer as disconnected");
242 session.peer_disconnected(peer_id);
243 }
244 }
245 }
246
247 fn on_dial_failure(
248 &mut self,
249 DialFailure {
250 connection_id,
251 peer_id,
252 error,
253 }: DialFailure,
254 ) {
255 let Some(peer_id) = peer_id else {
256 return;
257 };
258
259 tracing::warn!(%peer_id, %connection_id, error = %error, "unable to dial peer");
260
261 if self.connections.contains_key(&peer_id) {
262 return;
265 }
266
267 for session in self.want_session.values_mut() {
268 session.remove_peer(peer_id);
269 }
270
271 for session in self.have_session.values_mut() {
272 session.remove_peer(peer_id);
273 }
274 }
275
276 fn send_wants(&mut self, peers: Vec<PeerId>, cids: Vec<Cid>) {
277 if let Some(waker) = self.waker.take() {
278 waker.wake();
279 }
280
281 match cids.is_empty() {
282 false => {
283 for cid in cids {
284 let Some(session) = self.want_session.get_mut(&cid) else {
285 continue;
286 };
287 for peer_id in &peers {
288 session.send_have_block(*peer_id)
289 }
290 }
291 }
292 true => {
293 for session in self.want_session.values_mut() {
294 for peer_id in &peers {
295 session.send_have_block(*peer_id)
296 }
297 }
298 }
299 }
300 }
301}
302
303impl NetworkBehaviour for Behaviour {
304 type ConnectionHandler = OneShotHandler<BitswapProtocol, BitswapMessage, Message>;
305 type ToSwarm = Event;
306
307 fn handle_pending_inbound_connection(
308 &mut self,
309 _: ConnectionId,
310 _: &Multiaddr,
311 _: &Multiaddr,
312 ) -> Result<(), ConnectionDenied> {
313 Ok(())
314 }
315
316 fn handle_pending_outbound_connection(
317 &mut self,
318 _: ConnectionId,
319 _: Option<PeerId>,
320 _: &[Multiaddr],
321 _: Endpoint,
322 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
323 Ok(vec![])
324 }
325
326 fn handle_established_inbound_connection(
327 &mut self,
328 _: ConnectionId,
329 _: PeerId,
330 _: &Multiaddr,
331 _: &Multiaddr,
332 ) -> Result<THandler<Self>, ConnectionDenied> {
333 Ok(OneShotHandler::default())
334 }
335
336 fn handle_established_outbound_connection(
337 &mut self,
338 _: ConnectionId,
339 _: PeerId,
340 _: &Multiaddr,
341 _: Endpoint,
342 _: PortUse,
343 ) -> Result<THandler<Self>, ConnectionDenied> {
344 Ok(OneShotHandler::default())
345 }
346
347 fn on_connection_handler_event(
348 &mut self,
349 peer_id: PeerId,
350 connection_id: ConnectionId,
351 event: THandlerOutEvent<Self>,
352 ) {
353 let message = match event {
354 Ok(Message::Receive { message }) => {
355 tracing::trace!(%peer_id, %connection_id, "message received");
356 if let Entry::Occupied(mut e) = self.blacklist_connections.entry(peer_id) {
357 let list = e.get_mut();
358 list.remove(&connection_id);
359 if list.is_empty() {
360 e.remove();
361 }
362 }
363
364 message
365 }
366 Ok(Message::Sent) => {
367 tracing::trace!(%peer_id, %connection_id, "message sent");
368 return;
369 }
370 Err(e) => {
371 tracing::error!(%peer_id, %connection_id, error = %e, "error sending or receiving message");
372 self.blacklist_connections
375 .entry(peer_id)
376 .or_default()
377 .insert(connection_id);
378 return;
379 }
380 };
381
382 if message.is_empty() {
383 tracing::warn!(%peer_id, %connection_id, "received an empty message");
384 return;
385 }
386
387 let BitswapMessage {
388 requests,
389 responses,
390 ..
391 } = message;
392
393 for request in requests {
394 let BitswapRequest {
395 ty,
396 cid,
397 send_dont_have,
398 cancel,
399 priority: _,
400 } = &request;
401
402 if !self.have_session.contains_key(cid) && !cancel {
403 let have_session = HaveSession::new(&self.store, *cid);
405 self.have_session.insert(*cid, have_session);
406 }
407
408 let Some(session) = self.have_session.get_mut(cid) else {
409 if !*cancel {
410 tracing::warn!(block = %cid, %peer_id, %connection_id, "have session does not exist. Skipping request");
411 }
412 continue;
413 };
414
415 if *cancel {
416 session.cancel(peer_id);
417 continue;
418 }
419
420 match ty {
421 RequestType::Have => {
422 session.want_block(peer_id, *send_dont_have);
423 }
424 RequestType::Block => {
425 session.need_block(peer_id);
426 }
427 }
428 }
429
430 for (cid, response) in responses {
431 let Some(session) = self.want_session.get_mut(&cid) else {
432 tracing::warn!(block = %cid, %peer_id, %connection_id, "want session does not exist. Skipping response");
433 continue;
434 };
435 match response {
436 BitswapResponse::Have(have) => match have {
437 true => {
438 session.has_block(peer_id);
439 }
440 false => {
441 session.dont_have_block(peer_id);
442 }
443 },
444 BitswapResponse::Block(bytes) => {
445 let Ok(block) = Block::new(cid, bytes) else {
446 tracing::error!(block = %cid, %peer_id, %connection_id, "block is invalid or corrupted");
450 session.dont_have_block(peer_id);
451 continue;
452 };
453 session.put_block(peer_id, block);
454 }
455 }
456 }
457 }
458
459 fn on_swarm_event(&mut self, event: FromSwarm) {
460 match event {
461 FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
462 FromSwarm::ConnectionClosed(event) => self.on_connection_close(event),
463 FromSwarm::DialFailure(event) => self.on_dial_failure(event),
464 _ => {}
465 }
466 }
467
468 fn poll(&mut self, ctx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
469 if let Some(event) = self.events.pop_front() {
470 return Poll::Ready(event);
471 } else if self.events.capacity() > CAP_THRESHOLD {
472 self.events.shrink_to_fit();
473 }
474
475 while let Poll::Ready(Some((cid, event))) = self.have_session.poll_next_unpin(ctx) {
476 match event {
477 HaveSessionEvent::Have { peer_id } => {
478 return Poll::Ready(ToSwarm::NotifyHandler {
479 peer_id,
480 handler: NotifyHandler::Any,
481 event: BitswapMessage::default()
482 .add_response(cid, BitswapResponse::Have(true)),
483 });
484 }
485 HaveSessionEvent::DontHave { peer_id } => {
486 return Poll::Ready(ToSwarm::NotifyHandler {
487 peer_id,
488 handler: NotifyHandler::Any,
489 event: BitswapMessage::default()
490 .add_response(cid, BitswapResponse::Have(false)),
491 });
492 }
493 HaveSessionEvent::Block { peer_id, bytes } => {
494 return Poll::Ready(ToSwarm::NotifyHandler {
495 peer_id,
496 handler: NotifyHandler::Any,
497 event: BitswapMessage::default()
498 .add_response(cid, BitswapResponse::Block(bytes)),
499 });
500 }
501 HaveSessionEvent::Cancelled => {
502 self.have_session.remove(&cid);
504 }
505 };
506 }
507
508 match self.want_session.poll_next_unpin(ctx) {
509 Poll::Ready(Some((cid, event))) => match event {
510 WantSessionEvent::SendWant { peer_id } => {
511 return Poll::Ready(ToSwarm::NotifyHandler {
512 peer_id,
513 handler: NotifyHandler::Any,
514 event: BitswapMessage::default()
515 .add_request(BitswapRequest::have(cid).send_dont_have(true)),
516 });
517 }
518 WantSessionEvent::SendCancel { peer_id } => {
519 return Poll::Ready(ToSwarm::NotifyHandler {
520 peer_id,
521 handler: NotifyHandler::Any,
522 event: BitswapMessage::default().add_request(BitswapRequest::cancel(cid)),
523 });
524 }
525 WantSessionEvent::SendBlock { peer_id } => {
526 ctx.waker().wake_by_ref();
527
528 return Poll::Ready(ToSwarm::NotifyHandler {
529 peer_id,
530 handler: NotifyHandler::Any,
531 event: BitswapMessage::default()
532 .add_request(BitswapRequest::block(cid).send_dont_have(true)),
533 });
534 }
535 WantSessionEvent::NeedBlock => {
536 return Poll::Ready(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
537 }
538 WantSessionEvent::BlockStored => {
539 return Poll::Ready(ToSwarm::GenerateEvent(Event::BlockRetrieved { cid }));
540 }
541 WantSessionEvent::Dial { peer_id } => {
542 let opts = DialOpts::peer_id(peer_id).build();
543 return Poll::Ready(ToSwarm::Dial { opts });
544 }
545 WantSessionEvent::Cancelled => {
546 self.want_session.remove(&cid);
547 return Poll::Ready(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
548 }
549 },
550 Poll::Pending | Poll::Ready(None) => {}
551 }
552
553 self.waker = Some(ctx.waker().clone());
554
555 Poll::Pending
556 }
557}
558
559#[cfg(test)]
560mod test {
561 use std::time::Duration;
562
563 use crate::{block::BlockCodec, repo::DefaultStorage};
564 use connexa::prelude::{
565 swarm::{dial_opts::DialOpts, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent},
566 transport::{
567 noise,
568 transport::{MemoryTransport, Transport},
569 upgrade::Version,
570 yamux,
571 },
572 Multiaddr, PeerId,
573 };
574 use futures::StreamExt;
575 use ipld_core::cid::Cid;
576 use multihash_codetable::{Code, MultihashDigest};
577
578 use crate::{repo::Repo, Block};
579
580 fn create_block() -> Block {
581 let data = b"hello block\n".to_vec();
582 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
583
584 Block::new_unchecked(cid, data)
585 }
586
587 async fn wait_on_connection(
588 swarm1: &mut Swarm<Behaviour>,
589 swarm2: &mut Swarm<Behaviour>,
590 peer_id: PeerId,
591 ) {
592 loop {
593 futures::select! {
594 event = swarm1.select_next_some() => {
595 if let SwarmEvent::ConnectionEstablished { peer_id: peer, .. } = event {
596 assert_eq!(peer, peer_id);
597 break;
598 }
599 }
600 _ = swarm2.next() => {}
601 }
602 }
603 }
604
605 #[tokio::test]
606 async fn exchange_blocks() -> anyhow::Result<()> {
607 let (_, _, mut swarm1, repo) = build_swarm().await;
608 let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
609
610 let block = create_block();
611
612 let cid = *block.cid();
613
614 repo.put_block(&block).await?;
615
616 let opt = DialOpts::peer_id(peer2)
617 .addresses(vec![addr2.clone()])
618 .build();
619
620 swarm1.dial(opt)?;
621
622 wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
623
624 swarm2.behaviour_mut().bitswap.get(&cid, &[], None);
625
626 loop {
627 tokio::select! {
628 _ = swarm1.next() => {}
629 e = swarm2.select_next_some() => {
630 match e {
631 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) => {
632 assert_eq!(inner_cid, cid);
633 }
634 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
635 assert_eq!(inner_cid, cid);
636 unreachable!("exchange should not timeout");
637 }
638 _ => {}
639 }
640 },
641 Ok(true) = repo2.contains(&cid) => {
642 break;
643 }
644 }
645 }
646
647 let b = repo2
648 .get_block_now(&cid)
649 .await
650 .unwrap()
651 .expect("block exist");
652
653 assert_eq!(b, block);
654
655 Ok(())
656 }
657
658 #[tokio::test]
659 async fn notify_swarm() -> anyhow::Result<()> {
660 let (_, _, mut swarm1, _) = build_swarm().await;
661
662 let block = create_block();
663
664 let cid = *block.cid();
665
666 swarm1
667 .behaviour_mut()
668 .bitswap
669 .get(&cid, &[], Some(Duration::from_millis(500)));
670
671 let mut notified_counter = 0;
672
673 loop {
674 tokio::select! {
675 e = swarm1.select_next_some() => {
676 match e {
677 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) => {
678 assert_eq!(inner_cid, cid);
679 notified_counter += 1;
680 }
681 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
682 assert_eq!(inner_cid, cid);
683 unreachable!()
684 }
685 _ => {}
686 }
687 },
688 }
689
690 if notified_counter == 2 {
691 break;
692 }
693 }
694
695 Ok(())
696 }
697
698 #[tokio::test]
699 async fn bitswap_timeout() -> anyhow::Result<()> {
700 let (_, _, mut swarm1, _) = build_swarm().await;
701 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
702
703 let block = create_block();
704
705 let cid = *block.cid();
706
707 let opt = DialOpts::peer_id(peer2)
708 .addresses(vec![addr2.clone()])
709 .build();
710
711 swarm1.dial(opt)?;
712
713 wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
714
715 swarm2
716 .behaviour_mut()
717 .bitswap
718 .get(&cid, &[], Some(Duration::from_millis(150)));
719
720 loop {
721 tokio::select! {
722 _ = swarm1.next() => {}
723 e = swarm2.select_next_some() => {
724 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
725 assert_eq!(inner_cid, cid);
726 break;
727 }
728 },
729 }
730 }
731
732 Ok(())
733 }
734
735 #[tokio::test]
736 async fn exchange_blocks_with_explicit_peer() -> anyhow::Result<()> {
737 let (peer1, _, mut swarm1, repo) = build_swarm().await;
738 let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
739
740 let block = create_block();
741
742 let cid = *block.cid();
743
744 repo.put_block(&block).await?;
745
746 let opt = DialOpts::peer_id(peer2)
747 .addresses(vec![addr2.clone()])
748 .build();
749
750 swarm1.dial(opt)?;
751
752 wait_on_connection(&mut swarm1, &mut swarm2, peer2).await;
753
754 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
755
756 loop {
757 tokio::select! {
758 _ = swarm1.next() => {}
759 e = swarm2.select_next_some() => {
760 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
761 assert_eq!(inner_cid, cid);
762 }
763 },
764 Ok(true) = repo2.contains(&cid) => {
765 break;
766 }
767 }
768 }
769
770 let b = repo2
771 .get_block_now(&cid)
772 .await
773 .unwrap()
774 .expect("block exist");
775
776 assert_eq!(b, block);
777
778 Ok(())
779 }
780
781 #[tokio::test]
782 async fn notify_after_block_exchange() -> anyhow::Result<()> {
783 let (peer1, _, mut swarm1, repo) = build_swarm().await;
784 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
785 let (peer3, addr3, mut swarm3, repo3) = build_swarm().await;
786
787 let block = create_block();
788
789 let cid = *block.cid();
790
791 repo.put_block(&block).await?;
792
793 let opt = DialOpts::peer_id(peer2)
794 .addresses(vec![addr2.clone()])
795 .build();
796 swarm1.dial(opt)?;
797
798 let opt = DialOpts::peer_id(peer3)
799 .addresses(vec![addr3.clone()])
800 .build();
801
802 swarm2.dial(opt)?;
803 let mut peer_1_connected = false;
804 let mut peer_2_connected = false;
805 let mut peer_3_connected = false;
806
807 loop {
808 futures::select! {
809 event = swarm1.select_next_some() => {
810 if let SwarmEvent::ConnectionEstablished { .. } = event {
811 peer_1_connected = true;
812 }
813 }
814 event = swarm2.select_next_some() => {
815 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
816 if peer_id == peer1 {
817 peer_2_connected = true;
818 }
819 }
820 }
821
822 event = swarm3.select_next_some() => {
823 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
824 assert_eq!(peer_id, peer2);
825 peer_3_connected = true;
826 }
827 }
828 }
829 if peer_1_connected && peer_2_connected && peer_3_connected {
830 break;
831 }
832 }
833 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
834 swarm3.behaviour_mut().bitswap.get(&cid, &[], None);
835
836 loop {
837 tokio::select! {
838 _ = swarm1.next() => {}
839 e = swarm2.select_next_some() => {
840 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
841 assert_eq!(inner_cid, cid);
842 swarm2.behaviour_mut().bitswap.notify_new_blocks(std::iter::once(cid));
843 }
844 },
845 e = swarm3.select_next_some() => {
846 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
847 assert_eq!(inner_cid, cid);
848 break;
849 }
850 },
851 }
852 }
853
854 let b = repo3
855 .get_block_now(&cid)
856 .await
857 .unwrap()
858 .expect("block exist");
859
860 assert_eq!(b, block);
861
862 Ok(())
863 }
864
865 #[tokio::test]
866 async fn cancel_block_exchange() -> anyhow::Result<()> {
867 let (_, _, mut swarm1, _) = build_swarm().await;
868
869 let block = create_block();
870
871 let cid = *block.cid();
872
873 swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
874 swarm1.behaviour_mut().bitswap.cancel(cid);
875
876 loop {
877 tokio::select! {
878 e = swarm1.select_next_some() => {
879 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
880 assert_eq!(inner_cid, cid);
881 break;
882 }
883 },
884 }
885 }
886
887 Ok(())
888 }
889
890 #[tokio::test]
891 async fn local_wantlist() -> anyhow::Result<()> {
892 let (_, _, mut swarm1, _) = build_swarm().await;
893
894 let block = create_block();
895
896 let cid = *block.cid();
897
898 swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
899
900 let list = swarm1.behaviour().bitswap.local_wantlist();
901
902 assert_eq!(list[0], cid);
903
904 Ok(())
905 }
906
907 #[tokio::test]
908 async fn peer_wantlist() -> anyhow::Result<()> {
909 let (peer1, _, mut swarm1, _) = build_swarm().await;
910 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
911
912 let block = create_block();
913
914 let cid = *block.cid();
915
916 let opt = DialOpts::peer_id(peer2)
917 .addresses(vec![addr2.clone()])
918 .build();
919 swarm1.dial(opt)?;
920
921 let mut peer_1_connected = false;
922 let mut peer_2_connected = false;
923
924 loop {
925 futures::select! {
926 event = swarm1.select_next_some() => {
927 if let SwarmEvent::ConnectionEstablished { .. } = event {
928 peer_1_connected = true;
929 }
930 }
931 event = swarm2.select_next_some() => {
932 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
933 if peer_id == peer1 {
934 peer_2_connected = true;
935 }
936 }
937 }
938 }
939 if peer_1_connected && peer_2_connected {
940 break;
941 }
942 }
943 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
944
945 loop {
946 tokio::select! {
947 _ = swarm1.next() => {}
948 e = swarm2.select_next_some() => {
949 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) = e {
950 assert_eq!(inner_cid, cid);
951 break;
952 }
953 },
954 }
955 }
956
957 let list = swarm1.behaviour().bitswap.peer_wantlist(peer2);
958 assert_eq!(list[0], cid);
959
960 Ok(())
961 }
962
963 async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo<DefaultStorage>) {
964 let repo = Repo::new_memory();
965
966 let mut swarm = SwarmBuilder::with_new_identity()
967 .with_tokio()
968 .with_other_transport(|kp| {
969 MemoryTransport::default()
970 .upgrade(Version::V1)
971 .authenticate(noise::Config::new(kp).expect("valid config"))
972 .multiplex(yamux::Config::default())
973 .timeout(Duration::from_secs(20))
974 .boxed()
975 })
976 .expect("")
977 .with_behaviour(|_| Behaviour {
978 bitswap: super::Behaviour::new(&repo),
979 address_book: crate::p2p::addressbook::Behaviour::with_config(
980 crate::p2p::addressbook::Config {
981 store_on_connection: true,
982 ..Default::default()
983 },
984 ),
985 })
986 .expect("")
987 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
988 .build();
989
990 Swarm::listen_on(&mut swarm, "/memory/0".parse().unwrap()).unwrap();
991
992 if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm.next().await {
993 let peer_id = swarm.local_peer_id();
994 return (*peer_id, address, swarm, repo);
995 }
996
997 unreachable!()
998 }
999
1000 #[derive(NetworkBehaviour)]
1001 #[behaviour(prelude = "connexa::prelude::swarm::derive_prelude")]
1002 struct Behaviour {
1003 bitswap: super::Behaviour,
1004 address_book: crate::p2p::addressbook::Behaviour,
1005 }
1006}