1use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24 Multiaddr, PeerId, Swarm,
25 core::connection::ConnectedPoint,
26 multiaddr::Protocol,
27 swarm::{
28 DialError, NetworkBehaviour,
29 dial_opts::{DialOpts, PeerCondition},
30 },
31};
32use std::collections::{HashSet, VecDeque};
33use std::time::Duration;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::sync::mpsc::UnboundedSender;
36use url::Url;
37
38const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
40
41const MIN_INITIAL_ADDRS: usize = 5;
43
44const INITIAL_ADDR_FETCH_TIMEOUT_SECS: u64 = 30;
46
47#[derive(custom_debug::Debug)]
63pub struct Bootstrap {
64 cache_store: BootstrapCacheStore,
65 addrs: VecDeque<Multiaddr>,
66 #[debug(skip)]
68 cache_task: Option<tokio::task::JoinHandle<()>>,
69 cache_pending: bool,
71 contacts_progress: Option<ContactsProgress>,
72 event_tx: UnboundedSender<FetchEvent>,
73 event_rx: UnboundedReceiver<FetchEvent>,
74 fetch_in_progress: Option<FetchKind>,
75 ongoing_dials: HashSet<Multiaddr>,
77 bootstrap_peer_ids: HashSet<PeerId>,
78 bootstrap_completed: bool,
79}
80
81impl Bootstrap {
82 pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
83 let contacts_progress = Self::build_contacts_progress(&config)?;
84
85 let mut addrs_queue = VecDeque::new();
86 let mut bootstrap_peer_ids = HashSet::new();
87 if !config.first {
88 if !config.disable_env_peers {
89 for addr in Self::fetch_from_env() {
90 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
91 }
92 } else {
93 info!("Skipping ANT_PEERS environment variable as per configuration");
94 }
95
96 for addr in config.initial_peers.drain(..) {
97 if let Some(addr) = craft_valid_multiaddr(&addr, false) {
98 info!("Adding addr from arguments: {addr}");
99 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
100 } else {
101 warn!("Invalid multiaddress format from arguments: {addr}");
102 }
103 }
104 }
105
106 let cache_pending = !config.first && !config.disable_cache_reading;
107 if !cache_pending {
108 info!(
109 "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
110 config.first, config.disable_cache_reading
111 );
112 } else {
113 info!("Cache loading is enabled - cache will be fetched if needed");
114 }
115 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
116
117 let cache_store = BootstrapCacheStore::new(config.clone())?;
118
119 let mut bootstrap = Self {
120 cache_store,
121 addrs: addrs_queue,
122 cache_pending,
123 contacts_progress,
124 event_tx,
125 event_rx,
126 fetch_in_progress: None,
127 ongoing_dials: HashSet::new(),
128 bootstrap_peer_ids,
129 bootstrap_completed: config.first,
130 cache_task: None,
131 };
132
133 info!("Cache store is initialized and will sync and flush periodically");
134 let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
135 bootstrap.cache_task = Some(cache_task);
136
137 if config.first {
138 info!("First node in network; clearing any existing cache");
139 bootstrap.cache_store.write().await?;
140 return Ok(bootstrap);
141 }
142
143 let mut collected_addrs = Vec::new();
147 if bootstrap.addrs.len() < MIN_INITIAL_ADDRS {
148 info!("Initial address queue < {MIN_INITIAL_ADDRS}; fetching from cache/contacts");
149 let now = std::time::Instant::now();
150 loop {
151 match bootstrap.next_addr() {
152 Ok(Some(addr)) => {
153 collected_addrs.push(addr);
154 if Self::try_finalize_initial_addrs(
155 &mut bootstrap,
156 &mut collected_addrs,
157 MIN_INITIAL_ADDRS,
158 ) {
159 break;
160 }
161 continue;
162 }
163 Ok(None) => {
164 debug!(
165 "No immediate address available; waiting for async fetch to complete"
166 );
167 }
168 Err(err) => {
169 if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1)
170 {
171 break;
172 }
173 warn!("Failed to fetch initial address: {err}");
174 return Err(err);
175 }
176 }
177
178 if now.elapsed() > std::time::Duration::from_secs(INITIAL_ADDR_FETCH_TIMEOUT_SECS) {
179 if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1) {
180 break;
181 }
182 error!("Timed out waiting for initial addresses. ");
183 return Err(Error::NoBootstrapPeersFound);
184 }
185 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186 }
187 }
188
189 Ok(bootstrap)
190 }
191
192 fn try_finalize_initial_addrs(
196 bootstrap: &mut Bootstrap,
197 collected_addrs: &mut Vec<Multiaddr>,
198 min_address: usize,
199 ) -> bool {
200 if collected_addrs.len() < min_address {
201 return false;
202 }
203 info!(
204 "Collected minimum required initial addresses ({}), proceeding with bootstrap.",
205 collected_addrs.len()
206 );
207 bootstrap.extend_with_addrs(std::mem::take(collected_addrs));
208 true
209 }
210
211 pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
218 loop {
219 self.process_events();
220
221 if let Some(addr) = self.addrs.pop_front() {
222 info!("Returning next bootstrap address: {addr}");
223 return Ok(Some(addr));
224 }
225
226 if let Some(fetch_kind) = self.fetch_in_progress {
227 debug!("Fetch in progress: {fetch_kind:?}; waiting for addresses");
228 return Ok(None);
229 }
230
231 if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
232 info!("Triggering cache fetch");
233 self.start_cache_fetch()?;
234 continue;
235 }
236
237 if self.contacts_progress.is_some()
238 && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
239 {
240 info!("Triggering contacts fetch");
241 self.start_contacts_fetch()?;
242 if self.fetch_in_progress.is_some() {
243 return Ok(None);
244 }
245 continue;
246 }
247
248 warn!("No more sources to fetch bootstrap addresses from, and address queue is empty.");
249 return Err(Error::NoBootstrapPeersFound);
250 }
251 }
252
253 fn process_events(&mut self) {
254 while let Ok(event) = self.event_rx.try_recv() {
255 match event {
256 FetchEvent::Cache(addrs) => {
257 if addrs.is_empty() {
258 info!("Cache fetch has completed, but read 0 addresses");
259 } else {
260 info!("Cache fetch has completed. Got {} addresses", addrs.len());
261 self.extend_with_addrs(addrs);
262 }
263 }
264 FetchEvent::Contacts(addrs) => {
265 info!(
266 "Contacts fetch has completed. Got {} addresses",
267 addrs.len()
268 );
269 self.extend_with_addrs(addrs);
270 if self
271 .contacts_progress
272 .as_ref()
273 .is_none_or(ContactsProgress::is_empty)
274 {
275 self.contacts_progress = None;
276 }
277 }
278 }
279
280 self.fetch_in_progress = None;
281 }
282 }
283
284 fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
285 if addrs.is_empty() {
286 return;
287 }
288 for addr in addrs {
289 Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
290 }
291 }
292
293 fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
294 if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
295 peer_ids.insert(peer_id);
296 }
297 queue.push_back(addr);
298 }
299
300 fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
301 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
302 let _ = addr.pop();
303 Some(peer_id)
304 } else {
305 None
306 }
307 }
308
309 fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
310 match self.next_addr() {
311 Ok(Some(addr)) => Ok(Some(addr)),
312 Ok(None) => Ok(None),
313 Err(Error::NoBootstrapPeersFound) => {
314 self.bootstrap_completed = true;
315 Err(Error::NoBootstrapPeersFound)
316 }
317 Err(err) => Err(err),
318 }
319 }
320
321 fn has_bootstrap_completed(&self, contacted_peers: usize) -> bool {
323 if self.bootstrap_completed {
324 debug!("Initial bootstrap process has already completed successfully.");
325 return true;
326 }
327
328 if contacted_peers
329 >= self
330 .cache_store
331 .config()
332 .max_contacted_peers_before_termination
333 {
334 info!(
335 "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
336 );
337 return true;
338 }
339
340 if self.addrs.is_empty()
343 && !self.cache_pending
344 && self.contacts_progress.is_none()
345 && self.fetch_in_progress.is_none()
346 {
347 info!(
348 "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
349 );
350 return true;
351 }
352
353 false
354 }
355
356 pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
361 &mut self,
362 swarm: &mut Swarm<B>,
363 contacted_peers: usize,
364 ) -> bool {
365 if self.has_bootstrap_completed(contacted_peers) {
366 self.bootstrap_completed = true;
367 self.addrs.clear();
368 self.ongoing_dials.clear();
369 return true;
370 }
371
372 while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
373 match self.try_next_dial_addr() {
374 Ok(Some(mut addr)) => {
375 let addr_clone = addr.clone();
376 let peer_id = Self::pop_p2p(&mut addr);
377
378 let opts = match peer_id {
379 Some(peer_id) => DialOpts::peer_id(peer_id)
380 .condition(PeerCondition::NotDialing)
381 .addresses(vec![addr])
382 .build(),
383 None => DialOpts::unknown_peer_id().address(addr).build(),
384 };
385
386 info!("Trying to dial peer with address: {addr_clone}");
387
388 match swarm.dial(opts) {
389 Ok(()) => {
390 info!(
391 "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
392 self.ongoing_dials.len() + 1
393 );
394 let _ = self.ongoing_dials.insert(addr_clone);
395 }
396 Err(err) => match err {
397 DialError::LocalPeerId { .. } => {
398 warn!(
399 "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
400 );
401 }
402 DialError::NoAddresses => {
403 error!(
404 "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
405 );
406 }
407 DialError::DialPeerConditionFalse(_) => {
408 warn!(
409 "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
410 );
411 }
412 DialError::Aborted => {
413 error!(
414 "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
415 );
416 }
417 DialError::WrongPeerId { obtained, .. } => {
418 error!(
419 "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
420 );
421 }
422 DialError::Denied { cause } => {
423 error!(
424 "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
425 );
426 }
427 DialError::Transport(items) => {
428 error!(
429 "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
430 );
431 }
432 },
433 }
434 }
435 Ok(None) => {
436 debug!("Waiting for additional bootstrap addresses before continuing to dial");
437 break;
438 }
439 Err(Error::NoBootstrapPeersFound) => {
440 info!("No more bootstrap peers available to dial.");
441 break;
442 }
443 Err(err) => {
444 warn!("Failed to obtain next bootstrap address: {err}");
445 break;
446 }
447 }
448 }
449 self.bootstrap_completed
450 }
451
452 pub fn on_connection_established(&mut self, peer_id: &PeerId, endpoint: &ConnectedPoint) {
453 if self.bootstrap_completed {
454 return;
455 }
456
457 if let ConnectedPoint::Dialer { address, .. } = endpoint
458 && !self.ongoing_dials.remove(address)
459 {
460 self.ongoing_dials
461 .retain(|addr| match multiaddr_get_peer_id(addr) {
462 Some(id) => id != *peer_id,
463 None => true,
464 });
465 }
466 }
467
468 pub fn on_outgoing_connection_error(&mut self, peer_id: Option<PeerId>) {
469 if self.bootstrap_completed {
470 return;
471 }
472
473 match peer_id {
474 Some(peer_id) => {
475 self.ongoing_dials.retain(|addr| {
476 if let Some(id) = multiaddr_get_peer_id(addr) {
477 id != peer_id
478 } else {
479 true
480 }
481 });
482 }
483 None => {
484 self.ongoing_dials
487 .retain(|addr| multiaddr_get_peer_id(addr).is_some());
488 }
489 }
490 }
491
492 pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
493 self.bootstrap_peer_ids.contains(peer_id)
494 }
495
496 pub fn has_terminated(&self) -> bool {
497 self.bootstrap_completed
498 }
499
500 fn start_cache_fetch(&mut self) -> Result<()> {
501 if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
502 error!("Cache fetch already in progress, not starting another");
503 return Ok(());
504 }
505
506 self.cache_pending = false;
507 let config = self.cache_store.config().clone();
508 let event_tx = self.event_tx.clone();
509
510 tokio::spawn(async move {
511 let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async move {
512 tokio::task::spawn_blocking(move || BootstrapCacheStore::load_cache_data(&config))
513 .await
514 })
515 .await;
516
517 let addrs = match fetch_result {
518 Ok(spawn_result) => match spawn_result {
519 Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
520 Ok(Err(err)) => {
521 warn!("Failed to load cache data: {err}");
522 Vec::new()
523 }
524 Err(err) => {
525 warn!("Cache fetch task failed to join: {err}");
526 Vec::new()
527 }
528 },
529 Err(_) => {
530 warn!(
531 "Cache fetch timed out after {} seconds",
532 FETCH_TIMEOUT.as_secs()
533 );
534 Vec::new()
535 }
536 };
537
538 info!(
539 "Bootstrap cache loaded from disk with {} addresses",
540 addrs.len()
541 );
542 if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
543 error!("Failed to send cache fetch event: {err:?}");
544 }
545 });
546
547 self.fetch_in_progress = Some(FetchKind::Cache);
548
549 Ok(())
550 }
551
552 fn start_contacts_fetch(&mut self) -> Result<()> {
553 if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
554 error!("Contacts fetch already in progress, not starting another");
555 return Ok(());
556 }
557
558 let Some(progress) = self.contacts_progress.as_mut() else {
559 info!("No contacts progress available");
560 return Ok(());
561 };
562
563 let Some(endpoint) = progress.next_endpoint() else {
564 info!("No more contacts endpoints to try");
565 self.contacts_progress = None;
566 return Ok(());
567 };
568
569 let event_tx = self.event_tx.clone();
570
571 tokio::spawn(async move {
572 let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async {
573 let fetcher = ContactsFetcher::with_endpoints(vec![endpoint.clone()])?;
574 fetcher.fetch_bootstrap_addresses().await
575 })
576 .await;
577
578 let addrs = match fetch_result {
579 Ok(Ok(addrs)) => addrs,
580 Ok(Err(err)) => {
581 warn!("Failed to fetch contacts from {endpoint}: {err}");
582 Vec::new()
583 }
584 Err(_) => {
585 warn!(
586 "Contacts fetch from {endpoint} timed out after {} seconds",
587 FETCH_TIMEOUT.as_secs()
588 );
589 Vec::new()
590 }
591 };
592
593 info!(
594 "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
595 addrs.len()
596 );
597 if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
598 error!("Failed to send contacts fetch event: {err:?}");
599 }
600 });
601
602 self.fetch_in_progress = Some(FetchKind::Contacts);
603
604 Ok(())
605 }
606
607 fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
608 if config.first {
609 info!("First node in network; not fetching contacts");
610 return Ok(None);
611 }
612
613 if config.local {
614 info!("Local network configuration; skipping contacts endpoints");
615 return Ok(None);
616 }
617
618 if !config.network_contacts_url.is_empty() {
619 let endpoints = config
620 .network_contacts_url
621 .iter()
622 .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
623 .collect::<Result<Vec<_>>>()?;
624 info!("Using provided contacts endpoints: {endpoints:?}");
625 return Ok(ContactsProgress::new(endpoints));
626 }
627
628 match get_network_id() {
629 id if id == MAINNET_ID => {
630 info!("Using built-in mainnet contacts endpoints");
631 Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
632 }
633
634 id if id == ALPHANET_ID => {
635 info!("Using built-in alphanet contacts endpoints");
636 Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
637 }
638 _ => Ok(None),
639 }
640 }
641
642 pub fn fetch_from_env() -> Vec<Multiaddr> {
643 let mut bootstrap_addresses = Vec::new();
644 if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
646 for addr_str in addrs.split(',') {
647 if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
648 info!("Adding addr from environment variable: {addr}");
649 bootstrap_addresses.push(addr);
650 } else {
651 warn!("Invalid multiaddress format from environment variable: {addr_str}");
652 }
653 }
654 }
655 bootstrap_addresses
656 }
657
658 pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
659 &mut self.cache_store
660 }
661
662 pub fn cache_store(&self) -> &BootstrapCacheStore {
663 &self.cache_store
664 }
665}
666
667impl Drop for Bootstrap {
668 fn drop(&mut self) {
669 if let Some(cache_sync_task) = self.cache_task.take() {
670 cache_sync_task.abort();
671 }
672 }
673}
674
675#[derive(Debug)]
676struct ContactsProgress {
677 remaining: VecDeque<Url>,
678}
679
680enum FetchEvent {
681 Cache(Vec<Multiaddr>),
682 Contacts(Vec<Multiaddr>),
683}
684
685#[derive(Clone, Copy, Debug, PartialEq, Eq)]
686enum FetchKind {
687 Cache,
688 Contacts,
689}
690
691impl ContactsProgress {
692 fn new(urls: Vec<Url>) -> Option<Self> {
693 if urls.is_empty() {
694 None
695 } else {
696 Some(Self {
697 remaining: VecDeque::from(urls),
698 })
699 }
700 }
701
702 fn from_static(urls: &[&str]) -> Option<Self> {
703 let mut parsed = Vec::new();
704 for url in urls {
705 match url.parse::<Url>() {
706 Ok(parsed_url) => parsed.push(parsed_url),
707 Err(err) => {
708 warn!("Failed to parse static contacts URL {url}: {err}");
709 }
710 }
711 }
712 Self::new(parsed)
713 }
714
715 fn next_endpoint(&mut self) -> Option<Url> {
716 self.remaining.pop_front()
717 }
718
719 fn is_empty(&self) -> bool {
720 self.remaining.is_empty()
721 }
722}
723
724#[cfg(test)]
725mod tests {
726 use super::*;
727 use crate::{
728 InitialPeersConfig,
729 cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
730 multiaddr_get_peer_id,
731 };
732 use libp2p::Multiaddr;
733 use std::collections::HashSet;
734 use std::sync::{Arc, OnceLock};
735 use std::time::{Duration, Instant};
736 use tempfile::TempDir;
737 use tokio::sync::{Mutex, OwnedMutexGuard};
738 use tokio::time::sleep;
739 use wiremock::{
740 Mock, MockServer, ResponseTemplate,
741 matchers::{method, path},
742 };
743
744 async fn env_lock() -> OwnedMutexGuard<()> {
745 static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
746 Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
747 .lock_owned()
748 .await
749 }
750
751 #[allow(unsafe_code)]
752 fn set_env_var(key: &str, value: &str) {
753 unsafe {
754 std::env::set_var(key, value);
755 }
756 }
757
758 #[allow(unsafe_code)]
759 fn remove_env_var(key: &str) {
760 unsafe {
761 std::env::remove_var(key);
762 }
763 }
764
765 async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
766 let deadline = Instant::now() + Duration::from_secs(2);
767 loop {
768 match flow.next_addr() {
769 Ok(Some(addr)) => return Ok(addr),
770 Ok(None) => {
771 if Instant::now() >= deadline {
772 panic!("Timed out waiting for next address");
773 }
774 sleep(Duration::from_millis(5)).await;
775 }
776 Err(err) => return Err(err),
777 }
778 }
779 }
780
781 async fn expect_err(flow: &mut Bootstrap) -> Error {
782 let deadline = Instant::now() + Duration::from_secs(2);
783 loop {
784 match flow.next_addr() {
785 Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
786 Ok(None) => {
787 if Instant::now() >= deadline {
788 panic!("Timed out waiting for error from flow");
789 }
790 sleep(Duration::from_millis(5)).await;
791 }
792 Err(err) => return err,
793 }
794 }
795 }
796
797 fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
798 let peer_id = libp2p::PeerId::random();
799 format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
800 .parse()
801 .unwrap()
802 }
803
804 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
805 async fn test_cli_arguments_precedence() {
806 let env_addr: Multiaddr =
807 "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
808 .parse()
809 .unwrap();
810 let cli_addr: Multiaddr =
811 "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
812 .parse()
813 .unwrap();
814
815 let _env_guard = env_lock().await;
816 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
817
818 let temp_dir = TempDir::new().unwrap();
819
820 let config = InitialPeersConfig {
821 ignore_cache: true,
822 local: true,
823 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
824 addrs: vec![cli_addr.clone()],
825 ..Default::default()
826 };
827 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
828 let mut flow = Bootstrap::new(config).await.unwrap();
829
830 let first_two = vec![
831 expect_next_addr(&mut flow).await.unwrap(),
832 expect_next_addr(&mut flow).await.unwrap(),
833 ];
834 let first_set: HashSet<_> = first_two.into_iter().collect();
835 let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
836 assert_eq!(first_set, expected);
837
838 let err = expect_err(&mut flow).await;
839 assert!(matches!(err, Error::NoBootstrapPeersFound));
840
841 remove_env_var(ANT_PEERS_ENV);
842 }
843
844 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
845 async fn test_env_variable_parsing() {
846 let _env_guard = env_lock().await;
847 set_env_var(
848 ANT_PEERS_ENV,
849 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
850/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
851 );
852
853 let parsed = Bootstrap::fetch_from_env();
854 remove_env_var(ANT_PEERS_ENV);
855
856 assert_eq!(parsed.len(), 2);
857 let parsed_set: std::collections::HashSet<_> =
858 parsed.into_iter().map(|addr| addr.to_string()).collect();
859 let expected = std::collections::HashSet::from([
860 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
861 .to_string(),
862 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
863 .to_string(),
864 ]);
865 assert_eq!(parsed_set, expected);
866 }
867
868 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
869 async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
870 let _env_guard = env_lock().await;
871 let cache_addr: Multiaddr =
872 "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
873 .parse()
874 .unwrap();
875 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
876
877 let temp_dir = TempDir::new().unwrap();
878 let file_name = BootstrapCacheStore::cache_file_name(true);
879
880 let mut cache_data = CacheData::default();
881 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
882 cache_data
883 .write_to_file(temp_dir.path(), &file_name)
884 .unwrap();
885
886 let config = InitialPeersConfig {
887 local: true,
888 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
889 ..Default::default()
890 };
891 let mut config =
892 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
893 config.disable_env_peers = true;
894 let mut flow = Bootstrap::new(config).await.unwrap();
895
896 let got = expect_next_addr(&mut flow).await.unwrap();
897 assert_eq!(got, cache_addr);
898
899 let err = expect_err(&mut flow).await;
900 assert!(matches!(err, Error::NoBootstrapPeersFound));
901 }
902
903 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
904 async fn test_first_flag_behavior() {
905 let _env_guard = env_lock().await;
906
907 let mock_server = MockServer::start().await;
908 Mock::given(method("GET"))
909 .and(path("/peers"))
910 .respond_with(ResponseTemplate::new(200).set_body_string(
911 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
912 ))
913 .expect(0)
914 .mount(&mock_server)
915 .await;
916
917 let temp_dir = TempDir::new().unwrap();
918 let config = InitialPeersConfig {
919 first: true,
920 addrs: vec![
921 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
922 .parse()
923 .unwrap(),
924 ],
925 network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
926 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
927 ..Default::default()
928 };
929 let mut config =
930 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
931 config.disable_env_peers = true;
932 let mut flow = Bootstrap::new(config).await.unwrap();
933
934 let err = expect_err(&mut flow).await;
935 assert!(matches!(err, Error::NoBootstrapPeersFound));
936 assert!(
937 mock_server.received_requests().await.unwrap().is_empty(),
938 "first flag should prevent contact fetches"
939 );
940 }
941
942 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
943 async fn test_multiple_network_contacts() {
944 let _env_guard = env_lock().await;
945
946 let mock_server = MockServer::start().await;
947
948 let contact_one: Multiaddr =
949 "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
950 .parse()
951 .unwrap();
952 let contact_two: Multiaddr =
953 "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
954 .parse()
955 .unwrap();
956
957 Mock::given(method("GET"))
958 .and(path("/first"))
959 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
960 .expect(1)
961 .mount(&mock_server)
962 .await;
963
964 Mock::given(method("GET"))
965 .and(path("/second"))
966 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
967 .expect(1)
968 .mount(&mock_server)
969 .await;
970
971 let config = InitialPeersConfig {
972 ignore_cache: true,
973 network_contacts_url: vec![
974 format!("{}/first", mock_server.uri()),
975 format!("{}/second", mock_server.uri()),
976 ],
977 ..Default::default()
978 };
979 let mut config =
980 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
981 config.disable_env_peers = true;
982 let mut flow = Bootstrap::new(config).await.unwrap();
983
984 let first = expect_next_addr(&mut flow).await.unwrap();
985 assert_eq!(first, contact_one);
986
987 let second = expect_next_addr(&mut flow).await.unwrap();
988 assert_eq!(second, contact_two);
989
990 let err = expect_err(&mut flow).await;
991 assert!(matches!(err, Error::NoBootstrapPeersFound));
992
993 let requests = mock_server.received_requests().await.unwrap();
994 assert_eq!(requests.len(), 2);
995 assert_eq!(requests[0].url.path(), "/first");
996 assert_eq!(requests[1].url.path(), "/second");
997 }
998
999 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1000 async fn test_full_bootstrap_flow() {
1001 let _env_guard = env_lock().await;
1002 remove_env_var(ANT_PEERS_ENV);
1003
1004 let env_addr: Multiaddr =
1005 "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
1006 .parse()
1007 .unwrap();
1008 let cli_addr: Multiaddr =
1009 "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
1010 .parse()
1011 .unwrap();
1012 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1013
1014 let cache_addr_one: Multiaddr =
1015 "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1016 .parse()
1017 .unwrap();
1018 let cache_addr_two: Multiaddr =
1019 "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1020 .parse()
1021 .unwrap();
1022
1023 let temp_dir = TempDir::new().unwrap();
1024 let file_name = BootstrapCacheStore::cache_file_name(false);
1025 let mut cache_data = CacheData::default();
1026 cache_data.add_peer(
1027 multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1028 std::iter::once(&cache_addr_one),
1029 3,
1030 10,
1031 );
1032 cache_data.add_peer(
1033 multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1034 std::iter::once(&cache_addr_two),
1035 3,
1036 10,
1037 );
1038 cache_data
1039 .write_to_file(temp_dir.path(), &file_name)
1040 .unwrap();
1041
1042 let mock_server = MockServer::start().await;
1043 let contact_one: Multiaddr =
1044 "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1045 .parse()
1046 .unwrap();
1047 let contact_two: Multiaddr =
1048 "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1049 .parse()
1050 .unwrap();
1051
1052 Mock::given(method("GET"))
1053 .and(path("/contacts_one"))
1054 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1055 .expect(1)
1056 .mount(&mock_server)
1057 .await;
1058
1059 Mock::given(method("GET"))
1060 .and(path("/contacts_two"))
1061 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1062 .expect(1)
1063 .mount(&mock_server)
1064 .await;
1065
1066 let file_path = temp_dir.path().join(format!(
1067 "version_{}/{}",
1068 CacheData::CACHE_DATA_VERSION,
1069 file_name
1070 ));
1071 let contents = std::fs::read_to_string(&file_path).unwrap();
1072 assert!(contents.contains(&cache_addr_one.to_string()));
1073 assert!(contents.contains(&cache_addr_two.to_string()));
1074
1075 assert_eq!(
1076 Bootstrap::fetch_from_env(),
1077 vec![env_addr.clone()],
1078 "environment variable should yield the configured address"
1079 );
1080
1081 let config = InitialPeersConfig {
1082 addrs: vec![cli_addr.clone()],
1083 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1084 network_contacts_url: vec![
1085 format!("{}/contacts_one", mock_server.uri()),
1086 format!("{}/contacts_two", mock_server.uri()),
1087 ],
1088 ..Default::default()
1089 };
1090 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1091 let mut flow = Bootstrap::new(config).await.unwrap();
1092
1093 let initial_results = vec![
1094 expect_next_addr(&mut flow).await.unwrap(),
1095 expect_next_addr(&mut flow).await.unwrap(),
1096 ];
1097 let initial_set: HashSet<_> = initial_results.into_iter().collect();
1098 let expected_initial: HashSet<_> =
1099 [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1100 assert_eq!(initial_set, expected_initial);
1101
1102 let cache_results = vec![
1103 expect_next_addr(&mut flow).await.unwrap(),
1104 expect_next_addr(&mut flow).await.unwrap(),
1105 ];
1106 let cache_set: HashSet<_> = cache_results.into_iter().collect();
1107 let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1108 .into_iter()
1109 .collect();
1110 assert_eq!(cache_set, expected_cache);
1111
1112 let contact_first = expect_next_addr(&mut flow).await.unwrap();
1113 assert_eq!(contact_first, contact_one);
1114
1115 let contact_second = expect_next_addr(&mut flow).await.unwrap();
1116 assert_eq!(contact_second, contact_two);
1117
1118 let err = expect_err(&mut flow).await;
1119 assert!(matches!(err, Error::NoBootstrapPeersFound));
1120
1121 let requests = mock_server.received_requests().await.unwrap();
1122 assert_eq!(requests.len(), 2);
1123 assert_eq!(requests[0].url.path(), "/contacts_one");
1124 assert_eq!(requests[1].url.path(), "/contacts_two");
1125
1126 remove_env_var(ANT_PEERS_ENV);
1127 }
1128
1129 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1130 async fn test_disable_env_peers_flag() {
1131 let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1132
1133 let _env_guard = env_lock().await;
1134 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1135
1136 let temp_dir = TempDir::new().unwrap();
1137
1138 let config = InitialPeersConfig {
1139 local: true,
1140 ignore_cache: true,
1141 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1142 ..Default::default()
1143 };
1144 let mut config =
1145 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1146 config.disable_env_peers = true;
1147
1148 let result = Bootstrap::new(config).await;
1149 assert!(
1150 result.is_err(),
1151 "Should error when env peers are disabled and no other sources available"
1152 );
1153 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1154
1155 remove_env_var(ANT_PEERS_ENV);
1156 }
1157
1158 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1159 async fn test_disable_cache_reading_flag() {
1160 let _env_guard = env_lock().await;
1161
1162 let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1163 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1164
1165 let temp_dir = TempDir::new().unwrap();
1166 let file_name = BootstrapCacheStore::cache_file_name(true);
1167
1168 let mut cache_data = CacheData::default();
1169 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1170 cache_data
1171 .write_to_file(temp_dir.path(), &file_name)
1172 .unwrap();
1173
1174 let config = InitialPeersConfig {
1175 local: true,
1176 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1177 ..Default::default()
1178 };
1179 let mut config =
1180 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1181 config.disable_env_peers = true;
1182 config.disable_cache_reading = true;
1183
1184 let result = Bootstrap::new(config).await;
1185 assert!(
1186 result.is_err(),
1187 "Should error when cache reading is disabled and no other sources available"
1188 );
1189 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1190 }
1191
1192 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1193 async fn test_bootstrap_completed_initialization() {
1194 let temp_dir = TempDir::new().unwrap();
1195
1196 let config = InitialPeersConfig {
1197 first: true,
1198 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1199 ..Default::default()
1200 };
1201 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1202 let flow = Bootstrap::new(config).await.unwrap();
1203
1204 assert!(
1205 flow.has_terminated(),
1206 "bootstrap_completed should be true for first node"
1207 );
1208
1209 let config = InitialPeersConfig {
1210 first: false,
1211 local: true,
1212 ignore_cache: true,
1213 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1214 addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1215 ..Default::default()
1216 };
1217 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1218 let flow = Bootstrap::new(config).await.unwrap();
1219
1220 assert!(
1221 !flow.has_terminated(),
1222 "bootstrap_completed should be false for non-first node"
1223 );
1224 }
1225
1226 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1227 async fn test_bootstrap_peer_ids_population() {
1228 let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1229 let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1230
1231 let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1232 let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1233
1234 let _env_guard = env_lock().await;
1235 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1236
1237 let temp_dir = TempDir::new().unwrap();
1238
1239 let config = InitialPeersConfig {
1240 local: true,
1241 ignore_cache: true,
1242 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1243 addrs: vec![cli_addr.clone()],
1244 ..Default::default()
1245 };
1246 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1247 let flow = Bootstrap::new(config).await.unwrap();
1248
1249 assert!(
1250 flow.is_bootstrap_peer(&env_peer_id),
1251 "Peer ID from env should be tracked"
1252 );
1253 assert!(
1254 flow.is_bootstrap_peer(&cli_peer_id),
1255 "Peer ID from CLI should be tracked"
1256 );
1257
1258 remove_env_var(ANT_PEERS_ENV);
1259 }
1260
1261 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1262 async fn test_invalid_multiaddr_in_initial_peers() {
1263 let _env_guard = env_lock().await;
1264
1265 let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1266 let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1267
1268 let temp_dir = TempDir::new().unwrap();
1269
1270 let config = InitialPeersConfig {
1271 local: true,
1272 ignore_cache: true,
1273 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1274 addrs: vec![valid_addr.clone()],
1275 ..Default::default()
1276 };
1277 let mut config =
1278 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1279 config.disable_env_peers = true;
1280
1281 config.initial_peers.push(invalid_addr);
1282
1283 let mut flow = Bootstrap::new(config).await.unwrap();
1284
1285 let first = expect_next_addr(&mut flow).await.unwrap();
1286 assert_eq!(first, valid_addr, "Should get the valid address");
1287
1288 let err = expect_err(&mut flow).await;
1289 assert!(
1290 matches!(err, Error::NoBootstrapPeersFound),
1291 "Should not find any more peers after valid one (invalid addr was filtered)"
1292 );
1293 }
1294
1295 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1296 async fn test_local_network_skips_contacts() {
1297 let _env_guard = env_lock().await;
1298
1299 let mock_server = MockServer::start().await;
1300 Mock::given(method("GET"))
1301 .and(path("/should-not-be-called"))
1302 .respond_with(ResponseTemplate::new(200).set_body_string(
1303 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1304 ))
1305 .expect(0)
1306 .mount(&mock_server)
1307 .await;
1308
1309 let temp_dir = TempDir::new().unwrap();
1310
1311 let config = InitialPeersConfig {
1312 local: true,
1313 ignore_cache: true,
1314 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1315 network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1316 addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1317 ..Default::default()
1318 };
1319 let mut config =
1320 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1321 config.disable_env_peers = true;
1322
1323 let addr_from_config = config.initial_peers[0].clone();
1324 let mut flow = Bootstrap::new(config).await.unwrap();
1325
1326 let first = expect_next_addr(&mut flow).await.unwrap();
1327 assert_eq!(
1328 first, addr_from_config,
1329 "Should get the address from config"
1330 );
1331
1332 let err = expect_err(&mut flow).await;
1333 assert!(matches!(err, Error::NoBootstrapPeersFound));
1334
1335 assert!(
1336 mock_server.received_requests().await.unwrap().is_empty(),
1337 "local flag should prevent contact fetches"
1338 );
1339 }
1340
1341 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1342 async fn test_timeout_with_no_addresses() {
1343 let _env_guard = env_lock().await;
1344
1345 let temp_dir = TempDir::new().unwrap();
1346 let file_name = BootstrapCacheStore::cache_file_name(true);
1347 let cache_data = CacheData::default();
1348 cache_data
1349 .write_to_file(temp_dir.path(), &file_name)
1350 .unwrap();
1351
1352 let config = InitialPeersConfig {
1353 local: true,
1354 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1355 ..Default::default()
1356 };
1357 let mut config =
1358 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1359 config.disable_env_peers = true;
1360
1361 let result = Bootstrap::new(config).await;
1362
1363 assert!(
1364 result.is_err(),
1365 "Should error when no addresses are available from any source"
1366 );
1367 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1368 }
1369
1370 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1371 async fn test_first_node_clears_cache() {
1372 let _env_guard = env_lock().await;
1373
1374 let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1375 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1376
1377 let temp_dir = TempDir::new().unwrap();
1378 let file_name = BootstrapCacheStore::cache_file_name(false);
1379
1380 let mut cache_data = CacheData::default();
1381 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1382 cache_data
1383 .write_to_file(temp_dir.path(), &file_name)
1384 .unwrap();
1385
1386 let file_path = temp_dir.path().join(format!(
1387 "version_{}/{}",
1388 CacheData::CACHE_DATA_VERSION,
1389 file_name
1390 ));
1391
1392 let contents_before = std::fs::read_to_string(&file_path).unwrap();
1393 assert!(
1394 contents_before.contains(&cache_addr.to_string()),
1395 "Cache should contain the address before initialization"
1396 );
1397
1398 let config = InitialPeersConfig {
1399 first: true,
1400 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1401 ..Default::default()
1402 };
1403 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1404 let _flow = Bootstrap::new(config).await.unwrap();
1405
1406 tokio::time::sleep(Duration::from_millis(100)).await;
1407
1408 let contents_after = std::fs::read_to_string(&file_path).unwrap();
1409 assert!(
1410 !contents_after.contains(&cache_addr.to_string()),
1411 "Cache should be cleared for first node"
1412 );
1413 }
1414
1415 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1416 async fn test_new_loads_at_least_50_contacts() {
1417 let _env_guard = env_lock().await;
1418
1419 let temp_dir = TempDir::new().unwrap();
1420 let file_name = BootstrapCacheStore::cache_file_name(true);
1421
1422 let mut cache_data = CacheData::default();
1423 for i in 0..60 {
1424 let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1425 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1426 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1427 }
1428 cache_data
1429 .write_to_file(temp_dir.path(), &file_name)
1430 .unwrap();
1431
1432 let config = InitialPeersConfig {
1433 local: true,
1434 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1435 ..Default::default()
1436 };
1437 let mut config =
1438 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1439 config.disable_env_peers = true;
1440
1441 let result = Bootstrap::new(config).await;
1442
1443 assert!(
1444 result.is_ok(),
1445 "Should successfully initialize with 60 contacts in cache"
1446 );
1447
1448 let mut flow = result.unwrap();
1449 let mut count = 0;
1450 while let Ok(Some(_addr)) = flow.next_addr() {
1451 count += 1;
1452 if count >= 60 {
1453 break;
1454 }
1455 }
1456
1457 assert!(
1458 count > 0,
1459 "Should have loaded contacts from cache, got {count}"
1460 );
1461 }
1462
1463 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1464 async fn test_new_succeeds_with_few_contacts() {
1465 let _env_guard = env_lock().await;
1466
1467 let temp_dir = TempDir::new().unwrap();
1468 let file_name = BootstrapCacheStore::cache_file_name(true);
1469
1470 let mut cache_data = CacheData::default();
1471 for i in 0..5 {
1472 let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1473 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1474 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1475 }
1476 cache_data
1477 .write_to_file(temp_dir.path(), &file_name)
1478 .unwrap();
1479
1480 let config = InitialPeersConfig {
1481 local: true,
1482 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1483 ..Default::default()
1484 };
1485 let mut config =
1486 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1487 config.disable_env_peers = true;
1488
1489 let result = Bootstrap::new(config).await;
1490 assert!(
1491 result.is_ok(),
1492 "Should succeed with few contacts (< 50 but > 0)"
1493 );
1494
1495 let mut flow = result.unwrap();
1496 let mut count = 0;
1497 while let Ok(Some(_addr)) = flow.next_addr() {
1498 count += 1;
1499 if count >= 10 {
1500 break;
1501 }
1502 }
1503
1504 assert_eq!(count, 5, "Should have exactly 5 contacts");
1505 }
1506
1507 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1508 async fn test_new_errors_with_zero_contacts() {
1509 let _env_guard = env_lock().await;
1510
1511 let temp_dir = TempDir::new().unwrap();
1512 let file_name = BootstrapCacheStore::cache_file_name(false);
1513 let cache_data = CacheData::default();
1514 cache_data
1515 .write_to_file(temp_dir.path(), &file_name)
1516 .unwrap();
1517
1518 let mock_server = MockServer::start().await;
1519 Mock::given(method("GET"))
1520 .and(path("/failing-endpoint"))
1521 .respond_with(ResponseTemplate::new(500))
1522 .expect(1..)
1523 .mount(&mock_server)
1524 .await;
1525
1526 let config = InitialPeersConfig {
1527 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1528 network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1529 ..Default::default()
1530 };
1531 let mut config =
1532 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1533 config.disable_env_peers = true;
1534
1535 let result = Bootstrap::new(config).await;
1536
1537 assert!(
1538 result.is_err(),
1539 "Should error when all sources fail and no contacts are available"
1540 );
1541 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1542 }
1543}