1#[cfg(feature = "compat")]
9use crate::compat::{CompatMessage, CompatProtocol, InboundMessage};
10use crate::protocol::{
11 BitswapCodec, BitswapProtocol, BitswapRequest, BitswapResponse, RequestType,
12};
13use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response};
14use crate::stats::*;
15use fnv::FnvHashMap;
16#[cfg(feature = "compat")]
17use fnv::FnvHashSet;
18use futures::{
19 channel::mpsc,
20 stream::{Stream, StreamExt},
21 task::{Context, Poll},
22};
23use libipld::{error::BlockNotFound, store::StoreParams, Block, Cid, Result};
24#[cfg(feature = "compat")]
25use libp2p::core::either::EitherOutput;
26use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId};
27use libp2p::swarm::derive_prelude::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
28#[cfg(feature = "compat")]
29use libp2p::swarm::{ConnectionHandlerSelect, NotifyHandler, OneShotHandler};
30use libp2p::{
31 request_response::{
32 InboundFailure, OutboundFailure, ProtocolSupport, RequestId, RequestResponse,
33 RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
34 },
35 swarm::{ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters},
36};
37use prometheus::Registry;
38use std::{pin::Pin, time::Duration};
39
40pub type Channel = ResponseChannel<BitswapResponse>;
42
43#[derive(Debug)]
45pub enum BitswapEvent {
46 Progress(QueryId, usize),
50 Complete(QueryId, Result<()>),
52}
53
54pub trait BitswapStore: Send + Sync + 'static {
56 type Params: StoreParams;
58 fn contains(&mut self, cid: &Cid) -> Result<bool>;
60 fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
62 fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
64 fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
66}
67
68#[derive(Clone, Copy, Debug, Eq, PartialEq)]
70pub struct BitswapConfig {
71 pub request_timeout: Duration,
73 pub connection_keep_alive: Duration,
75}
76
77impl BitswapConfig {
78 pub fn new() -> Self {
80 Self {
81 request_timeout: Duration::from_secs(10),
82 connection_keep_alive: Duration::from_secs(10),
83 }
84 }
85}
86
87impl Default for BitswapConfig {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
94enum BitswapId {
95 Bitswap(RequestId),
96 #[cfg(feature = "compat")]
97 Compat(Cid),
98}
99
100enum BitswapChannel {
101 Bitswap(Channel),
102 #[cfg(feature = "compat")]
103 Compat(PeerId, Cid),
104}
105
106pub struct Bitswap<P: StoreParams> {
108 inner: RequestResponse<BitswapCodec<P>>,
110 query_manager: QueryManager,
112 requests: FnvHashMap<BitswapId, QueryId>,
114 db_tx: mpsc::UnboundedSender<DbRequest<P>>,
116 db_rx: mpsc::UnboundedReceiver<DbResponse>,
118 #[cfg(feature = "compat")]
120 compat: FnvHashSet<PeerId>,
121}
122
123impl<P: StoreParams> Bitswap<P> {
124 pub fn new<S: BitswapStore<Params = P>>(config: BitswapConfig, store: S) -> Self {
126 let mut rr_config = RequestResponseConfig::default();
127 rr_config.set_connection_keep_alive(config.connection_keep_alive);
128 rr_config.set_request_timeout(config.request_timeout);
129 let protocols = std::iter::once((BitswapProtocol, ProtocolSupport::Full));
130 let inner = RequestResponse::new(BitswapCodec::<P>::default(), protocols, rr_config);
131 let (db_tx, db_rx) = start_db_thread(store);
132 Self {
133 inner,
134 query_manager: Default::default(),
135 requests: Default::default(),
136 db_tx,
137 db_rx,
138 #[cfg(feature = "compat")]
139 compat: Default::default(),
140 }
141 }
142
143 pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
145 self.inner.add_address(peer_id, addr);
146 }
147
148 pub fn remove_address(&mut self, peer_id: &PeerId, addr: &Multiaddr) {
150 self.inner.remove_address(peer_id, addr);
151 }
152
153 pub fn get(&mut self, cid: Cid, peers: impl Iterator<Item = PeerId>) -> QueryId {
155 self.query_manager.get(None, cid, peers)
156 }
157
158 pub fn sync(
160 &mut self,
161 cid: Cid,
162 peers: Vec<PeerId>,
163 missing: impl Iterator<Item = Cid>,
164 ) -> QueryId {
165 self.query_manager.sync(cid, peers, missing)
166 }
167
168 pub fn cancel(&mut self, id: QueryId) -> bool {
170 let res = self.query_manager.cancel(id);
171 if res {
172 REQUESTS_CANCELED.inc();
173 }
174 res
175 }
176
177 pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
179 registry.register(Box::new(REQUESTS_TOTAL.clone()))?;
180 registry.register(Box::new(REQUEST_DURATION_SECONDS.clone()))?;
181 registry.register(Box::new(REQUESTS_CANCELED.clone()))?;
182 registry.register(Box::new(BLOCK_NOT_FOUND.clone()))?;
183 registry.register(Box::new(PROVIDERS_TOTAL.clone()))?;
184 registry.register(Box::new(MISSING_BLOCKS_TOTAL.clone()))?;
185 registry.register(Box::new(RECEIVED_BLOCK_BYTES.clone()))?;
186 registry.register(Box::new(RECEIVED_INVALID_BLOCK_BYTES.clone()))?;
187 registry.register(Box::new(SENT_BLOCK_BYTES.clone()))?;
188 registry.register(Box::new(RESPONSES_TOTAL.clone()))?;
189 registry.register(Box::new(THROTTLED_INBOUND.clone()))?;
190 registry.register(Box::new(THROTTLED_OUTBOUND.clone()))?;
191 registry.register(Box::new(OUTBOUND_FAILURE.clone()))?;
192 registry.register(Box::new(INBOUND_FAILURE.clone()))?;
193 Ok(())
194 }
195}
196
197enum DbRequest<P: StoreParams> {
198 Bitswap(BitswapChannel, BitswapRequest),
199 Insert(Block<P>),
200 MissingBlocks(QueryId, Cid),
201}
202
203enum DbResponse {
204 Bitswap(BitswapChannel, BitswapResponse),
205 MissingBlocks(QueryId, Result<Vec<Cid>>),
206}
207
208fn start_db_thread<S: BitswapStore>(
209 mut store: S,
210) -> (
211 mpsc::UnboundedSender<DbRequest<S::Params>>,
212 mpsc::UnboundedReceiver<DbResponse>,
213) {
214 let (tx, requests) = mpsc::unbounded();
215 let (responses, rx) = mpsc::unbounded();
216 std::thread::spawn(move || {
217 let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
218 while let Some(request) = futures::executor::block_on(requests.next()) {
219 match request {
220 DbRequest::Bitswap(channel, request) => {
221 let response = match request.ty {
222 RequestType::Have => {
223 let have = store.contains(&request.cid).ok().unwrap_or_default();
224 if have {
225 RESPONSES_TOTAL.with_label_values(&["have"]).inc();
226 } else {
227 RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
228 }
229 tracing::trace!("have {}", have);
230 BitswapResponse::Have(have)
231 }
232 RequestType::Block => {
233 let block = store.get(&request.cid).ok().unwrap_or_default();
234 if let Some(data) = block {
235 RESPONSES_TOTAL.with_label_values(&["block"]).inc();
236 SENT_BLOCK_BYTES.inc_by(data.len() as u64);
237 tracing::trace!("block {}", data.len());
238 BitswapResponse::Block(data)
239 } else {
240 RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
241 tracing::trace!("have false");
242 BitswapResponse::Have(false)
243 }
244 }
245 };
246 responses
247 .unbounded_send(DbResponse::Bitswap(channel, response))
248 .ok();
249 }
250 DbRequest::Insert(block) => {
251 if let Err(err) = store.insert(&block) {
252 tracing::error!("error inserting blocks {}", err);
253 }
254 }
255 DbRequest::MissingBlocks(id, cid) => {
256 let res = store.missing_blocks(&cid);
257 responses
258 .unbounded_send(DbResponse::MissingBlocks(id, res))
259 .ok();
260 }
261 }
262 }
263 });
264 (tx, rx)
265}
266
267impl<P: StoreParams> Bitswap<P> {
268 fn inject_request(&mut self, channel: BitswapChannel, request: BitswapRequest) {
270 self.db_tx
271 .unbounded_send(DbRequest::Bitswap(channel, request))
272 .ok();
273 }
274
275 fn inject_response(&mut self, id: BitswapId, peer: PeerId, response: BitswapResponse) {
277 if let Some(id) = self.requests.remove(&id) {
278 match response {
279 BitswapResponse::Have(have) => {
280 self.query_manager
281 .inject_response(id, Response::Have(peer, have));
282 }
283 BitswapResponse::Block(data) => {
284 if let Some(info) = self.query_manager.query_info(id) {
285 let len = data.len();
286 if let Ok(block) = Block::new(info.cid, data) {
287 RECEIVED_BLOCK_BYTES.inc_by(len as u64);
288 self.db_tx.unbounded_send(DbRequest::Insert(block)).ok();
289 self.query_manager
290 .inject_response(id, Response::Block(peer, true));
291 } else {
292 tracing::error!("received invalid block");
293 RECEIVED_INVALID_BLOCK_BYTES.inc_by(len as u64);
294 self.query_manager
295 .inject_response(id, Response::Block(peer, false));
296 }
297 }
298 }
299 }
300 }
301 }
302
303 fn inject_outbound_failure(
304 &mut self,
305 peer: &PeerId,
306 request_id: RequestId,
307 error: &OutboundFailure,
308 ) {
309 tracing::debug!(
310 "bitswap outbound failure {} {} {:?}",
311 peer,
312 request_id,
313 error
314 );
315 match error {
316 OutboundFailure::DialFailure => {
317 OUTBOUND_FAILURE.with_label_values(&["dial_failure"]).inc();
318 }
319 OutboundFailure::Timeout => {
320 OUTBOUND_FAILURE.with_label_values(&["timeout"]).inc();
321 }
322 OutboundFailure::ConnectionClosed => {
323 OUTBOUND_FAILURE
324 .with_label_values(&["connection_closed"])
325 .inc();
326 }
327 OutboundFailure::UnsupportedProtocols => {
328 OUTBOUND_FAILURE
329 .with_label_values(&["unsupported_protocols"])
330 .inc();
331 }
332 }
333 }
334
335 fn inject_inbound_failure(
336 &mut self,
337 peer: &PeerId,
338 request_id: RequestId,
339 error: &InboundFailure,
340 ) {
341 tracing::error!(
342 "bitswap inbound failure {} {} {:?}",
343 peer,
344 request_id,
345 error
346 );
347 match error {
348 InboundFailure::Timeout => {
349 INBOUND_FAILURE.with_label_values(&["timeout"]).inc();
350 }
351 InboundFailure::ConnectionClosed => {
352 INBOUND_FAILURE
353 .with_label_values(&["connection_closed"])
354 .inc();
355 }
356 InboundFailure::UnsupportedProtocols => {
357 INBOUND_FAILURE
358 .with_label_values(&["unsupported_protocols"])
359 .inc();
360 }
361 InboundFailure::ResponseOmission => {
362 INBOUND_FAILURE
363 .with_label_values(&["response_omission"])
364 .inc();
365 }
366 }
367 }
368}
369
370impl<P: StoreParams> NetworkBehaviour for Bitswap<P> {
371 #[cfg(not(feature = "compat"))]
372 type ConnectionHandler =
373 <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler;
374
375 #[cfg(feature = "compat")]
376 #[allow(clippy::type_complexity)]
377 type ConnectionHandler = ConnectionHandlerSelect<
378 <RequestResponse<BitswapCodec<P>> as NetworkBehaviour>::ConnectionHandler,
379 OneShotHandler<CompatProtocol, CompatMessage, InboundMessage>,
380 >;
381 type OutEvent = BitswapEvent;
382
383 fn new_handler(&mut self) -> Self::ConnectionHandler {
384 #[cfg(not(feature = "compat"))]
385 return self.inner.new_handler();
386 #[cfg(feature = "compat")]
387 ConnectionHandler::select(self.inner.new_handler(), OneShotHandler::default())
388 }
389
390 fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
391 self.inner.addresses_of_peer(peer_id)
392 }
393
394 fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
395 match event {
396 FromSwarm::ConnectionEstablished(ev) => self
397 .inner
398 .on_swarm_event(FromSwarm::ConnectionEstablished(ev)),
399 FromSwarm::ConnectionClosed(ConnectionClosed {
400 peer_id,
401 connection_id,
402 endpoint,
403 handler,
404 remaining_established,
405 }) => {
406 #[cfg(feature = "compat")]
407 if remaining_established == 0 {
408 self.compat.remove(&peer_id);
409 }
410 #[cfg(feature = "compat")]
411 let (handler, _oneshot) = handler.into_inner();
412 self.inner
413 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
414 peer_id,
415 connection_id,
416 endpoint,
417 handler,
418 remaining_established,
419 }));
420 }
421 FromSwarm::DialFailure(DialFailure {
422 peer_id,
423 handler,
424 error,
425 }) => {
426 #[cfg(feature = "compat")]
427 let (handler, _oneshot) = handler.into_inner();
428 self.inner
429 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
430 peer_id,
431 handler,
432 error,
433 }));
434 }
435 FromSwarm::AddressChange(ev) => self.inner.on_swarm_event(FromSwarm::AddressChange(ev)),
436 FromSwarm::ListenFailure(ListenFailure {
437 local_addr,
438 send_back_addr,
439 handler,
440 }) => {
441 #[cfg(feature = "compat")]
442 let (handler, _oneshot) = handler.into_inner();
443 self.inner
444 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
445 local_addr,
446 send_back_addr,
447 handler,
448 }));
449 }
450 FromSwarm::NewListener(ev) => self.inner.on_swarm_event(FromSwarm::NewListener(ev)),
451 FromSwarm::NewListenAddr(ev) => self.inner.on_swarm_event(FromSwarm::NewListenAddr(ev)),
452 FromSwarm::ExpiredListenAddr(ev) => {
453 self.inner.on_swarm_event(FromSwarm::ExpiredListenAddr(ev))
454 }
455 FromSwarm::ListenerError(ev) => self.inner.on_swarm_event(FromSwarm::ListenerError(ev)),
456 FromSwarm::ListenerClosed(ev) => {
457 self.inner.on_swarm_event(FromSwarm::ListenerClosed(ev))
458 }
459 FromSwarm::NewExternalAddr(ev) => {
460 self.inner.on_swarm_event(FromSwarm::NewExternalAddr(ev))
461 }
462 FromSwarm::ExpiredExternalAddr(ev) => self
463 .inner
464 .on_swarm_event(FromSwarm::ExpiredExternalAddr(ev)),
465 }
466 }
467
468 fn on_connection_handler_event(
469 &mut self,
470 peer_id: PeerId,
471 conn: ConnectionId,
472 event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
473 ) {
474 #[cfg(not(feature = "compat"))]
475 return self.inner.on_connection_handler_event(peer_id, conn, event);
476 #[cfg(feature = "compat")]
477 match event {
478 EitherOutput::First(event) => {
479 self.inner.on_connection_handler_event(peer_id, conn, event)
480 }
481 EitherOutput::Second(msg) => {
482 for msg in msg.0 {
483 match msg {
484 CompatMessage::Request(req) => {
485 tracing::trace!("received compat request");
486 self.inject_request(BitswapChannel::Compat(peer_id, req.cid), req);
487 }
488 CompatMessage::Response(cid, res) => {
489 tracing::trace!("received compat response");
490 self.inject_response(BitswapId::Compat(cid), peer_id, res);
491 }
492 }
493 }
494 }
495 }
496 }
497
498 fn poll(
499 &mut self,
500 cx: &mut Context,
501 pp: &mut impl PollParameters,
502 ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
503 let mut exit = false;
504 while !exit {
505 exit = true;
506 while let Poll::Ready(Some(response)) = Pin::new(&mut self.db_rx).poll_next(cx) {
507 exit = false;
508 match response {
509 DbResponse::Bitswap(channel, response) => match channel {
510 BitswapChannel::Bitswap(channel) => {
511 self.inner.send_response(channel, response).ok();
512 }
513 #[cfg(feature = "compat")]
514 BitswapChannel::Compat(peer_id, cid) => {
515 let compat = CompatMessage::Response(cid, response);
516 return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
517 peer_id,
518 handler: NotifyHandler::Any,
519 event: EitherOutput::Second(compat),
520 });
521 }
522 },
523 DbResponse::MissingBlocks(id, res) => match res {
524 Ok(missing) => {
525 MISSING_BLOCKS_TOTAL.inc_by(missing.len() as u64);
526 self.query_manager
527 .inject_response(id, Response::MissingBlocks(missing));
528 }
529 Err(err) => {
530 self.query_manager.cancel(id);
531 let event = BitswapEvent::Complete(id, Err(err));
532 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
533 }
534 },
535 }
536 }
537 while let Some(query) = self.query_manager.next() {
538 exit = false;
539 match query {
540 QueryEvent::Request(id, req) => match req {
541 Request::Have(peer_id, cid) => {
542 let req = BitswapRequest {
543 ty: RequestType::Have,
544 cid,
545 };
546 let rid = self.inner.send_request(&peer_id, req);
547 self.requests.insert(BitswapId::Bitswap(rid), id);
548 }
549 Request::Block(peer_id, cid) => {
550 let req = BitswapRequest {
551 ty: RequestType::Block,
552 cid,
553 };
554 let rid = self.inner.send_request(&peer_id, req);
555 self.requests.insert(BitswapId::Bitswap(rid), id);
556 }
557 Request::MissingBlocks(cid) => {
558 self.db_tx
559 .unbounded_send(DbRequest::MissingBlocks(id, cid))
560 .ok();
561 }
562 },
563 QueryEvent::Progress(id, missing) => {
564 let event = BitswapEvent::Progress(id, missing);
565 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
566 }
567 QueryEvent::Complete(id, res) => {
568 if res.is_err() {
569 BLOCK_NOT_FOUND.inc();
570 }
571 let event = BitswapEvent::Complete(
572 id,
573 res.map_err(|cid| BlockNotFound(cid).into()),
574 );
575 return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
576 }
577 }
578 }
579 while let Poll::Ready(event) = self.inner.poll(cx, pp) {
580 exit = false;
581 let event = match event {
582 NetworkBehaviourAction::GenerateEvent(event) => event,
583 NetworkBehaviourAction::Dial { opts, handler } => {
584 #[cfg(feature = "compat")]
585 let handler = ConnectionHandler::select(handler, Default::default());
586 return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler });
587 }
588 NetworkBehaviourAction::NotifyHandler {
589 peer_id,
590 handler,
591 event,
592 } => {
593 return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
594 peer_id,
595 handler,
596 #[cfg(not(feature = "compat"))]
597 event,
598 #[cfg(feature = "compat")]
599 event: EitherOutput::First(event),
600 });
601 }
602 NetworkBehaviourAction::ReportObservedAddr { address, score } => {
603 return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
604 address,
605 score,
606 });
607 }
608 NetworkBehaviourAction::CloseConnection {
609 peer_id,
610 connection,
611 } => {
612 return Poll::Ready(NetworkBehaviourAction::CloseConnection {
613 peer_id,
614 connection,
615 });
616 }
617 };
618 match event {
619 RequestResponseEvent::Message { peer, message } => match message {
620 RequestResponseMessage::Request {
621 request_id: _,
622 request,
623 channel,
624 } => self.inject_request(BitswapChannel::Bitswap(channel), request),
625 RequestResponseMessage::Response {
626 request_id,
627 response,
628 } => self.inject_response(BitswapId::Bitswap(request_id), peer, response),
629 },
630 RequestResponseEvent::ResponseSent { .. } => {}
631 RequestResponseEvent::OutboundFailure {
632 peer,
633 request_id,
634 error,
635 } => {
636 self.inject_outbound_failure(&peer, request_id, &error);
637 #[cfg(feature = "compat")]
638 if let OutboundFailure::UnsupportedProtocols = error {
639 if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id))
640 {
641 if let Some(info) = self.query_manager.query_info(id) {
642 let ty = match info.label {
643 "have" => RequestType::Have,
644 "block" => RequestType::Block,
645 _ => unreachable!(),
646 };
647 let request = BitswapRequest { ty, cid: info.cid };
648 self.requests.insert(BitswapId::Compat(info.cid), id);
649 tracing::trace!("adding compat peer {}", peer);
650 self.compat.insert(peer);
651 return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
652 peer_id: peer,
653 handler: NotifyHandler::Any,
654 event: EitherOutput::Second(CompatMessage::Request(
655 request,
656 )),
657 });
658 }
659 }
660 }
661 if let Some(id) = self.requests.remove(&BitswapId::Bitswap(request_id)) {
662 self.query_manager
663 .inject_response(id, Response::Have(peer, false));
664 }
665 }
666 RequestResponseEvent::InboundFailure {
667 peer,
668 request_id,
669 error,
670 } => {
671 self.inject_inbound_failure(&peer, request_id, &error);
672 }
673 }
674 }
675 }
676 Poll::Pending
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use async_std::task;
684 use futures::prelude::*;
685 use libipld::block::Block;
686 use libipld::cbor::DagCborCodec;
687 use libipld::ipld;
688 use libipld::ipld::Ipld;
689 use libipld::multihash::Code;
690 use libipld::store::DefaultParams;
691 use libp2p::core::muxing::StreamMuxerBox;
692 use libp2p::core::transport::Boxed;
693 use libp2p::identity;
694 use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
695 use libp2p::swarm::SwarmEvent;
696 use libp2p::tcp::{self, async_io};
697 use libp2p::yamux::YamuxConfig;
698 use libp2p::{PeerId, Swarm, Transport};
699 use std::sync::{Arc, Mutex};
700 use std::time::Duration;
701
702 fn tracing_try_init() {
703 tracing_subscriber::fmt()
704 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
705 .try_init()
706 .ok();
707 }
708
709 fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) {
710 let id_key = identity::Keypair::generate_ed25519();
711 let peer_id = id_key.public().to_peer_id();
712 let dh_key = Keypair::<X25519Spec>::new()
713 .into_authentic(&id_key)
714 .unwrap();
715 let noise = NoiseConfig::xx(dh_key).into_authenticated();
716
717 let transport = async_io::Transport::new(tcp::Config::new().nodelay(true))
718 .upgrade(libp2p::core::upgrade::Version::V1)
719 .authenticate(noise)
720 .multiplex(YamuxConfig::default())
721 .timeout(Duration::from_secs(20))
722 .boxed();
723 (peer_id, transport)
724 }
725
726 fn create_block(ipld: Ipld) -> Block<DefaultParams> {
727 Block::encode(DagCborCodec, Code::Blake3_256, &ipld).unwrap()
728 }
729
730 #[derive(Clone, Default)]
731 struct Store(Arc<Mutex<FnvHashMap<Cid, Vec<u8>>>>);
732
733 impl BitswapStore for Store {
734 type Params = DefaultParams;
735 fn contains(&mut self, cid: &Cid) -> Result<bool> {
736 Ok(self.0.lock().unwrap().contains_key(cid))
737 }
738 fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
739 Ok(self.0.lock().unwrap().get(cid).cloned())
740 }
741 fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
742 self.0
743 .lock()
744 .unwrap()
745 .insert(*block.cid(), block.data().to_vec());
746 Ok(())
747 }
748 fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
749 let mut stack = vec![*cid];
750 let mut missing = vec![];
751 while let Some(cid) = stack.pop() {
752 if let Some(data) = self.get(&cid)? {
753 let block = Block::<Self::Params>::new_unchecked(cid, data);
754 block.references(&mut stack)?;
755 } else {
756 missing.push(cid);
757 }
758 }
759 Ok(missing)
760 }
761 }
762
763 struct Peer {
764 peer_id: PeerId,
765 addr: Multiaddr,
766 store: Store,
767 swarm: Swarm<Bitswap<DefaultParams>>,
768 }
769
770 impl Peer {
771 fn new() -> Self {
772 let (peer_id, trans) = mk_transport();
773 let store = Store::default();
774 let mut swarm = Swarm::with_async_std_executor(
775 trans,
776 Bitswap::new(BitswapConfig::new(), store.clone()),
777 peer_id,
778 );
779 Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
780 while swarm.next().now_or_never().is_some() {}
781 let addr = Swarm::listeners(&swarm).next().unwrap().clone();
782 Self {
783 peer_id,
784 addr,
785 store,
786 swarm,
787 }
788 }
789
790 fn add_address(&mut self, peer: &Peer) {
791 self.swarm
792 .behaviour_mut()
793 .add_address(&peer.peer_id, peer.addr.clone());
794 }
795
796 fn store(&mut self) -> impl std::ops::DerefMut<Target = FnvHashMap<Cid, Vec<u8>>> + '_ {
797 self.store.0.lock().unwrap()
798 }
799
800 fn swarm(&mut self) -> &mut Swarm<Bitswap<DefaultParams>> {
801 &mut self.swarm
802 }
803
804 fn spawn(mut self, name: &'static str) -> PeerId {
805 let peer_id = self.peer_id;
806 task::spawn(async move {
807 loop {
808 let event = self.swarm.next().await;
809 tracing::debug!("{}: {:?}", name, event);
810 }
811 });
812 peer_id
813 }
814
815 async fn next(&mut self) -> Option<BitswapEvent> {
816 loop {
817 let ev = self.swarm.next().await?;
818 if let SwarmEvent::Behaviour(event) = ev {
819 return Some(event);
820 }
821 }
822 }
823 }
824
825 fn assert_progress(event: Option<BitswapEvent>, id: QueryId, missing: usize) {
826 if let Some(BitswapEvent::Progress(id2, missing2)) = event {
827 assert_eq!(id2, id);
828 assert_eq!(missing2, missing);
829 } else {
830 panic!("{:?} is not a progress event", event);
831 }
832 }
833
834 fn assert_complete_ok(event: Option<BitswapEvent>, id: QueryId) {
835 if let Some(BitswapEvent::Complete(id2, Ok(()))) = event {
836 assert_eq!(id2, id);
837 } else {
838 panic!("{:?} is not a complete event", event);
839 }
840 }
841
842 #[async_std::test]
843 async fn test_bitswap_get() {
844 tracing_try_init();
845 let mut peer1 = Peer::new();
846 let mut peer2 = Peer::new();
847 peer2.add_address(&peer1);
848
849 let block = create_block(ipld!(&b"hello world"[..]));
850 peer1.store().insert(*block.cid(), block.data().to_vec());
851 let peer1 = peer1.spawn("peer1");
852
853 let id = peer2
854 .swarm()
855 .behaviour_mut()
856 .get(*block.cid(), std::iter::once(peer1));
857
858 assert_complete_ok(peer2.next().await, id);
859 }
860
861 #[async_std::test]
862 async fn test_bitswap_cancel_get() {
863 tracing_try_init();
864 let mut peer1 = Peer::new();
865 let mut peer2 = Peer::new();
866 peer2.add_address(&peer1);
867
868 let block = create_block(ipld!(&b"hello world"[..]));
869 peer1.store().insert(*block.cid(), block.data().to_vec());
870 let peer1 = peer1.spawn("peer1");
871
872 let id = peer2
873 .swarm()
874 .behaviour_mut()
875 .get(*block.cid(), std::iter::once(peer1));
876 peer2.swarm().behaviour_mut().cancel(id);
877 let res = peer2.next().now_or_never();
878 println!("{:?}", res);
879 assert!(res.is_none());
880 }
881
882 #[async_std::test]
883 async fn test_bitswap_sync() {
884 tracing_try_init();
885 let mut peer1 = Peer::new();
886 let mut peer2 = Peer::new();
887 peer2.add_address(&peer1);
888
889 let b0 = create_block(ipld!({
890 "n": 0,
891 }));
892 let b1 = create_block(ipld!({
893 "prev": b0.cid(),
894 "n": 1,
895 }));
896 let b2 = create_block(ipld!({
897 "prev": b1.cid(),
898 "n": 2,
899 }));
900 peer1.store().insert(*b0.cid(), b0.data().to_vec());
901 peer1.store().insert(*b1.cid(), b1.data().to_vec());
902 peer1.store().insert(*b2.cid(), b2.data().to_vec());
903 let peer1 = peer1.spawn("peer1");
904
905 let id =
906 peer2
907 .swarm()
908 .behaviour_mut()
909 .sync(*b2.cid(), vec![peer1], std::iter::once(*b2.cid()));
910
911 assert_progress(peer2.next().await, id, 1);
912 assert_progress(peer2.next().await, id, 1);
913
914 assert_complete_ok(peer2.next().await, id);
915 }
916
917 #[async_std::test]
918 async fn test_bitswap_cancel_sync() {
919 tracing_try_init();
920 let mut peer1 = Peer::new();
921 let mut peer2 = Peer::new();
922 peer2.add_address(&peer1);
923
924 let block = create_block(ipld!(&b"hello world"[..]));
925 peer1.store().insert(*block.cid(), block.data().to_vec());
926 let peer1 = peer1.spawn("peer1");
927
928 let id = peer2.swarm().behaviour_mut().sync(
929 *block.cid(),
930 vec![peer1],
931 std::iter::once(*block.cid()),
932 );
933 peer2.swarm().behaviour_mut().cancel(id);
934 let res = peer2.next().now_or_never();
935 println!("{:?}", res);
936 assert!(res.is_none());
937 }
938
939 #[cfg(feature = "compat")]
940 #[async_std::test]
941 async fn compat_test() {
942 tracing_try_init();
943 let cid: Cid = "QmP8njGuyiw9cjkhwHD9nZhyBTHufXFanAvZgcy9xYoWiB"
944 .parse()
945 .unwrap();
946 let peer_id: PeerId = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
947 .parse()
948 .unwrap();
949 let multiaddr: Multiaddr = "/ip4/104.131.131.82/tcp/4001".parse().unwrap();
950
951 let mut peer = Peer::new();
952 peer.swarm()
953 .behaviour_mut()
954 .add_address(&peer_id, multiaddr);
955 let id = peer
956 .swarm()
957 .behaviour_mut()
958 .get(cid, std::iter::once(peer_id));
959 assert_complete_ok(peer.next().await, id);
960 }
961}