ant_bootstrap/
bootstrap.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24    Multiaddr, PeerId, Swarm,
25    core::connection::ConnectedPoint,
26    multiaddr::Protocol,
27    swarm::{
28        DialError, NetworkBehaviour,
29        dial_opts::{DialOpts, PeerCondition},
30    },
31};
32use std::collections::{HashSet, VecDeque};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::sync::mpsc::UnboundedSender;
35use url::Url;
36
37/// Manages the flow of obtaining bootstrap peer addresses from various sources and also writes to the bootstrap cache.
38///
39/// The sources are tried in the following order while reading:
40/// 1. Environment variable `ANT_PEERS`
41/// 2. Command-line provided addresses
42/// 3. Bootstrap cache file on disk
43/// 4. Network contacts endpoints
44///
45/// Addresses are returned one at a time via the `next_addr` method.
46/// It handles asynchronous fetching from the cache and contacts endpoints,
47/// ensuring only one fetch is in progress at a time.
48///
49/// If no more addresses are available from any source, `next_addr` returns an error.
50/// It is expected that the caller will retry `next_addr` later to allow
51/// for asynchronous fetches to complete.
52#[derive(custom_debug::Debug)]
53pub struct Bootstrap {
54    cache_store: BootstrapCacheStore,
55    addrs: VecDeque<Multiaddr>,
56    // The task responsible for syncing the cache, this is aborted on drop.
57    #[debug(skip)]
58    cache_task: Option<tokio::task::JoinHandle<()>>,
59    // fetcher
60    cache_pending: bool,
61    contacts_progress: Option<ContactsProgress>,
62    event_tx: UnboundedSender<FetchEvent>,
63    event_rx: UnboundedReceiver<FetchEvent>,
64    fetch_in_progress: Option<FetchKind>,
65    // dialer
66    ongoing_dials: HashSet<Multiaddr>,
67    bootstrap_peer_ids: HashSet<PeerId>,
68    bootstrap_completed: bool,
69}
70
71impl Bootstrap {
72    pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
73        let contacts_progress = Self::build_contacts_progress(&config)?;
74
75        let mut addrs_queue = VecDeque::new();
76        let mut bootstrap_peer_ids = HashSet::new();
77        if !config.first {
78            if !config.disable_env_peers {
79                for addr in Self::fetch_from_env() {
80                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
81                }
82            } else {
83                info!("Skipping ANT_PEERS environment variable as per configuration");
84            }
85
86            for addr in config.initial_peers.drain(..) {
87                if let Some(addr) = craft_valid_multiaddr(&addr, false) {
88                    info!("Adding addr from arguments: {addr}");
89                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
90                } else {
91                    warn!("Invalid multiaddress format from arguments: {addr}");
92                }
93            }
94        }
95
96        let cache_pending = !config.first && !config.disable_cache_reading;
97        if !cache_pending {
98            info!(
99                "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
100                config.first, config.disable_cache_reading
101            );
102        } else {
103            info!("Cache loading is enabled - cache will be fetched if needed");
104        }
105        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
106
107        let cache_store = BootstrapCacheStore::new(config.clone())?;
108
109        let mut bootstrap = Self {
110            cache_store,
111            addrs: addrs_queue,
112            cache_pending,
113            contacts_progress,
114            event_tx,
115            event_rx,
116            fetch_in_progress: None,
117            ongoing_dials: HashSet::new(),
118            bootstrap_peer_ids,
119            bootstrap_completed: config.first,
120            cache_task: None,
121        };
122
123        info!("Cache store is initialized and will sync and flush periodically");
124        let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
125        bootstrap.cache_task = Some(cache_task);
126
127        if config.first {
128            info!("First node in network; clearing any existing cache");
129            bootstrap.cache_store.write().await?;
130            return Ok(bootstrap);
131        }
132
133        // ensure the initial queue is not empty by fetching from cache/contacts if needed
134        //
135        // not required for 'first' node
136        let mut collected_addrs = Vec::new();
137        if bootstrap.addrs.len() < 50 {
138            info!("Initial address queue < 50; fetching from cache/contacts");
139            let now = std::time::Instant::now();
140            loop {
141                match bootstrap.next_addr() {
142                    Ok(Some(addr)) => {
143                        collected_addrs.push(addr);
144                        info!(
145                            "We now have {} initial address(es) to dial",
146                            collected_addrs.len()
147                        );
148                        continue;
149                    }
150                    Ok(None) => {
151                        debug!(
152                            "No immediate address available; waiting for async fetch to complete"
153                        );
154                    }
155                    Err(err) => {
156                        if !collected_addrs.is_empty() {
157                            info!(
158                                "No more addresses available, but we have {} to dial",
159                                collected_addrs.len()
160                            );
161                            bootstrap.extend_with_addrs(collected_addrs);
162                            break;
163                        }
164                        warn!("Failed to fetch initial address: {err}");
165                        return Err(err);
166                    }
167                }
168
169                if now.elapsed() > std::time::Duration::from_secs(30) {
170                    if !collected_addrs.is_empty() {
171                        info!(
172                            "Timed out waiting for more addresses, but we have {} to dial, returning",
173                            collected_addrs.len()
174                        );
175                        bootstrap.extend_with_addrs(collected_addrs);
176                        break;
177                    }
178                    error!("Timed out waiting for initial addresses. ");
179                    return Err(Error::NoBootstrapPeersFound);
180                }
181                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
182            }
183        }
184
185        Ok(bootstrap)
186    }
187
188    /// Returns the next address from the sources. Returns `Ok(None)` if we are waiting for a source to return more
189    /// addresses.
190    /// Error if we have exhausted all sources and have no more addresses to return.
191    ///
192    /// This does not start any dial attempts, it returns the next address to dial.
193    /// Use `trigger_bootstrapping_process` to start auto manage the whole bootstrapping process.
194    pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
195        loop {
196            self.process_events();
197
198            if let Some(addr) = self.addrs.pop_front() {
199                info!(?addr, "next_addr returning queued address");
200                return Ok(Some(addr));
201            }
202
203            if self.fetch_in_progress.is_some() {
204                info!("next_addr waiting for in-flight fetch result");
205                return Ok(None);
206            }
207
208            if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
209                info!("next_addr triggering cache fetch");
210                self.start_cache_fetch()?;
211                continue;
212            }
213
214            if self.contacts_progress.is_some()
215                && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
216            {
217                info!("next_addr triggering contacts fetch");
218                self.start_contacts_fetch()?;
219                if self.fetch_in_progress.is_some() {
220                    return Ok(None);
221                }
222                continue;
223            }
224
225            info!("next_addr exhausted all address sources");
226            return Err(Error::NoBootstrapPeersFound);
227        }
228    }
229
230    fn process_events(&mut self) {
231        while let Ok(event) = self.event_rx.try_recv() {
232            match event {
233                FetchEvent::Cache(addrs) => {
234                    info!(count = addrs.len(), "process_events received cache batch");
235                    if addrs.is_empty() {
236                        info!("No addresses retrieved from cache");
237                    } else {
238                        self.extend_with_addrs(addrs);
239                    }
240                }
241                FetchEvent::Contacts(addrs) => {
242                    info!(
243                        count = addrs.len(),
244                        "process_events received contacts batch"
245                    );
246                    self.extend_with_addrs(addrs);
247                    if self
248                        .contacts_progress
249                        .as_ref()
250                        .is_none_or(ContactsProgress::is_empty)
251                    {
252                        self.contacts_progress = None;
253                    }
254                }
255            }
256
257            self.fetch_in_progress = None;
258        }
259    }
260
261    fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
262        if addrs.is_empty() {
263            return;
264        }
265        for addr in addrs {
266            Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
267        }
268    }
269
270    fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
271        if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
272            peer_ids.insert(peer_id);
273        }
274        queue.push_back(addr);
275    }
276
277    fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
278        if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
279            let _ = addr.pop();
280            Some(peer_id)
281        } else {
282            None
283        }
284    }
285
286    fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
287        match self.next_addr() {
288            Ok(Some(addr)) => Ok(Some(addr)),
289            Ok(None) => Ok(None),
290            Err(Error::NoBootstrapPeersFound) => {
291                self.bootstrap_completed = true;
292                Err(Error::NoBootstrapPeersFound)
293            }
294            Err(err) => Err(err),
295        }
296    }
297
298    fn should_continue_bootstrapping(&mut self, contacted_peers: usize) -> bool {
299        if self.bootstrap_completed {
300            info!("Initial bootstrap process has already completed successfully.");
301            return false;
302        }
303
304        if contacted_peers
305            >= self
306                .cache_store
307                .config()
308                .max_contacted_peers_before_termination
309        {
310            self.bootstrap_completed = true;
311            self.addrs.clear();
312            self.ongoing_dials.clear();
313
314            info!(
315                "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
316            );
317
318            return false;
319        }
320
321        if self.ongoing_dials.len() >= self.cache_store.config().max_concurrent_dials {
322            info!(
323                "Initial bootstrap has {} ongoing dials. Not dialing anymore.",
324                self.ongoing_dials.len()
325            );
326
327            return false;
328        }
329
330        if self.addrs.is_empty() {
331            if self.cache_pending
332                || self.contacts_progress.is_some()
333                || self.fetch_in_progress.is_some()
334            {
335                info!("Initial bootstrap is awaiting additional addresses to arrive.");
336                return false;
337            }
338            info!(
339                "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
340            );
341
342            self.bootstrap_completed = true;
343            return false;
344        }
345
346        true
347    }
348
349    pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
350        &mut self,
351        swarm: &mut Swarm<B>,
352        contacted_peers: usize,
353    ) {
354        if !self.should_continue_bootstrapping(contacted_peers) {
355            return;
356        }
357
358        while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
359            match self.try_next_dial_addr() {
360                Ok(Some(mut addr)) => {
361                    let addr_clone = addr.clone();
362                    let peer_id = Self::pop_p2p(&mut addr);
363
364                    let opts = match peer_id {
365                        Some(peer_id) => DialOpts::peer_id(peer_id)
366                            .condition(PeerCondition::NotDialing)
367                            .addresses(vec![addr])
368                            .build(),
369                        None => DialOpts::unknown_peer_id().address(addr).build(),
370                    };
371
372                    info!("Trying to dial peer with address: {addr_clone}");
373
374                    match swarm.dial(opts) {
375                        Ok(()) => {
376                            info!(
377                                "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
378                                self.ongoing_dials.len() + 1
379                            );
380                            let _ = self.ongoing_dials.insert(addr_clone);
381                        }
382                        Err(err) => match err {
383                            DialError::LocalPeerId { .. } => {
384                                warn!(
385                                    "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
386                                );
387                            }
388                            DialError::NoAddresses => {
389                                error!(
390                                    "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
391                                );
392                            }
393                            DialError::DialPeerConditionFalse(_) => {
394                                warn!(
395                                    "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
396                                );
397                            }
398                            DialError::Aborted => {
399                                error!(
400                                    "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
401                                );
402                            }
403                            DialError::WrongPeerId { obtained, .. } => {
404                                error!(
405                                    "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
406                                );
407                            }
408                            DialError::Denied { cause } => {
409                                error!(
410                                    "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
411                                );
412                            }
413                            DialError::Transport(items) => {
414                                error!(
415                                    "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
416                                );
417                            }
418                        },
419                    }
420                }
421                Ok(None) => {
422                    debug!("Waiting for additional bootstrap addresses before continuing to dial");
423                    break;
424                }
425                Err(Error::NoBootstrapPeersFound) => {
426                    info!("No more bootstrap peers available to dial.");
427                    break;
428                }
429                Err(err) => {
430                    warn!("Failed to obtain next bootstrap address: {err}");
431                    break;
432                }
433            }
434        }
435    }
436
437    pub fn on_connection_established<B: NetworkBehaviour>(
438        &mut self,
439        peer_id: &PeerId,
440        endpoint: &ConnectedPoint,
441        swarm: &mut Swarm<B>,
442        contacted_peers: usize,
443    ) {
444        if self.bootstrap_completed {
445            return;
446        }
447
448        if let ConnectedPoint::Dialer { address, .. } = endpoint
449            && !self.ongoing_dials.remove(address)
450        {
451            self.ongoing_dials
452                .retain(|addr| match multiaddr_get_peer_id(addr) {
453                    Some(id) => id != *peer_id,
454                    None => true,
455                });
456        }
457
458        self.trigger_bootstrapping_process(swarm, contacted_peers);
459    }
460
461    pub fn on_outgoing_connection_error<B: NetworkBehaviour>(
462        &mut self,
463        peer_id: Option<PeerId>,
464        swarm: &mut Swarm<B>,
465        contacted_peers: usize,
466    ) {
467        if self.bootstrap_completed {
468            return;
469        }
470
471        match peer_id {
472            Some(peer_id) => {
473                self.ongoing_dials.retain(|addr| {
474                    if let Some(id) = multiaddr_get_peer_id(addr) {
475                        id != peer_id
476                    } else {
477                        true
478                    }
479                });
480            }
481            None => {
482                // we are left with no option but to remove all the addresses from the ongoing dials that
483                // do not have a peer ID.
484                self.ongoing_dials
485                    .retain(|addr| multiaddr_get_peer_id(addr).is_some());
486            }
487        }
488
489        self.trigger_bootstrapping_process(swarm, contacted_peers);
490    }
491
492    pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
493        self.bootstrap_peer_ids.contains(peer_id)
494    }
495
496    pub fn has_terminated(&self) -> bool {
497        self.bootstrap_completed
498    }
499
500    fn start_cache_fetch(&mut self) -> Result<()> {
501        if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
502            error!("Cache fetch already in progress, not starting another");
503            return Ok(());
504        }
505
506        self.cache_pending = false;
507        let config = self.cache_store.config().clone();
508        let event_tx = self.event_tx.clone();
509
510        tokio::spawn(async move {
511            let addrs = match tokio::task::spawn_blocking(move || {
512                BootstrapCacheStore::load_cache_data(&config)
513            })
514            .await
515            {
516                Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
517                Ok(Err(err)) => {
518                    warn!("Failed to load cache data: {err}");
519                    Vec::new()
520                }
521                Err(err) => {
522                    warn!("Cache fetch task failed to join: {err}");
523                    Vec::new()
524                }
525            };
526
527            info!(
528                "Bootstrap cache loaded from disk with {} addresses",
529                addrs.len()
530            );
531            if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
532                error!("Failed to send cache fetch event: {err:?}");
533            }
534        });
535
536        self.fetch_in_progress = Some(FetchKind::Cache);
537
538        Ok(())
539    }
540
541    fn start_contacts_fetch(&mut self) -> Result<()> {
542        if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
543            error!("Contacts fetch already in progress, not starting another");
544            return Ok(());
545        }
546
547        let Some(progress) = self.contacts_progress.as_mut() else {
548            info!("No contacts progress available");
549            return Ok(());
550        };
551
552        let Some(endpoint) = progress.next_endpoint() else {
553            info!("No more contacts endpoints to try");
554            self.contacts_progress = None;
555            return Ok(());
556        };
557
558        let event_tx = self.event_tx.clone();
559
560        tokio::spawn(async move {
561            let Ok(fetcher) = ContactsFetcher::with_endpoints(vec![endpoint.clone()]) else {
562                warn!("Failed to create contacts fetcher for {endpoint}");
563                if let Err(err) = event_tx.send(FetchEvent::Contacts(Vec::new())) {
564                    error!("Failed to send contacts fetch error event: {err:?}");
565                }
566                return;
567            };
568
569            let addrs = match fetcher.fetch_bootstrap_addresses().await {
570                Ok(addrs) => addrs,
571                Err(err) => {
572                    warn!("Failed to fetch contacts from {endpoint}: {err}");
573                    Vec::new()
574                }
575            };
576
577            info!(
578                "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
579                addrs.len()
580            );
581            if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
582                error!("Failed to send contacts fetch event: {err:?}");
583            }
584        });
585
586        self.fetch_in_progress = Some(FetchKind::Contacts);
587
588        Ok(())
589    }
590
591    fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
592        if config.first {
593            info!("First node in network; not fetching contacts");
594            return Ok(None);
595        }
596
597        if config.local {
598            info!("Local network configuration; skipping contacts endpoints");
599            return Ok(None);
600        }
601
602        if !config.network_contacts_url.is_empty() {
603            let endpoints = config
604                .network_contacts_url
605                .iter()
606                .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
607                .collect::<Result<Vec<_>>>()?;
608            info!("Using provided contacts endpoints: {endpoints:?}");
609            return Ok(ContactsProgress::new(endpoints));
610        }
611
612        match get_network_id() {
613            id if id == MAINNET_ID => {
614                info!("Using built-in mainnet contacts endpoints");
615                Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
616            }
617
618            id if id == ALPHANET_ID => {
619                info!("Using built-in alphanet contacts endpoints");
620                Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
621            }
622            _ => Ok(None),
623        }
624    }
625
626    pub fn fetch_from_env() -> Vec<Multiaddr> {
627        let mut bootstrap_addresses = Vec::new();
628        // Read from ANT_PEERS environment variable if present
629        if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
630            for addr_str in addrs.split(',') {
631                if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
632                    info!("Adding addr from environment variable: {addr}");
633                    bootstrap_addresses.push(addr);
634                } else {
635                    warn!("Invalid multiaddress format from environment variable: {addr_str}");
636                }
637            }
638        }
639        bootstrap_addresses
640    }
641
642    pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
643        &mut self.cache_store
644    }
645
646    pub fn cache_store(&self) -> &BootstrapCacheStore {
647        &self.cache_store
648    }
649}
650
651impl Drop for Bootstrap {
652    fn drop(&mut self) {
653        if let Some(cache_task) = self.cache_task.take() {
654            cache_task.abort();
655        }
656    }
657}
658
659#[derive(Debug)]
660struct ContactsProgress {
661    remaining: VecDeque<Url>,
662}
663
664enum FetchEvent {
665    Cache(Vec<Multiaddr>),
666    Contacts(Vec<Multiaddr>),
667}
668
669#[derive(Clone, Copy, Debug, PartialEq, Eq)]
670enum FetchKind {
671    Cache,
672    Contacts,
673}
674
675impl ContactsProgress {
676    fn new(urls: Vec<Url>) -> Option<Self> {
677        if urls.is_empty() {
678            None
679        } else {
680            Some(Self {
681                remaining: VecDeque::from(urls),
682            })
683        }
684    }
685
686    fn from_static(urls: &[&str]) -> Option<Self> {
687        let mut parsed = Vec::new();
688        for url in urls {
689            match url.parse::<Url>() {
690                Ok(parsed_url) => parsed.push(parsed_url),
691                Err(err) => {
692                    warn!("Failed to parse static contacts URL {url}: {err}");
693                }
694            }
695        }
696        Self::new(parsed)
697    }
698
699    fn next_endpoint(&mut self) -> Option<Url> {
700        self.remaining.pop_front()
701    }
702
703    fn is_empty(&self) -> bool {
704        self.remaining.is_empty()
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711    use crate::{
712        InitialPeersConfig,
713        cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
714        multiaddr_get_peer_id,
715    };
716    use libp2p::Multiaddr;
717    use std::collections::HashSet;
718    use std::sync::{Arc, OnceLock};
719    use std::time::{Duration, Instant};
720    use tempfile::TempDir;
721    use tokio::sync::{Mutex, OwnedMutexGuard};
722    use tokio::time::sleep;
723    use wiremock::{
724        Mock, MockServer, ResponseTemplate,
725        matchers::{method, path},
726    };
727
728    async fn env_lock() -> OwnedMutexGuard<()> {
729        static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
730        Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
731            .lock_owned()
732            .await
733    }
734
735    #[allow(unsafe_code)]
736    fn set_env_var(key: &str, value: &str) {
737        unsafe {
738            std::env::set_var(key, value);
739        }
740    }
741
742    #[allow(unsafe_code)]
743    fn remove_env_var(key: &str) {
744        unsafe {
745            std::env::remove_var(key);
746        }
747    }
748
749    async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
750        let deadline = Instant::now() + Duration::from_secs(2);
751        loop {
752            match flow.next_addr() {
753                Ok(Some(addr)) => return Ok(addr),
754                Ok(None) => {
755                    if Instant::now() >= deadline {
756                        panic!("Timed out waiting for next address");
757                    }
758                    sleep(Duration::from_millis(5)).await;
759                }
760                Err(err) => return Err(err),
761            }
762        }
763    }
764
765    async fn expect_err(flow: &mut Bootstrap) -> Error {
766        let deadline = Instant::now() + Duration::from_secs(2);
767        loop {
768            match flow.next_addr() {
769                Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
770                Ok(None) => {
771                    if Instant::now() >= deadline {
772                        panic!("Timed out waiting for error from flow");
773                    }
774                    sleep(Duration::from_millis(5)).await;
775                }
776                Err(err) => return err,
777            }
778        }
779    }
780
781    fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
782        let peer_id = libp2p::PeerId::random();
783        format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
784            .parse()
785            .unwrap()
786    }
787
788    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
789    async fn test_cli_arguments_precedence() {
790        let env_addr: Multiaddr =
791            "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
792                .parse()
793                .unwrap();
794        let cli_addr: Multiaddr =
795            "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
796                .parse()
797                .unwrap();
798
799        let _env_guard = env_lock().await;
800        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
801
802        let temp_dir = TempDir::new().unwrap();
803
804        let config = InitialPeersConfig {
805            ignore_cache: true,
806            local: true,
807            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
808            addrs: vec![cli_addr.clone()],
809            ..Default::default()
810        };
811        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
812        let mut flow = Bootstrap::new(config).await.unwrap();
813
814        let first_two = vec![
815            expect_next_addr(&mut flow).await.unwrap(),
816            expect_next_addr(&mut flow).await.unwrap(),
817        ];
818        let first_set: HashSet<_> = first_two.into_iter().collect();
819        let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
820        assert_eq!(first_set, expected);
821
822        let err = expect_err(&mut flow).await;
823        assert!(matches!(err, Error::NoBootstrapPeersFound));
824
825        remove_env_var(ANT_PEERS_ENV);
826    }
827
828    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
829    async fn test_env_variable_parsing() {
830        let _env_guard = env_lock().await;
831        set_env_var(
832            ANT_PEERS_ENV,
833            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
834/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
835        );
836
837        let parsed = Bootstrap::fetch_from_env();
838        remove_env_var(ANT_PEERS_ENV);
839
840        assert_eq!(parsed.len(), 2);
841        let parsed_set: std::collections::HashSet<_> =
842            parsed.into_iter().map(|addr| addr.to_string()).collect();
843        let expected = std::collections::HashSet::from([
844            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
845                .to_string(),
846            "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
847                .to_string(),
848        ]);
849        assert_eq!(parsed_set, expected);
850    }
851
852    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
853    async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
854        let _env_guard = env_lock().await;
855        let cache_addr: Multiaddr =
856            "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
857                .parse()
858                .unwrap();
859        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
860
861        let temp_dir = TempDir::new().unwrap();
862        let file_name = BootstrapCacheStore::cache_file_name(true);
863
864        let mut cache_data = CacheData::default();
865        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
866        cache_data
867            .write_to_file(temp_dir.path(), &file_name)
868            .unwrap();
869
870        let config = InitialPeersConfig {
871            local: true,
872            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
873            ..Default::default()
874        };
875        let mut config =
876            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
877        config.disable_env_peers = true;
878        let mut flow = Bootstrap::new(config).await.unwrap();
879
880        let got = expect_next_addr(&mut flow).await.unwrap();
881        assert_eq!(got, cache_addr);
882
883        let err = expect_err(&mut flow).await;
884        assert!(matches!(err, Error::NoBootstrapPeersFound));
885    }
886
887    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
888    async fn test_first_flag_behavior() {
889        let _env_guard = env_lock().await;
890
891        let mock_server = MockServer::start().await;
892        Mock::given(method("GET"))
893            .and(path("/peers"))
894            .respond_with(ResponseTemplate::new(200).set_body_string(
895                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
896            ))
897            .expect(0)
898            .mount(&mock_server)
899            .await;
900
901        let temp_dir = TempDir::new().unwrap();
902        let config = InitialPeersConfig {
903                first: true,
904                addrs: vec![
905                "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
906                    .parse()
907                    .unwrap(),
908                ],
909                network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
910                bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
911                ..Default::default()
912            };
913        let mut config =
914            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
915        config.disable_env_peers = true;
916        let mut flow = Bootstrap::new(config).await.unwrap();
917
918        let err = expect_err(&mut flow).await;
919        assert!(matches!(err, Error::NoBootstrapPeersFound));
920        assert!(
921            mock_server.received_requests().await.unwrap().is_empty(),
922            "first flag should prevent contact fetches"
923        );
924    }
925
926    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
927    async fn test_multiple_network_contacts() {
928        let _env_guard = env_lock().await;
929
930        let mock_server = MockServer::start().await;
931
932        let contact_one: Multiaddr =
933            "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
934                .parse()
935                .unwrap();
936        let contact_two: Multiaddr =
937            "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
938                .parse()
939                .unwrap();
940
941        Mock::given(method("GET"))
942            .and(path("/first"))
943            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
944            .expect(1)
945            .mount(&mock_server)
946            .await;
947
948        Mock::given(method("GET"))
949            .and(path("/second"))
950            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
951            .expect(1)
952            .mount(&mock_server)
953            .await;
954
955        let config = InitialPeersConfig {
956            ignore_cache: true,
957            network_contacts_url: vec![
958                format!("{}/first", mock_server.uri()),
959                format!("{}/second", mock_server.uri()),
960            ],
961            ..Default::default()
962        };
963        let mut config =
964            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
965        config.disable_env_peers = true;
966        let mut flow = Bootstrap::new(config).await.unwrap();
967
968        let first = expect_next_addr(&mut flow).await.unwrap();
969        assert_eq!(first, contact_one);
970
971        let second = expect_next_addr(&mut flow).await.unwrap();
972        assert_eq!(second, contact_two);
973
974        let err = expect_err(&mut flow).await;
975        assert!(matches!(err, Error::NoBootstrapPeersFound));
976
977        let requests = mock_server.received_requests().await.unwrap();
978        assert_eq!(requests.len(), 2);
979        assert_eq!(requests[0].url.path(), "/first");
980        assert_eq!(requests[1].url.path(), "/second");
981    }
982
983    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
984    async fn test_full_bootstrap_flow() {
985        let _env_guard = env_lock().await;
986        remove_env_var(ANT_PEERS_ENV);
987
988        let env_addr: Multiaddr =
989            "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
990                .parse()
991                .unwrap();
992        let cli_addr: Multiaddr =
993            "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
994                .parse()
995                .unwrap();
996        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
997
998        let cache_addr_one: Multiaddr =
999            "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1000                .parse()
1001                .unwrap();
1002        let cache_addr_two: Multiaddr =
1003            "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1004                .parse()
1005                .unwrap();
1006
1007        let temp_dir = TempDir::new().unwrap();
1008        let file_name = BootstrapCacheStore::cache_file_name(false);
1009        let mut cache_data = CacheData::default();
1010        cache_data.add_peer(
1011            multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1012            std::iter::once(&cache_addr_one),
1013            3,
1014            10,
1015        );
1016        cache_data.add_peer(
1017            multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1018            std::iter::once(&cache_addr_two),
1019            3,
1020            10,
1021        );
1022        cache_data
1023            .write_to_file(temp_dir.path(), &file_name)
1024            .unwrap();
1025
1026        let mock_server = MockServer::start().await;
1027        let contact_one: Multiaddr =
1028            "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1029                .parse()
1030                .unwrap();
1031        let contact_two: Multiaddr =
1032            "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1033                .parse()
1034                .unwrap();
1035
1036        Mock::given(method("GET"))
1037            .and(path("/contacts_one"))
1038            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1039            .expect(1)
1040            .mount(&mock_server)
1041            .await;
1042
1043        Mock::given(method("GET"))
1044            .and(path("/contacts_two"))
1045            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1046            .expect(1)
1047            .mount(&mock_server)
1048            .await;
1049
1050        let file_path = temp_dir.path().join(format!(
1051            "version_{}/{}",
1052            CacheData::CACHE_DATA_VERSION,
1053            file_name
1054        ));
1055        let contents = std::fs::read_to_string(&file_path).unwrap();
1056        assert!(contents.contains(&cache_addr_one.to_string()));
1057        assert!(contents.contains(&cache_addr_two.to_string()));
1058
1059        assert_eq!(
1060            Bootstrap::fetch_from_env(),
1061            vec![env_addr.clone()],
1062            "environment variable should yield the configured address"
1063        );
1064
1065        let config = InitialPeersConfig {
1066            addrs: vec![cli_addr.clone()],
1067            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1068            network_contacts_url: vec![
1069                format!("{}/contacts_one", mock_server.uri()),
1070                format!("{}/contacts_two", mock_server.uri()),
1071            ],
1072            ..Default::default()
1073        };
1074        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1075        let mut flow = Bootstrap::new(config).await.unwrap();
1076
1077        let initial_results = vec![
1078            expect_next_addr(&mut flow).await.unwrap(),
1079            expect_next_addr(&mut flow).await.unwrap(),
1080        ];
1081        let initial_set: HashSet<_> = initial_results.into_iter().collect();
1082        let expected_initial: HashSet<_> =
1083            [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1084        assert_eq!(initial_set, expected_initial);
1085
1086        let cache_results = vec![
1087            expect_next_addr(&mut flow).await.unwrap(),
1088            expect_next_addr(&mut flow).await.unwrap(),
1089        ];
1090        let cache_set: HashSet<_> = cache_results.into_iter().collect();
1091        let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1092            .into_iter()
1093            .collect();
1094        assert_eq!(cache_set, expected_cache);
1095
1096        let contact_first = expect_next_addr(&mut flow).await.unwrap();
1097        assert_eq!(contact_first, contact_one);
1098
1099        let contact_second = expect_next_addr(&mut flow).await.unwrap();
1100        assert_eq!(contact_second, contact_two);
1101
1102        let err = expect_err(&mut flow).await;
1103        assert!(matches!(err, Error::NoBootstrapPeersFound));
1104
1105        let requests = mock_server.received_requests().await.unwrap();
1106        assert_eq!(requests.len(), 2);
1107        assert_eq!(requests[0].url.path(), "/contacts_one");
1108        assert_eq!(requests[1].url.path(), "/contacts_two");
1109
1110        remove_env_var(ANT_PEERS_ENV);
1111    }
1112
1113    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1114    async fn test_disable_env_peers_flag() {
1115        let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1116
1117        let _env_guard = env_lock().await;
1118        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1119
1120        let temp_dir = TempDir::new().unwrap();
1121
1122        let config = InitialPeersConfig {
1123            local: true,
1124            ignore_cache: true,
1125            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1126            ..Default::default()
1127        };
1128        let mut config =
1129            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1130        config.disable_env_peers = true;
1131
1132        let result = Bootstrap::new(config).await;
1133        assert!(
1134            result.is_err(),
1135            "Should error when env peers are disabled and no other sources available"
1136        );
1137        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1138
1139        remove_env_var(ANT_PEERS_ENV);
1140    }
1141
1142    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1143    async fn test_disable_cache_reading_flag() {
1144        let _env_guard = env_lock().await;
1145
1146        let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1147        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1148
1149        let temp_dir = TempDir::new().unwrap();
1150        let file_name = BootstrapCacheStore::cache_file_name(true);
1151
1152        let mut cache_data = CacheData::default();
1153        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1154        cache_data
1155            .write_to_file(temp_dir.path(), &file_name)
1156            .unwrap();
1157
1158        let config = InitialPeersConfig {
1159            local: true,
1160            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1161            ..Default::default()
1162        };
1163        let mut config =
1164            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1165        config.disable_env_peers = true;
1166        config.disable_cache_reading = true;
1167
1168        let result = Bootstrap::new(config).await;
1169        assert!(
1170            result.is_err(),
1171            "Should error when cache reading is disabled and no other sources available"
1172        );
1173        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1174    }
1175
1176    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1177    async fn test_bootstrap_completed_initialization() {
1178        let temp_dir = TempDir::new().unwrap();
1179
1180        let config = InitialPeersConfig {
1181            first: true,
1182            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1183            ..Default::default()
1184        };
1185        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1186        let flow = Bootstrap::new(config).await.unwrap();
1187
1188        assert!(
1189            flow.has_terminated(),
1190            "bootstrap_completed should be true for first node"
1191        );
1192
1193        let config = InitialPeersConfig {
1194            first: false,
1195            local: true,
1196            ignore_cache: true,
1197            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1198            addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1199            ..Default::default()
1200        };
1201        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1202        let flow = Bootstrap::new(config).await.unwrap();
1203
1204        assert!(
1205            !flow.has_terminated(),
1206            "bootstrap_completed should be false for non-first node"
1207        );
1208    }
1209
1210    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1211    async fn test_bootstrap_peer_ids_population() {
1212        let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1213        let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1214
1215        let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1216        let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1217
1218        let _env_guard = env_lock().await;
1219        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1220
1221        let temp_dir = TempDir::new().unwrap();
1222
1223        let config = InitialPeersConfig {
1224            local: true,
1225            ignore_cache: true,
1226            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1227            addrs: vec![cli_addr.clone()],
1228            ..Default::default()
1229        };
1230        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1231        let flow = Bootstrap::new(config).await.unwrap();
1232
1233        assert!(
1234            flow.is_bootstrap_peer(&env_peer_id),
1235            "Peer ID from env should be tracked"
1236        );
1237        assert!(
1238            flow.is_bootstrap_peer(&cli_peer_id),
1239            "Peer ID from CLI should be tracked"
1240        );
1241
1242        remove_env_var(ANT_PEERS_ENV);
1243    }
1244
1245    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1246    async fn test_invalid_multiaddr_in_initial_peers() {
1247        let _env_guard = env_lock().await;
1248
1249        let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1250        let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1251
1252        let temp_dir = TempDir::new().unwrap();
1253
1254        let config = InitialPeersConfig {
1255            local: true,
1256            ignore_cache: true,
1257            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1258            addrs: vec![valid_addr.clone()],
1259            ..Default::default()
1260        };
1261        let mut config =
1262            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1263        config.disable_env_peers = true;
1264
1265        config.initial_peers.push(invalid_addr);
1266
1267        let mut flow = Bootstrap::new(config).await.unwrap();
1268
1269        let first = expect_next_addr(&mut flow).await.unwrap();
1270        assert_eq!(first, valid_addr, "Should get the valid address");
1271
1272        let err = expect_err(&mut flow).await;
1273        assert!(
1274            matches!(err, Error::NoBootstrapPeersFound),
1275            "Should not find any more peers after valid one (invalid addr was filtered)"
1276        );
1277    }
1278
1279    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280    async fn test_local_network_skips_contacts() {
1281        let _env_guard = env_lock().await;
1282
1283        let mock_server = MockServer::start().await;
1284        Mock::given(method("GET"))
1285            .and(path("/should-not-be-called"))
1286            .respond_with(ResponseTemplate::new(200).set_body_string(
1287                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1288            ))
1289            .expect(0)
1290            .mount(&mock_server)
1291            .await;
1292
1293        let temp_dir = TempDir::new().unwrap();
1294
1295        let config = InitialPeersConfig {
1296            local: true,
1297            ignore_cache: true,
1298            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1299            network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1300            addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1301            ..Default::default()
1302        };
1303        let mut config =
1304            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1305        config.disable_env_peers = true;
1306
1307        let addr_from_config = config.initial_peers[0].clone();
1308        let mut flow = Bootstrap::new(config).await.unwrap();
1309
1310        let first = expect_next_addr(&mut flow).await.unwrap();
1311        assert_eq!(
1312            first, addr_from_config,
1313            "Should get the address from config"
1314        );
1315
1316        let err = expect_err(&mut flow).await;
1317        assert!(matches!(err, Error::NoBootstrapPeersFound));
1318
1319        assert!(
1320            mock_server.received_requests().await.unwrap().is_empty(),
1321            "local flag should prevent contact fetches"
1322        );
1323    }
1324
1325    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1326    async fn test_timeout_with_no_addresses() {
1327        let _env_guard = env_lock().await;
1328
1329        let temp_dir = TempDir::new().unwrap();
1330        let file_name = BootstrapCacheStore::cache_file_name(true);
1331        let cache_data = CacheData::default();
1332        cache_data
1333            .write_to_file(temp_dir.path(), &file_name)
1334            .unwrap();
1335
1336        let config = InitialPeersConfig {
1337            local: true,
1338            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1339            ..Default::default()
1340        };
1341        let mut config =
1342            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1343        config.disable_env_peers = true;
1344
1345        let result = Bootstrap::new(config).await;
1346
1347        assert!(
1348            result.is_err(),
1349            "Should error when no addresses are available from any source"
1350        );
1351        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1352    }
1353
1354    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1355    async fn test_first_node_clears_cache() {
1356        let _env_guard = env_lock().await;
1357
1358        let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1359        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1360
1361        let temp_dir = TempDir::new().unwrap();
1362        let file_name = BootstrapCacheStore::cache_file_name(false);
1363
1364        let mut cache_data = CacheData::default();
1365        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1366        cache_data
1367            .write_to_file(temp_dir.path(), &file_name)
1368            .unwrap();
1369
1370        let file_path = temp_dir.path().join(format!(
1371            "version_{}/{}",
1372            CacheData::CACHE_DATA_VERSION,
1373            file_name
1374        ));
1375
1376        let contents_before = std::fs::read_to_string(&file_path).unwrap();
1377        assert!(
1378            contents_before.contains(&cache_addr.to_string()),
1379            "Cache should contain the address before initialization"
1380        );
1381
1382        let config = InitialPeersConfig {
1383            first: true,
1384            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1385            ..Default::default()
1386        };
1387        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1388        let _flow = Bootstrap::new(config).await.unwrap();
1389
1390        tokio::time::sleep(Duration::from_millis(100)).await;
1391
1392        let contents_after = std::fs::read_to_string(&file_path).unwrap();
1393        assert!(
1394            !contents_after.contains(&cache_addr.to_string()),
1395            "Cache should be cleared for first node"
1396        );
1397    }
1398
1399    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1400    async fn test_new_loads_at_least_50_contacts() {
1401        let _env_guard = env_lock().await;
1402
1403        let temp_dir = TempDir::new().unwrap();
1404        let file_name = BootstrapCacheStore::cache_file_name(true);
1405
1406        let mut cache_data = CacheData::default();
1407        for i in 0..60 {
1408            let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1409            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1410            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1411        }
1412        cache_data
1413            .write_to_file(temp_dir.path(), &file_name)
1414            .unwrap();
1415
1416        let config = InitialPeersConfig {
1417            local: true,
1418            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1419            ..Default::default()
1420        };
1421        let mut config =
1422            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1423        config.disable_env_peers = true;
1424
1425        let result = Bootstrap::new(config).await;
1426
1427        assert!(
1428            result.is_ok(),
1429            "Should successfully initialize with 60 contacts in cache"
1430        );
1431
1432        let mut flow = result.unwrap();
1433        let mut count = 0;
1434        while let Ok(Some(_addr)) = flow.next_addr() {
1435            count += 1;
1436            if count >= 60 {
1437                break;
1438            }
1439        }
1440
1441        assert!(
1442            count > 0,
1443            "Should have loaded contacts from cache, got {count}"
1444        );
1445    }
1446
1447    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1448    async fn test_new_succeeds_with_few_contacts() {
1449        let _env_guard = env_lock().await;
1450
1451        let temp_dir = TempDir::new().unwrap();
1452        let file_name = BootstrapCacheStore::cache_file_name(true);
1453
1454        let mut cache_data = CacheData::default();
1455        for i in 0..5 {
1456            let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1457            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1458            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1459        }
1460        cache_data
1461            .write_to_file(temp_dir.path(), &file_name)
1462            .unwrap();
1463
1464        let config = InitialPeersConfig {
1465            local: true,
1466            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1467            ..Default::default()
1468        };
1469        let mut config =
1470            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1471        config.disable_env_peers = true;
1472
1473        let result = Bootstrap::new(config).await;
1474        assert!(
1475            result.is_ok(),
1476            "Should succeed with few contacts (< 50 but > 0)"
1477        );
1478
1479        let mut flow = result.unwrap();
1480        let mut count = 0;
1481        while let Ok(Some(_addr)) = flow.next_addr() {
1482            count += 1;
1483            if count >= 10 {
1484                break;
1485            }
1486        }
1487
1488        assert_eq!(count, 5, "Should have exactly 5 contacts");
1489    }
1490
1491    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1492    async fn test_new_errors_with_zero_contacts() {
1493        let _env_guard = env_lock().await;
1494
1495        let temp_dir = TempDir::new().unwrap();
1496        let file_name = BootstrapCacheStore::cache_file_name(false);
1497        let cache_data = CacheData::default();
1498        cache_data
1499            .write_to_file(temp_dir.path(), &file_name)
1500            .unwrap();
1501
1502        let mock_server = MockServer::start().await;
1503        Mock::given(method("GET"))
1504            .and(path("/failing-endpoint"))
1505            .respond_with(ResponseTemplate::new(500))
1506            .expect(1..)
1507            .mount(&mock_server)
1508            .await;
1509
1510        let config = InitialPeersConfig {
1511            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1512            network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1513            ..Default::default()
1514        };
1515        let mut config =
1516            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1517        config.disable_env_peers = true;
1518
1519        let result = Bootstrap::new(config).await;
1520
1521        assert!(
1522            result.is_err(),
1523            "Should error when all sources fail and no contacts are available"
1524        );
1525        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1526    }
1527}