1use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24 Multiaddr, PeerId, Swarm,
25 core::connection::ConnectedPoint,
26 multiaddr::Protocol,
27 swarm::{
28 DialError, NetworkBehaviour,
29 dial_opts::{DialOpts, PeerCondition},
30 },
31};
32use std::collections::{HashSet, VecDeque};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::sync::mpsc::UnboundedSender;
35use url::Url;
36
37#[derive(custom_debug::Debug)]
53pub struct Bootstrap {
54 cache_store: BootstrapCacheStore,
55 addrs: VecDeque<Multiaddr>,
56 #[debug(skip)]
58 cache_task: Option<tokio::task::JoinHandle<()>>,
59 cache_pending: bool,
61 contacts_progress: Option<ContactsProgress>,
62 event_tx: UnboundedSender<FetchEvent>,
63 event_rx: UnboundedReceiver<FetchEvent>,
64 fetch_in_progress: Option<FetchKind>,
65 ongoing_dials: HashSet<Multiaddr>,
67 bootstrap_peer_ids: HashSet<PeerId>,
68 bootstrap_completed: bool,
69}
70
71impl Bootstrap {
72 pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
73 let contacts_progress = Self::build_contacts_progress(&config)?;
74
75 let mut addrs_queue = VecDeque::new();
76 let mut bootstrap_peer_ids = HashSet::new();
77 if !config.first {
78 if !config.disable_env_peers {
79 for addr in Self::fetch_from_env() {
80 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
81 }
82 } else {
83 info!("Skipping ANT_PEERS environment variable as per configuration");
84 }
85
86 for addr in config.initial_peers.drain(..) {
87 if let Some(addr) = craft_valid_multiaddr(&addr, false) {
88 info!("Adding addr from arguments: {addr}");
89 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
90 } else {
91 warn!("Invalid multiaddress format from arguments: {addr}");
92 }
93 }
94 }
95
96 let cache_pending = !config.first && !config.disable_cache_reading;
97 if !cache_pending {
98 info!(
99 "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
100 config.first, config.disable_cache_reading
101 );
102 } else {
103 info!("Cache loading is enabled - cache will be fetched if needed");
104 }
105 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
106
107 let cache_store = BootstrapCacheStore::new(config.clone())?;
108
109 let mut bootstrap = Self {
110 cache_store,
111 addrs: addrs_queue,
112 cache_pending,
113 contacts_progress,
114 event_tx,
115 event_rx,
116 fetch_in_progress: None,
117 ongoing_dials: HashSet::new(),
118 bootstrap_peer_ids,
119 bootstrap_completed: config.first,
120 cache_task: None,
121 };
122
123 info!("Cache store is initialized and will sync and flush periodically");
124 let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
125 bootstrap.cache_task = Some(cache_task);
126
127 if config.first {
128 info!("First node in network; clearing any existing cache");
129 bootstrap.cache_store.write().await?;
130 return Ok(bootstrap);
131 }
132
133 let mut collected_addrs = Vec::new();
137 if bootstrap.addrs.len() < 50 {
138 info!("Initial address queue < 50; fetching from cache/contacts");
139 let now = std::time::Instant::now();
140 loop {
141 match bootstrap.next_addr() {
142 Ok(Some(addr)) => {
143 collected_addrs.push(addr);
144 info!(
145 "We now have {} initial address(es) to dial",
146 collected_addrs.len()
147 );
148 continue;
149 }
150 Ok(None) => {
151 debug!(
152 "No immediate address available; waiting for async fetch to complete"
153 );
154 }
155 Err(err) => {
156 if !collected_addrs.is_empty() {
157 info!(
158 "No more addresses available, but we have {} to dial",
159 collected_addrs.len()
160 );
161 bootstrap.extend_with_addrs(collected_addrs);
162 break;
163 }
164 warn!("Failed to fetch initial address: {err}");
165 return Err(err);
166 }
167 }
168
169 if now.elapsed() > std::time::Duration::from_secs(30) {
170 if !collected_addrs.is_empty() {
171 info!(
172 "Timed out waiting for more addresses, but we have {} to dial, returning",
173 collected_addrs.len()
174 );
175 bootstrap.extend_with_addrs(collected_addrs);
176 break;
177 }
178 error!("Timed out waiting for initial addresses. ");
179 return Err(Error::NoBootstrapPeersFound);
180 }
181 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
182 }
183 }
184
185 Ok(bootstrap)
186 }
187
188 pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
195 loop {
196 self.process_events();
197
198 if let Some(addr) = self.addrs.pop_front() {
199 info!(?addr, "next_addr returning queued address");
200 return Ok(Some(addr));
201 }
202
203 if self.fetch_in_progress.is_some() {
204 info!("next_addr waiting for in-flight fetch result");
205 return Ok(None);
206 }
207
208 if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
209 info!("next_addr triggering cache fetch");
210 self.start_cache_fetch()?;
211 continue;
212 }
213
214 if self.contacts_progress.is_some()
215 && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
216 {
217 info!("next_addr triggering contacts fetch");
218 self.start_contacts_fetch()?;
219 if self.fetch_in_progress.is_some() {
220 return Ok(None);
221 }
222 continue;
223 }
224
225 info!("next_addr exhausted all address sources");
226 return Err(Error::NoBootstrapPeersFound);
227 }
228 }
229
230 fn process_events(&mut self) {
231 while let Ok(event) = self.event_rx.try_recv() {
232 match event {
233 FetchEvent::Cache(addrs) => {
234 info!(count = addrs.len(), "process_events received cache batch");
235 if addrs.is_empty() {
236 info!("No addresses retrieved from cache");
237 } else {
238 self.extend_with_addrs(addrs);
239 }
240 }
241 FetchEvent::Contacts(addrs) => {
242 info!(
243 count = addrs.len(),
244 "process_events received contacts batch"
245 );
246 self.extend_with_addrs(addrs);
247 if self
248 .contacts_progress
249 .as_ref()
250 .is_none_or(ContactsProgress::is_empty)
251 {
252 self.contacts_progress = None;
253 }
254 }
255 }
256
257 self.fetch_in_progress = None;
258 }
259 }
260
261 fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
262 if addrs.is_empty() {
263 return;
264 }
265 for addr in addrs {
266 Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
267 }
268 }
269
270 fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
271 if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
272 peer_ids.insert(peer_id);
273 }
274 queue.push_back(addr);
275 }
276
277 fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
278 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
279 let _ = addr.pop();
280 Some(peer_id)
281 } else {
282 None
283 }
284 }
285
286 fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
287 match self.next_addr() {
288 Ok(Some(addr)) => Ok(Some(addr)),
289 Ok(None) => Ok(None),
290 Err(Error::NoBootstrapPeersFound) => {
291 self.bootstrap_completed = true;
292 Err(Error::NoBootstrapPeersFound)
293 }
294 Err(err) => Err(err),
295 }
296 }
297
298 fn should_continue_bootstrapping(&mut self, contacted_peers: usize) -> bool {
299 if self.bootstrap_completed {
300 info!("Initial bootstrap process has already completed successfully.");
301 return false;
302 }
303
304 if contacted_peers
305 >= self
306 .cache_store
307 .config()
308 .max_contacted_peers_before_termination
309 {
310 self.bootstrap_completed = true;
311 self.addrs.clear();
312 self.ongoing_dials.clear();
313
314 info!(
315 "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
316 );
317
318 return false;
319 }
320
321 if self.ongoing_dials.len() >= self.cache_store.config().max_concurrent_dials {
322 info!(
323 "Initial bootstrap has {} ongoing dials. Not dialing anymore.",
324 self.ongoing_dials.len()
325 );
326
327 return false;
328 }
329
330 if self.addrs.is_empty() {
331 if self.cache_pending
332 || self.contacts_progress.is_some()
333 || self.fetch_in_progress.is_some()
334 {
335 info!("Initial bootstrap is awaiting additional addresses to arrive.");
336 return false;
337 }
338 info!(
339 "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
340 );
341
342 self.bootstrap_completed = true;
343 return false;
344 }
345
346 true
347 }
348
349 pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
350 &mut self,
351 swarm: &mut Swarm<B>,
352 contacted_peers: usize,
353 ) {
354 if !self.should_continue_bootstrapping(contacted_peers) {
355 return;
356 }
357
358 while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
359 match self.try_next_dial_addr() {
360 Ok(Some(mut addr)) => {
361 let addr_clone = addr.clone();
362 let peer_id = Self::pop_p2p(&mut addr);
363
364 let opts = match peer_id {
365 Some(peer_id) => DialOpts::peer_id(peer_id)
366 .condition(PeerCondition::NotDialing)
367 .addresses(vec![addr])
368 .build(),
369 None => DialOpts::unknown_peer_id().address(addr).build(),
370 };
371
372 info!("Trying to dial peer with address: {addr_clone}");
373
374 match swarm.dial(opts) {
375 Ok(()) => {
376 info!(
377 "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
378 self.ongoing_dials.len() + 1
379 );
380 let _ = self.ongoing_dials.insert(addr_clone);
381 }
382 Err(err) => match err {
383 DialError::LocalPeerId { .. } => {
384 warn!(
385 "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
386 );
387 }
388 DialError::NoAddresses => {
389 error!(
390 "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
391 );
392 }
393 DialError::DialPeerConditionFalse(_) => {
394 warn!(
395 "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
396 );
397 }
398 DialError::Aborted => {
399 error!(
400 "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
401 );
402 }
403 DialError::WrongPeerId { obtained, .. } => {
404 error!(
405 "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
406 );
407 }
408 DialError::Denied { cause } => {
409 error!(
410 "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
411 );
412 }
413 DialError::Transport(items) => {
414 error!(
415 "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
416 );
417 }
418 },
419 }
420 }
421 Ok(None) => {
422 debug!("Waiting for additional bootstrap addresses before continuing to dial");
423 break;
424 }
425 Err(Error::NoBootstrapPeersFound) => {
426 info!("No more bootstrap peers available to dial.");
427 break;
428 }
429 Err(err) => {
430 warn!("Failed to obtain next bootstrap address: {err}");
431 break;
432 }
433 }
434 }
435 }
436
437 pub fn on_connection_established<B: NetworkBehaviour>(
438 &mut self,
439 peer_id: &PeerId,
440 endpoint: &ConnectedPoint,
441 swarm: &mut Swarm<B>,
442 contacted_peers: usize,
443 ) {
444 if self.bootstrap_completed {
445 return;
446 }
447
448 if let ConnectedPoint::Dialer { address, .. } = endpoint
449 && !self.ongoing_dials.remove(address)
450 {
451 self.ongoing_dials
452 .retain(|addr| match multiaddr_get_peer_id(addr) {
453 Some(id) => id != *peer_id,
454 None => true,
455 });
456 }
457
458 self.trigger_bootstrapping_process(swarm, contacted_peers);
459 }
460
461 pub fn on_outgoing_connection_error<B: NetworkBehaviour>(
462 &mut self,
463 peer_id: Option<PeerId>,
464 swarm: &mut Swarm<B>,
465 contacted_peers: usize,
466 ) {
467 if self.bootstrap_completed {
468 return;
469 }
470
471 match peer_id {
472 Some(peer_id) => {
473 self.ongoing_dials.retain(|addr| {
474 if let Some(id) = multiaddr_get_peer_id(addr) {
475 id != peer_id
476 } else {
477 true
478 }
479 });
480 }
481 None => {
482 self.ongoing_dials
485 .retain(|addr| multiaddr_get_peer_id(addr).is_some());
486 }
487 }
488
489 self.trigger_bootstrapping_process(swarm, contacted_peers);
490 }
491
492 pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
493 self.bootstrap_peer_ids.contains(peer_id)
494 }
495
496 pub fn has_terminated(&self) -> bool {
497 self.bootstrap_completed
498 }
499
500 fn start_cache_fetch(&mut self) -> Result<()> {
501 if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
502 error!("Cache fetch already in progress, not starting another");
503 return Ok(());
504 }
505
506 self.cache_pending = false;
507 let config = self.cache_store.config().clone();
508 let event_tx = self.event_tx.clone();
509
510 tokio::spawn(async move {
511 let addrs = match tokio::task::spawn_blocking(move || {
512 BootstrapCacheStore::load_cache_data(&config)
513 })
514 .await
515 {
516 Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
517 Ok(Err(err)) => {
518 warn!("Failed to load cache data: {err}");
519 Vec::new()
520 }
521 Err(err) => {
522 warn!("Cache fetch task failed to join: {err}");
523 Vec::new()
524 }
525 };
526
527 info!(
528 "Bootstrap cache loaded from disk with {} addresses",
529 addrs.len()
530 );
531 if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
532 error!("Failed to send cache fetch event: {err:?}");
533 }
534 });
535
536 self.fetch_in_progress = Some(FetchKind::Cache);
537
538 Ok(())
539 }
540
541 fn start_contacts_fetch(&mut self) -> Result<()> {
542 if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
543 error!("Contacts fetch already in progress, not starting another");
544 return Ok(());
545 }
546
547 let Some(progress) = self.contacts_progress.as_mut() else {
548 info!("No contacts progress available");
549 return Ok(());
550 };
551
552 let Some(endpoint) = progress.next_endpoint() else {
553 info!("No more contacts endpoints to try");
554 self.contacts_progress = None;
555 return Ok(());
556 };
557
558 let event_tx = self.event_tx.clone();
559
560 tokio::spawn(async move {
561 let Ok(fetcher) = ContactsFetcher::with_endpoints(vec![endpoint.clone()]) else {
562 warn!("Failed to create contacts fetcher for {endpoint}");
563 if let Err(err) = event_tx.send(FetchEvent::Contacts(Vec::new())) {
564 error!("Failed to send contacts fetch error event: {err:?}");
565 }
566 return;
567 };
568
569 let addrs = match fetcher.fetch_bootstrap_addresses().await {
570 Ok(addrs) => addrs,
571 Err(err) => {
572 warn!("Failed to fetch contacts from {endpoint}: {err}");
573 Vec::new()
574 }
575 };
576
577 info!(
578 "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
579 addrs.len()
580 );
581 if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
582 error!("Failed to send contacts fetch event: {err:?}");
583 }
584 });
585
586 self.fetch_in_progress = Some(FetchKind::Contacts);
587
588 Ok(())
589 }
590
591 fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
592 if config.first {
593 info!("First node in network; not fetching contacts");
594 return Ok(None);
595 }
596
597 if config.local {
598 info!("Local network configuration; skipping contacts endpoints");
599 return Ok(None);
600 }
601
602 if !config.network_contacts_url.is_empty() {
603 let endpoints = config
604 .network_contacts_url
605 .iter()
606 .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
607 .collect::<Result<Vec<_>>>()?;
608 info!("Using provided contacts endpoints: {endpoints:?}");
609 return Ok(ContactsProgress::new(endpoints));
610 }
611
612 match get_network_id() {
613 id if id == MAINNET_ID => {
614 info!("Using built-in mainnet contacts endpoints");
615 Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
616 }
617
618 id if id == ALPHANET_ID => {
619 info!("Using built-in alphanet contacts endpoints");
620 Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
621 }
622 _ => Ok(None),
623 }
624 }
625
626 pub fn fetch_from_env() -> Vec<Multiaddr> {
627 let mut bootstrap_addresses = Vec::new();
628 if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
630 for addr_str in addrs.split(',') {
631 if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
632 info!("Adding addr from environment variable: {addr}");
633 bootstrap_addresses.push(addr);
634 } else {
635 warn!("Invalid multiaddress format from environment variable: {addr_str}");
636 }
637 }
638 }
639 bootstrap_addresses
640 }
641
642 pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
643 &mut self.cache_store
644 }
645
646 pub fn cache_store(&self) -> &BootstrapCacheStore {
647 &self.cache_store
648 }
649}
650
651impl Drop for Bootstrap {
652 fn drop(&mut self) {
653 if let Some(cache_task) = self.cache_task.take() {
654 cache_task.abort();
655 }
656 }
657}
658
659#[derive(Debug)]
660struct ContactsProgress {
661 remaining: VecDeque<Url>,
662}
663
664enum FetchEvent {
665 Cache(Vec<Multiaddr>),
666 Contacts(Vec<Multiaddr>),
667}
668
669#[derive(Clone, Copy, Debug, PartialEq, Eq)]
670enum FetchKind {
671 Cache,
672 Contacts,
673}
674
675impl ContactsProgress {
676 fn new(urls: Vec<Url>) -> Option<Self> {
677 if urls.is_empty() {
678 None
679 } else {
680 Some(Self {
681 remaining: VecDeque::from(urls),
682 })
683 }
684 }
685
686 fn from_static(urls: &[&str]) -> Option<Self> {
687 let mut parsed = Vec::new();
688 for url in urls {
689 match url.parse::<Url>() {
690 Ok(parsed_url) => parsed.push(parsed_url),
691 Err(err) => {
692 warn!("Failed to parse static contacts URL {url}: {err}");
693 }
694 }
695 }
696 Self::new(parsed)
697 }
698
699 fn next_endpoint(&mut self) -> Option<Url> {
700 self.remaining.pop_front()
701 }
702
703 fn is_empty(&self) -> bool {
704 self.remaining.is_empty()
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711 use crate::{
712 InitialPeersConfig,
713 cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
714 multiaddr_get_peer_id,
715 };
716 use libp2p::Multiaddr;
717 use std::collections::HashSet;
718 use std::sync::{Arc, OnceLock};
719 use std::time::{Duration, Instant};
720 use tempfile::TempDir;
721 use tokio::sync::{Mutex, OwnedMutexGuard};
722 use tokio::time::sleep;
723 use wiremock::{
724 Mock, MockServer, ResponseTemplate,
725 matchers::{method, path},
726 };
727
728 async fn env_lock() -> OwnedMutexGuard<()> {
729 static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
730 Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
731 .lock_owned()
732 .await
733 }
734
735 #[allow(unsafe_code)]
736 fn set_env_var(key: &str, value: &str) {
737 unsafe {
738 std::env::set_var(key, value);
739 }
740 }
741
742 #[allow(unsafe_code)]
743 fn remove_env_var(key: &str) {
744 unsafe {
745 std::env::remove_var(key);
746 }
747 }
748
749 async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
750 let deadline = Instant::now() + Duration::from_secs(2);
751 loop {
752 match flow.next_addr() {
753 Ok(Some(addr)) => return Ok(addr),
754 Ok(None) => {
755 if Instant::now() >= deadline {
756 panic!("Timed out waiting for next address");
757 }
758 sleep(Duration::from_millis(5)).await;
759 }
760 Err(err) => return Err(err),
761 }
762 }
763 }
764
765 async fn expect_err(flow: &mut Bootstrap) -> Error {
766 let deadline = Instant::now() + Duration::from_secs(2);
767 loop {
768 match flow.next_addr() {
769 Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
770 Ok(None) => {
771 if Instant::now() >= deadline {
772 panic!("Timed out waiting for error from flow");
773 }
774 sleep(Duration::from_millis(5)).await;
775 }
776 Err(err) => return err,
777 }
778 }
779 }
780
781 fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
782 let peer_id = libp2p::PeerId::random();
783 format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
784 .parse()
785 .unwrap()
786 }
787
788 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
789 async fn test_cli_arguments_precedence() {
790 let env_addr: Multiaddr =
791 "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
792 .parse()
793 .unwrap();
794 let cli_addr: Multiaddr =
795 "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
796 .parse()
797 .unwrap();
798
799 let _env_guard = env_lock().await;
800 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
801
802 let temp_dir = TempDir::new().unwrap();
803
804 let config = InitialPeersConfig {
805 ignore_cache: true,
806 local: true,
807 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
808 addrs: vec![cli_addr.clone()],
809 ..Default::default()
810 };
811 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
812 let mut flow = Bootstrap::new(config).await.unwrap();
813
814 let first_two = vec![
815 expect_next_addr(&mut flow).await.unwrap(),
816 expect_next_addr(&mut flow).await.unwrap(),
817 ];
818 let first_set: HashSet<_> = first_two.into_iter().collect();
819 let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
820 assert_eq!(first_set, expected);
821
822 let err = expect_err(&mut flow).await;
823 assert!(matches!(err, Error::NoBootstrapPeersFound));
824
825 remove_env_var(ANT_PEERS_ENV);
826 }
827
828 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
829 async fn test_env_variable_parsing() {
830 let _env_guard = env_lock().await;
831 set_env_var(
832 ANT_PEERS_ENV,
833 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
834/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
835 );
836
837 let parsed = Bootstrap::fetch_from_env();
838 remove_env_var(ANT_PEERS_ENV);
839
840 assert_eq!(parsed.len(), 2);
841 let parsed_set: std::collections::HashSet<_> =
842 parsed.into_iter().map(|addr| addr.to_string()).collect();
843 let expected = std::collections::HashSet::from([
844 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
845 .to_string(),
846 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
847 .to_string(),
848 ]);
849 assert_eq!(parsed_set, expected);
850 }
851
852 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
853 async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
854 let _env_guard = env_lock().await;
855 let cache_addr: Multiaddr =
856 "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
857 .parse()
858 .unwrap();
859 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
860
861 let temp_dir = TempDir::new().unwrap();
862 let file_name = BootstrapCacheStore::cache_file_name(true);
863
864 let mut cache_data = CacheData::default();
865 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
866 cache_data
867 .write_to_file(temp_dir.path(), &file_name)
868 .unwrap();
869
870 let config = InitialPeersConfig {
871 local: true,
872 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
873 ..Default::default()
874 };
875 let mut config =
876 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
877 config.disable_env_peers = true;
878 let mut flow = Bootstrap::new(config).await.unwrap();
879
880 let got = expect_next_addr(&mut flow).await.unwrap();
881 assert_eq!(got, cache_addr);
882
883 let err = expect_err(&mut flow).await;
884 assert!(matches!(err, Error::NoBootstrapPeersFound));
885 }
886
887 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
888 async fn test_first_flag_behavior() {
889 let _env_guard = env_lock().await;
890
891 let mock_server = MockServer::start().await;
892 Mock::given(method("GET"))
893 .and(path("/peers"))
894 .respond_with(ResponseTemplate::new(200).set_body_string(
895 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
896 ))
897 .expect(0)
898 .mount(&mock_server)
899 .await;
900
901 let temp_dir = TempDir::new().unwrap();
902 let config = InitialPeersConfig {
903 first: true,
904 addrs: vec![
905 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
906 .parse()
907 .unwrap(),
908 ],
909 network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
910 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
911 ..Default::default()
912 };
913 let mut config =
914 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
915 config.disable_env_peers = true;
916 let mut flow = Bootstrap::new(config).await.unwrap();
917
918 let err = expect_err(&mut flow).await;
919 assert!(matches!(err, Error::NoBootstrapPeersFound));
920 assert!(
921 mock_server.received_requests().await.unwrap().is_empty(),
922 "first flag should prevent contact fetches"
923 );
924 }
925
926 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
927 async fn test_multiple_network_contacts() {
928 let _env_guard = env_lock().await;
929
930 let mock_server = MockServer::start().await;
931
932 let contact_one: Multiaddr =
933 "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
934 .parse()
935 .unwrap();
936 let contact_two: Multiaddr =
937 "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
938 .parse()
939 .unwrap();
940
941 Mock::given(method("GET"))
942 .and(path("/first"))
943 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
944 .expect(1)
945 .mount(&mock_server)
946 .await;
947
948 Mock::given(method("GET"))
949 .and(path("/second"))
950 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
951 .expect(1)
952 .mount(&mock_server)
953 .await;
954
955 let config = InitialPeersConfig {
956 ignore_cache: true,
957 network_contacts_url: vec![
958 format!("{}/first", mock_server.uri()),
959 format!("{}/second", mock_server.uri()),
960 ],
961 ..Default::default()
962 };
963 let mut config =
964 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
965 config.disable_env_peers = true;
966 let mut flow = Bootstrap::new(config).await.unwrap();
967
968 let first = expect_next_addr(&mut flow).await.unwrap();
969 assert_eq!(first, contact_one);
970
971 let second = expect_next_addr(&mut flow).await.unwrap();
972 assert_eq!(second, contact_two);
973
974 let err = expect_err(&mut flow).await;
975 assert!(matches!(err, Error::NoBootstrapPeersFound));
976
977 let requests = mock_server.received_requests().await.unwrap();
978 assert_eq!(requests.len(), 2);
979 assert_eq!(requests[0].url.path(), "/first");
980 assert_eq!(requests[1].url.path(), "/second");
981 }
982
983 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
984 async fn test_full_bootstrap_flow() {
985 let _env_guard = env_lock().await;
986 remove_env_var(ANT_PEERS_ENV);
987
988 let env_addr: Multiaddr =
989 "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
990 .parse()
991 .unwrap();
992 let cli_addr: Multiaddr =
993 "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
994 .parse()
995 .unwrap();
996 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
997
998 let cache_addr_one: Multiaddr =
999 "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1000 .parse()
1001 .unwrap();
1002 let cache_addr_two: Multiaddr =
1003 "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1004 .parse()
1005 .unwrap();
1006
1007 let temp_dir = TempDir::new().unwrap();
1008 let file_name = BootstrapCacheStore::cache_file_name(false);
1009 let mut cache_data = CacheData::default();
1010 cache_data.add_peer(
1011 multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1012 std::iter::once(&cache_addr_one),
1013 3,
1014 10,
1015 );
1016 cache_data.add_peer(
1017 multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1018 std::iter::once(&cache_addr_two),
1019 3,
1020 10,
1021 );
1022 cache_data
1023 .write_to_file(temp_dir.path(), &file_name)
1024 .unwrap();
1025
1026 let mock_server = MockServer::start().await;
1027 let contact_one: Multiaddr =
1028 "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1029 .parse()
1030 .unwrap();
1031 let contact_two: Multiaddr =
1032 "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1033 .parse()
1034 .unwrap();
1035
1036 Mock::given(method("GET"))
1037 .and(path("/contacts_one"))
1038 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1039 .expect(1)
1040 .mount(&mock_server)
1041 .await;
1042
1043 Mock::given(method("GET"))
1044 .and(path("/contacts_two"))
1045 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1046 .expect(1)
1047 .mount(&mock_server)
1048 .await;
1049
1050 let file_path = temp_dir.path().join(format!(
1051 "version_{}/{}",
1052 CacheData::CACHE_DATA_VERSION,
1053 file_name
1054 ));
1055 let contents = std::fs::read_to_string(&file_path).unwrap();
1056 assert!(contents.contains(&cache_addr_one.to_string()));
1057 assert!(contents.contains(&cache_addr_two.to_string()));
1058
1059 assert_eq!(
1060 Bootstrap::fetch_from_env(),
1061 vec![env_addr.clone()],
1062 "environment variable should yield the configured address"
1063 );
1064
1065 let config = InitialPeersConfig {
1066 addrs: vec![cli_addr.clone()],
1067 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1068 network_contacts_url: vec![
1069 format!("{}/contacts_one", mock_server.uri()),
1070 format!("{}/contacts_two", mock_server.uri()),
1071 ],
1072 ..Default::default()
1073 };
1074 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1075 let mut flow = Bootstrap::new(config).await.unwrap();
1076
1077 let initial_results = vec![
1078 expect_next_addr(&mut flow).await.unwrap(),
1079 expect_next_addr(&mut flow).await.unwrap(),
1080 ];
1081 let initial_set: HashSet<_> = initial_results.into_iter().collect();
1082 let expected_initial: HashSet<_> =
1083 [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1084 assert_eq!(initial_set, expected_initial);
1085
1086 let cache_results = vec![
1087 expect_next_addr(&mut flow).await.unwrap(),
1088 expect_next_addr(&mut flow).await.unwrap(),
1089 ];
1090 let cache_set: HashSet<_> = cache_results.into_iter().collect();
1091 let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1092 .into_iter()
1093 .collect();
1094 assert_eq!(cache_set, expected_cache);
1095
1096 let contact_first = expect_next_addr(&mut flow).await.unwrap();
1097 assert_eq!(contact_first, contact_one);
1098
1099 let contact_second = expect_next_addr(&mut flow).await.unwrap();
1100 assert_eq!(contact_second, contact_two);
1101
1102 let err = expect_err(&mut flow).await;
1103 assert!(matches!(err, Error::NoBootstrapPeersFound));
1104
1105 let requests = mock_server.received_requests().await.unwrap();
1106 assert_eq!(requests.len(), 2);
1107 assert_eq!(requests[0].url.path(), "/contacts_one");
1108 assert_eq!(requests[1].url.path(), "/contacts_two");
1109
1110 remove_env_var(ANT_PEERS_ENV);
1111 }
1112
1113 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1114 async fn test_disable_env_peers_flag() {
1115 let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1116
1117 let _env_guard = env_lock().await;
1118 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1119
1120 let temp_dir = TempDir::new().unwrap();
1121
1122 let config = InitialPeersConfig {
1123 local: true,
1124 ignore_cache: true,
1125 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1126 ..Default::default()
1127 };
1128 let mut config =
1129 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1130 config.disable_env_peers = true;
1131
1132 let result = Bootstrap::new(config).await;
1133 assert!(
1134 result.is_err(),
1135 "Should error when env peers are disabled and no other sources available"
1136 );
1137 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1138
1139 remove_env_var(ANT_PEERS_ENV);
1140 }
1141
1142 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1143 async fn test_disable_cache_reading_flag() {
1144 let _env_guard = env_lock().await;
1145
1146 let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1147 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1148
1149 let temp_dir = TempDir::new().unwrap();
1150 let file_name = BootstrapCacheStore::cache_file_name(true);
1151
1152 let mut cache_data = CacheData::default();
1153 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1154 cache_data
1155 .write_to_file(temp_dir.path(), &file_name)
1156 .unwrap();
1157
1158 let config = InitialPeersConfig {
1159 local: true,
1160 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1161 ..Default::default()
1162 };
1163 let mut config =
1164 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1165 config.disable_env_peers = true;
1166 config.disable_cache_reading = true;
1167
1168 let result = Bootstrap::new(config).await;
1169 assert!(
1170 result.is_err(),
1171 "Should error when cache reading is disabled and no other sources available"
1172 );
1173 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1174 }
1175
1176 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1177 async fn test_bootstrap_completed_initialization() {
1178 let temp_dir = TempDir::new().unwrap();
1179
1180 let config = InitialPeersConfig {
1181 first: true,
1182 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1183 ..Default::default()
1184 };
1185 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1186 let flow = Bootstrap::new(config).await.unwrap();
1187
1188 assert!(
1189 flow.has_terminated(),
1190 "bootstrap_completed should be true for first node"
1191 );
1192
1193 let config = InitialPeersConfig {
1194 first: false,
1195 local: true,
1196 ignore_cache: true,
1197 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1198 addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1199 ..Default::default()
1200 };
1201 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1202 let flow = Bootstrap::new(config).await.unwrap();
1203
1204 assert!(
1205 !flow.has_terminated(),
1206 "bootstrap_completed should be false for non-first node"
1207 );
1208 }
1209
1210 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1211 async fn test_bootstrap_peer_ids_population() {
1212 let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1213 let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1214
1215 let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1216 let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1217
1218 let _env_guard = env_lock().await;
1219 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1220
1221 let temp_dir = TempDir::new().unwrap();
1222
1223 let config = InitialPeersConfig {
1224 local: true,
1225 ignore_cache: true,
1226 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1227 addrs: vec![cli_addr.clone()],
1228 ..Default::default()
1229 };
1230 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1231 let flow = Bootstrap::new(config).await.unwrap();
1232
1233 assert!(
1234 flow.is_bootstrap_peer(&env_peer_id),
1235 "Peer ID from env should be tracked"
1236 );
1237 assert!(
1238 flow.is_bootstrap_peer(&cli_peer_id),
1239 "Peer ID from CLI should be tracked"
1240 );
1241
1242 remove_env_var(ANT_PEERS_ENV);
1243 }
1244
1245 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1246 async fn test_invalid_multiaddr_in_initial_peers() {
1247 let _env_guard = env_lock().await;
1248
1249 let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1250 let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1251
1252 let temp_dir = TempDir::new().unwrap();
1253
1254 let config = InitialPeersConfig {
1255 local: true,
1256 ignore_cache: true,
1257 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1258 addrs: vec![valid_addr.clone()],
1259 ..Default::default()
1260 };
1261 let mut config =
1262 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1263 config.disable_env_peers = true;
1264
1265 config.initial_peers.push(invalid_addr);
1266
1267 let mut flow = Bootstrap::new(config).await.unwrap();
1268
1269 let first = expect_next_addr(&mut flow).await.unwrap();
1270 assert_eq!(first, valid_addr, "Should get the valid address");
1271
1272 let err = expect_err(&mut flow).await;
1273 assert!(
1274 matches!(err, Error::NoBootstrapPeersFound),
1275 "Should not find any more peers after valid one (invalid addr was filtered)"
1276 );
1277 }
1278
1279 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280 async fn test_local_network_skips_contacts() {
1281 let _env_guard = env_lock().await;
1282
1283 let mock_server = MockServer::start().await;
1284 Mock::given(method("GET"))
1285 .and(path("/should-not-be-called"))
1286 .respond_with(ResponseTemplate::new(200).set_body_string(
1287 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1288 ))
1289 .expect(0)
1290 .mount(&mock_server)
1291 .await;
1292
1293 let temp_dir = TempDir::new().unwrap();
1294
1295 let config = InitialPeersConfig {
1296 local: true,
1297 ignore_cache: true,
1298 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1299 network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1300 addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1301 ..Default::default()
1302 };
1303 let mut config =
1304 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1305 config.disable_env_peers = true;
1306
1307 let addr_from_config = config.initial_peers[0].clone();
1308 let mut flow = Bootstrap::new(config).await.unwrap();
1309
1310 let first = expect_next_addr(&mut flow).await.unwrap();
1311 assert_eq!(
1312 first, addr_from_config,
1313 "Should get the address from config"
1314 );
1315
1316 let err = expect_err(&mut flow).await;
1317 assert!(matches!(err, Error::NoBootstrapPeersFound));
1318
1319 assert!(
1320 mock_server.received_requests().await.unwrap().is_empty(),
1321 "local flag should prevent contact fetches"
1322 );
1323 }
1324
1325 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1326 async fn test_timeout_with_no_addresses() {
1327 let _env_guard = env_lock().await;
1328
1329 let temp_dir = TempDir::new().unwrap();
1330 let file_name = BootstrapCacheStore::cache_file_name(true);
1331 let cache_data = CacheData::default();
1332 cache_data
1333 .write_to_file(temp_dir.path(), &file_name)
1334 .unwrap();
1335
1336 let config = InitialPeersConfig {
1337 local: true,
1338 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1339 ..Default::default()
1340 };
1341 let mut config =
1342 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1343 config.disable_env_peers = true;
1344
1345 let result = Bootstrap::new(config).await;
1346
1347 assert!(
1348 result.is_err(),
1349 "Should error when no addresses are available from any source"
1350 );
1351 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1352 }
1353
1354 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1355 async fn test_first_node_clears_cache() {
1356 let _env_guard = env_lock().await;
1357
1358 let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1359 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1360
1361 let temp_dir = TempDir::new().unwrap();
1362 let file_name = BootstrapCacheStore::cache_file_name(false);
1363
1364 let mut cache_data = CacheData::default();
1365 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1366 cache_data
1367 .write_to_file(temp_dir.path(), &file_name)
1368 .unwrap();
1369
1370 let file_path = temp_dir.path().join(format!(
1371 "version_{}/{}",
1372 CacheData::CACHE_DATA_VERSION,
1373 file_name
1374 ));
1375
1376 let contents_before = std::fs::read_to_string(&file_path).unwrap();
1377 assert!(
1378 contents_before.contains(&cache_addr.to_string()),
1379 "Cache should contain the address before initialization"
1380 );
1381
1382 let config = InitialPeersConfig {
1383 first: true,
1384 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1385 ..Default::default()
1386 };
1387 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1388 let _flow = Bootstrap::new(config).await.unwrap();
1389
1390 tokio::time::sleep(Duration::from_millis(100)).await;
1391
1392 let contents_after = std::fs::read_to_string(&file_path).unwrap();
1393 assert!(
1394 !contents_after.contains(&cache_addr.to_string()),
1395 "Cache should be cleared for first node"
1396 );
1397 }
1398
1399 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1400 async fn test_new_loads_at_least_50_contacts() {
1401 let _env_guard = env_lock().await;
1402
1403 let temp_dir = TempDir::new().unwrap();
1404 let file_name = BootstrapCacheStore::cache_file_name(true);
1405
1406 let mut cache_data = CacheData::default();
1407 for i in 0..60 {
1408 let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1409 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1410 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1411 }
1412 cache_data
1413 .write_to_file(temp_dir.path(), &file_name)
1414 .unwrap();
1415
1416 let config = InitialPeersConfig {
1417 local: true,
1418 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1419 ..Default::default()
1420 };
1421 let mut config =
1422 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1423 config.disable_env_peers = true;
1424
1425 let result = Bootstrap::new(config).await;
1426
1427 assert!(
1428 result.is_ok(),
1429 "Should successfully initialize with 60 contacts in cache"
1430 );
1431
1432 let mut flow = result.unwrap();
1433 let mut count = 0;
1434 while let Ok(Some(_addr)) = flow.next_addr() {
1435 count += 1;
1436 if count >= 60 {
1437 break;
1438 }
1439 }
1440
1441 assert!(
1442 count > 0,
1443 "Should have loaded contacts from cache, got {count}"
1444 );
1445 }
1446
1447 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1448 async fn test_new_succeeds_with_few_contacts() {
1449 let _env_guard = env_lock().await;
1450
1451 let temp_dir = TempDir::new().unwrap();
1452 let file_name = BootstrapCacheStore::cache_file_name(true);
1453
1454 let mut cache_data = CacheData::default();
1455 for i in 0..5 {
1456 let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1457 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1458 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1459 }
1460 cache_data
1461 .write_to_file(temp_dir.path(), &file_name)
1462 .unwrap();
1463
1464 let config = InitialPeersConfig {
1465 local: true,
1466 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1467 ..Default::default()
1468 };
1469 let mut config =
1470 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1471 config.disable_env_peers = true;
1472
1473 let result = Bootstrap::new(config).await;
1474 assert!(
1475 result.is_ok(),
1476 "Should succeed with few contacts (< 50 but > 0)"
1477 );
1478
1479 let mut flow = result.unwrap();
1480 let mut count = 0;
1481 while let Ok(Some(_addr)) = flow.next_addr() {
1482 count += 1;
1483 if count >= 10 {
1484 break;
1485 }
1486 }
1487
1488 assert_eq!(count, 5, "Should have exactly 5 contacts");
1489 }
1490
1491 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1492 async fn test_new_errors_with_zero_contacts() {
1493 let _env_guard = env_lock().await;
1494
1495 let temp_dir = TempDir::new().unwrap();
1496 let file_name = BootstrapCacheStore::cache_file_name(false);
1497 let cache_data = CacheData::default();
1498 cache_data
1499 .write_to_file(temp_dir.path(), &file_name)
1500 .unwrap();
1501
1502 let mock_server = MockServer::start().await;
1503 Mock::given(method("GET"))
1504 .and(path("/failing-endpoint"))
1505 .respond_with(ResponseTemplate::new(500))
1506 .expect(1..)
1507 .mount(&mock_server)
1508 .await;
1509
1510 let config = InitialPeersConfig {
1511 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1512 network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1513 ..Default::default()
1514 };
1515 let mut config =
1516 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1517 config.disable_env_peers = true;
1518
1519 let result = Bootstrap::new(config).await;
1520
1521 assert!(
1522 result.is_err(),
1523 "Should error when all sources fail and no contacts are available"
1524 );
1525 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1526 }
1527}