1use std::collections::hash_map::Entry;
6use std::collections::HashSet;
7use std::fmt::Debug;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use ahash::{AHashMap, AHashSet};
14use anyhow::Result;
15use async_trait::async_trait;
16use cid::Cid;
17use futures_util::StreamExt;
18use handler::{BitswapHandler, HandlerEvent};
19
20use futures::channel::{mpsc, oneshot};
21use libp2p::swarm::derive_prelude::ConnectionEstablished;
22use libp2p::swarm::dial_opts::DialOpts;
23use libp2p::swarm::{ConnectionClosed, ConnectionId, DialFailure, FromSwarm};
24use libp2p::swarm::{
25 ConnectionDenied, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, ToSwarm,
26};
27use libp2p::{Multiaddr, PeerId};
28use tokio::task::JoinHandle;
29use tracing::{debug, trace, warn};
30
31pub use self::client::session;
32use self::client::{Client, Config as ClientConfig};
33use self::message::BitswapMessage;
34use self::network::Network;
35use self::network::OutEvent;
36pub use self::protocol::ProtocolConfig;
37pub use self::server::{Config as ServerConfig, Server};
38
39mod block;
40mod client;
41mod error;
42mod handler;
43mod network;
44mod pb;
45mod prefix;
46mod protocol;
47mod server;
48
49pub mod message;
50pub mod peer_task_queue;
51
52pub use self::block::{tests::*, Block};
53pub use self::protocol::ProtocolId;
54
55type DialMap = AHashMap<
58 PeerId,
59 Vec<(
60 usize,
61 oneshot::Sender<std::result::Result<(ConnectionId, Option<ProtocolId>), String>>,
62 )>,
63>;
64
65#[derive(Debug)]
66pub struct Bitswap<S: Store> {
67 network: Network,
68 protocol_config: ProtocolConfig,
69 connected_peers: AHashMap<PeerId, AHashSet<ConnectionId>>,
71 connection_state: AHashMap<ConnectionId, ConnectionState>,
72 dials: DialMap,
73 _pause_dialing: bool,
75 client: Client<S>,
76 server: Option<Server<S>>,
77 incoming_messages: mpsc::Sender<(PeerId, BitswapMessage)>,
78 peers_connected: mpsc::Sender<PeerId>,
79 peers_disconnected: mpsc::Sender<PeerId>,
80 _workers: Arc<Vec<JoinHandle<()>>>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum ConnectionState {
85 Pending,
86 Responsive(ProtocolId),
87 Unresponsive,
88}
89
90#[derive(Debug)]
91pub struct Config {
92 pub client: ClientConfig,
93 pub server: Option<ServerConfig>,
95 pub protocol: ProtocolConfig,
96 pub idle_timeout: Duration,
97}
98
99impl Config {
100 pub fn default_client_mode() -> Self {
101 Config {
102 server: None,
103 ..Default::default()
104 }
105 }
106}
107
108impl Default for Config {
109 fn default() -> Self {
110 Config {
111 client: ClientConfig::default(),
112 server: Some(ServerConfig::default()),
113 protocol: ProtocolConfig::default(),
114 idle_timeout: Duration::from_secs(30),
115 }
116 }
117}
118
119#[async_trait]
120pub trait Store: Debug + Clone + Send + Sync + 'static {
121 async fn get_size(&self, cid: &Cid) -> Result<usize>;
122 async fn get(&self, cid: &Cid) -> Result<Block>;
123 async fn has(&self, cid: &Cid) -> Result<bool>;
124}
125
126impl<S: Store> Bitswap<S> {
127 pub async fn new(self_id: PeerId, store: S, config: Config) -> Self {
128 let network = Network::new(self_id);
129 let (server, cb) = if let Some(config) = config.server {
130 let server = Server::new(network.clone(), store.clone(), config).await;
131 let cb = server.received_blocks_cb();
132 (Some(server), Some(cb))
133 } else {
134 (None, None)
135 };
136 let client = Client::new(network.clone(), store, cb, config.client).await;
137
138 let (sender_msg, mut receiver_msg) = mpsc::channel::<(PeerId, BitswapMessage)>(2048);
139 let (sender_con, mut receiver_con) = mpsc::channel(2048);
140 let (sender_dis, mut receiver_dis) = mpsc::channel(2048);
141
142 let mut workers = Vec::new();
143 workers.push(tokio::task::spawn({
144 let server = server.clone();
145 let client = client.clone();
146
147 async move {
148 while let Some((peer, mut message)) = receiver_msg.next().await {
150 let message = tokio::task::spawn_blocking(move || {
151 message.verify_blocks();
152 message
153 })
154 .await
155 .expect("cannot spawn blocking thread");
156 if let Some(ref server) = server {
157 futures::future::join(
158 client.receive_message(&peer, &message),
159 server.receive_message(&peer, &message),
160 )
161 .await;
162 } else {
163 client.receive_message(&peer, &message).await;
164 }
165 }
166 }
167 }));
168
169 workers.push(tokio::task::spawn({
170 let server = server.clone();
171 let client = client.clone();
172
173 async move {
174 while let Some(peer) = receiver_con.next().await {
176 if let Some(ref server) = server {
177 futures::future::join(
178 client.peer_connected(&peer),
179 server.peer_connected(&peer),
180 )
181 .await;
182 } else {
183 client.peer_connected(&peer).await;
184 }
185 }
186 }
187 }));
188
189 workers.push(tokio::task::spawn({
190 let server = server.clone();
191 let client = client.clone();
192
193 async move {
194 while let Some(peer) = receiver_dis.next().await {
196 if let Some(ref server) = server {
197 futures::future::join(
198 client.peer_disconnected(&peer),
199 server.peer_disconnected(&peer),
200 )
201 .await;
202 } else {
203 client.peer_disconnected(&peer).await;
204 }
205 }
206 }
207 }));
208
209 Bitswap {
210 network,
211 protocol_config: config.protocol,
212 connected_peers: Default::default(),
213 connection_state: Default::default(),
214 dials: Default::default(),
215 _pause_dialing: false,
216 server,
217 client,
218 incoming_messages: sender_msg,
219 peers_connected: sender_con,
220 peers_disconnected: sender_dis,
221 _workers: Arc::new(workers),
222 }
223 }
224
225 pub fn server(&self) -> Option<&Server<S>> {
226 self.server.as_ref()
227 }
228
229 pub fn client(&self) -> &Client<S> {
230 &self.client
231 }
232
233 pub async fn stop(self) -> Result<()> {
234 self.network.stop();
235 if let Some(server) = self.server {
236 futures::future::try_join(self.client.stop(), server.stop()).await?;
237 } else {
238 self.client.stop().await?;
239 }
240
241 Ok(())
242 }
243
244 pub async fn notify_new_blocks(&self, blocks: &[Block]) -> Result<()> {
245 self.client.notify_new_blocks(blocks).await?;
246 if let Some(ref server) = self.server {
247 server.notify_new_blocks(blocks).await?;
248 }
249
250 Ok(())
251 }
252
253 pub async fn wantlist_for_peer(&self, peer: &PeerId) -> Vec<Cid> {
254 if peer == self.network.self_id() {
255 return self.client.get_wantlist().await.into_iter().collect();
256 }
257
258 if let Some(ref server) = self.server {
259 server.wantlist_for_peer(peer).await
260 } else {
261 Vec::new()
262 }
263 }
264
265 fn peer_connected(&self, peer: PeerId) {
266 if let Err(err) = self.peers_connected.clone().try_send(peer) {
267 warn!(
268 "failed to process peer connection from {}: {:?}, dropping",
269 peer, err
270 );
271 }
272 }
273
274 fn peer_disconnected(&self, peer: PeerId) {
275 if let Err(err) = self.peers_disconnected.clone().try_send(peer) {
276 warn!(
277 "failed to process peer disconnection from {}: {:?}, dropping",
278 peer, err
279 );
280 }
281 }
282
283 fn receive_message(&self, peer: PeerId, message: BitswapMessage) {
284 if let Err(err) = self.incoming_messages.clone().try_send((peer, message)) {
286 warn!(
287 "failed to receive message from {}: {:?}, dropping",
288 peer, err
289 );
290 }
291 }
292}
293
294#[derive(Debug)]
295pub enum BitswapEvent {
296 Provide { key: Cid },
298 FindProviders {
299 key: Cid,
300 response: mpsc::Sender<std::result::Result<HashSet<PeerId>, String>>,
301 limit: usize,
302 },
303 Ping {
304 peer: PeerId,
305 response: oneshot::Sender<Option<Duration>>,
306 },
307}
308
309impl<S: Store> NetworkBehaviour for Bitswap<S> {
310 type ConnectionHandler = BitswapHandler;
311 type ToSwarm = BitswapEvent;
312
313 fn handle_established_inbound_connection(
314 &mut self,
315 _connection_id: ConnectionId,
316 _: PeerId,
317 _: &Multiaddr,
318 _: &Multiaddr,
319 ) -> std::result::Result<THandler<Self>, ConnectionDenied> {
320 let protocol_config = self.protocol_config.clone();
321 Ok(BitswapHandler::new(protocol_config))
322 }
323
324 fn handle_established_outbound_connection(
325 &mut self,
326 _connection_id: ConnectionId,
327 _: PeerId,
328 _: &Multiaddr,
329 _: libp2p::core::Endpoint,
330 ) -> std::result::Result<THandler<Self>, ConnectionDenied> {
331 let protocol_config = self.protocol_config.clone();
332 Ok(BitswapHandler::new(protocol_config))
333 }
334
335 #[allow(clippy::collapsible_match)]
336 fn on_swarm_event(&mut self, event: FromSwarm) {
337 match event {
338 FromSwarm::ConnectionEstablished(ConnectionEstablished {
339 peer_id,
340 connection_id,
341 other_established,
342 ..
343 }) => {
344 trace!("connection established {} ({})", peer_id, other_established);
345
346 self.connected_peers
347 .entry(peer_id)
348 .or_default()
349 .insert(connection_id);
350
351 self.connection_state
352 .insert(connection_id, ConnectionState::Pending);
353 }
355 FromSwarm::ConnectionClosed(ConnectionClosed {
356 peer_id,
357 remaining_established,
358 connection_id,
359 ..
360 }) => {
361 if let Entry::Occupied(mut entry) = self.connected_peers.entry(peer_id) {
362 let list = entry.get_mut();
363 list.remove(&connection_id);
364 if list.is_empty() {
365 entry.remove();
366 }
367 }
368
369 self.connection_state.remove(&connection_id);
370
371 if remaining_established == 0 && !self.connected_peers.contains_key(&peer_id) {
372 self.peer_disconnected(peer_id);
374 }
375 }
376 FromSwarm::DialFailure(DialFailure {
377 peer_id,
378 error,
379 connection_id: _,
380 ..
381 }) => {
382 let Some(peer_id) = peer_id else {
383 return;
384 };
385
386 trace!("inject_dial_failure {}, {:?}", peer_id, error);
387 let dials = &mut self.dials;
388 if let Some(mut dials) = dials.remove(&peer_id) {
389 while let Some((_id, sender)) = dials.pop() {
390 let _ = sender.send(Err(error.to_string()));
391 }
392 }
393 }
394 _ => {}
395 }
396 }
397
398 fn on_connection_handler_event(
399 &mut self,
400 peer_id: PeerId,
401 connection: ConnectionId,
402 event: HandlerEvent,
403 ) {
404 trace!(
405 "on_connection_handler_event from {}, event: {:?}",
406 peer_id,
407 event
408 );
409 match event {
410 HandlerEvent::Connected { protocol } => {
411 if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
412 let state = entry.get_mut();
413 let _old_state = *state;
414 *state = ConnectionState::Responsive(protocol);
415
416 self.peer_connected(peer_id);
417
418 let dials = &mut self.dials;
419 if let Some(mut dials) = dials.remove(&peer_id) {
420 while let Some((id, sender)) = dials.pop() {
421 if let Err(err) = sender.send(Ok((connection, Some(protocol)))) {
422 warn!("dial:{}: failed to send dial response {:?}", id, err)
423 }
424 }
425 }
426 }
427 }
428 HandlerEvent::ProtocolNotSuppported => {
429 if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
430 *entry.get_mut() = ConnectionState::Unresponsive;
431
432 let dials = &mut self.dials;
433 if let Some(mut dials) = dials.remove(&peer_id) {
434 while let Some((id, sender)) = dials.pop() {
435 if let Err(err) = sender.send(Err("protocol not supported".into())) {
436 warn!("dial:{} failed to send dial response {:?}", id, err)
437 }
438 }
439 }
440 }
441 }
442 HandlerEvent::Message { message, protocol } => {
443 if let Entry::Occupied(mut entry) = self.connection_state.entry(connection) {
445 let state = entry.get_mut();
446 let old_state = *state;
447 if !matches!(old_state, ConnectionState::Responsive(_)) {
448 *state = ConnectionState::Responsive(protocol);
449 self.peer_connected(peer_id);
450 }
451 }
452 self.receive_message(peer_id, message);
453 }
454 HandlerEvent::FailedToSendMessage { .. } => {
455 }
457 }
458 }
459
460 #[allow(clippy::type_complexity)]
461 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
462 for _ in 0..50 {
464 match futures::ready!(Pin::new(&mut self.network).poll(cx)) {
465 OutEvent::Dial { peer, response, id } => {
466 let connections = match self.connected_peers.get(&peer) {
467 Some(connections) => connections,
468 None => {
469 self.dials.entry(peer).or_default().push((id, response));
470
471 return Poll::Ready(ToSwarm::Dial {
472 opts: DialOpts::peer_id(peer).build(),
473 });
474 }
475 };
476
477 let first_responseive = self
478 .connection_state
479 .iter()
480 .filter(|(k, _)| connections.contains(k))
481 .collect::<Vec<_>>();
482
483 if let Some((conn, state)) = first_responseive
484 .iter()
485 .find(|(_, state)| matches!(state, ConnectionState::Responsive(_)))
486 {
487 if let ConnectionState::Responsive(protocol_id) = state {
488 if let Err(err) = response.send(Ok((**conn, Some(*protocol_id)))) {
489 debug!("dial:{}: failed to send dial response {:?}", id, err)
490 }
491 }
492 continue;
493 }
494
495 if let Some((conn, _)) = first_responseive.iter().find(|(_, state)| {
496 matches!(
497 state,
498 ConnectionState::Pending | ConnectionState::Unresponsive
499 )
500 }) {
501 if let Err(err) = response.send(Ok((**conn, None))) {
502 debug!("dial:{}: failed to send dial response {:?}", id, err)
503 }
504 continue;
505 }
506 }
507 OutEvent::GenerateEvent(ev) => return Poll::Ready(ToSwarm::GenerateEvent(ev)),
508 OutEvent::SendMessage {
509 peer,
510 message,
511 response,
512 connection_id,
513 } => {
514 tracing::debug!("send message to {}", peer);
515 return Poll::Ready(ToSwarm::NotifyHandler {
516 peer_id: peer,
517 handler: NotifyHandler::One(connection_id),
518 event: handler::BitswapHandlerIn::Message(message, response),
519 });
520 }
521 OutEvent::ProtectPeer { peer } => {
522 if self.connected_peers.contains_key(&peer) {
523 return Poll::Ready(ToSwarm::NotifyHandler {
524 peer_id: peer,
525 handler: NotifyHandler::Any,
526 event: handler::BitswapHandlerIn::Protect,
527 });
528 }
529 }
530 OutEvent::UnprotectPeer { peer, response } => {
531 if self.connected_peers.contains_key(&peer) {
532 let _ = response.send(true);
533 return Poll::Ready(ToSwarm::NotifyHandler {
534 peer_id: peer,
535 handler: NotifyHandler::Any,
536 event: handler::BitswapHandlerIn::Unprotect,
537 });
538 }
539 let _ = response.send(false);
540 }
541 }
542 }
543
544 Poll::Pending
545 }
546}
547
548pub fn verify_hash(cid: &Cid, bytes: &[u8]) -> Option<bool> {
549 use cid::multihash::{Code, MultihashDigest};
550 Code::try_from(cid.hash().code()).ok().map(|code| {
551 let calculated_hash = code.digest(bytes);
552 &calculated_hash == cid.hash()
553 })
554}
555
556#[cfg(test)]
557mod tests {
558 use std::sync::Arc;
559
560 use anyhow::anyhow;
561 use futures::prelude::*;
562 use libp2p::identity::Keypair;
563 use libp2p::swarm::SwarmEvent;
564 use libp2p::Swarm;
565 use libp2p::SwarmBuilder;
566 use tokio::sync::{mpsc, RwLock};
567 use tracing::{info, trace};
568 use tracing_subscriber::{fmt, prelude::*, EnvFilter};
569
570 use super::*;
571 use crate::Block;
572
573 fn assert_send<T: Send + Sync>() {}
574
575 #[derive(Debug, Clone)]
576 struct DummyStore;
577
578 #[async_trait]
579 impl Store for DummyStore {
580 async fn get_size(&self, _: &Cid) -> Result<usize> {
581 todo!()
582 }
583 async fn get(&self, _: &Cid) -> Result<Block> {
584 todo!()
585 }
586 async fn has(&self, _: &Cid) -> Result<bool> {
587 todo!()
588 }
589 }
590
591 #[test]
592 fn test_traits() {
593 assert_send::<Bitswap<DummyStore>>();
594 assert_send::<&Bitswap<DummyStore>>();
595 }
596
597 #[derive(Debug, Clone, Default)]
598 struct TestStore {
599 store: Arc<RwLock<AHashMap<Cid, Block>>>,
600 }
601
602 #[async_trait]
603 impl Store for TestStore {
604 async fn get_size(&self, cid: &Cid) -> Result<usize> {
605 self.store
606 .read()
607 .await
608 .get(cid)
609 .map(|block| block.data().len())
610 .ok_or_else(|| anyhow!("missing"))
611 }
612
613 async fn get(&self, cid: &Cid) -> Result<Block> {
614 self.store
615 .read()
616 .await
617 .get(cid)
618 .cloned()
619 .ok_or_else(|| anyhow!("missing"))
620 }
621
622 async fn has(&self, cid: &Cid) -> Result<bool> {
623 Ok(self.store.read().await.contains_key(cid))
624 }
625 }
626
627 #[tokio::test]
628 async fn test_get_1_block() {
629 tracing_subscriber::registry()
630 .with(fmt::layer().pretty())
631 .with(EnvFilter::from_default_env())
632 .init();
633
634 get_block::<1>().await;
635 }
636
637 #[tokio::test]
638 async fn test_get_2_block() {
639 get_block::<2>().await;
640 }
641
642 #[tokio::test]
643 async fn test_get_4_block() {
644 get_block::<4>().await;
645 }
646
647 #[tokio::test]
648 async fn test_get_64_block() {
649 get_block::<64>().await;
650 }
651
652 #[tokio::test]
653 async fn test_get_65_block() {
654 get_block::<65>().await;
655 }
656
657 #[tokio::test]
658 async fn test_get_66_block() {
659 get_block::<66>().await;
660 }
661
662 #[tokio::test]
663 async fn test_get_128_block() {
664 tracing_subscriber::registry()
665 .with(fmt::layer().pretty())
666 .with(EnvFilter::from_default_env())
667 .init();
668
669 get_block::<128>().await;
670 }
671
672 #[tokio::test]
673 async fn test_get_1024_block() {
674 get_block::<1024>().await;
675 }
676
677 async fn get_block<const N: usize>() {
678 let kp = Keypair::generate_ed25519();
679 let store1 = TestStore::default();
680 let bs1 = Bitswap::new(kp.public().to_peer_id(), store1.clone(), Config::default()).await;
681
682 trace!("peer1: {}", kp.public().to_peer_id());
683
684 let mut swarm1 = SwarmBuilder::with_existing_identity(kp)
685 .with_tokio()
686 .with_tcp(
687 libp2p::tcp::Config::default(),
688 libp2p::noise::Config::new,
689 libp2p::yamux::Config::default,
690 )
691 .unwrap()
692 .with_behaviour(|_| bs1)
693 .unwrap()
694 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
695 .build();
696
697 let blocks = (0..N).map(|_| create_random_block_v1()).collect::<Vec<_>>();
698
699 for block in &blocks {
700 store1
701 .store
702 .write()
703 .await
704 .insert(*block.cid(), block.clone());
705 }
706
707 let (tx, mut rx) = mpsc::channel::<Multiaddr>(1);
708
709 Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
710
711 let peer1 = tokio::task::spawn(async move {
712 while swarm1.next().now_or_never().is_some() {}
713 let listeners: Vec<_> = Swarm::listeners(&swarm1).collect();
714 for l in listeners {
715 tx.send(l.clone()).await.unwrap();
716 }
717
718 loop {
719 let ev = swarm1.next().await;
720 trace!("peer1: {:?}", ev);
721 }
722 });
723
724 info!("peer2: startup");
725 let kp = Keypair::generate_ed25519();
726 let store2 = TestStore::default();
727 let bs2 = Bitswap::new(kp.public().to_peer_id(), store2.clone(), Config::default()).await;
728 trace!("peer2: {}", kp.public().to_peer_id());
729 let mut swarm2 = SwarmBuilder::with_existing_identity(kp)
730 .with_tokio()
731 .with_tcp(
732 libp2p::tcp::Config::default(),
733 libp2p::noise::Config::new,
734 libp2p::yamux::Config::default,
735 )
736 .unwrap()
737 .with_behaviour(|_| bs2)
738 .unwrap()
739 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30)))
740 .build();
741
742 let swarm2_bs_client = swarm2.behaviour().client().clone();
743 let peer2 = tokio::task::spawn(async move {
744 let addr = rx.recv().await.unwrap();
745 info!("peer2: dialing peer1 at {}", addr);
746 Swarm::dial(&mut swarm2, addr).unwrap();
747
748 loop {
749 match swarm2.next().await {
750 Some(SwarmEvent::ConnectionEstablished {
751 connection_id,
752 peer_id,
753 ..
754 }) => {
755 trace!("peer2: connected to {} to {connection_id}", peer_id);
756 }
757 ev => trace!("peer2: {:?}", ev),
758 }
759 }
760 });
761
762 {
763 info!("peer2: fetching block - ordered");
764 let blocks = blocks.clone();
765 let mut futs = Vec::new();
766 for block in &blocks {
767 let client = swarm2_bs_client.clone();
768 futs.push(async move {
769 let received_block = client.get_block(block.cid()).await?;
771
772 info!("peer2: received block");
773 Ok::<Block, anyhow::Error>(received_block)
774 });
775 }
776
777 let results = futures::future::join_all(futs).await;
778 for (block, result) in blocks.into_iter().zip(results.into_iter()) {
779 let received_block = result.unwrap();
780 assert_eq!(block, received_block);
781 }
782 }
783
784 {
785 info!("peer2: fetching block - unordered");
786 let mut blocks = blocks.clone();
787 let futs = futures::stream::FuturesUnordered::new();
788 for block in &blocks {
789 let client = swarm2_bs_client.clone();
790 futs.push(async move {
791 let received_block = client.get_block(block.cid()).await?;
793
794 info!("peer2: received block");
795 Ok::<Block, anyhow::Error>(received_block)
796 });
797 }
798
799 let mut results = futs.try_collect::<Vec<_>>().await.unwrap();
800 results.sort();
801 blocks.sort();
802 for (block, received_block) in blocks.into_iter().zip(results.into_iter()) {
803 assert_eq!(block, received_block);
804 }
805 }
806
807 {
808 info!("peer2: fetching block - session");
809 let mut blocks = blocks.clone();
810 let ids: Vec<_> = blocks.iter().map(|b| *b.cid()).collect();
811 let session = swarm2_bs_client.new_session().await;
812 let (blocks_receiver, _guard) = session.get_blocks(&ids).await.unwrap().into_parts();
813 let mut results: Vec<_> = blocks_receiver.collect().await;
814
815 results.sort();
816 blocks.sort();
817 for (block, received_block) in blocks.into_iter().zip(results.into_iter()) {
818 assert_eq!(block, received_block);
819 }
820 }
821
822 info!("--shutting down peer1");
823 peer1.abort();
824 peer1.await.ok();
825
826 info!("--shutting down peer2");
827 peer2.abort();
828 peer2.await.ok();
829 }
830}