1use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24 Multiaddr, PeerId, Swarm,
25 core::connection::ConnectedPoint,
26 multiaddr::Protocol,
27 swarm::{
28 DialError, NetworkBehaviour,
29 dial_opts::{DialOpts, PeerCondition},
30 },
31};
32use std::collections::{HashSet, VecDeque};
33use std::time::Duration;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::sync::mpsc::UnboundedSender;
36use url::Url;
37
38const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
40
41const MIN_INITIAL_ADDRS: usize = 5;
43
44const INITIAL_ADDR_FETCH_TIMEOUT_SECS: u64 = 30;
46
47#[derive(custom_debug::Debug)]
63pub struct Bootstrap {
64 cache_store: BootstrapCacheStore,
65 addrs: VecDeque<Multiaddr>,
66 #[debug(skip)]
68 cache_task: Option<tokio::task::JoinHandle<()>>,
69 cache_pending: bool,
71 contacts_progress: Option<ContactsProgress>,
72 event_tx: UnboundedSender<FetchEvent>,
73 event_rx: UnboundedReceiver<FetchEvent>,
74 fetch_in_progress: Option<FetchKind>,
75 ongoing_dials: HashSet<Multiaddr>,
77 bootstrap_peer_ids: HashSet<PeerId>,
78 bootstrap_completed: bool,
79}
80
81impl Bootstrap {
82 pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
83 let contacts_progress = Self::build_contacts_progress(&config)?;
84
85 let mut addrs_queue = VecDeque::new();
86 let mut bootstrap_peer_ids = HashSet::new();
87 if !config.first {
88 if !config.disable_env_peers {
89 for addr in Self::fetch_from_env() {
90 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
91 }
92 } else {
93 info!("Skipping ANT_PEERS environment variable as per configuration");
94 }
95
96 for addr in config.initial_peers.drain(..) {
97 if let Some(addr) = craft_valid_multiaddr(&addr, false) {
98 info!("Adding addr from arguments: {addr}");
99 Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
100 } else {
101 warn!("Invalid multiaddress format from arguments: {addr}");
102 }
103 }
104 }
105
106 let cache_pending = !config.first && !config.disable_cache_reading;
107 if !cache_pending {
108 info!(
109 "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
110 config.first, config.disable_cache_reading
111 );
112 } else {
113 info!("Cache loading is enabled - cache will be fetched if needed");
114 }
115 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
116
117 let cache_store = BootstrapCacheStore::new(config.clone())?;
118
119 let mut bootstrap = Self {
120 cache_store,
121 addrs: addrs_queue,
122 cache_pending,
123 contacts_progress,
124 event_tx,
125 event_rx,
126 fetch_in_progress: None,
127 ongoing_dials: HashSet::new(),
128 bootstrap_peer_ids,
129 bootstrap_completed: config.first,
130 cache_task: None,
131 };
132
133 info!("Cache store is initialized and will sync and flush periodically");
134 let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
135 bootstrap.cache_task = Some(cache_task);
136
137 if config.first {
138 info!("First node in network; clearing any existing cache");
139 bootstrap.cache_store.write().await?;
140 return Ok(bootstrap);
141 }
142
143 let mut collected_addrs = Vec::new();
147 if bootstrap.addrs.len() < MIN_INITIAL_ADDRS {
148 info!("Initial address queue < {MIN_INITIAL_ADDRS}; fetching from cache/contacts");
149 let now = std::time::Instant::now();
150 loop {
151 match bootstrap.next_addr() {
152 Ok(Some(addr)) => {
153 collected_addrs.push(addr);
154 if Self::try_finalize_initial_addrs(
155 &mut bootstrap,
156 &mut collected_addrs,
157 MIN_INITIAL_ADDRS,
158 ) {
159 break;
160 }
161 continue;
162 }
163 Ok(None) => {
164 debug!(
165 "No immediate address available; waiting for async fetch to complete"
166 );
167 }
168 Err(err) => {
169 if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1)
170 {
171 break;
172 }
173 warn!("Failed to fetch initial address: {err}");
174 return Err(err);
175 }
176 }
177
178 if now.elapsed() > std::time::Duration::from_secs(INITIAL_ADDR_FETCH_TIMEOUT_SECS) {
179 if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1) {
180 break;
181 }
182 error!("Timed out waiting for initial addresses. ");
183 return Err(Error::NoBootstrapPeersFound);
184 }
185 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186 }
187 }
188
189 Ok(bootstrap)
190 }
191
192 fn try_finalize_initial_addrs(
196 bootstrap: &mut Bootstrap,
197 collected_addrs: &mut Vec<Multiaddr>,
198 min_address: usize,
199 ) -> bool {
200 if collected_addrs.len() < min_address {
201 return false;
202 }
203 info!(
204 "Collected minimum required initial addresses ({}), proceeding with bootstrap.",
205 collected_addrs.len()
206 );
207 bootstrap.extend_with_addrs(std::mem::take(collected_addrs));
208 true
209 }
210
211 pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
218 loop {
219 self.process_events();
220
221 if let Some(addr) = self.addrs.pop_front() {
222 info!(?addr, "next_addr returning queued address");
223 return Ok(Some(addr));
224 }
225
226 if self.fetch_in_progress.is_some() {
227 debug!("next_addr waiting for in-flight fetch result");
228 return Ok(None);
229 }
230
231 if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
232 info!("next_addr triggering cache fetch");
233 self.start_cache_fetch()?;
234 continue;
235 }
236
237 if self.contacts_progress.is_some()
238 && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
239 {
240 info!("next_addr triggering contacts fetch");
241 self.start_contacts_fetch()?;
242 if self.fetch_in_progress.is_some() {
243 return Ok(None);
244 }
245 continue;
246 }
247
248 info!("next_addr exhausted all address sources");
249 return Err(Error::NoBootstrapPeersFound);
250 }
251 }
252
253 fn process_events(&mut self) {
254 while let Ok(event) = self.event_rx.try_recv() {
255 match event {
256 FetchEvent::Cache(addrs) => {
257 info!(count = addrs.len(), "process_events received cache batch");
258 if addrs.is_empty() {
259 info!("No addresses retrieved from cache");
260 } else {
261 self.extend_with_addrs(addrs);
262 }
263 }
264 FetchEvent::Contacts(addrs) => {
265 info!(
266 count = addrs.len(),
267 "process_events received contacts batch"
268 );
269 self.extend_with_addrs(addrs);
270 if self
271 .contacts_progress
272 .as_ref()
273 .is_none_or(ContactsProgress::is_empty)
274 {
275 self.contacts_progress = None;
276 }
277 }
278 }
279
280 self.fetch_in_progress = None;
281 }
282 }
283
284 fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
285 if addrs.is_empty() {
286 return;
287 }
288 for addr in addrs {
289 Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
290 }
291 }
292
293 fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
294 if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
295 peer_ids.insert(peer_id);
296 }
297 queue.push_back(addr);
298 }
299
300 fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
301 if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
302 let _ = addr.pop();
303 Some(peer_id)
304 } else {
305 None
306 }
307 }
308
309 fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
310 match self.next_addr() {
311 Ok(Some(addr)) => Ok(Some(addr)),
312 Ok(None) => Ok(None),
313 Err(Error::NoBootstrapPeersFound) => {
314 self.bootstrap_completed = true;
315 Err(Error::NoBootstrapPeersFound)
316 }
317 Err(err) => Err(err),
318 }
319 }
320
321 fn should_continue_bootstrapping(&mut self, contacted_peers: usize) -> bool {
322 if self.bootstrap_completed {
323 info!("Initial bootstrap process has already completed successfully.");
324 return false;
325 }
326
327 if contacted_peers
328 >= self
329 .cache_store
330 .config()
331 .max_contacted_peers_before_termination
332 {
333 self.bootstrap_completed = true;
334 self.addrs.clear();
335 self.ongoing_dials.clear();
336
337 info!(
338 "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
339 );
340
341 return false;
342 }
343
344 if self.ongoing_dials.len() >= self.cache_store.config().max_concurrent_dials {
345 info!(
346 "Initial bootstrap has {} ongoing dials. Not dialing anymore.",
347 self.ongoing_dials.len()
348 );
349
350 return false;
351 }
352
353 if self.addrs.is_empty() {
354 if self.cache_pending
355 || self.contacts_progress.is_some()
356 || self.fetch_in_progress.is_some()
357 {
358 info!("Initial bootstrap is awaiting additional addresses to arrive.");
359 return false;
360 }
361 info!(
362 "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
363 );
364
365 self.bootstrap_completed = true;
366 return false;
367 }
368
369 true
370 }
371
372 pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
373 &mut self,
374 swarm: &mut Swarm<B>,
375 contacted_peers: usize,
376 ) {
377 if !self.should_continue_bootstrapping(contacted_peers) {
378 return;
379 }
380
381 while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
382 match self.try_next_dial_addr() {
383 Ok(Some(mut addr)) => {
384 let addr_clone = addr.clone();
385 let peer_id = Self::pop_p2p(&mut addr);
386
387 let opts = match peer_id {
388 Some(peer_id) => DialOpts::peer_id(peer_id)
389 .condition(PeerCondition::NotDialing)
390 .addresses(vec![addr])
391 .build(),
392 None => DialOpts::unknown_peer_id().address(addr).build(),
393 };
394
395 info!("Trying to dial peer with address: {addr_clone}");
396
397 match swarm.dial(opts) {
398 Ok(()) => {
399 info!(
400 "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
401 self.ongoing_dials.len() + 1
402 );
403 let _ = self.ongoing_dials.insert(addr_clone);
404 }
405 Err(err) => match err {
406 DialError::LocalPeerId { .. } => {
407 warn!(
408 "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
409 );
410 }
411 DialError::NoAddresses => {
412 error!(
413 "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
414 );
415 }
416 DialError::DialPeerConditionFalse(_) => {
417 warn!(
418 "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
419 );
420 }
421 DialError::Aborted => {
422 error!(
423 "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
424 );
425 }
426 DialError::WrongPeerId { obtained, .. } => {
427 error!(
428 "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
429 );
430 }
431 DialError::Denied { cause } => {
432 error!(
433 "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
434 );
435 }
436 DialError::Transport(items) => {
437 error!(
438 "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
439 );
440 }
441 },
442 }
443 }
444 Ok(None) => {
445 debug!("Waiting for additional bootstrap addresses before continuing to dial");
446 break;
447 }
448 Err(Error::NoBootstrapPeersFound) => {
449 info!("No more bootstrap peers available to dial.");
450 break;
451 }
452 Err(err) => {
453 warn!("Failed to obtain next bootstrap address: {err}");
454 break;
455 }
456 }
457 }
458 }
459
460 pub fn on_connection_established<B: NetworkBehaviour>(
461 &mut self,
462 peer_id: &PeerId,
463 endpoint: &ConnectedPoint,
464 swarm: &mut Swarm<B>,
465 contacted_peers: usize,
466 ) {
467 if self.bootstrap_completed {
468 return;
469 }
470
471 if let ConnectedPoint::Dialer { address, .. } = endpoint
472 && !self.ongoing_dials.remove(address)
473 {
474 self.ongoing_dials
475 .retain(|addr| match multiaddr_get_peer_id(addr) {
476 Some(id) => id != *peer_id,
477 None => true,
478 });
479 }
480
481 self.trigger_bootstrapping_process(swarm, contacted_peers);
482 }
483
484 pub fn on_outgoing_connection_error<B: NetworkBehaviour>(
485 &mut self,
486 peer_id: Option<PeerId>,
487 swarm: &mut Swarm<B>,
488 contacted_peers: usize,
489 ) {
490 if self.bootstrap_completed {
491 return;
492 }
493
494 match peer_id {
495 Some(peer_id) => {
496 self.ongoing_dials.retain(|addr| {
497 if let Some(id) = multiaddr_get_peer_id(addr) {
498 id != peer_id
499 } else {
500 true
501 }
502 });
503 }
504 None => {
505 self.ongoing_dials
508 .retain(|addr| multiaddr_get_peer_id(addr).is_some());
509 }
510 }
511
512 self.trigger_bootstrapping_process(swarm, contacted_peers);
513 }
514
515 pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
516 self.bootstrap_peer_ids.contains(peer_id)
517 }
518
519 pub fn has_terminated(&self) -> bool {
520 self.bootstrap_completed
521 }
522
523 fn start_cache_fetch(&mut self) -> Result<()> {
524 if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
525 error!("Cache fetch already in progress, not starting another");
526 return Ok(());
527 }
528
529 self.cache_pending = false;
530 let config = self.cache_store.config().clone();
531 let event_tx = self.event_tx.clone();
532
533 tokio::spawn(async move {
534 let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async move {
535 tokio::task::spawn_blocking(move || BootstrapCacheStore::load_cache_data(&config))
536 .await
537 })
538 .await;
539
540 let addrs = match fetch_result {
541 Ok(spawn_result) => match spawn_result {
542 Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
543 Ok(Err(err)) => {
544 warn!("Failed to load cache data: {err}");
545 Vec::new()
546 }
547 Err(err) => {
548 warn!("Cache fetch task failed to join: {err}");
549 Vec::new()
550 }
551 },
552 Err(_) => {
553 warn!(
554 "Cache fetch timed out after {} seconds",
555 FETCH_TIMEOUT.as_secs()
556 );
557 Vec::new()
558 }
559 };
560
561 info!(
562 "Bootstrap cache loaded from disk with {} addresses",
563 addrs.len()
564 );
565 if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
566 error!("Failed to send cache fetch event: {err:?}");
567 }
568 });
569
570 self.fetch_in_progress = Some(FetchKind::Cache);
571
572 Ok(())
573 }
574
575 fn start_contacts_fetch(&mut self) -> Result<()> {
576 if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
577 error!("Contacts fetch already in progress, not starting another");
578 return Ok(());
579 }
580
581 let Some(progress) = self.contacts_progress.as_mut() else {
582 info!("No contacts progress available");
583 return Ok(());
584 };
585
586 let Some(endpoint) = progress.next_endpoint() else {
587 info!("No more contacts endpoints to try");
588 self.contacts_progress = None;
589 return Ok(());
590 };
591
592 let event_tx = self.event_tx.clone();
593
594 tokio::spawn(async move {
595 let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async {
596 let fetcher = ContactsFetcher::with_endpoints(vec![endpoint.clone()])?;
597 fetcher.fetch_bootstrap_addresses().await
598 })
599 .await;
600
601 let addrs = match fetch_result {
602 Ok(Ok(addrs)) => addrs,
603 Ok(Err(err)) => {
604 warn!("Failed to fetch contacts from {endpoint}: {err}");
605 Vec::new()
606 }
607 Err(_) => {
608 warn!(
609 "Contacts fetch from {endpoint} timed out after {} seconds",
610 FETCH_TIMEOUT.as_secs()
611 );
612 Vec::new()
613 }
614 };
615
616 info!(
617 "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
618 addrs.len()
619 );
620 if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
621 error!("Failed to send contacts fetch event: {err:?}");
622 }
623 });
624
625 self.fetch_in_progress = Some(FetchKind::Contacts);
626
627 Ok(())
628 }
629
630 fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
631 if config.first {
632 info!("First node in network; not fetching contacts");
633 return Ok(None);
634 }
635
636 if config.local {
637 info!("Local network configuration; skipping contacts endpoints");
638 return Ok(None);
639 }
640
641 if !config.network_contacts_url.is_empty() {
642 let endpoints = config
643 .network_contacts_url
644 .iter()
645 .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
646 .collect::<Result<Vec<_>>>()?;
647 info!("Using provided contacts endpoints: {endpoints:?}");
648 return Ok(ContactsProgress::new(endpoints));
649 }
650
651 match get_network_id() {
652 id if id == MAINNET_ID => {
653 info!("Using built-in mainnet contacts endpoints");
654 Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
655 }
656
657 id if id == ALPHANET_ID => {
658 info!("Using built-in alphanet contacts endpoints");
659 Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
660 }
661 _ => Ok(None),
662 }
663 }
664
665 pub fn fetch_from_env() -> Vec<Multiaddr> {
666 let mut bootstrap_addresses = Vec::new();
667 if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
669 for addr_str in addrs.split(',') {
670 if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
671 info!("Adding addr from environment variable: {addr}");
672 bootstrap_addresses.push(addr);
673 } else {
674 warn!("Invalid multiaddress format from environment variable: {addr_str}");
675 }
676 }
677 }
678 bootstrap_addresses
679 }
680
681 pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
682 &mut self.cache_store
683 }
684
685 pub fn cache_store(&self) -> &BootstrapCacheStore {
686 &self.cache_store
687 }
688}
689
690impl Drop for Bootstrap {
691 fn drop(&mut self) {
692 if let Some(cache_task) = self.cache_task.take() {
693 cache_task.abort();
694 }
695 }
696}
697
698#[derive(Debug)]
699struct ContactsProgress {
700 remaining: VecDeque<Url>,
701}
702
703enum FetchEvent {
704 Cache(Vec<Multiaddr>),
705 Contacts(Vec<Multiaddr>),
706}
707
708#[derive(Clone, Copy, Debug, PartialEq, Eq)]
709enum FetchKind {
710 Cache,
711 Contacts,
712}
713
714impl ContactsProgress {
715 fn new(urls: Vec<Url>) -> Option<Self> {
716 if urls.is_empty() {
717 None
718 } else {
719 Some(Self {
720 remaining: VecDeque::from(urls),
721 })
722 }
723 }
724
725 fn from_static(urls: &[&str]) -> Option<Self> {
726 let mut parsed = Vec::new();
727 for url in urls {
728 match url.parse::<Url>() {
729 Ok(parsed_url) => parsed.push(parsed_url),
730 Err(err) => {
731 warn!("Failed to parse static contacts URL {url}: {err}");
732 }
733 }
734 }
735 Self::new(parsed)
736 }
737
738 fn next_endpoint(&mut self) -> Option<Url> {
739 self.remaining.pop_front()
740 }
741
742 fn is_empty(&self) -> bool {
743 self.remaining.is_empty()
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use super::*;
750 use crate::{
751 InitialPeersConfig,
752 cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
753 multiaddr_get_peer_id,
754 };
755 use libp2p::Multiaddr;
756 use std::collections::HashSet;
757 use std::sync::{Arc, OnceLock};
758 use std::time::{Duration, Instant};
759 use tempfile::TempDir;
760 use tokio::sync::{Mutex, OwnedMutexGuard};
761 use tokio::time::sleep;
762 use wiremock::{
763 Mock, MockServer, ResponseTemplate,
764 matchers::{method, path},
765 };
766
767 async fn env_lock() -> OwnedMutexGuard<()> {
768 static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
769 Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
770 .lock_owned()
771 .await
772 }
773
774 #[allow(unsafe_code)]
775 fn set_env_var(key: &str, value: &str) {
776 unsafe {
777 std::env::set_var(key, value);
778 }
779 }
780
781 #[allow(unsafe_code)]
782 fn remove_env_var(key: &str) {
783 unsafe {
784 std::env::remove_var(key);
785 }
786 }
787
788 async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
789 let deadline = Instant::now() + Duration::from_secs(2);
790 loop {
791 match flow.next_addr() {
792 Ok(Some(addr)) => return Ok(addr),
793 Ok(None) => {
794 if Instant::now() >= deadline {
795 panic!("Timed out waiting for next address");
796 }
797 sleep(Duration::from_millis(5)).await;
798 }
799 Err(err) => return Err(err),
800 }
801 }
802 }
803
804 async fn expect_err(flow: &mut Bootstrap) -> Error {
805 let deadline = Instant::now() + Duration::from_secs(2);
806 loop {
807 match flow.next_addr() {
808 Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
809 Ok(None) => {
810 if Instant::now() >= deadline {
811 panic!("Timed out waiting for error from flow");
812 }
813 sleep(Duration::from_millis(5)).await;
814 }
815 Err(err) => return err,
816 }
817 }
818 }
819
820 fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
821 let peer_id = libp2p::PeerId::random();
822 format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
823 .parse()
824 .unwrap()
825 }
826
827 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
828 async fn test_cli_arguments_precedence() {
829 let env_addr: Multiaddr =
830 "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
831 .parse()
832 .unwrap();
833 let cli_addr: Multiaddr =
834 "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
835 .parse()
836 .unwrap();
837
838 let _env_guard = env_lock().await;
839 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
840
841 let temp_dir = TempDir::new().unwrap();
842
843 let config = InitialPeersConfig {
844 ignore_cache: true,
845 local: true,
846 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
847 addrs: vec![cli_addr.clone()],
848 ..Default::default()
849 };
850 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
851 let mut flow = Bootstrap::new(config).await.unwrap();
852
853 let first_two = vec![
854 expect_next_addr(&mut flow).await.unwrap(),
855 expect_next_addr(&mut flow).await.unwrap(),
856 ];
857 let first_set: HashSet<_> = first_two.into_iter().collect();
858 let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
859 assert_eq!(first_set, expected);
860
861 let err = expect_err(&mut flow).await;
862 assert!(matches!(err, Error::NoBootstrapPeersFound));
863
864 remove_env_var(ANT_PEERS_ENV);
865 }
866
867 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
868 async fn test_env_variable_parsing() {
869 let _env_guard = env_lock().await;
870 set_env_var(
871 ANT_PEERS_ENV,
872 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
873/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
874 );
875
876 let parsed = Bootstrap::fetch_from_env();
877 remove_env_var(ANT_PEERS_ENV);
878
879 assert_eq!(parsed.len(), 2);
880 let parsed_set: std::collections::HashSet<_> =
881 parsed.into_iter().map(|addr| addr.to_string()).collect();
882 let expected = std::collections::HashSet::from([
883 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
884 .to_string(),
885 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
886 .to_string(),
887 ]);
888 assert_eq!(parsed_set, expected);
889 }
890
891 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
892 async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
893 let _env_guard = env_lock().await;
894 let cache_addr: Multiaddr =
895 "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
896 .parse()
897 .unwrap();
898 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
899
900 let temp_dir = TempDir::new().unwrap();
901 let file_name = BootstrapCacheStore::cache_file_name(true);
902
903 let mut cache_data = CacheData::default();
904 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
905 cache_data
906 .write_to_file(temp_dir.path(), &file_name)
907 .unwrap();
908
909 let config = InitialPeersConfig {
910 local: true,
911 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
912 ..Default::default()
913 };
914 let mut config =
915 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
916 config.disable_env_peers = true;
917 let mut flow = Bootstrap::new(config).await.unwrap();
918
919 let got = expect_next_addr(&mut flow).await.unwrap();
920 assert_eq!(got, cache_addr);
921
922 let err = expect_err(&mut flow).await;
923 assert!(matches!(err, Error::NoBootstrapPeersFound));
924 }
925
926 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
927 async fn test_first_flag_behavior() {
928 let _env_guard = env_lock().await;
929
930 let mock_server = MockServer::start().await;
931 Mock::given(method("GET"))
932 .and(path("/peers"))
933 .respond_with(ResponseTemplate::new(200).set_body_string(
934 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
935 ))
936 .expect(0)
937 .mount(&mock_server)
938 .await;
939
940 let temp_dir = TempDir::new().unwrap();
941 let config = InitialPeersConfig {
942 first: true,
943 addrs: vec![
944 "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
945 .parse()
946 .unwrap(),
947 ],
948 network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
949 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
950 ..Default::default()
951 };
952 let mut config =
953 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
954 config.disable_env_peers = true;
955 let mut flow = Bootstrap::new(config).await.unwrap();
956
957 let err = expect_err(&mut flow).await;
958 assert!(matches!(err, Error::NoBootstrapPeersFound));
959 assert!(
960 mock_server.received_requests().await.unwrap().is_empty(),
961 "first flag should prevent contact fetches"
962 );
963 }
964
965 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
966 async fn test_multiple_network_contacts() {
967 let _env_guard = env_lock().await;
968
969 let mock_server = MockServer::start().await;
970
971 let contact_one: Multiaddr =
972 "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
973 .parse()
974 .unwrap();
975 let contact_two: Multiaddr =
976 "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
977 .parse()
978 .unwrap();
979
980 Mock::given(method("GET"))
981 .and(path("/first"))
982 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
983 .expect(1)
984 .mount(&mock_server)
985 .await;
986
987 Mock::given(method("GET"))
988 .and(path("/second"))
989 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
990 .expect(1)
991 .mount(&mock_server)
992 .await;
993
994 let config = InitialPeersConfig {
995 ignore_cache: true,
996 network_contacts_url: vec![
997 format!("{}/first", mock_server.uri()),
998 format!("{}/second", mock_server.uri()),
999 ],
1000 ..Default::default()
1001 };
1002 let mut config =
1003 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1004 config.disable_env_peers = true;
1005 let mut flow = Bootstrap::new(config).await.unwrap();
1006
1007 let first = expect_next_addr(&mut flow).await.unwrap();
1008 assert_eq!(first, contact_one);
1009
1010 let second = expect_next_addr(&mut flow).await.unwrap();
1011 assert_eq!(second, contact_two);
1012
1013 let err = expect_err(&mut flow).await;
1014 assert!(matches!(err, Error::NoBootstrapPeersFound));
1015
1016 let requests = mock_server.received_requests().await.unwrap();
1017 assert_eq!(requests.len(), 2);
1018 assert_eq!(requests[0].url.path(), "/first");
1019 assert_eq!(requests[1].url.path(), "/second");
1020 }
1021
1022 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1023 async fn test_full_bootstrap_flow() {
1024 let _env_guard = env_lock().await;
1025 remove_env_var(ANT_PEERS_ENV);
1026
1027 let env_addr: Multiaddr =
1028 "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
1029 .parse()
1030 .unwrap();
1031 let cli_addr: Multiaddr =
1032 "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
1033 .parse()
1034 .unwrap();
1035 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1036
1037 let cache_addr_one: Multiaddr =
1038 "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1039 .parse()
1040 .unwrap();
1041 let cache_addr_two: Multiaddr =
1042 "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1043 .parse()
1044 .unwrap();
1045
1046 let temp_dir = TempDir::new().unwrap();
1047 let file_name = BootstrapCacheStore::cache_file_name(false);
1048 let mut cache_data = CacheData::default();
1049 cache_data.add_peer(
1050 multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1051 std::iter::once(&cache_addr_one),
1052 3,
1053 10,
1054 );
1055 cache_data.add_peer(
1056 multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1057 std::iter::once(&cache_addr_two),
1058 3,
1059 10,
1060 );
1061 cache_data
1062 .write_to_file(temp_dir.path(), &file_name)
1063 .unwrap();
1064
1065 let mock_server = MockServer::start().await;
1066 let contact_one: Multiaddr =
1067 "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1068 .parse()
1069 .unwrap();
1070 let contact_two: Multiaddr =
1071 "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1072 .parse()
1073 .unwrap();
1074
1075 Mock::given(method("GET"))
1076 .and(path("/contacts_one"))
1077 .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1078 .expect(1)
1079 .mount(&mock_server)
1080 .await;
1081
1082 Mock::given(method("GET"))
1083 .and(path("/contacts_two"))
1084 .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1085 .expect(1)
1086 .mount(&mock_server)
1087 .await;
1088
1089 let file_path = temp_dir.path().join(format!(
1090 "version_{}/{}",
1091 CacheData::CACHE_DATA_VERSION,
1092 file_name
1093 ));
1094 let contents = std::fs::read_to_string(&file_path).unwrap();
1095 assert!(contents.contains(&cache_addr_one.to_string()));
1096 assert!(contents.contains(&cache_addr_two.to_string()));
1097
1098 assert_eq!(
1099 Bootstrap::fetch_from_env(),
1100 vec![env_addr.clone()],
1101 "environment variable should yield the configured address"
1102 );
1103
1104 let config = InitialPeersConfig {
1105 addrs: vec![cli_addr.clone()],
1106 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1107 network_contacts_url: vec![
1108 format!("{}/contacts_one", mock_server.uri()),
1109 format!("{}/contacts_two", mock_server.uri()),
1110 ],
1111 ..Default::default()
1112 };
1113 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1114 let mut flow = Bootstrap::new(config).await.unwrap();
1115
1116 let initial_results = vec![
1117 expect_next_addr(&mut flow).await.unwrap(),
1118 expect_next_addr(&mut flow).await.unwrap(),
1119 ];
1120 let initial_set: HashSet<_> = initial_results.into_iter().collect();
1121 let expected_initial: HashSet<_> =
1122 [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1123 assert_eq!(initial_set, expected_initial);
1124
1125 let cache_results = vec![
1126 expect_next_addr(&mut flow).await.unwrap(),
1127 expect_next_addr(&mut flow).await.unwrap(),
1128 ];
1129 let cache_set: HashSet<_> = cache_results.into_iter().collect();
1130 let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1131 .into_iter()
1132 .collect();
1133 assert_eq!(cache_set, expected_cache);
1134
1135 let contact_first = expect_next_addr(&mut flow).await.unwrap();
1136 assert_eq!(contact_first, contact_one);
1137
1138 let contact_second = expect_next_addr(&mut flow).await.unwrap();
1139 assert_eq!(contact_second, contact_two);
1140
1141 let err = expect_err(&mut flow).await;
1142 assert!(matches!(err, Error::NoBootstrapPeersFound));
1143
1144 let requests = mock_server.received_requests().await.unwrap();
1145 assert_eq!(requests.len(), 2);
1146 assert_eq!(requests[0].url.path(), "/contacts_one");
1147 assert_eq!(requests[1].url.path(), "/contacts_two");
1148
1149 remove_env_var(ANT_PEERS_ENV);
1150 }
1151
1152 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1153 async fn test_disable_env_peers_flag() {
1154 let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1155
1156 let _env_guard = env_lock().await;
1157 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1158
1159 let temp_dir = TempDir::new().unwrap();
1160
1161 let config = InitialPeersConfig {
1162 local: true,
1163 ignore_cache: true,
1164 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1165 ..Default::default()
1166 };
1167 let mut config =
1168 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1169 config.disable_env_peers = true;
1170
1171 let result = Bootstrap::new(config).await;
1172 assert!(
1173 result.is_err(),
1174 "Should error when env peers are disabled and no other sources available"
1175 );
1176 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1177
1178 remove_env_var(ANT_PEERS_ENV);
1179 }
1180
1181 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1182 async fn test_disable_cache_reading_flag() {
1183 let _env_guard = env_lock().await;
1184
1185 let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1186 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1187
1188 let temp_dir = TempDir::new().unwrap();
1189 let file_name = BootstrapCacheStore::cache_file_name(true);
1190
1191 let mut cache_data = CacheData::default();
1192 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1193 cache_data
1194 .write_to_file(temp_dir.path(), &file_name)
1195 .unwrap();
1196
1197 let config = InitialPeersConfig {
1198 local: true,
1199 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1200 ..Default::default()
1201 };
1202 let mut config =
1203 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1204 config.disable_env_peers = true;
1205 config.disable_cache_reading = true;
1206
1207 let result = Bootstrap::new(config).await;
1208 assert!(
1209 result.is_err(),
1210 "Should error when cache reading is disabled and no other sources available"
1211 );
1212 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1213 }
1214
1215 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1216 async fn test_bootstrap_completed_initialization() {
1217 let temp_dir = TempDir::new().unwrap();
1218
1219 let config = InitialPeersConfig {
1220 first: true,
1221 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1222 ..Default::default()
1223 };
1224 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1225 let flow = Bootstrap::new(config).await.unwrap();
1226
1227 assert!(
1228 flow.has_terminated(),
1229 "bootstrap_completed should be true for first node"
1230 );
1231
1232 let config = InitialPeersConfig {
1233 first: false,
1234 local: true,
1235 ignore_cache: true,
1236 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1237 addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1238 ..Default::default()
1239 };
1240 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1241 let flow = Bootstrap::new(config).await.unwrap();
1242
1243 assert!(
1244 !flow.has_terminated(),
1245 "bootstrap_completed should be false for non-first node"
1246 );
1247 }
1248
1249 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1250 async fn test_bootstrap_peer_ids_population() {
1251 let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1252 let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1253
1254 let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1255 let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1256
1257 let _env_guard = env_lock().await;
1258 set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1259
1260 let temp_dir = TempDir::new().unwrap();
1261
1262 let config = InitialPeersConfig {
1263 local: true,
1264 ignore_cache: true,
1265 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1266 addrs: vec![cli_addr.clone()],
1267 ..Default::default()
1268 };
1269 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1270 let flow = Bootstrap::new(config).await.unwrap();
1271
1272 assert!(
1273 flow.is_bootstrap_peer(&env_peer_id),
1274 "Peer ID from env should be tracked"
1275 );
1276 assert!(
1277 flow.is_bootstrap_peer(&cli_peer_id),
1278 "Peer ID from CLI should be tracked"
1279 );
1280
1281 remove_env_var(ANT_PEERS_ENV);
1282 }
1283
1284 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1285 async fn test_invalid_multiaddr_in_initial_peers() {
1286 let _env_guard = env_lock().await;
1287
1288 let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1289 let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1290
1291 let temp_dir = TempDir::new().unwrap();
1292
1293 let config = InitialPeersConfig {
1294 local: true,
1295 ignore_cache: true,
1296 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1297 addrs: vec![valid_addr.clone()],
1298 ..Default::default()
1299 };
1300 let mut config =
1301 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1302 config.disable_env_peers = true;
1303
1304 config.initial_peers.push(invalid_addr);
1305
1306 let mut flow = Bootstrap::new(config).await.unwrap();
1307
1308 let first = expect_next_addr(&mut flow).await.unwrap();
1309 assert_eq!(first, valid_addr, "Should get the valid address");
1310
1311 let err = expect_err(&mut flow).await;
1312 assert!(
1313 matches!(err, Error::NoBootstrapPeersFound),
1314 "Should not find any more peers after valid one (invalid addr was filtered)"
1315 );
1316 }
1317
1318 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1319 async fn test_local_network_skips_contacts() {
1320 let _env_guard = env_lock().await;
1321
1322 let mock_server = MockServer::start().await;
1323 Mock::given(method("GET"))
1324 .and(path("/should-not-be-called"))
1325 .respond_with(ResponseTemplate::new(200).set_body_string(
1326 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1327 ))
1328 .expect(0)
1329 .mount(&mock_server)
1330 .await;
1331
1332 let temp_dir = TempDir::new().unwrap();
1333
1334 let config = InitialPeersConfig {
1335 local: true,
1336 ignore_cache: true,
1337 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1338 network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1339 addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1340 ..Default::default()
1341 };
1342 let mut config =
1343 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1344 config.disable_env_peers = true;
1345
1346 let addr_from_config = config.initial_peers[0].clone();
1347 let mut flow = Bootstrap::new(config).await.unwrap();
1348
1349 let first = expect_next_addr(&mut flow).await.unwrap();
1350 assert_eq!(
1351 first, addr_from_config,
1352 "Should get the address from config"
1353 );
1354
1355 let err = expect_err(&mut flow).await;
1356 assert!(matches!(err, Error::NoBootstrapPeersFound));
1357
1358 assert!(
1359 mock_server.received_requests().await.unwrap().is_empty(),
1360 "local flag should prevent contact fetches"
1361 );
1362 }
1363
1364 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1365 async fn test_timeout_with_no_addresses() {
1366 let _env_guard = env_lock().await;
1367
1368 let temp_dir = TempDir::new().unwrap();
1369 let file_name = BootstrapCacheStore::cache_file_name(true);
1370 let cache_data = CacheData::default();
1371 cache_data
1372 .write_to_file(temp_dir.path(), &file_name)
1373 .unwrap();
1374
1375 let config = InitialPeersConfig {
1376 local: true,
1377 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1378 ..Default::default()
1379 };
1380 let mut config =
1381 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1382 config.disable_env_peers = true;
1383
1384 let result = Bootstrap::new(config).await;
1385
1386 assert!(
1387 result.is_err(),
1388 "Should error when no addresses are available from any source"
1389 );
1390 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1391 }
1392
1393 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1394 async fn test_first_node_clears_cache() {
1395 let _env_guard = env_lock().await;
1396
1397 let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1398 let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1399
1400 let temp_dir = TempDir::new().unwrap();
1401 let file_name = BootstrapCacheStore::cache_file_name(false);
1402
1403 let mut cache_data = CacheData::default();
1404 cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1405 cache_data
1406 .write_to_file(temp_dir.path(), &file_name)
1407 .unwrap();
1408
1409 let file_path = temp_dir.path().join(format!(
1410 "version_{}/{}",
1411 CacheData::CACHE_DATA_VERSION,
1412 file_name
1413 ));
1414
1415 let contents_before = std::fs::read_to_string(&file_path).unwrap();
1416 assert!(
1417 contents_before.contains(&cache_addr.to_string()),
1418 "Cache should contain the address before initialization"
1419 );
1420
1421 let config = InitialPeersConfig {
1422 first: true,
1423 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1424 ..Default::default()
1425 };
1426 let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1427 let _flow = Bootstrap::new(config).await.unwrap();
1428
1429 tokio::time::sleep(Duration::from_millis(100)).await;
1430
1431 let contents_after = std::fs::read_to_string(&file_path).unwrap();
1432 assert!(
1433 !contents_after.contains(&cache_addr.to_string()),
1434 "Cache should be cleared for first node"
1435 );
1436 }
1437
1438 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1439 async fn test_new_loads_at_least_50_contacts() {
1440 let _env_guard = env_lock().await;
1441
1442 let temp_dir = TempDir::new().unwrap();
1443 let file_name = BootstrapCacheStore::cache_file_name(true);
1444
1445 let mut cache_data = CacheData::default();
1446 for i in 0..60 {
1447 let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1448 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1449 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1450 }
1451 cache_data
1452 .write_to_file(temp_dir.path(), &file_name)
1453 .unwrap();
1454
1455 let config = InitialPeersConfig {
1456 local: true,
1457 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1458 ..Default::default()
1459 };
1460 let mut config =
1461 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1462 config.disable_env_peers = true;
1463
1464 let result = Bootstrap::new(config).await;
1465
1466 assert!(
1467 result.is_ok(),
1468 "Should successfully initialize with 60 contacts in cache"
1469 );
1470
1471 let mut flow = result.unwrap();
1472 let mut count = 0;
1473 while let Ok(Some(_addr)) = flow.next_addr() {
1474 count += 1;
1475 if count >= 60 {
1476 break;
1477 }
1478 }
1479
1480 assert!(
1481 count > 0,
1482 "Should have loaded contacts from cache, got {count}"
1483 );
1484 }
1485
1486 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1487 async fn test_new_succeeds_with_few_contacts() {
1488 let _env_guard = env_lock().await;
1489
1490 let temp_dir = TempDir::new().unwrap();
1491 let file_name = BootstrapCacheStore::cache_file_name(true);
1492
1493 let mut cache_data = CacheData::default();
1494 for i in 0..5 {
1495 let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1496 let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1497 cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1498 }
1499 cache_data
1500 .write_to_file(temp_dir.path(), &file_name)
1501 .unwrap();
1502
1503 let config = InitialPeersConfig {
1504 local: true,
1505 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1506 ..Default::default()
1507 };
1508 let mut config =
1509 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1510 config.disable_env_peers = true;
1511
1512 let result = Bootstrap::new(config).await;
1513 assert!(
1514 result.is_ok(),
1515 "Should succeed with few contacts (< 50 but > 0)"
1516 );
1517
1518 let mut flow = result.unwrap();
1519 let mut count = 0;
1520 while let Ok(Some(_addr)) = flow.next_addr() {
1521 count += 1;
1522 if count >= 10 {
1523 break;
1524 }
1525 }
1526
1527 assert_eq!(count, 5, "Should have exactly 5 contacts");
1528 }
1529
1530 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1531 async fn test_new_errors_with_zero_contacts() {
1532 let _env_guard = env_lock().await;
1533
1534 let temp_dir = TempDir::new().unwrap();
1535 let file_name = BootstrapCacheStore::cache_file_name(false);
1536 let cache_data = CacheData::default();
1537 cache_data
1538 .write_to_file(temp_dir.path(), &file_name)
1539 .unwrap();
1540
1541 let mock_server = MockServer::start().await;
1542 Mock::given(method("GET"))
1543 .and(path("/failing-endpoint"))
1544 .respond_with(ResponseTemplate::new(500))
1545 .expect(1..)
1546 .mount(&mock_server)
1547 .await;
1548
1549 let config = InitialPeersConfig {
1550 bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1551 network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1552 ..Default::default()
1553 };
1554 let mut config =
1555 BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1556 config.disable_env_peers = true;
1557
1558 let result = Bootstrap::new(config).await;
1559
1560 assert!(
1561 result.is_err(),
1562 "Should error when all sources fail and no contacts are available"
1563 );
1564 assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1565 }
1566}