1use std::{
8 collections::{BTreeMap, HashMap, HashSet},
9 convert::Infallible,
10 net::{IpAddr, SocketAddr},
11 pin::Pin,
12 sync::Arc,
13 time::Duration,
14};
15
16use futures::{
17 future::{self, FutureExt},
18 sink::SinkExt,
19 stream::{FuturesUnordered, StreamExt},
20 Future, TryFutureExt,
21};
22use indexmap::IndexMap;
23use rand::seq::SliceRandom;
24use tokio::{
25 net::{TcpListener, TcpStream},
26 sync::{broadcast, mpsc, watch},
27 time::{sleep, Instant},
28};
29use tokio_stream::wrappers::IntervalStream;
30use tower::{
31 buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
32};
33use tracing_futures::Instrument;
34
35use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
36
37use crate::{
38 address_book_updater::{AddressBookUpdater, MIN_CHANNEL_SIZE},
39 constants,
40 meta_addr::{MetaAddr, MetaAddrChange},
41 peer::{
42 self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion,
43 OutboundConnectorRequest, PeerPreference,
44 },
45 peer_cache_updater::peer_cache_updater,
46 peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
47 AddressBook, BoxError, Config, PeerSocketAddr, Request, Response,
48};
49
50#[cfg(test)]
51mod tests;
52
53mod recent_by_ip;
54
55type DiscoveredPeer = (PeerSocketAddr, peer::Client);
64
65pub async fn init<S, C>(
98 config: Config,
99 inbound_service: S,
100 latest_chain_tip: C,
101 user_agent: String,
102) -> (
103 Buffer<BoxService<Request, Response, BoxError>, Request>,
104 Arc<std::sync::Mutex<AddressBook>>,
105 mpsc::Sender<(PeerSocketAddr, u32)>,
106)
107where
108 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
109 S::Future: Send + 'static,
110 C: ChainTip + Clone + Send + Sync + 'static,
111{
112 let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
113
114 let (
115 address_book,
116 bans_receiver,
117 address_book_updater,
118 address_metrics,
119 address_book_updater_guard,
120 ) = AddressBookUpdater::spawn(&config, listen_addr);
121
122 let (misbehavior_tx, mut misbehavior_rx) = mpsc::channel(
123 config
126 .peerset_total_connection_limit()
127 .max(MIN_CHANNEL_SIZE),
128 );
129
130 let misbehaviour_updater = address_book_updater.clone();
131 tokio::spawn(
132 async move {
133 let mut misbehaviors: HashMap<PeerSocketAddr, u32> = HashMap::new();
134 let mut flush_timer =
137 IntervalStream::new(tokio::time::interval(Duration::from_secs(30)));
138
139 loop {
140 tokio::select! {
141 msg = misbehavior_rx.recv() => match msg {
142 Some((peer_addr, score_increment)) => *misbehaviors
143 .entry(peer_addr)
144 .or_default()
145 += score_increment,
146 None => break,
147 },
148
149 _ = flush_timer.next() => {
150 for (addr, score_increment) in misbehaviors.drain() {
151 let _ = misbehaviour_updater
152 .send(MetaAddr::new_misbehavior(addr, score_increment))
153 .await;
154 }
155 },
156 };
157 }
158
159 tracing::warn!("exiting misbehavior update batch task");
160 }
161 .in_current_span(),
162 );
163
164 let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
172
173 let (listen_handshaker, outbound_connector) = {
178 use tower::timeout::TimeoutLayer;
179 let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
180 use crate::protocol::external::types::PeerServices;
181 let hs = peer::Handshake::builder()
182 .with_config(config.clone())
183 .with_inbound_service(inbound_service)
184 .with_inventory_collector(inv_sender)
185 .with_address_book_updater(address_book_updater.clone())
186 .with_advertised_services(PeerServices::NODE_NETWORK)
187 .with_user_agent(user_agent)
188 .with_latest_chain_tip(latest_chain_tip.clone())
189 .want_transactions(true)
190 .finish()
191 .expect("configured all required parameters");
192 (
193 hs_timeout.layer(hs.clone()),
194 hs_timeout.layer(peer::Connector::new(hs)),
195 )
196 };
197
198 let (peerset_tx, peerset_rx) =
204 futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
205
206 let discovered_peers = peerset_rx.map(|(address, client)| {
207 Result::<_, Infallible>::Ok(Change::Insert(address, client.into()))
208 });
209
210 let (mut demand_tx, demand_rx) =
213 futures::channel::mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
214
215 let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
217
218 let peer_set = PeerSet::new(
220 &config,
221 discovered_peers,
222 demand_tx.clone(),
223 handle_rx,
224 inv_receiver,
225 bans_receiver.clone(),
226 address_metrics,
227 MinimumPeerVersion::new(latest_chain_tip, &config.network),
228 None,
229 );
230 let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
231
232 let listen_fut = accept_inbound_connections(
236 config.clone(),
237 tcp_listener,
238 constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
239 listen_handshaker,
240 peerset_tx.clone(),
241 bans_receiver,
242 );
243 let listen_guard = tokio::spawn(listen_fut.in_current_span());
244
245 let initial_peers_fut = add_initial_peers(
247 config.clone(),
248 outbound_connector.clone(),
249 peerset_tx.clone(),
250 address_book_updater.clone(),
251 );
252 let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
253
254 let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
256
257 let mut active_outbound_connections = initial_peers_join
259 .wait_for_panics()
260 .await
261 .expect("unexpected error connecting to initial peers");
262 let active_initial_peer_count = active_outbound_connections.update_count();
263
264 info!(
272 ?active_initial_peer_count,
273 "sending initial request for peers"
274 );
275 let _ = candidates.update_initial(active_initial_peer_count).await;
276
277 let demand_count = config
279 .peerset_initial_target_size
280 .saturating_sub(active_outbound_connections.update_count());
281
282 for _ in 0..demand_count {
283 let _ = demand_tx.try_send(MorePeers);
284 }
285
286 let crawl_fut = crawl_and_dial(
288 config.clone(),
289 demand_tx,
290 demand_rx,
291 candidates,
292 outbound_connector,
293 peerset_tx,
294 active_outbound_connections,
295 address_book_updater,
296 );
297 let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
298
299 let peer_cache_updater_fut = peer_cache_updater(config, address_book.clone());
301 let peer_cache_updater_guard = tokio::spawn(peer_cache_updater_fut.in_current_span());
302
303 handle_tx
304 .send(vec![
305 listen_guard,
306 crawl_guard,
307 address_book_updater_guard,
308 peer_cache_updater_guard,
309 ])
310 .unwrap();
311
312 (peer_set, address_book, misbehavior_tx)
313}
314
315#[instrument(skip(config, outbound_connector, peerset_tx, address_book_updater))]
320async fn add_initial_peers<S>(
321 config: Config,
322 outbound_connector: S,
323 mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
324 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
325) -> Result<ActiveConnectionCounter, BoxError>
326where
327 S: Service<
328 OutboundConnectorRequest,
329 Response = (PeerSocketAddr, peer::Client),
330 Error = BoxError,
331 > + Clone
332 + Send
333 + 'static,
334 S::Future: Send + 'static,
335{
336 let initial_peers = limit_initial_peers(&config, address_book_updater).await;
337
338 let mut handshake_success_total: usize = 0;
339 let mut handshake_error_total: usize = 0;
340
341 let mut active_outbound_connections = ActiveConnectionCounter::new_counter_with(
342 config.peerset_outbound_connection_limit(),
343 "Outbound Connections",
344 );
345
346 let ipv4_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv4()).count();
348 let ipv6_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv6()).count();
349 info!(
350 ?ipv4_peer_count,
351 ?ipv6_peer_count,
352 "connecting to initial peer set"
353 );
354
355 let mut handshakes: FuturesUnordered<_> = initial_peers
367 .into_iter()
368 .enumerate()
369 .map(|(i, addr)| {
370 let connection_tracker = active_outbound_connections.track_connection();
371 let req = OutboundConnectorRequest {
372 addr,
373 connection_tracker,
374 };
375 let outbound_connector = outbound_connector.clone();
376
377 tokio::spawn(
379 async move {
380 sleep(
384 constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32),
385 )
386 .await;
387
388 outbound_connector
391 .oneshot(req)
392 .map_err(move |e| (addr, e))
393 .await
394 }
395 .in_current_span(),
396 )
397 .wait_for_panics()
398 })
399 .collect();
400
401 while let Some(handshake_result) = handshakes.next().await {
402 match handshake_result {
403 Ok(change) => {
404 handshake_success_total += 1;
405 debug!(
406 ?handshake_success_total,
407 ?handshake_error_total,
408 ?change,
409 "an initial peer handshake succeeded"
410 );
411
412 peerset_tx.send(change).await?;
414 }
415 Err((addr, ref e)) => {
416 handshake_error_total += 1;
417
418 let mut expected_error = false;
420 if let Some(io_error) = e.downcast_ref::<tokio::io::Error>() {
421 if io_error.kind() == tokio::io::ErrorKind::AddrNotAvailable {
424 expected_error = true;
425 }
426 }
427
428 if expected_error {
429 debug!(
430 successes = ?handshake_success_total,
431 errors = ?handshake_error_total,
432 ?addr,
433 ?e,
434 "an initial peer connection failed"
435 );
436 } else {
437 info!(
438 successes = ?handshake_success_total,
439 errors = ?handshake_error_total,
440 ?addr,
441 %e,
442 "an initial peer connection failed"
443 );
444 }
445 }
446 }
447
448 tokio::task::yield_now().await;
452 }
453
454 let outbound_connections = active_outbound_connections.update_count();
455 info!(
456 ?handshake_success_total,
457 ?handshake_error_total,
458 ?outbound_connections,
459 "finished connecting to initial seed and disk cache peers"
460 );
461
462 Ok(active_outbound_connections)
463}
464
465async fn limit_initial_peers(
473 config: &Config,
474 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
475) -> HashSet<PeerSocketAddr> {
476 let all_peers: HashSet<PeerSocketAddr> = config.initial_peers().await;
477 let mut preferred_peers: BTreeMap<PeerPreference, Vec<PeerSocketAddr>> = BTreeMap::new();
478
479 let all_peers_count = all_peers.len();
480 if all_peers_count > config.peerset_initial_target_size {
481 info!(
482 "limiting the initial peers list from {} to {}",
483 all_peers_count, config.peerset_initial_target_size,
484 );
485 }
486
487 for peer_addr in all_peers {
490 let preference = PeerPreference::new(peer_addr, config.network.clone());
491
492 match preference {
493 Ok(preference) => preferred_peers
494 .entry(preference)
495 .or_default()
496 .push(peer_addr),
497 Err(error) => info!(
498 ?peer_addr,
499 ?error,
500 "invalid initial peer from DNS seeder, configured IP address, or disk cache",
501 ),
502 }
503 }
504
505 for peer in preferred_peers.values().flatten() {
514 let peer_addr = MetaAddr::new_initial_peer(*peer);
515 let _ = address_book_updater.send(peer_addr).await;
518 }
519
520 let mut initial_peers: HashSet<PeerSocketAddr> = HashSet::new();
523 for better_peers in preferred_peers.values() {
524 let mut better_peers = better_peers.clone();
525 let (chosen_peers, _unused_peers) = better_peers.partial_shuffle(
526 &mut rand::thread_rng(),
527 config.peerset_initial_target_size - initial_peers.len(),
528 );
529
530 initial_peers.extend(chosen_peers.iter());
531
532 if initial_peers.len() >= config.peerset_initial_target_size {
533 break;
534 }
535 }
536
537 initial_peers
538}
539
540#[instrument(skip(config), fields(addr = ?config.listen_addr))]
550pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
551 if let Err(wrong_addr) =
553 address_is_valid_for_inbound_listeners(config.listen_addr, config.network.clone())
554 {
555 warn!(
556 "We are configured with address {} on {:?}, but it could cause network issues. \
557 The default port for {:?} is {}. Error: {wrong_addr:?}",
558 config.listen_addr,
559 config.network,
560 config.network,
561 config.network.default_port(),
562 );
563 }
564
565 info!(
566 "Trying to open Zcash protocol endpoint at {}...",
567 config.listen_addr
568 );
569 let listener_result = TcpListener::bind(config.listen_addr).await;
570
571 let listener = match listener_result {
572 Ok(l) => l,
573 Err(e) => panic!(
574 "Opening Zcash network protocol listener {:?} failed: {e:?}. \
575 Hint: Check if another zebrad or zcashd process is running. \
576 Try changing the network listen_addr in the Zebra config.",
577 config.listen_addr,
578 ),
579 };
580
581 let local_addr = listener
582 .local_addr()
583 .expect("unexpected missing local addr for open listener");
584 info!("Opened Zcash protocol endpoint at {}", local_addr);
585
586 (listener, local_addr)
587}
588
589#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
598async fn accept_inbound_connections<S>(
599 config: Config,
600 listener: TcpListener,
601 min_inbound_peer_connection_interval: Duration,
602 handshaker: S,
603 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
604 bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
605) -> Result<(), BoxError>
606where
607 S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
608 + Clone,
609 S::Future: Send + 'static,
610{
611 let mut recent_inbound_connections =
612 recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip));
613
614 let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
615 config.peerset_inbound_connection_limit(),
616 "Inbound Connections",
617 );
618
619 let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
620 FuturesUnordered::new();
621 handshakes.push(future::pending().boxed());
623
624 loop {
625 let inbound_result = tokio::select! {
627 biased;
628 next_handshake_res = handshakes.next() => match next_handshake_res {
629 Some(()) => continue,
631 None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
632 },
633
634 inbound_result = listener.accept() => inbound_result,
636 };
637
638 if let Ok((tcp_stream, addr)) = inbound_result {
639 let addr: PeerSocketAddr = addr.into();
640
641 if bans_receiver.borrow().clone().contains_key(&addr.ip()) {
642 debug!(?addr, "banned inbound connection attempt");
643 std::mem::drop(tcp_stream);
644 continue;
645 }
646
647 if active_inbound_connections.update_count()
648 >= config.peerset_inbound_connection_limit()
649 || recent_inbound_connections.is_past_limit_or_add(addr.ip())
650 {
651 std::mem::drop(tcp_stream);
654 tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
657 continue;
658 }
659
660 let connection_tracker = active_inbound_connections.track_connection();
663 debug!(
664 inbound_connections = ?active_inbound_connections.update_count(),
665 "handshaking on an open inbound peer connection"
666 );
667
668 let handshake_task = accept_inbound_handshake(
669 addr,
670 handshaker.clone(),
671 tcp_stream,
672 connection_tracker,
673 peerset_tx.clone(),
674 )
675 .await?
676 .wait_for_panics();
677
678 handshakes.push(handshake_task);
679
680 tokio::time::sleep(min_inbound_peer_connection_interval).await;
690 } else {
691 debug!(?inbound_result, "error accepting inbound connection");
694 tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
695 }
696
697 tokio::task::yield_now().await;
705 }
706}
707
708#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
716async fn accept_inbound_handshake<S>(
717 addr: PeerSocketAddr,
718 mut handshaker: S,
719 tcp_stream: TcpStream,
720 connection_tracker: ConnectionTracker,
721 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
722) -> Result<tokio::task::JoinHandle<()>, BoxError>
723where
724 S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
725 + Clone,
726 S::Future: Send + 'static,
727{
728 let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
729
730 debug!("got incoming connection");
731
732 handshaker.ready().await?;
739
740 let handshake = handshaker.call(HandshakeRequest {
742 data_stream: tcp_stream,
743 connected_addr,
744 connection_tracker,
745 });
746 let mut peerset_tx = peerset_tx.clone();
748
749 let handshake_task = tokio::spawn(
750 async move {
751 let handshake_result = handshake.await;
752
753 if let Ok(client) = handshake_result {
754 let _ = peerset_tx.send((addr, client)).await;
756 } else {
757 debug!(?handshake_result, "error handshaking with inbound peer");
758 }
759 }
760 .in_current_span(),
761 );
762
763 Ok(handshake_task)
764}
765
766enum CrawlerAction {
768 DemandDrop,
770 DemandHandshakeOrCrawl,
774 TimerCrawl { tick: Instant },
776 HandshakeFinished,
778 DemandCrawlFinished,
780 TimerCrawlFinished,
782}
783
784#[allow(clippy::too_many_arguments)]
802#[instrument(
803 skip(
804 config,
805 demand_tx,
806 demand_rx,
807 candidates,
808 outbound_connector,
809 peerset_tx,
810 active_outbound_connections,
811 address_book_updater,
812 ),
813 fields(
814 new_peer_interval = ?config.crawl_new_peer_interval,
815 )
816)]
817async fn crawl_and_dial<C, S>(
818 config: Config,
819 demand_tx: futures::channel::mpsc::Sender<MorePeers>,
820 mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
821 candidates: CandidateSet<S>,
822 outbound_connector: C,
823 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
824 mut active_outbound_connections: ActiveConnectionCounter,
825 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
826) -> Result<(), BoxError>
827where
828 C: Service<
829 OutboundConnectorRequest,
830 Response = (PeerSocketAddr, peer::Client),
831 Error = BoxError,
832 > + Clone
833 + Send
834 + 'static,
835 C::Future: Send + 'static,
836 S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
837 S::Future: Send + 'static,
838{
839 use CrawlerAction::*;
840
841 info!(
842 crawl_new_peer_interval = ?config.crawl_new_peer_interval,
843 outbound_connections = ?active_outbound_connections.update_count(),
844 "starting the peer address crawler",
845 );
846
847 let candidates = Arc::new(futures::lock::Mutex::new(candidates));
853
854 let mut handshakes: FuturesUnordered<
856 Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
857 > = FuturesUnordered::new();
858 handshakes.push(future::pending().boxed());
861
862 let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
863 crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
866
867 let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
868
869 loop {
874 metrics::gauge!("crawler.in_flight_handshakes").set(
875 handshakes
876 .len()
877 .checked_sub(1)
878 .expect("the pool always contains an unresolved future") as f64,
879 );
880
881 let crawler_action = tokio::select! {
882 biased;
883 next_handshake_res = handshakes.next() => next_handshake_res.expect(
886 "handshakes never terminates, because it contains a future that never resolves"
887 ),
888 next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")),
890 next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{
896 if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
897 DemandDrop
899 } else {
900 DemandHandshakeOrCrawl
901 }
902 })
903 };
904
905 match crawler_action {
906 Ok(DemandDrop) => {
908 trace!("too many open connections or in-flight handshakes, dropping demand signal");
911 }
912
913 Ok(DemandHandshakeOrCrawl) => {
915 let candidates = candidates.clone();
916 let outbound_connector = outbound_connector.clone();
917 let peerset_tx = peerset_tx.clone();
918 let address_book_updater = address_book_updater.clone();
919 let demand_tx = demand_tx.clone();
920
921 let outbound_connection_tracker = active_outbound_connections.track_connection();
923 let outbound_connections = active_outbound_connections.update_count();
924 debug!(?outbound_connections, "opening an outbound peer connection");
925
926 let handshake_or_crawl_handle = tokio::spawn(
935 async move {
936 let candidate = { candidates.lock().await.next().await };
943
944 if let Some(candidate) = candidate {
945 dial(
947 candidate,
948 outbound_connector,
949 outbound_connection_tracker,
950 outbound_connections,
951 peerset_tx,
952 address_book_updater,
953 demand_tx,
954 )
955 .await?;
956
957 Ok(HandshakeFinished)
958 } else {
959 debug!("demand for peers but no available candidates");
961
962 crawl(candidates, demand_tx, false).await?;
963
964 Ok(DemandCrawlFinished)
965 }
966 }
967 .in_current_span(),
968 )
969 .wait_for_panics();
970
971 handshakes.push(handshake_or_crawl_handle);
972 }
973 Ok(TimerCrawl { tick }) => {
974 let candidates = candidates.clone();
975 let demand_tx = demand_tx.clone();
976 let should_always_dial = active_outbound_connections.update_count() == 0;
977
978 let crawl_handle = tokio::spawn(
979 async move {
980 debug!(
981 ?tick,
982 "crawling for more peers in response to the crawl timer"
983 );
984
985 crawl(candidates, demand_tx, should_always_dial).await?;
986
987 Ok(TimerCrawlFinished)
988 }
989 .in_current_span(),
990 )
991 .wait_for_panics();
992
993 handshakes.push(crawl_handle);
994 }
995
996 Ok(HandshakeFinished) => {
998 }
1000 Ok(DemandCrawlFinished) => {
1001 trace!("demand-based crawl finished");
1004 }
1005 Ok(TimerCrawlFinished) => {
1006 debug!("timer-based crawl finished");
1007 }
1008
1009 Err(error) => {
1011 info!(?error, "crawler task exiting due to an error");
1012 return Err(error);
1013 }
1014 }
1015
1016 tokio::task::yield_now().await;
1020 }
1021}
1022
1023#[instrument(skip(candidates, demand_tx))]
1026async fn crawl<S>(
1027 candidates: Arc<futures::lock::Mutex<CandidateSet<S>>>,
1028 mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1029 should_always_dial: bool,
1030) -> Result<(), BoxError>
1031where
1032 S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
1033 S::Future: Send + 'static,
1034{
1035 let result = {
1039 let result = candidates.lock().await.update().await;
1040 std::mem::drop(candidates);
1041 result
1042 };
1043 let more_peers = match result {
1044 Ok(more_peers) => more_peers.or_else(|| should_always_dial.then_some(MorePeers)),
1045 Err(e) => {
1046 info!(
1047 ?e,
1048 "candidate set returned an error, is Zebra shutting down?"
1049 );
1050 return Err(e);
1051 }
1052 };
1053
1054 if let Some(more_peers) = more_peers {
1065 if let Err(send_error) = demand_tx.try_send(more_peers) {
1066 if send_error.is_disconnected() {
1067 return Err(send_error.into());
1069 }
1070 }
1071 }
1072
1073 Ok(())
1074}
1075
1076#[instrument(skip(
1083 outbound_connector,
1084 outbound_connection_tracker,
1085 outbound_connections,
1086 peerset_tx,
1087 address_book_updater,
1088 demand_tx
1089))]
1090async fn dial<C>(
1091 candidate: MetaAddr,
1092 mut outbound_connector: C,
1093 outbound_connection_tracker: ConnectionTracker,
1094 outbound_connections: usize,
1095 mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
1096 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1097 mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1098) -> Result<(), BoxError>
1099where
1100 C: Service<
1101 OutboundConnectorRequest,
1102 Response = (PeerSocketAddr, peer::Client),
1103 Error = BoxError,
1104 > + Clone
1105 + Send
1106 + 'static,
1107 C::Future: Send + 'static,
1108{
1109 const MAX_CONNECTIONS_FOR_INFO_LOG: usize = 5;
1112
1113 debug!(?candidate.addr, "attempting outbound connection in response to demand");
1120
1121 let outbound_connector = outbound_connector.ready().await?;
1123
1124 let req = OutboundConnectorRequest {
1125 addr: candidate.addr,
1126 connection_tracker: outbound_connection_tracker,
1127 };
1128
1129 let handshake_result = outbound_connector.call(req).map(Into::into).await;
1131
1132 match handshake_result {
1133 Ok((address, client)) => {
1134 debug!(?candidate.addr, "successfully dialed new peer");
1135
1136 peerset_tx.send((address, client)).await?;
1138 }
1139 Err(error) => {
1141 if outbound_connections <= MAX_CONNECTIONS_FOR_INFO_LOG && !cfg!(test) {
1144 info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1145 } else {
1146 debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1147 }
1148 report_failed(address_book_updater.clone(), candidate).await;
1149
1150 if let Err(send_error) = demand_tx.try_send(MorePeers) {
1157 if send_error.is_disconnected() {
1158 return Err(send_error.into());
1160 }
1161 }
1162 }
1163 }
1164
1165 Ok(())
1166}
1167
1168#[instrument(skip(address_book_updater))]
1170async fn report_failed(
1171 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1172 addr: MetaAddr,
1173) {
1174 let addr = MetaAddr::new_errored(addr.addr, None);
1176
1177 let _ = address_book_updater.send(addr).await;
1179}