1#[cfg(feature = "compat")]
9use crate::compat::{CompatMessage, CompatProtocol, InboundMessage};
10use crate::protocol::{
11 BitswapCodec, BitswapRequest, BitswapResponse, RequestType, LIBP2P_BITSWAP_PROTOCOL,
12};
13use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response};
14use crate::stats::*;
15use fnv::FnvHashMap;
16#[cfg(feature = "compat")]
17use fnv::FnvHashSet;
18use futures::future::BoxFuture;
19use futures::{
20 channel::mpsc,
21 stream::{Stream, StreamExt},
22 task::{Context, Poll},
23};
24use libipld::{error::BlockNotFound, store::StoreParams, Block, Cid, Result};
25use libp2p::core::{Endpoint, Multiaddr};
26use libp2p::identity::PeerId;
27
28use libp2p::swarm::behaviour::ConnectionEstablished;
29#[cfg(feature = "compat")]
30use libp2p::swarm::derive_prelude::Either;
31use libp2p::swarm::{
32 derive_prelude::{ConnectionClosed, FromSwarm},
33 ConnectionId, THandlerInEvent,
34};
35use libp2p::swarm::{ConnectionDenied, THandler};
36#[cfg(feature = "compat")]
37use libp2p::swarm::{ConnectionHandlerSelect, NotifyHandler, OneShotHandler};
38use libp2p::{
39 request_response::{
40 Behaviour as RequestResponse, Config as RequestResponseConfig,
41 Event as RequestResponseEvent, InboundFailure, InboundRequestId,
42 Message as RequestResponseMessage, OutboundFailure, OutboundRequestId, ProtocolSupport,
43 ResponseChannel,
44 },
45 swarm::{ConnectionHandler, NetworkBehaviour, ToSwarm},
46};
47use prometheus::Registry;
48use std::{pin::Pin, time::Duration};
49
50pub type Channel = ResponseChannel<BitswapResponse>;
52
53#[derive(Debug)]
55pub enum BitswapEvent {
56 Progress(QueryId, usize),
60 Complete(QueryId, Result<()>),
62}
63
64#[async_trait::async_trait]
66pub trait BitswapStore: Send + Sync + 'static {
67 type Params: StoreParams;
69 async fn contains(&mut self, cid: &Cid) -> Result<bool>;
71 async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
73 async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
75 async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
77}
78
79#[derive(Clone, Copy, Debug, Eq, PartialEq)]
81pub struct BitswapConfig {
82 pub request_timeout: Duration,
84}
85
86impl BitswapConfig {
87 pub fn new() -> Self {
89 Self {
90 request_timeout: Duration::from_secs(10),
91 }
92 }
93}
94
95impl Default for BitswapConfig {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
102enum BitswapId {
103 Bitswap(OutboundRequestId),
104 #[cfg(feature = "compat")]
105 Compat(Cid),
106}
107
108enum BitswapChannel {
109 Bitswap(Channel),
110 #[cfg(feature = "compat")]
111 Compat(PeerId, Cid),
112}
113
114pub struct Bitswap<P: StoreParams> {
116 inner: RequestResponse<BitswapCodec<P>>,
118 query_manager: QueryManager,
120 requests: FnvHashMap<BitswapId, QueryId>,
122 db_tx: mpsc::UnboundedSender<DbRequest<P>>,
124 db_rx: mpsc::UnboundedReceiver<DbResponse>,
126 #[cfg(feature = "compat")]
128 compat: FnvHashSet<PeerId>,
129}
130
131impl<P: StoreParams> Bitswap<P> {
132 pub fn new<S: BitswapStore<Params = P>>(
134 config: BitswapConfig,
135 store: S,
136 executor: Box<dyn FnOnce(BoxFuture<'static, ()>)>,
137 ) -> Self {
138 let rr_config =
139 RequestResponseConfig::default().with_request_timeout(config.request_timeout);
140 let protocols = std::iter::once((LIBP2P_BITSWAP_PROTOCOL, ProtocolSupport::Full));
141 let inner = RequestResponse::new(protocols, rr_config);
142 let (db_tx, db_rx) = start_db_thread(store, executor);
143 Self {
144 inner,
145 query_manager: Default::default(),
146 requests: Default::default(),
147 db_tx,
148 db_rx,
149 #[cfg(feature = "compat")]
150 compat: Default::default(),
151 }
152 }
153
154 pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
156 self.query_manager.add_peer(peer_id);
157 #[allow(deprecated)]
158 self.inner.add_address(peer_id, addr);
159 }
160
161 pub fn remove_address(&mut self, peer_id: &PeerId, addr: &Multiaddr) {
163 self.query_manager.remove_peer(peer_id);
164 #[allow(deprecated)]
165 self.inner.remove_address(peer_id, addr);
166 }
167
168 pub fn get(&mut self, cid: Cid, peers: impl Iterator<Item = PeerId>) -> QueryId {
170 self.query_manager.get(None, cid, peers)
171 }
172
173 pub fn sync(
175 &mut self,
176 cid: Cid,
177 peers: Vec<PeerId>,
178 missing: impl Iterator<Item = Cid>,
179 ) -> QueryId {
180 self.query_manager.sync(cid, peers, missing)
181 }
182
183 pub fn cancel(&mut self, id: QueryId) -> bool {
185 let res = self.query_manager.cancel(id);
186 if res {
187 REQUESTS_CANCELED.inc();
188 }
189 res
190 }
191
192 pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
194 registry.register(Box::new(REQUESTS_TOTAL.clone()))?;
195 registry.register(Box::new(REQUEST_DURATION_SECONDS.clone()))?;
196 registry.register(Box::new(REQUESTS_CANCELED.clone()))?;
197 registry.register(Box::new(BLOCK_NOT_FOUND.clone()))?;
198 registry.register(Box::new(PROVIDERS_TOTAL.clone()))?;
199 registry.register(Box::new(MISSING_BLOCKS_TOTAL.clone()))?;
200 registry.register(Box::new(RECEIVED_BLOCK_BYTES.clone()))?;
201 registry.register(Box::new(RECEIVED_INVALID_BLOCK_BYTES.clone()))?;
202 registry.register(Box::new(SENT_BLOCK_BYTES.clone()))?;
203 registry.register(Box::new(RESPONSES_TOTAL.clone()))?;
204 registry.register(Box::new(THROTTLED_INBOUND.clone()))?;
205 registry.register(Box::new(THROTTLED_OUTBOUND.clone()))?;
206 registry.register(Box::new(OUTBOUND_FAILURE.clone()))?;
207 registry.register(Box::new(INBOUND_FAILURE.clone()))?;
208 Ok(())
209 }
210}
211
212enum DbRequest<P: StoreParams> {
213 Bitswap(BitswapChannel, BitswapRequest),
214 Insert(Block<P>),
215 MissingBlocks(QueryId, Cid),
216}
217
218enum DbResponse {
219 Bitswap(BitswapChannel, BitswapResponse),
220 MissingBlocks(QueryId, Result<Vec<Cid>>),
221}
222
223fn start_db_thread<S: BitswapStore>(
224 mut store: S,
225 executor: Box<dyn FnOnce(BoxFuture<'static, ()>)>,
226) -> (
227 mpsc::UnboundedSender<DbRequest<S::Params>>,
228 mpsc::UnboundedReceiver<DbResponse>,
229) {
230 let (tx, requests) = mpsc::unbounded();
231 let (responses, rx) = mpsc::unbounded();
232 executor(Box::pin(async move {
233 let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
234 while let Some(request) = requests.next().await {
235 match request {
236 DbRequest::Bitswap(channel, request) => {
237 let response = match request.ty {
238 RequestType::Have => {
239 let have = store.contains(&request.cid).await.unwrap_or_default();
240 if have {
241 RESPONSES_TOTAL.with_label_values(&["have"]).inc();
242 } else {
243 RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
244 }
245 tracing::trace!("have {}", have);
246 BitswapResponse::Have(have)
247 }
248 RequestType::Block => {
249 let block = store.get(&request.cid).await.unwrap_or_default();
250 if let Some(data) = block {
251 RESPONSES_TOTAL.with_label_values(&["block"]).inc();
252 SENT_BLOCK_BYTES.inc_by(data.len() as u64);
253 tracing::trace!("block {}", data.len());
254 BitswapResponse::Block(data)
255 } else {
256 RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
257 tracing::trace!("have false");
258 BitswapResponse::Have(false)
259 }
260 }
261 };
262 responses
263 .unbounded_send(DbResponse::Bitswap(channel, response))
264 .ok();
265 }
266 DbRequest::Insert(block) => {
267 if let Err(err) = store.insert(&block).await {
268 tracing::error!("error inserting blocks {}", err);
269 }
270 }
271 DbRequest::MissingBlocks(id, cid) => {
272 let res = store.missing_blocks(&cid).await;
273 responses
274 .unbounded_send(DbResponse::MissingBlocks(id, res))
275 .ok();
276 }
277 }
278 }
279 }));
280 (tx, rx)
281}
282
283impl<P: StoreParams> Bitswap<P> {
284 fn inject_request(&mut self, channel: BitswapChannel, request: BitswapRequest) {
286 self.db_tx
287 .unbounded_send(DbRequest::Bitswap(channel, request))
288 .ok();
289 }
290
291 fn inject_response(&mut self, id: BitswapId, peer: PeerId, response: BitswapResponse) {
293 if let Some(id) = self.requests.remove(&id) {
294 match response {
295 BitswapResponse::Have(have) => {
296 self.query_manager
297 .inject_response(id, Response::Have(peer, have));
298 }
299 BitswapResponse::Block(data) => {
300 if let Some(info) = self.query_manager.query_info(id) {
301 let len = data.len();
302 if let Ok(block) = Block::new(info.cid, data) {
303 RECEIVED_BLOCK_BYTES.inc_by(len as u64);
304 self.db_tx.unbounded_send(DbRequest::Insert(block)).ok();
305 self.query_manager
306 .inject_response(id, Response::Block(peer, true));
307 } else {
308 tracing::error!("received invalid block");
309 RECEIVED_INVALID_BLOCK_BYTES.inc_by(len as u64);
310 self.query_manager
311 .inject_response(id, Response::Block(peer, false));
312 }
313 }
314 }
315 }
316 }
317 }
318
319 fn inject_outbound_failure(
320 &mut self,
321 peer: &PeerId,
322 request_id: OutboundRequestId,
323 error: &OutboundFailure,
324 ) {
325 tracing::debug!(
326 "bitswap outbound failure {} {} {:?}",
327 peer,
328 request_id,
329 error
330 );
331 match error {
332 OutboundFailure::DialFailure => {
333 OUTBOUND_FAILURE.with_label_values(&["dial_failure"]).inc();
334 }
335 OutboundFailure::Timeout => {
336 OUTBOUND_FAILURE.with_label_values(&["timeout"]).inc();
337 }
338 OutboundFailure::ConnectionClosed => {
339 OUTBOUND_FAILURE
340 .with_label_values(&["connection_closed"])
341 .inc();
342 }
343 OutboundFailure::UnsupportedProtocols => {
344 OUTBOUND_FAILURE
345 .with_label_values(&["unsupported_protocols"])
346 .inc();
347 }
348 OutboundFailure::Io(_) => {
349 OUTBOUND_FAILURE.with_label_values(&["io"]).inc();
350 }
351 }
352 }
353
354 fn inject_inbound_failure(
355 &mut self,
356 peer: &PeerId,
357 request_id: InboundRequestId,
358 error: &InboundFailure,
359 ) {
360 tracing::error!(
361 "bitswap inbound failure {} {} {:?}",
362 peer,
363 request_id,
364 error
365 );
366 match error {
367 InboundFailure::Timeout => {
368 INBOUND_FAILURE.with_label_values(&["timeout"]).inc();
369 }
370 InboundFailure::ConnectionClosed => {
371 INBOUND_FAILURE
372 .with_label_values(&["connection_closed"])
373 .inc();
374 }
375 InboundFailure::UnsupportedProtocols => {
376 INBOUND_FAILURE
377 .with_label_values(&["unsupported_protocols"])
378 .inc();
379 }
380 InboundFailure::ResponseOmission => {
381 INBOUND_FAILURE
382 .with_label_values(&["response_omission"])
383 .inc();
384 }
385 InboundFailure::Io(_) => {
386 INBOUND_FAILURE.with_label_values(&["io"]).inc();
387 }
388 }
389 }
390}
391
392impl<P: StoreParams> NetworkBehaviour for Bitswap<P> {
393 #[cfg(not(feature = "compat"))]
394 type ConnectionHandler =
395 <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler;
396
397 #[cfg(feature = "compat")]
398 #[allow(clippy::type_complexity)]
399 type ConnectionHandler = ConnectionHandlerSelect<
400 <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler,
401 OneShotHandler<CompatProtocol, CompatMessage, InboundMessage>,
402 >;
403 type ToSwarm = BitswapEvent;
404
405 fn handle_established_inbound_connection(
406 &mut self,
407 connection_id: ConnectionId,
408 peer: PeerId,
409 local_addr: &Multiaddr,
410 remote_addr: &Multiaddr,
411 ) -> Result<THandler<Self>, ConnectionDenied> {
412 #[cfg(not(feature = "compat"))]
413 return self.inner.handle_established_inbound_connection(
414 connection_id,
415 peer,
416 local_addr,
417 remote_addr,
418 );
419 #[cfg(feature = "compat")]
420 return self
421 .inner
422 .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)
423 .map(|handle| ConnectionHandler::select(handle, OneShotHandler::default()));
424 }
425
426 fn handle_established_outbound_connection(
427 &mut self,
428 connection_id: ConnectionId,
429 peer: PeerId,
430 addr: &Multiaddr,
431 role_override: Endpoint,
432 ) -> Result<THandler<Self>, ConnectionDenied> {
433 #[cfg(not(feature = "compat"))]
434 return self.inner.handle_established_outbound_connection(
435 connection_id,
436 peer,
437 addr,
438 role_override,
439 );
440 #[cfg(feature = "compat")]
441 return self
442 .inner
443 .handle_established_outbound_connection(connection_id, peer, addr, role_override)
444 .map(|handle| ConnectionHandler::select(handle, OneShotHandler::default()));
445 }
446
447 fn handle_pending_outbound_connection(
448 &mut self,
449 connection_id: ConnectionId,
450 maybe_peer: Option<PeerId>,
451 addresses: &[Multiaddr],
452 effective_role: Endpoint,
453 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
454 self.inner.handle_pending_outbound_connection(
455 connection_id,
456 maybe_peer,
457 addresses,
458 effective_role,
459 )
460 }
461
462 fn on_swarm_event(&mut self, event: FromSwarm) {
463 match event {
464 FromSwarm::ConnectionEstablished(ConnectionEstablished {
465 peer_id,
466 other_established,
467 connection_id,
468 endpoint,
469 failed_addresses,
470 }) => {
471 if other_established == 0 {
472 self.query_manager.add_peer(&peer_id);
473 }
474
475 self.inner.on_swarm_event(FromSwarm::ConnectionEstablished(
476 ConnectionEstablished {
477 peer_id,
478 other_established,
479 connection_id,
480 endpoint,
481 failed_addresses,
482 },
483 ));
484 }
485 FromSwarm::ConnectionClosed(ConnectionClosed {
486 peer_id,
487 connection_id,
488 endpoint,
489 remaining_established,
490 }) => {
491 #[cfg(feature = "compat")]
492 if remaining_established == 0 {
493 self.compat.remove(&peer_id);
494 }
495 if remaining_established == 0 {
496 self.query_manager.remove_peer(&peer_id);
497 }
498 self.inner
499 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
500 peer_id,
501 connection_id,
502 endpoint,
503 remaining_established,
504 }));
505 }
506 ev => self.inner.on_swarm_event(ev),
507 }
508 }
509
510 fn on_connection_handler_event(
511 &mut self,
512 peer_id: PeerId,
513 conn: ConnectionId,
514 event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
515 ) {
516 tracing::trace!(?event, "on_connection_handler_event");
517 #[cfg(not(feature = "compat"))]
518 return self.inner.on_connection_handler_event(peer_id, conn, event);
519 #[cfg(feature = "compat")]
520 match event {
521 Either::Left(event) => self.inner.on_connection_handler_event(peer_id, conn, event),
522 Either::Right(msg) => {
523 if let Ok(msg) = msg {
524 for msg in msg.0 {
525 match msg {
526 CompatMessage::Request(req) => {
527 tracing::trace!("received compat request");
528 self.inject_request(BitswapChannel::Compat(peer_id, req.cid), req);
529 }
530 CompatMessage::Response(cid, res) => {
531 tracing::trace!("received compat response");
532 self.inject_response(BitswapId::Compat(cid), peer_id, res);
533 }
534 }
535 }
536 }
537 }
538 }
539 }
540
541 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
542 let mut exit = false;
543 while !exit {
544 exit = true;
545 while let Poll::Ready(Some(response)) = Pin::new(&mut self.db_rx).poll_next(cx) {
546 exit = false;
547 match response {
548 DbResponse::Bitswap(channel, response) => match channel {
549 BitswapChannel::Bitswap(channel) => {
550 self.inner.send_response(channel, response).ok();
551 }
552 #[cfg(feature = "compat")]
553 BitswapChannel::Compat(peer_id, cid) => {
554 let compat = CompatMessage::Response(cid, response);
555 return Poll::Ready(ToSwarm::NotifyHandler {
556 peer_id,
557 handler: NotifyHandler::Any,
558 event: Either::Right(compat),
559 });
560 }
561 },
562 DbResponse::MissingBlocks(id, res) => match res {
563 Ok(missing) => {
564 MISSING_BLOCKS_TOTAL.inc_by(missing.len() as u64);
565 self.query_manager
566 .inject_response(id, Response::MissingBlocks(missing));
567 }
568 Err(err) => {
569 self.query_manager.cancel(id);
570 let event = BitswapEvent::Complete(id, Err(err));
571 return Poll::Ready(ToSwarm::GenerateEvent(event));
572 }
573 },
574 }
575 }
576 while let Some(query) = self.query_manager.next() {
577 exit = false;
578 match query {
579 QueryEvent::Request(id, req) => match req {
580 Request::Have(peer_id, cid) => {
581 let req = BitswapRequest {
582 ty: RequestType::Have,
583 cid,
584 };
585 let rid = self.inner.send_request(&peer_id, req);
586 self.requests.insert(BitswapId::Bitswap(rid), id);
587 }
588 Request::Block(peer_id, cid) => {
589 let req = BitswapRequest {
590 ty: RequestType::Block,
591 cid,
592 };
593 let rid = self.inner.send_request(&peer_id, req);
594 self.requests.insert(BitswapId::Bitswap(rid), id);
595 }
596 Request::MissingBlocks(cid) => {
597 self.db_tx
598 .unbounded_send(DbRequest::MissingBlocks(id, cid))
599 .ok();
600 }
601 },
602 QueryEvent::Progress(id, missing) => {
603 let event = BitswapEvent::Progress(id, missing);
604 return Poll::Ready(ToSwarm::GenerateEvent(event));
605 }
606 QueryEvent::Complete(id, res) => {
607 if res.is_err() {
608 BLOCK_NOT_FOUND.inc();
609 }
610 let event = BitswapEvent::Complete(
611 id,
612 res.map_err(|cid| BlockNotFound(cid).into()),
613 );
614 return Poll::Ready(ToSwarm::GenerateEvent(event));
615 }
616 }
617 }
618 while let Poll::Ready(event) = self.inner.poll(cx) {
619 exit = false;
620 let event = match event {
621 ToSwarm::GenerateEvent(event) => event,
622 ToSwarm::Dial { opts } => {
623 return Poll::Ready(ToSwarm::Dial { opts });
624 }
625 ToSwarm::NotifyHandler {
626 peer_id,
627 handler,
628 event,
629 } => {
630 return Poll::Ready(ToSwarm::NotifyHandler {
631 peer_id,
632 handler,
633 #[cfg(not(feature = "compat"))]
634 event,
635 #[cfg(feature = "compat")]
636 event: Either::Left(event),
637 });
638 }
639 ToSwarm::ListenOn { opts } => {
640 return Poll::Ready(ToSwarm::ListenOn { opts });
641 }
642 ToSwarm::RemoveListener { id } => {
643 return Poll::Ready(ToSwarm::RemoveListener { id });
644 }
645 ToSwarm::NewExternalAddrCandidate(address) => {
646 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(address));
647 }
648 ToSwarm::ExternalAddrConfirmed(address) => {
649 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(address));
650 }
651 ToSwarm::ExternalAddrExpired(address) => {
652 return Poll::Ready(ToSwarm::ExternalAddrExpired(address));
653 }
654 ToSwarm::CloseConnection {
655 peer_id,
656 connection,
657 } => {
658 return Poll::Ready(ToSwarm::CloseConnection {
659 peer_id,
660 connection,
661 });
662 }
663 _ => todo!(),
664 };
665 match event {
666 RequestResponseEvent::Message { peer, message } => match message {
667 RequestResponseMessage::Request {
668 request_id: _,
669 request,
670 channel,
671 } => self.inject_request(BitswapChannel::Bitswap(channel), request),
672 RequestResponseMessage::Response {
673 request_id,
674 response,
675 } => self.inject_response(BitswapId::Bitswap(request_id), peer, response),
676 },
677 RequestResponseEvent::ResponseSent { .. } => {}
678 RequestResponseEvent::OutboundFailure {
679 peer,
680 request_id,
681 error,
682 } => {
683 self.inject_outbound_failure(&peer, request_id, &error);
684 #[cfg(feature = "compat")]
685 if let OutboundFailure::UnsupportedProtocols = error {
686 if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id))
687 {
688 if let Some(info) = self.query_manager.query_info(id) {
689 let ty = match info.label {
690 "have" => RequestType::Have,
691 "block" => RequestType::Block,
692 _ => unreachable!(),
693 };
694 let request = BitswapRequest { ty, cid: info.cid };
695 self.requests.insert(BitswapId::Compat(info.cid), id);
696 tracing::trace!("adding compat peer {}", peer);
697 self.compat.insert(peer);
698 return Poll::Ready(ToSwarm::NotifyHandler {
699 peer_id: peer,
700 handler: NotifyHandler::Any,
701 event: Either::Right(CompatMessage::Request(request)),
702 });
703 }
704 }
705 }
706 if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id)) {
707 self.query_manager
708 .inject_response(id, Response::Have(peer, false));
709 }
710 }
711 RequestResponseEvent::InboundFailure {
712 peer,
713 request_id,
714 error,
715 } => {
716 self.inject_inbound_failure(&peer, request_id, &error);
717 }
718 }
719 }
720 }
721 Poll::Pending
722 }
723}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728 use async_std::task;
729 use futures::prelude::*;
730 use libipld::block::Block;
731 use libipld::cbor::DagCborCodec;
732 use libipld::ipld;
733 use libipld::ipld::Ipld;
734 use libipld::multihash::Code;
735 use libipld::store::DefaultParams;
736 use libp2p::swarm::SwarmEvent;
737 use libp2p::SwarmBuilder;
738 use libp2p::{PeerId, Swarm};
739 use std::sync::{Arc, Mutex};
740 use std::time::Duration;
741 use tracing_subscriber::fmt::TestWriter;
742
743 fn tracing_try_init() {
744 tracing_subscriber::fmt()
745 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
746 .with_writer(TestWriter::new())
747 .try_init()
748 .ok();
749 }
750
751 fn create_block(ipld: Ipld) -> Block<DefaultParams> {
752 Block::encode(DagCborCodec, Code::Blake3_256, &ipld).unwrap()
753 }
754
755 #[derive(Clone, Default)]
756 struct Store(Arc<Mutex<FnvHashMap<Cid, Vec<u8>>>>);
757
758 #[async_trait::async_trait]
759 impl BitswapStore for Store {
760 type Params = DefaultParams;
761 async fn contains(&mut self, cid: &Cid) -> Result<bool> {
762 Ok(self.0.lock().unwrap().contains_key(cid))
763 }
764 async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
765 Ok(self.0.lock().unwrap().get(cid).cloned())
766 }
767 async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
768 self.0
769 .lock()
770 .unwrap()
771 .insert(*block.cid(), block.data().to_vec());
772 Ok(())
773 }
774 async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
775 let mut stack = vec![*cid];
776 let mut missing = vec![];
777 while let Some(cid) = stack.pop() {
778 if let Some(data) = self.get(&cid).await? {
779 let block = Block::<Self::Params>::new_unchecked(cid, data);
780 block.references(&mut stack)?;
781 } else {
782 missing.push(cid);
783 }
784 }
785 Ok(missing)
786 }
787 }
788
789 struct Peer {
790 peer_id: PeerId,
791 addr: Multiaddr,
792 store: Store,
793 swarm: Swarm<Bitswap<DefaultParams>>,
794 }
795
796 impl Peer {
797 fn new() -> Self {
798 let store = Store::default();
799 let mut swarm = SwarmBuilder::with_new_identity()
800 .with_async_std()
801 .with_tcp(
802 Default::default(),
803 libp2p::noise::Config::new,
804 libp2p::yamux::Config::default,
805 )
806 .expect("valid")
807 .with_behaviour(|_| {
808 Bitswap::new(
809 BitswapConfig::new(),
810 store.clone(),
811 Box::new(|t| {
812 async_std::task::spawn(t);
813 }),
814 )
815 })
816 .expect("Valid")
817 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
818 .build();
819 let peer_id = *swarm.local_peer_id();
820 Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
821 while swarm.next().now_or_never().is_some() {}
822 let addr = Swarm::listeners(&swarm).next().unwrap().clone();
823 Self {
824 peer_id,
825 addr,
826 store,
827 swarm,
828 }
829 }
830
831 fn add_address(&mut self, peer: &Peer) {
832 self.swarm
833 .behaviour_mut()
834 .add_address(&peer.peer_id, peer.addr.clone());
835 }
836
837 fn store(&mut self) -> impl std::ops::DerefMut<Target = FnvHashMap<Cid, Vec<u8>>> + '_ {
838 self.store.0.lock().unwrap()
839 }
840
841 fn swarm(&mut self) -> &mut Swarm<Bitswap<DefaultParams>> {
842 &mut self.swarm
843 }
844
845 fn spawn(mut self, name: &'static str) -> PeerId {
846 let peer_id = self.peer_id;
847 task::spawn(async move {
848 loop {
849 let event = self.swarm.next().await;
850 tracing::debug!("{}: {:?}", name, event);
851 }
852 });
853 peer_id
854 }
855
856 async fn next(&mut self) -> Option<BitswapEvent> {
857 loop {
858 let ev = self.swarm.next().await?;
859 if let SwarmEvent::Behaviour(event) = ev {
860 return Some(event);
861 }
862 }
863 }
864 }
865
866 fn assert_progress(event: Option<BitswapEvent>, id: QueryId, missing: usize) {
867 if let Some(BitswapEvent::Progress(id2, missing2)) = event {
868 assert_eq!(id2, id);
869 assert_eq!(missing2, missing);
870 } else {
871 panic!("{:?} is not a progress event", event);
872 }
873 }
874
875 fn assert_complete_ok(event: Option<BitswapEvent>, id: QueryId) {
876 if let Some(BitswapEvent::Complete(id2, Ok(()))) = event {
877 assert_eq!(id2, id);
878 } else {
879 panic!("{:?} is not a complete event", event);
880 }
881 }
882
883 #[async_std::test]
884 async fn test_bitswap_get() {
885 tracing_try_init();
886 let mut peer1 = Peer::new();
887 let mut peer2 = Peer::new();
888 peer2.add_address(&peer1);
889
890 let block = create_block(ipld!(&b"hello world"[..]));
891 peer1.store().insert(*block.cid(), block.data().to_vec());
892 let peer1 = peer1.spawn("peer1");
893
894 let id = peer2
895 .swarm()
896 .behaviour_mut()
897 .get(*block.cid(), std::iter::once(peer1));
898
899 assert_complete_ok(peer2.next().await, id);
900 }
901
902 #[async_std::test]
903 async fn test_bitswap_cancel_get() {
904 tracing_try_init();
905 let mut peer1 = Peer::new();
906 let mut peer2 = Peer::new();
907 peer2.add_address(&peer1);
908
909 let block = create_block(ipld!(&b"hello world"[..]));
910 peer1.store().insert(*block.cid(), block.data().to_vec());
911 let peer1 = peer1.spawn("peer1");
912
913 let id = peer2
914 .swarm()
915 .behaviour_mut()
916 .get(*block.cid(), std::iter::once(peer1));
917 peer2.swarm().behaviour_mut().cancel(id);
918 let res = peer2.next().now_or_never();
919 println!("{:?}", res);
920 assert!(res.is_none());
921 }
922
923 #[async_std::test]
924 async fn test_bitswap_sync() {
925 tracing_try_init();
926 let mut peer1 = Peer::new();
927 let mut peer2 = Peer::new();
928 peer2.add_address(&peer1);
929
930 let b0 = create_block(ipld!({
931 "n": 0,
932 }));
933 let b1 = create_block(ipld!({
934 "prev": b0.cid(),
935 "n": 1,
936 }));
937 let b2 = create_block(ipld!({
938 "prev": b1.cid(),
939 "n": 2,
940 }));
941 peer1.store().insert(*b0.cid(), b0.data().to_vec());
942 peer1.store().insert(*b1.cid(), b1.data().to_vec());
943 peer1.store().insert(*b2.cid(), b2.data().to_vec());
944 let peer1 = peer1.spawn("peer1");
945
946 let id =
947 peer2
948 .swarm()
949 .behaviour_mut()
950 .sync(*b2.cid(), vec![peer1], std::iter::once(*b2.cid()));
951
952 assert_progress(peer2.next().await, id, 1);
953 assert_progress(peer2.next().await, id, 1);
954
955 assert_complete_ok(peer2.next().await, id);
956 }
957
958 #[async_std::test]
959 async fn test_bitswap_cancel_sync() {
960 tracing_try_init();
961 let mut peer1 = Peer::new();
962 let mut peer2 = Peer::new();
963 peer2.add_address(&peer1);
964
965 let block = create_block(ipld!(&b"hello world"[..]));
966 peer1.store().insert(*block.cid(), block.data().to_vec());
967 let peer1 = peer1.spawn("peer1");
968
969 let id = peer2.swarm().behaviour_mut().sync(
970 *block.cid(),
971 vec![peer1],
972 std::iter::once(*block.cid()),
973 );
974 peer2.swarm().behaviour_mut().cancel(id);
975 let res = peer2.next().now_or_never();
976 println!("{:?}", res);
977 assert!(res.is_none());
978 }
979
980 #[cfg(feature = "compat")]
981 #[async_std::test]
982 async fn compat_test() {
983 tracing_try_init();
984 let cid: Cid = "QmP8njGuyiw9cjkhwHD9nZhyBTHufXFanAvZgcy9xYoWiB"
985 .parse()
986 .unwrap();
987 let peer_id: PeerId = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
988 .parse()
989 .unwrap();
990 let multiaddr: Multiaddr = "/ip4/104.131.131.82/tcp/4001".parse().unwrap();
991
992 let mut peer = Peer::new();
993 peer.swarm()
994 .behaviour_mut()
995 .add_address(&peer_id, multiaddr);
996 let id = peer
997 .swarm()
998 .behaviour_mut()
999 .get(cid, std::iter::once(peer_id));
1000 assert_complete_ok(peer.next().await, id);
1001 }
1002}