1mod message;
2mod pb;
3mod prefix;
4mod protocol;
5mod sessions;
6
7use std::{
8 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
9 fmt::Debug,
10 task::{Context, Poll, Waker},
11 time::Duration,
12};
13
14use futures::StreamExt;
15use ipld_core::cid::Cid;
16use libp2p::core::transport::PortUse;
17use libp2p::{
18 core::Endpoint,
19 swarm::{
20 behaviour::ConnectionEstablished, dial_opts::DialOpts, ConnectionClosed, ConnectionDenied,
21 ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, NotifyHandler, OneShotHandler,
22 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23 },
24 Multiaddr, PeerId,
25};
26use pollable_map::stream::StreamMap;
27
28mod bitswap_pb {
29 pub use super::pb::bitswap_pb::Message;
30 pub mod message {
31 use super::super::pb::bitswap_pb::mod_Message as message;
32 pub use message::mod_Wantlist as wantlist;
33 pub use message::Wantlist;
34 pub use message::{Block, BlockPresence, BlockPresenceType};
35 }
36}
37
38use crate::{repo::Repo, Block};
39
40use self::{
41 message::{BitswapMessage, BitswapRequest, BitswapResponse, RequestType},
42 protocol::{BitswapProtocol, Message},
43 sessions::{HaveSession, HaveSessionEvent, WantSession, WantSessionEvent},
44};
45
46const CAP_THRESHOLD: usize = 100;
47
48#[derive(Default, Debug, Clone, Copy)]
49pub struct Config {
50 pub max_wanted_blocks: Option<u8>,
51 pub timeout: Option<Duration>,
52}
53
54#[derive(Debug)]
55pub enum Event {
56 NeedBlock { cid: Cid },
57 BlockRetrieved { cid: Cid },
58 CancelBlock { cid: Cid },
59}
60
61pub struct Behaviour {
62 events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
63 connections: HashMap<PeerId, HashSet<ConnectionId>>,
64 blacklist_connections: HashMap<PeerId, BTreeSet<ConnectionId>>,
65 store: Repo,
66 want_session: StreamMap<Cid, WantSession>,
67 have_session: StreamMap<Cid, HaveSession>,
68 waker: Option<Waker>,
69}
70
71impl Behaviour {
72 pub fn new(store: &Repo) -> Self {
73 Self {
74 events: Default::default(),
75 connections: Default::default(),
76 blacklist_connections: Default::default(),
77 store: store.clone(),
78 want_session: StreamMap::new(),
79 have_session: StreamMap::new(),
80 waker: None,
81 }
82 }
83
84 pub fn get(&mut self, cid: &Cid, providers: &[PeerId], timeout: Option<Duration>) {
85 self.gets(vec![*cid], providers, timeout)
86 }
87
88 pub fn gets(&mut self, cids: Vec<Cid>, providers: &[PeerId], timeout: Option<Duration>) {
89 let peers = match providers.is_empty() {
90 true => {
91 self.connections
93 .keys()
94 .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
95 .copied()
96 .collect::<Vec<_>>()
97 }
98 false => {
99 let mut connected = VecDeque::new();
100 for peer_id in providers
101 .iter()
102 .filter(|peer_id| !self.blacklist_connections.contains_key(peer_id))
103 {
104 if self.connections.contains_key(peer_id) {
105 connected.push_back(*peer_id);
106 continue;
107 }
108 let opts = DialOpts::peer_id(*peer_id).build();
109
110 self.events.push_back(ToSwarm::Dial { opts });
111 }
112 Vec::from_iter(connected)
113 }
114 };
115
116 for cid in &cids {
117 if self.want_session.contains_key(cid) {
118 continue;
119 }
120 let session = WantSession::new(&self.store, *cid, timeout);
121 self.want_session.insert(*cid, session);
122 }
123
124 if peers.is_empty() {
125 for cid in cids {
127 self.events
128 .push_back(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
129 }
130 return;
131 }
132
133 self.send_wants(peers, cids)
134 }
135
136 pub fn local_wantlist(&self) -> Vec<Cid> {
137 self.want_session.keys().copied().collect()
138 }
139
140 pub fn peer_wantlist(&self, peer_id: PeerId) -> Vec<Cid> {
141 let mut blocks = HashSet::new();
142
143 for (cid, session) in self.have_session.iter() {
144 if session.has_peer(peer_id) {
145 blocks.insert(*cid);
146 }
147 }
148
149 Vec::from_iter(blocks)
150 }
151
152 pub fn cancel(&mut self, cid: Cid) {
155 if self.want_session.remove(&cid).is_none() {
156 return;
157 }
158
159 self.events
160 .push_back(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
161
162 if let Some(waker) = self.waker.take() {
163 waker.wake();
164 }
165 }
166
167 pub fn notify_new_blocks(&mut self, cid: impl IntoIterator<Item = Cid>) {
172 let blocks = cid.into_iter().collect::<Vec<_>>();
173
174 for (cid, session) in self.have_session.iter_mut() {
175 if !blocks.contains(cid) {
176 continue;
177 }
178
179 session.reset();
180 }
181
182 if !self.have_session.is_empty() {
183 if let Some(waker) = self.waker.take() {
184 waker.wake();
185 }
186 }
187 }
188
189 fn on_connection_established(
190 &mut self,
191 ConnectionEstablished {
192 connection_id,
193 peer_id,
194 other_established,
195 ..
196 }: ConnectionEstablished,
197 ) {
198 tracing::info!(%peer_id, %connection_id, "connection established");
199 self.connections
200 .entry(peer_id)
201 .or_default()
202 .insert(connection_id);
203
204 if other_established > 0 {
205 return;
206 }
207
208 self.send_wants(vec![peer_id], vec![]);
209 }
210
211 fn on_connection_close(
212 &mut self,
213 ConnectionClosed {
214 connection_id,
215 peer_id,
216 remaining_established,
217 ..
218 }: ConnectionClosed,
219 ) {
220 tracing::debug!(%connection_id, %peer_id, "connection closed");
221 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
222 let list = entry.get_mut();
223 list.remove(&connection_id);
224 if list.is_empty() {
225 entry.remove();
226 }
227 }
228
229 if let Entry::Occupied(mut entry) = self.blacklist_connections.entry(peer_id) {
230 let list = entry.get_mut();
231 list.remove(&connection_id);
232 if list.is_empty() {
233 entry.remove();
234 }
235 }
236
237 if remaining_established == 0 {
238 tracing::debug!(%connection_id, %peer_id, "peer disconnected");
239 for (cid, session) in self.want_session.iter_mut() {
240 tracing::debug!(session=%*cid, %peer_id, "marking peer as disconnected");
241 session.peer_disconnected(peer_id);
242 }
243 }
244 }
245
246 fn on_dial_failure(
247 &mut self,
248 DialFailure {
249 connection_id,
250 peer_id,
251 error,
252 }: DialFailure,
253 ) {
254 let Some(peer_id) = peer_id else {
255 return;
256 };
257
258 tracing::warn!(%peer_id, %connection_id, error = %error, "unable to dial peer");
259
260 if self.connections.contains_key(&peer_id) {
261 return;
264 }
265
266 for session in self.want_session.values_mut() {
267 session.remove_peer(peer_id);
268 }
269
270 for session in self.have_session.values_mut() {
271 session.remove_peer(peer_id);
272 }
273 }
274
275 fn send_wants(&mut self, peers: Vec<PeerId>, cids: Vec<Cid>) {
276 if let Some(waker) = self.waker.take() {
277 waker.wake();
278 }
279
280 match cids.is_empty() {
281 false => {
282 for cid in cids {
283 let Some(session) = self.want_session.get_mut(&cid) else {
284 continue;
285 };
286 for peer_id in &peers {
287 session.send_have_block(*peer_id)
288 }
289 }
290 }
291 true => {
292 for session in self.want_session.values_mut() {
293 for peer_id in &peers {
294 session.send_have_block(*peer_id)
295 }
296 }
297 }
298 }
299 }
300}
301
302impl NetworkBehaviour for Behaviour {
303 type ConnectionHandler = OneShotHandler<BitswapProtocol, BitswapMessage, Message>;
304 type ToSwarm = Event;
305
306 fn handle_pending_inbound_connection(
307 &mut self,
308 _: ConnectionId,
309 _: &Multiaddr,
310 _: &Multiaddr,
311 ) -> Result<(), ConnectionDenied> {
312 Ok(())
313 }
314
315 fn handle_pending_outbound_connection(
316 &mut self,
317 _: ConnectionId,
318 _: Option<PeerId>,
319 _: &[Multiaddr],
320 _: Endpoint,
321 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
322 Ok(vec![])
323 }
324
325 fn handle_established_inbound_connection(
326 &mut self,
327 _: ConnectionId,
328 _: PeerId,
329 _: &Multiaddr,
330 _: &Multiaddr,
331 ) -> Result<THandler<Self>, ConnectionDenied> {
332 Ok(OneShotHandler::default())
333 }
334
335 fn handle_established_outbound_connection(
336 &mut self,
337 _: ConnectionId,
338 _: PeerId,
339 _: &Multiaddr,
340 _: Endpoint,
341 _: PortUse,
342 ) -> Result<THandler<Self>, ConnectionDenied> {
343 Ok(OneShotHandler::default())
344 }
345
346 fn on_connection_handler_event(
347 &mut self,
348 peer_id: PeerId,
349 connection_id: ConnectionId,
350 event: THandlerOutEvent<Self>,
351 ) {
352 let message = match event {
353 Ok(Message::Receive { message }) => {
354 tracing::trace!(%peer_id, %connection_id, "message received");
355 if let Entry::Occupied(mut e) = self.blacklist_connections.entry(peer_id) {
356 let list = e.get_mut();
357 list.remove(&connection_id);
358 if list.is_empty() {
359 e.remove();
360 }
361 }
362
363 message
364 }
365 Ok(Message::Sent) => {
366 tracing::trace!(%peer_id, %connection_id, "message sent");
367 return;
368 }
369 Err(e) => {
370 tracing::error!(%peer_id, %connection_id, error = %e, "error sending or receiving message");
371 self.blacklist_connections
374 .entry(peer_id)
375 .or_default()
376 .insert(connection_id);
377 return;
378 }
379 };
380
381 if message.is_empty() {
382 tracing::warn!(%peer_id, %connection_id, "received an empty message");
383 return;
384 }
385
386 let BitswapMessage {
387 requests,
388 responses,
389 ..
390 } = message;
391
392 for request in requests {
393 let BitswapRequest {
394 ty,
395 cid,
396 send_dont_have,
397 cancel,
398 priority: _,
399 } = &request;
400
401 if !self.have_session.contains_key(cid) && !cancel {
402 let have_session = HaveSession::new(&self.store, *cid);
404 self.have_session.insert(*cid, have_session);
405 }
406
407 let Some(session) = self.have_session.get_mut(cid) else {
408 if !*cancel {
409 tracing::warn!(block = %cid, %peer_id, %connection_id, "have session does not exist. Skipping request");
410 }
411 continue;
412 };
413
414 if *cancel {
415 session.cancel(peer_id);
416 continue;
417 }
418
419 match ty {
420 RequestType::Have => {
421 session.want_block(peer_id, *send_dont_have);
422 }
423 RequestType::Block => {
424 session.need_block(peer_id);
425 }
426 }
427 }
428
429 for (cid, response) in responses {
430 let Some(session) = self.want_session.get_mut(&cid) else {
431 tracing::warn!(block = %cid, %peer_id, %connection_id, "want session does not exist. Skipping response");
432 continue;
433 };
434 match response {
435 BitswapResponse::Have(have) => match have {
436 true => {
437 session.has_block(peer_id);
438 }
439 false => {
440 session.dont_have_block(peer_id);
441 }
442 },
443 BitswapResponse::Block(bytes) => {
444 let Ok(block) = Block::new(cid, bytes) else {
445 tracing::error!(block = %cid, %peer_id, %connection_id, "block is invalid or corrupted");
449 session.dont_have_block(peer_id);
450 continue;
451 };
452 session.put_block(peer_id, block);
453 }
454 }
455 }
456 }
457
458 fn on_swarm_event(&mut self, event: FromSwarm) {
459 match event {
460 FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
461 FromSwarm::ConnectionClosed(event) => self.on_connection_close(event),
462 FromSwarm::DialFailure(event) => self.on_dial_failure(event),
463 _ => {}
464 }
465 }
466
467 fn poll(&mut self, ctx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
468 if let Some(event) = self.events.pop_front() {
469 return Poll::Ready(event);
470 } else if self.events.capacity() > CAP_THRESHOLD {
471 self.events.shrink_to_fit();
472 }
473
474 while let Poll::Ready(Some((cid, event))) = self.have_session.poll_next_unpin(ctx) {
475 match event {
476 HaveSessionEvent::Have { peer_id } => {
477 return Poll::Ready(ToSwarm::NotifyHandler {
478 peer_id,
479 handler: NotifyHandler::Any,
480 event: BitswapMessage::default()
481 .add_response(cid, BitswapResponse::Have(true)),
482 });
483 }
484 HaveSessionEvent::DontHave { peer_id } => {
485 return Poll::Ready(ToSwarm::NotifyHandler {
486 peer_id,
487 handler: NotifyHandler::Any,
488 event: BitswapMessage::default()
489 .add_response(cid, BitswapResponse::Have(false)),
490 })
491 }
492 HaveSessionEvent::Block { peer_id, bytes } => {
493 return Poll::Ready(ToSwarm::NotifyHandler {
494 peer_id,
495 handler: NotifyHandler::Any,
496 event: BitswapMessage::default()
497 .add_response(cid, BitswapResponse::Block(bytes)),
498 })
499 }
500 HaveSessionEvent::Cancelled => {
501 self.have_session.remove(&cid);
503 }
504 };
505 }
506
507 match self.want_session.poll_next_unpin(ctx) {
508 Poll::Ready(Some((cid, event))) => match event {
509 WantSessionEvent::SendWant { peer_id } => {
510 return Poll::Ready(ToSwarm::NotifyHandler {
511 peer_id,
512 handler: NotifyHandler::Any,
513 event: BitswapMessage::default()
514 .add_request(BitswapRequest::have(cid).send_dont_have(true)),
515 });
516 }
517 WantSessionEvent::SendCancel { peer_id } => {
518 return Poll::Ready(ToSwarm::NotifyHandler {
519 peer_id,
520 handler: NotifyHandler::Any,
521 event: BitswapMessage::default().add_request(BitswapRequest::cancel(cid)),
522 });
523 }
524 WantSessionEvent::SendBlock { peer_id } => {
525 ctx.waker().wake_by_ref();
526
527 return Poll::Ready(ToSwarm::NotifyHandler {
528 peer_id,
529 handler: NotifyHandler::Any,
530 event: BitswapMessage::default()
531 .add_request(BitswapRequest::block(cid).send_dont_have(true)),
532 });
533 }
534 WantSessionEvent::NeedBlock => {
535 return Poll::Ready(ToSwarm::GenerateEvent(Event::NeedBlock { cid }));
536 }
537 WantSessionEvent::BlockStored => {
538 return Poll::Ready(ToSwarm::GenerateEvent(Event::BlockRetrieved { cid }))
539 }
540 WantSessionEvent::Dial { peer_id } => {
541 let opts = DialOpts::peer_id(peer_id).build();
542 return Poll::Ready(ToSwarm::Dial { opts });
543 }
544 WantSessionEvent::Cancelled => {
545 self.want_session.remove(&cid);
546 return Poll::Ready(ToSwarm::GenerateEvent(Event::CancelBlock { cid }));
547 }
548 },
549 Poll::Pending | Poll::Ready(None) => {}
550 }
551
552 self.waker = Some(ctx.waker().clone());
553
554 Poll::Pending
555 }
556}
557
558#[cfg(test)]
559mod test {
560 use std::time::Duration;
561
562 use crate::block::BlockCodec;
563 use futures::StreamExt;
564 use ipld_core::cid::Cid;
565 use libp2p::{
566 core::{transport::MemoryTransport, upgrade::Version},
567 swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent},
568 Multiaddr, PeerId, Swarm, SwarmBuilder, Transport,
569 };
570 use multihash_codetable::{Code, MultihashDigest};
571
572 use crate::{repo::Repo, Block};
573
574 fn create_block() -> Block {
575 let data = b"hello block\n".to_vec();
576 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
577
578 Block::new_unchecked(cid, data)
579 }
580
581 #[tokio::test]
582 async fn exchange_blocks() -> anyhow::Result<()> {
583 let (_, _, mut swarm1, repo) = build_swarm().await;
584 let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
585
586 let block = create_block();
587
588 let cid = *block.cid();
589
590 repo.put_block(&block).await?;
591
592 let opt = DialOpts::peer_id(peer2)
593 .addresses(vec![addr2.clone()])
594 .build();
595
596 swarm1.dial(opt)?;
597
598 loop {
599 futures::select! {
600 event = swarm1.select_next_some() => {
601 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
602 assert_eq!(peer_id, peer2);
603 break;
604 }
605 }
606 _ = swarm2.next() => {}
607 }
608 }
609
610 swarm2.behaviour_mut().bitswap.get(&cid, &[], None);
611
612 loop {
613 tokio::select! {
614 _ = swarm1.next() => {}
615 e = swarm2.select_next_some() => {
616 match e {
617 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) => {
618 assert_eq!(inner_cid, cid);
619 }
620 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
621 assert_eq!(inner_cid, cid);
622 unreachable!("exchange should not timeout");
623 }
624 _ => {}
625 }
626 },
627 Ok(true) = repo2.contains(&cid) => {
628 break;
629 }
630 }
631 }
632
633 let b = repo2
634 .get_block_now(&cid)
635 .await
636 .unwrap()
637 .expect("block exist");
638
639 assert_eq!(b, block);
640
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn notify_swarm() -> anyhow::Result<()> {
646 let (_, _, mut swarm1, _) = build_swarm().await;
647
648 let block = create_block();
649
650 let cid = *block.cid();
651
652 swarm1
653 .behaviour_mut()
654 .bitswap
655 .get(&cid, &[], Some(Duration::from_millis(500)));
656
657 let mut notified_counter = 0;
658
659 loop {
660 tokio::select! {
661 e = swarm1.select_next_some() => {
662 match e {
663 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) => {
664 assert_eq!(inner_cid, cid);
665 notified_counter += 1;
666 }
667 SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) => {
668 assert_eq!(inner_cid, cid);
669 unreachable!()
670 }
671 _ => {}
672 }
673 },
674 }
675
676 if notified_counter == 2 {
677 break;
678 }
679 }
680
681 Ok(())
682 }
683
684 #[tokio::test]
685 async fn bitswap_timeout() -> anyhow::Result<()> {
686 let (_, _, mut swarm1, _) = build_swarm().await;
687 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
688
689 let block = create_block();
690
691 let cid = *block.cid();
692
693 let opt = DialOpts::peer_id(peer2)
694 .addresses(vec![addr2.clone()])
695 .build();
696
697 swarm1.dial(opt)?;
698
699 loop {
700 futures::select! {
701 event = swarm1.select_next_some() => {
702 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
703 assert_eq!(peer_id, peer2);
704 break;
705 }
706 }
707 _ = swarm2.next() => {}
708 }
709 }
710
711 swarm2
712 .behaviour_mut()
713 .bitswap
714 .get(&cid, &[], Some(Duration::from_millis(150)));
715
716 loop {
717 tokio::select! {
718 _ = swarm1.next() => {}
719 e = swarm2.select_next_some() => {
720 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
721 assert_eq!(inner_cid, cid);
722 break;
723 }
724 },
725 }
726 }
727
728 Ok(())
729 }
730
731 #[tokio::test]
732 async fn exchange_blocks_with_explicit_peer() -> anyhow::Result<()> {
733 let (peer1, _, mut swarm1, repo) = build_swarm().await;
734 let (peer2, addr2, mut swarm2, repo2) = build_swarm().await;
735
736 let block = create_block();
737
738 let cid = *block.cid();
739
740 repo.put_block(&block).await?;
741
742 let opt = DialOpts::peer_id(peer2)
743 .addresses(vec![addr2.clone()])
744 .build();
745
746 swarm1.dial(opt)?;
747
748 loop {
749 futures::select! {
750 event = swarm1.select_next_some() => {
751 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
752 assert_eq!(peer_id, peer2);
753 break;
754 }
755 }
756 _ = swarm2.next() => {}
757 }
758 }
759
760 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
761
762 loop {
763 tokio::select! {
764 _ = swarm1.next() => {}
765 e = swarm2.select_next_some() => {
766 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
767 assert_eq!(inner_cid, cid);
768 }
769 },
770 Ok(true) = repo2.contains(&cid) => {
771 break;
772 }
773 }
774 }
775
776 let b = repo2
777 .get_block_now(&cid)
778 .await
779 .unwrap()
780 .expect("block exist");
781
782 assert_eq!(b, block);
783
784 Ok(())
785 }
786
787 #[tokio::test]
788 async fn notify_after_block_exchange() -> anyhow::Result<()> {
789 let (peer1, _, mut swarm1, repo) = build_swarm().await;
790 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
791 let (peer3, addr3, mut swarm3, repo3) = build_swarm().await;
792
793 let block = create_block();
794
795 let cid = *block.cid();
796
797 repo.put_block(&block).await?;
798
799 let opt = DialOpts::peer_id(peer2)
800 .addresses(vec![addr2.clone()])
801 .build();
802 swarm1.dial(opt)?;
803
804 let opt = DialOpts::peer_id(peer3)
805 .addresses(vec![addr3.clone()])
806 .build();
807
808 swarm2.dial(opt)?;
809 let mut peer_1_connected = false;
810 let mut peer_2_connected = false;
811 let mut peer_3_connected = false;
812
813 loop {
814 futures::select! {
815 event = swarm1.select_next_some() => {
816 if let SwarmEvent::ConnectionEstablished { .. } = event {
817 peer_1_connected = true;
818 }
819 }
820 event = swarm2.select_next_some() => {
821 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
822 if peer_id == peer1 {
823 peer_2_connected = true;
824 }
825 }
826 }
827
828 event = swarm3.select_next_some() => {
829 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
830 assert_eq!(peer_id, peer2);
831 peer_3_connected = true;
832 }
833 }
834 }
835 if peer_1_connected && peer_2_connected && peer_3_connected {
836 break;
837 }
838 }
839 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
840 swarm3.behaviour_mut().bitswap.get(&cid, &[], None);
841
842 loop {
843 tokio::select! {
844 _ = swarm1.next() => {}
845 e = swarm2.select_next_some() => {
846 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
847 assert_eq!(inner_cid, cid);
848 swarm2.behaviour_mut().bitswap.notify_new_blocks(std::iter::once(cid));
849 }
850 },
851 e = swarm3.select_next_some() => {
852 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::BlockRetrieved { cid: inner_cid })) = e {
853 assert_eq!(inner_cid, cid);
854 break;
855 }
856 },
857 }
858 }
859
860 let b = repo3
861 .get_block_now(&cid)
862 .await
863 .unwrap()
864 .expect("block exist");
865
866 assert_eq!(b, block);
867
868 Ok(())
869 }
870
871 #[tokio::test]
872 async fn cancel_block_exchange() -> anyhow::Result<()> {
873 let (_, _, mut swarm1, _) = build_swarm().await;
874
875 let block = create_block();
876
877 let cid = *block.cid();
878
879 swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
880 swarm1.behaviour_mut().bitswap.cancel(cid);
881
882 loop {
883 tokio::select! {
884 e = swarm1.select_next_some() => {
885 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::CancelBlock { cid: inner_cid })) = e {
886 assert_eq!(inner_cid, cid);
887 break;
888 }
889 },
890 }
891 }
892
893 Ok(())
894 }
895
896 #[tokio::test]
897 async fn local_wantlist() -> anyhow::Result<()> {
898 let (_, _, mut swarm1, _) = build_swarm().await;
899
900 let block = create_block();
901
902 let cid = *block.cid();
903
904 swarm1.behaviour_mut().bitswap.get(&cid, &[], None);
905
906 let list = swarm1.behaviour().bitswap.local_wantlist();
907
908 assert_eq!(list[0], cid);
909
910 Ok(())
911 }
912
913 #[tokio::test]
914 async fn peer_wantlist() -> anyhow::Result<()> {
915 let (peer1, _, mut swarm1, _) = build_swarm().await;
916 let (peer2, addr2, mut swarm2, _) = build_swarm().await;
917
918 let block = create_block();
919
920 let cid = *block.cid();
921
922 let opt = DialOpts::peer_id(peer2)
923 .addresses(vec![addr2.clone()])
924 .build();
925 swarm1.dial(opt)?;
926
927 let mut peer_1_connected = false;
928 let mut peer_2_connected = false;
929
930 loop {
931 futures::select! {
932 event = swarm1.select_next_some() => {
933 if let SwarmEvent::ConnectionEstablished { .. } = event {
934 peer_1_connected = true;
935 }
936 }
937 event = swarm2.select_next_some() => {
938 if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event {
939 if peer_id == peer1 {
940 peer_2_connected = true;
941 }
942 }
943 }
944 }
945 if peer_1_connected && peer_2_connected {
946 break;
947 }
948 }
949 swarm2.behaviour_mut().bitswap.get(&cid, &[peer1], None);
950
951 loop {
952 tokio::select! {
953 _ = swarm1.next() => {}
954 e = swarm2.select_next_some() => {
955 if let SwarmEvent::Behaviour(BehaviourEvent::Bitswap(super::Event::NeedBlock { cid: inner_cid })) = e {
956 assert_eq!(inner_cid, cid);
957 break;
958 }
959 },
960 }
961 }
962
963 let list = swarm1.behaviour().bitswap.peer_wantlist(peer2);
964 assert_eq!(list[0], cid);
965
966 Ok(())
967 }
968
969 async fn build_swarm() -> (PeerId, Multiaddr, Swarm<Behaviour>, Repo) {
970 let repo = Repo::new_memory();
971
972 let mut swarm = SwarmBuilder::with_new_identity()
973 .with_tokio()
974 .with_other_transport(|kp| {
975 MemoryTransport::default()
976 .upgrade(Version::V1)
977 .authenticate(libp2p::noise::Config::new(kp).expect("valid config"))
978 .multiplex(libp2p::yamux::Config::default())
979 .timeout(Duration::from_secs(20))
980 .boxed()
981 })
982 .expect("")
983 .with_behaviour(|_| Behaviour {
984 bitswap: super::Behaviour::new(&repo),
985 address_book: crate::p2p::addressbook::Behaviour::with_config(
986 crate::p2p::addressbook::Config {
987 store_on_connection: true,
988 ..Default::default()
989 },
990 ),
991 })
992 .expect("")
993 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
994 .build();
995
996 Swarm::listen_on(&mut swarm, "/memory/0".parse().unwrap()).unwrap();
997
998 if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm.next().await {
999 let peer_id = swarm.local_peer_id();
1000 return (*peer_id, address, swarm, repo);
1001 }
1002
1003 unreachable!()
1004 }
1005
1006 #[derive(NetworkBehaviour)]
1007 struct Behaviour {
1008 bitswap: super::Behaviour,
1009 address_book: crate::p2p::addressbook::Behaviour,
1010 }
1011}