ant_bootstrap/
bootstrap.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24    Multiaddr, PeerId, Swarm,
25    core::connection::ConnectedPoint,
26    multiaddr::Protocol,
27    swarm::{
28        DialError, NetworkBehaviour,
29        dial_opts::{DialOpts, PeerCondition},
30    },
31};
32use std::collections::{HashSet, VecDeque};
33use std::time::Duration;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::sync::mpsc::UnboundedSender;
36use url::Url;
37
38/// Timeout for individual fetch operations
39const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
40
41/// Minimum number of initial addresses to fetch before returning from `new()`
42const MIN_INITIAL_ADDRS: usize = 5;
43
44/// Timeout in seconds to wait for initial addresses during bootstrap initialization
45const INITIAL_ADDR_FETCH_TIMEOUT_SECS: u64 = 30;
46
47/// Manages the flow of obtaining bootstrap peer addresses from various sources and also writes to the bootstrap cache.
48///
49/// The sources are tried in the following order while reading:
50/// 1. Environment variable `ANT_PEERS`
51/// 2. Command-line provided addresses
52/// 3. Bootstrap cache file on disk
53/// 4. Network contacts endpoints
54///
55/// Addresses are returned one at a time via the `next_addr` method.
56/// It handles asynchronous fetching from the cache and contacts endpoints,
57/// ensuring only one fetch is in progress at a time.
58///
59/// If no more addresses are available from any source, `next_addr` returns an error.
60/// It is expected that the caller will retry `next_addr` later to allow
61/// for asynchronous fetches to complete.
62#[derive(custom_debug::Debug)]
63pub struct Bootstrap {
64    cache_store: BootstrapCacheStore,
65    addrs: VecDeque<Multiaddr>,
66    // The task responsible for syncing the cache, this is aborted on drop.
67    #[debug(skip)]
68    cache_task: Option<tokio::task::JoinHandle<()>>,
69    // fetcher
70    cache_pending: bool,
71    contacts_progress: Option<ContactsProgress>,
72    event_tx: UnboundedSender<FetchEvent>,
73    event_rx: UnboundedReceiver<FetchEvent>,
74    fetch_in_progress: Option<FetchKind>,
75    // dialer
76    ongoing_dials: HashSet<Multiaddr>,
77    bootstrap_peer_ids: HashSet<PeerId>,
78    bootstrap_completed: bool,
79}
80
81impl Bootstrap {
82    pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
83        let contacts_progress = Self::build_contacts_progress(&config)?;
84
85        let mut addrs_queue = VecDeque::new();
86        let mut bootstrap_peer_ids = HashSet::new();
87        if !config.first {
88            if !config.disable_env_peers {
89                for addr in Self::fetch_from_env() {
90                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
91                }
92            } else {
93                info!("Skipping ANT_PEERS environment variable as per configuration");
94            }
95
96            for addr in config.initial_peers.drain(..) {
97                if let Some(addr) = craft_valid_multiaddr(&addr, false) {
98                    info!("Adding addr from arguments: {addr}");
99                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
100                } else {
101                    warn!("Invalid multiaddress format from arguments: {addr}");
102                }
103            }
104        }
105
106        let cache_pending = !config.first && !config.disable_cache_reading;
107        if !cache_pending {
108            info!(
109                "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
110                config.first, config.disable_cache_reading
111            );
112        } else {
113            info!("Cache loading is enabled - cache will be fetched if needed");
114        }
115        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
116
117        let cache_store = BootstrapCacheStore::new(config.clone())?;
118
119        let mut bootstrap = Self {
120            cache_store,
121            addrs: addrs_queue,
122            cache_pending,
123            contacts_progress,
124            event_tx,
125            event_rx,
126            fetch_in_progress: None,
127            ongoing_dials: HashSet::new(),
128            bootstrap_peer_ids,
129            bootstrap_completed: config.first,
130            cache_task: None,
131        };
132
133        info!("Cache store is initialized and will sync and flush periodically");
134        let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
135        bootstrap.cache_task = Some(cache_task);
136
137        if config.first {
138            info!("First node in network; clearing any existing cache");
139            bootstrap.cache_store.write().await?;
140            return Ok(bootstrap);
141        }
142
143        // ensure the initial queue is not empty by fetching from cache/contacts if needed
144        //
145        // not required for 'first' node
146        let mut collected_addrs = Vec::new();
147        if bootstrap.addrs.len() < MIN_INITIAL_ADDRS {
148            info!("Initial address queue < {MIN_INITIAL_ADDRS}; fetching from cache/contacts");
149            let now = std::time::Instant::now();
150            loop {
151                match bootstrap.next_addr() {
152                    Ok(Some(addr)) => {
153                        collected_addrs.push(addr);
154                        if Self::try_finalize_initial_addrs(
155                            &mut bootstrap,
156                            &mut collected_addrs,
157                            MIN_INITIAL_ADDRS,
158                        ) {
159                            break;
160                        }
161                        continue;
162                    }
163                    Ok(None) => {
164                        debug!(
165                            "No immediate address available; waiting for async fetch to complete"
166                        );
167                    }
168                    Err(err) => {
169                        if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1)
170                        {
171                            break;
172                        }
173                        warn!("Failed to fetch initial address: {err}");
174                        return Err(err);
175                    }
176                }
177
178                if now.elapsed() > std::time::Duration::from_secs(INITIAL_ADDR_FETCH_TIMEOUT_SECS) {
179                    if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1) {
180                        break;
181                    }
182                    error!("Timed out waiting for initial addresses. ");
183                    return Err(Error::NoBootstrapPeersFound);
184                }
185                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186            }
187        }
188
189        Ok(bootstrap)
190    }
191
192    /// Attempts to finalize the initial address collection by extending the bootstrap with collected addresses.
193    /// Returns `true` if addresses were successfully added and initialization should complete.
194    /// Returns `false` if no addresses are available yet.
195    fn try_finalize_initial_addrs(
196        bootstrap: &mut Bootstrap,
197        collected_addrs: &mut Vec<Multiaddr>,
198        min_address: usize,
199    ) -> bool {
200        if collected_addrs.len() < min_address {
201            return false;
202        }
203        info!(
204            "Collected minimum required initial addresses ({}), proceeding with bootstrap.",
205            collected_addrs.len()
206        );
207        bootstrap.extend_with_addrs(std::mem::take(collected_addrs));
208        true
209    }
210
211    /// Returns the next address from the sources. Returns `Ok(None)` if we are waiting for a source to return more
212    /// addresses.
213    /// Error if we have exhausted all sources and have no more addresses to return.
214    ///
215    /// This does not start any dial attempts, it returns the next address to dial.
216    /// Use `trigger_bootstrapping_process` to poll the dialing process.
217    pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
218        loop {
219            self.process_events();
220
221            if let Some(addr) = self.addrs.pop_front() {
222                info!("Returning next bootstrap address: {addr}");
223                return Ok(Some(addr));
224            }
225
226            if let Some(fetch_kind) = self.fetch_in_progress {
227                debug!("Fetch in progress: {fetch_kind:?}; waiting for addresses");
228                return Ok(None);
229            }
230
231            if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
232                info!("Triggering cache fetch");
233                self.start_cache_fetch()?;
234                continue;
235            }
236
237            if self.contacts_progress.is_some()
238                && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
239            {
240                info!("Triggering contacts fetch");
241                self.start_contacts_fetch()?;
242                if self.fetch_in_progress.is_some() {
243                    return Ok(None);
244                }
245                continue;
246            }
247
248            warn!("No more sources to fetch bootstrap addresses from, and address queue is empty.");
249            return Err(Error::NoBootstrapPeersFound);
250        }
251    }
252
253    fn process_events(&mut self) {
254        while let Ok(event) = self.event_rx.try_recv() {
255            match event {
256                FetchEvent::Cache(addrs) => {
257                    if addrs.is_empty() {
258                        info!("Cache fetch has completed, but read 0 addresses");
259                    } else {
260                        info!("Cache fetch has completed. Got {} addresses", addrs.len());
261                        self.extend_with_addrs(addrs);
262                    }
263                }
264                FetchEvent::Contacts(addrs) => {
265                    info!(
266                        "Contacts fetch has completed. Got {} addresses",
267                        addrs.len()
268                    );
269                    self.extend_with_addrs(addrs);
270                    if self
271                        .contacts_progress
272                        .as_ref()
273                        .is_none_or(ContactsProgress::is_empty)
274                    {
275                        self.contacts_progress = None;
276                    }
277                }
278            }
279
280            self.fetch_in_progress = None;
281        }
282    }
283
284    fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
285        if addrs.is_empty() {
286            return;
287        }
288        for addr in addrs {
289            Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
290        }
291    }
292
293    fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
294        if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
295            peer_ids.insert(peer_id);
296        }
297        queue.push_back(addr);
298    }
299
300    fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
301        if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
302            let _ = addr.pop();
303            Some(peer_id)
304        } else {
305            None
306        }
307    }
308
309    fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
310        match self.next_addr() {
311            Ok(Some(addr)) => Ok(Some(addr)),
312            Ok(None) => Ok(None),
313            Err(Error::NoBootstrapPeersFound) => {
314                self.bootstrap_completed = true;
315                Err(Error::NoBootstrapPeersFound)
316            }
317            Err(err) => Err(err),
318        }
319    }
320
321    /// Return true if the bootstrapping process has completed or if we have run out of addresses, otherwise false.
322    fn has_bootstrap_completed(&self, contacted_peers: usize) -> bool {
323        if self.bootstrap_completed {
324            debug!("Initial bootstrap process has already completed successfully.");
325            return true;
326        }
327
328        if contacted_peers
329            >= self
330                .cache_store
331                .config()
332                .max_contacted_peers_before_termination
333        {
334            info!(
335                "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
336            );
337            return true;
338        }
339
340        // If addresses are empty AND no fetch is in progress AND no contacts endpoints are left to try, then
341        // we have exhausted all sources.
342        if self.addrs.is_empty()
343            && !self.cache_pending
344            && self.contacts_progress.is_none()
345            && self.fetch_in_progress.is_none()
346        {
347            info!(
348                "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
349            );
350            return true;
351        }
352
353        false
354    }
355
356    /// Manages the bootstrapping process by attempting to dial peers from the available addresses.
357    ///
358    /// Returns `true` if the bootstrapping process has ended (either due to successful connection or due to exhaustion
359    /// of addresses), otherwise `false`.
360    pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
361        &mut self,
362        swarm: &mut Swarm<B>,
363        contacted_peers: usize,
364    ) -> bool {
365        if self.has_bootstrap_completed(contacted_peers) {
366            self.bootstrap_completed = true;
367            self.addrs.clear();
368            self.ongoing_dials.clear();
369            return true;
370        }
371
372        while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
373            match self.try_next_dial_addr() {
374                Ok(Some(mut addr)) => {
375                    let addr_clone = addr.clone();
376                    let peer_id = Self::pop_p2p(&mut addr);
377
378                    let opts = match peer_id {
379                        Some(peer_id) => DialOpts::peer_id(peer_id)
380                            .condition(PeerCondition::NotDialing)
381                            .addresses(vec![addr])
382                            .build(),
383                        None => DialOpts::unknown_peer_id().address(addr).build(),
384                    };
385
386                    info!("Trying to dial peer with address: {addr_clone}");
387
388                    match swarm.dial(opts) {
389                        Ok(()) => {
390                            info!(
391                                "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
392                                self.ongoing_dials.len() + 1
393                            );
394                            let _ = self.ongoing_dials.insert(addr_clone);
395                        }
396                        Err(err) => match err {
397                            DialError::LocalPeerId { .. } => {
398                                warn!(
399                                    "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
400                                );
401                            }
402                            DialError::NoAddresses => {
403                                error!(
404                                    "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
405                                );
406                            }
407                            DialError::DialPeerConditionFalse(_) => {
408                                warn!(
409                                    "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
410                                );
411                            }
412                            DialError::Aborted => {
413                                error!(
414                                    "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
415                                );
416                            }
417                            DialError::WrongPeerId { obtained, .. } => {
418                                error!(
419                                    "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
420                                );
421                            }
422                            DialError::Denied { cause } => {
423                                error!(
424                                    "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
425                                );
426                            }
427                            DialError::Transport(items) => {
428                                error!(
429                                    "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
430                                );
431                            }
432                        },
433                    }
434                }
435                Ok(None) => {
436                    debug!("Waiting for additional bootstrap addresses before continuing to dial");
437                    break;
438                }
439                Err(Error::NoBootstrapPeersFound) => {
440                    info!("No more bootstrap peers available to dial.");
441                    break;
442                }
443                Err(err) => {
444                    warn!("Failed to obtain next bootstrap address: {err}");
445                    break;
446                }
447            }
448        }
449        self.bootstrap_completed
450    }
451
452    pub fn on_connection_established(&mut self, peer_id: &PeerId, endpoint: &ConnectedPoint) {
453        if self.bootstrap_completed {
454            return;
455        }
456
457        if let ConnectedPoint::Dialer { address, .. } = endpoint
458            && !self.ongoing_dials.remove(address)
459        {
460            self.ongoing_dials
461                .retain(|addr| match multiaddr_get_peer_id(addr) {
462                    Some(id) => id != *peer_id,
463                    None => true,
464                });
465        }
466    }
467
468    pub fn on_outgoing_connection_error(&mut self, peer_id: Option<PeerId>) {
469        if self.bootstrap_completed {
470            return;
471        }
472
473        match peer_id {
474            Some(peer_id) => {
475                self.ongoing_dials.retain(|addr| {
476                    if let Some(id) = multiaddr_get_peer_id(addr) {
477                        id != peer_id
478                    } else {
479                        true
480                    }
481                });
482            }
483            None => {
484                // we are left with no option but to remove all the addresses from the ongoing dials that
485                // do not have a peer ID.
486                self.ongoing_dials
487                    .retain(|addr| multiaddr_get_peer_id(addr).is_some());
488            }
489        }
490    }
491
492    pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
493        self.bootstrap_peer_ids.contains(peer_id)
494    }
495
496    pub fn has_terminated(&self) -> bool {
497        self.bootstrap_completed
498    }
499
500    fn start_cache_fetch(&mut self) -> Result<()> {
501        if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
502            error!("Cache fetch already in progress, not starting another");
503            return Ok(());
504        }
505
506        self.cache_pending = false;
507        let config = self.cache_store.config().clone();
508        let event_tx = self.event_tx.clone();
509
510        tokio::spawn(async move {
511            let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async move {
512                tokio::task::spawn_blocking(move || BootstrapCacheStore::load_cache_data(&config))
513                    .await
514            })
515            .await;
516
517            let addrs = match fetch_result {
518                Ok(spawn_result) => match spawn_result {
519                    Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
520                    Ok(Err(err)) => {
521                        warn!("Failed to load cache data: {err}");
522                        Vec::new()
523                    }
524                    Err(err) => {
525                        warn!("Cache fetch task failed to join: {err}");
526                        Vec::new()
527                    }
528                },
529                Err(_) => {
530                    warn!(
531                        "Cache fetch timed out after {} seconds",
532                        FETCH_TIMEOUT.as_secs()
533                    );
534                    Vec::new()
535                }
536            };
537
538            info!(
539                "Bootstrap cache loaded from disk with {} addresses",
540                addrs.len()
541            );
542            if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
543                error!("Failed to send cache fetch event: {err:?}");
544            }
545        });
546
547        self.fetch_in_progress = Some(FetchKind::Cache);
548
549        Ok(())
550    }
551
552    fn start_contacts_fetch(&mut self) -> Result<()> {
553        if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
554            error!("Contacts fetch already in progress, not starting another");
555            return Ok(());
556        }
557
558        let Some(progress) = self.contacts_progress.as_mut() else {
559            info!("No contacts progress available");
560            return Ok(());
561        };
562
563        let Some(endpoint) = progress.next_endpoint() else {
564            info!("No more contacts endpoints to try");
565            self.contacts_progress = None;
566            return Ok(());
567        };
568
569        let event_tx = self.event_tx.clone();
570
571        tokio::spawn(async move {
572            let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async {
573                let fetcher = ContactsFetcher::with_endpoints(vec![endpoint.clone()])?;
574                fetcher.fetch_bootstrap_addresses().await
575            })
576            .await;
577
578            let addrs = match fetch_result {
579                Ok(Ok(addrs)) => addrs,
580                Ok(Err(err)) => {
581                    warn!("Failed to fetch contacts from {endpoint}: {err}");
582                    Vec::new()
583                }
584                Err(_) => {
585                    warn!(
586                        "Contacts fetch from {endpoint} timed out after {} seconds",
587                        FETCH_TIMEOUT.as_secs()
588                    );
589                    Vec::new()
590                }
591            };
592
593            info!(
594                "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
595                addrs.len()
596            );
597            if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
598                error!("Failed to send contacts fetch event: {err:?}");
599            }
600        });
601
602        self.fetch_in_progress = Some(FetchKind::Contacts);
603
604        Ok(())
605    }
606
607    fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
608        if config.first {
609            info!("First node in network; not fetching contacts");
610            return Ok(None);
611        }
612
613        if config.local {
614            info!("Local network configuration; skipping contacts endpoints");
615            return Ok(None);
616        }
617
618        if !config.network_contacts_url.is_empty() {
619            let endpoints = config
620                .network_contacts_url
621                .iter()
622                .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
623                .collect::<Result<Vec<_>>>()?;
624            info!("Using provided contacts endpoints: {endpoints:?}");
625            return Ok(ContactsProgress::new(endpoints));
626        }
627
628        match get_network_id() {
629            id if id == MAINNET_ID => {
630                info!("Using built-in mainnet contacts endpoints");
631                Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
632            }
633
634            id if id == ALPHANET_ID => {
635                info!("Using built-in alphanet contacts endpoints");
636                Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
637            }
638            _ => Ok(None),
639        }
640    }
641
642    pub fn fetch_from_env() -> Vec<Multiaddr> {
643        let mut bootstrap_addresses = Vec::new();
644        // Read from ANT_PEERS environment variable if present
645        if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
646            for addr_str in addrs.split(',') {
647                if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
648                    info!("Adding addr from environment variable: {addr}");
649                    bootstrap_addresses.push(addr);
650                } else {
651                    warn!("Invalid multiaddress format from environment variable: {addr_str}");
652                }
653            }
654        }
655        bootstrap_addresses
656    }
657
658    pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
659        &mut self.cache_store
660    }
661
662    pub fn cache_store(&self) -> &BootstrapCacheStore {
663        &self.cache_store
664    }
665}
666
667impl Drop for Bootstrap {
668    fn drop(&mut self) {
669        if let Some(cache_sync_task) = self.cache_task.take() {
670            cache_sync_task.abort();
671        }
672    }
673}
674
675#[derive(Debug)]
676struct ContactsProgress {
677    remaining: VecDeque<Url>,
678}
679
680enum FetchEvent {
681    Cache(Vec<Multiaddr>),
682    Contacts(Vec<Multiaddr>),
683}
684
685#[derive(Clone, Copy, Debug, PartialEq, Eq)]
686enum FetchKind {
687    Cache,
688    Contacts,
689}
690
691impl ContactsProgress {
692    fn new(urls: Vec<Url>) -> Option<Self> {
693        if urls.is_empty() {
694            None
695        } else {
696            Some(Self {
697                remaining: VecDeque::from(urls),
698            })
699        }
700    }
701
702    fn from_static(urls: &[&str]) -> Option<Self> {
703        let mut parsed = Vec::new();
704        for url in urls {
705            match url.parse::<Url>() {
706                Ok(parsed_url) => parsed.push(parsed_url),
707                Err(err) => {
708                    warn!("Failed to parse static contacts URL {url}: {err}");
709                }
710            }
711        }
712        Self::new(parsed)
713    }
714
715    fn next_endpoint(&mut self) -> Option<Url> {
716        self.remaining.pop_front()
717    }
718
719    fn is_empty(&self) -> bool {
720        self.remaining.is_empty()
721    }
722}
723
724#[cfg(test)]
725mod tests {
726    use super::*;
727    use crate::{
728        InitialPeersConfig,
729        cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
730        multiaddr_get_peer_id,
731    };
732    use libp2p::Multiaddr;
733    use std::collections::HashSet;
734    use std::sync::{Arc, OnceLock};
735    use std::time::{Duration, Instant};
736    use tempfile::TempDir;
737    use tokio::sync::{Mutex, OwnedMutexGuard};
738    use tokio::time::sleep;
739    use wiremock::{
740        Mock, MockServer, ResponseTemplate,
741        matchers::{method, path},
742    };
743
744    async fn env_lock() -> OwnedMutexGuard<()> {
745        static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
746        Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
747            .lock_owned()
748            .await
749    }
750
751    #[allow(unsafe_code)]
752    fn set_env_var(key: &str, value: &str) {
753        unsafe {
754            std::env::set_var(key, value);
755        }
756    }
757
758    #[allow(unsafe_code)]
759    fn remove_env_var(key: &str) {
760        unsafe {
761            std::env::remove_var(key);
762        }
763    }
764
765    async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
766        let deadline = Instant::now() + Duration::from_secs(2);
767        loop {
768            match flow.next_addr() {
769                Ok(Some(addr)) => return Ok(addr),
770                Ok(None) => {
771                    if Instant::now() >= deadline {
772                        panic!("Timed out waiting for next address");
773                    }
774                    sleep(Duration::from_millis(5)).await;
775                }
776                Err(err) => return Err(err),
777            }
778        }
779    }
780
781    async fn expect_err(flow: &mut Bootstrap) -> Error {
782        let deadline = Instant::now() + Duration::from_secs(2);
783        loop {
784            match flow.next_addr() {
785                Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
786                Ok(None) => {
787                    if Instant::now() >= deadline {
788                        panic!("Timed out waiting for error from flow");
789                    }
790                    sleep(Duration::from_millis(5)).await;
791                }
792                Err(err) => return err,
793            }
794        }
795    }
796
797    fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
798        let peer_id = libp2p::PeerId::random();
799        format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
800            .parse()
801            .unwrap()
802    }
803
804    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
805    async fn test_cli_arguments_precedence() {
806        let env_addr: Multiaddr =
807            "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
808                .parse()
809                .unwrap();
810        let cli_addr: Multiaddr =
811            "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
812                .parse()
813                .unwrap();
814
815        let _env_guard = env_lock().await;
816        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
817
818        let temp_dir = TempDir::new().unwrap();
819
820        let config = InitialPeersConfig {
821            ignore_cache: true,
822            local: true,
823            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
824            addrs: vec![cli_addr.clone()],
825            ..Default::default()
826        };
827        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
828        let mut flow = Bootstrap::new(config).await.unwrap();
829
830        let first_two = vec![
831            expect_next_addr(&mut flow).await.unwrap(),
832            expect_next_addr(&mut flow).await.unwrap(),
833        ];
834        let first_set: HashSet<_> = first_two.into_iter().collect();
835        let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
836        assert_eq!(first_set, expected);
837
838        let err = expect_err(&mut flow).await;
839        assert!(matches!(err, Error::NoBootstrapPeersFound));
840
841        remove_env_var(ANT_PEERS_ENV);
842    }
843
844    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
845    async fn test_env_variable_parsing() {
846        let _env_guard = env_lock().await;
847        set_env_var(
848            ANT_PEERS_ENV,
849            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
850/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
851        );
852
853        let parsed = Bootstrap::fetch_from_env();
854        remove_env_var(ANT_PEERS_ENV);
855
856        assert_eq!(parsed.len(), 2);
857        let parsed_set: std::collections::HashSet<_> =
858            parsed.into_iter().map(|addr| addr.to_string()).collect();
859        let expected = std::collections::HashSet::from([
860            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
861                .to_string(),
862            "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
863                .to_string(),
864        ]);
865        assert_eq!(parsed_set, expected);
866    }
867
868    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
869    async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
870        let _env_guard = env_lock().await;
871        let cache_addr: Multiaddr =
872            "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
873                .parse()
874                .unwrap();
875        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
876
877        let temp_dir = TempDir::new().unwrap();
878        let file_name = BootstrapCacheStore::cache_file_name(true);
879
880        let mut cache_data = CacheData::default();
881        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
882        cache_data
883            .write_to_file(temp_dir.path(), &file_name)
884            .unwrap();
885
886        let config = InitialPeersConfig {
887            local: true,
888            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
889            ..Default::default()
890        };
891        let mut config =
892            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
893        config.disable_env_peers = true;
894        let mut flow = Bootstrap::new(config).await.unwrap();
895
896        let got = expect_next_addr(&mut flow).await.unwrap();
897        assert_eq!(got, cache_addr);
898
899        let err = expect_err(&mut flow).await;
900        assert!(matches!(err, Error::NoBootstrapPeersFound));
901    }
902
903    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
904    async fn test_first_flag_behavior() {
905        let _env_guard = env_lock().await;
906
907        let mock_server = MockServer::start().await;
908        Mock::given(method("GET"))
909            .and(path("/peers"))
910            .respond_with(ResponseTemplate::new(200).set_body_string(
911                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
912            ))
913            .expect(0)
914            .mount(&mock_server)
915            .await;
916
917        let temp_dir = TempDir::new().unwrap();
918        let config = InitialPeersConfig {
919                first: true,
920                addrs: vec![
921                "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
922                    .parse()
923                    .unwrap(),
924                ],
925                network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
926                bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
927                ..Default::default()
928            };
929        let mut config =
930            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
931        config.disable_env_peers = true;
932        let mut flow = Bootstrap::new(config).await.unwrap();
933
934        let err = expect_err(&mut flow).await;
935        assert!(matches!(err, Error::NoBootstrapPeersFound));
936        assert!(
937            mock_server.received_requests().await.unwrap().is_empty(),
938            "first flag should prevent contact fetches"
939        );
940    }
941
942    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
943    async fn test_multiple_network_contacts() {
944        let _env_guard = env_lock().await;
945
946        let mock_server = MockServer::start().await;
947
948        let contact_one: Multiaddr =
949            "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
950                .parse()
951                .unwrap();
952        let contact_two: Multiaddr =
953            "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
954                .parse()
955                .unwrap();
956
957        Mock::given(method("GET"))
958            .and(path("/first"))
959            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
960            .expect(1)
961            .mount(&mock_server)
962            .await;
963
964        Mock::given(method("GET"))
965            .and(path("/second"))
966            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
967            .expect(1)
968            .mount(&mock_server)
969            .await;
970
971        let config = InitialPeersConfig {
972            ignore_cache: true,
973            network_contacts_url: vec![
974                format!("{}/first", mock_server.uri()),
975                format!("{}/second", mock_server.uri()),
976            ],
977            ..Default::default()
978        };
979        let mut config =
980            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
981        config.disable_env_peers = true;
982        let mut flow = Bootstrap::new(config).await.unwrap();
983
984        let first = expect_next_addr(&mut flow).await.unwrap();
985        assert_eq!(first, contact_one);
986
987        let second = expect_next_addr(&mut flow).await.unwrap();
988        assert_eq!(second, contact_two);
989
990        let err = expect_err(&mut flow).await;
991        assert!(matches!(err, Error::NoBootstrapPeersFound));
992
993        let requests = mock_server.received_requests().await.unwrap();
994        assert_eq!(requests.len(), 2);
995        assert_eq!(requests[0].url.path(), "/first");
996        assert_eq!(requests[1].url.path(), "/second");
997    }
998
999    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1000    async fn test_full_bootstrap_flow() {
1001        let _env_guard = env_lock().await;
1002        remove_env_var(ANT_PEERS_ENV);
1003
1004        let env_addr: Multiaddr =
1005            "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
1006                .parse()
1007                .unwrap();
1008        let cli_addr: Multiaddr =
1009            "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
1010                .parse()
1011                .unwrap();
1012        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1013
1014        let cache_addr_one: Multiaddr =
1015            "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1016                .parse()
1017                .unwrap();
1018        let cache_addr_two: Multiaddr =
1019            "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1020                .parse()
1021                .unwrap();
1022
1023        let temp_dir = TempDir::new().unwrap();
1024        let file_name = BootstrapCacheStore::cache_file_name(false);
1025        let mut cache_data = CacheData::default();
1026        cache_data.add_peer(
1027            multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1028            std::iter::once(&cache_addr_one),
1029            3,
1030            10,
1031        );
1032        cache_data.add_peer(
1033            multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1034            std::iter::once(&cache_addr_two),
1035            3,
1036            10,
1037        );
1038        cache_data
1039            .write_to_file(temp_dir.path(), &file_name)
1040            .unwrap();
1041
1042        let mock_server = MockServer::start().await;
1043        let contact_one: Multiaddr =
1044            "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1045                .parse()
1046                .unwrap();
1047        let contact_two: Multiaddr =
1048            "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1049                .parse()
1050                .unwrap();
1051
1052        Mock::given(method("GET"))
1053            .and(path("/contacts_one"))
1054            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1055            .expect(1)
1056            .mount(&mock_server)
1057            .await;
1058
1059        Mock::given(method("GET"))
1060            .and(path("/contacts_two"))
1061            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1062            .expect(1)
1063            .mount(&mock_server)
1064            .await;
1065
1066        let file_path = temp_dir.path().join(format!(
1067            "version_{}/{}",
1068            CacheData::CACHE_DATA_VERSION,
1069            file_name
1070        ));
1071        let contents = std::fs::read_to_string(&file_path).unwrap();
1072        assert!(contents.contains(&cache_addr_one.to_string()));
1073        assert!(contents.contains(&cache_addr_two.to_string()));
1074
1075        assert_eq!(
1076            Bootstrap::fetch_from_env(),
1077            vec![env_addr.clone()],
1078            "environment variable should yield the configured address"
1079        );
1080
1081        let config = InitialPeersConfig {
1082            addrs: vec![cli_addr.clone()],
1083            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1084            network_contacts_url: vec![
1085                format!("{}/contacts_one", mock_server.uri()),
1086                format!("{}/contacts_two", mock_server.uri()),
1087            ],
1088            ..Default::default()
1089        };
1090        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1091        let mut flow = Bootstrap::new(config).await.unwrap();
1092
1093        let initial_results = vec![
1094            expect_next_addr(&mut flow).await.unwrap(),
1095            expect_next_addr(&mut flow).await.unwrap(),
1096        ];
1097        let initial_set: HashSet<_> = initial_results.into_iter().collect();
1098        let expected_initial: HashSet<_> =
1099            [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1100        assert_eq!(initial_set, expected_initial);
1101
1102        let cache_results = vec![
1103            expect_next_addr(&mut flow).await.unwrap(),
1104            expect_next_addr(&mut flow).await.unwrap(),
1105        ];
1106        let cache_set: HashSet<_> = cache_results.into_iter().collect();
1107        let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1108            .into_iter()
1109            .collect();
1110        assert_eq!(cache_set, expected_cache);
1111
1112        let contact_first = expect_next_addr(&mut flow).await.unwrap();
1113        assert_eq!(contact_first, contact_one);
1114
1115        let contact_second = expect_next_addr(&mut flow).await.unwrap();
1116        assert_eq!(contact_second, contact_two);
1117
1118        let err = expect_err(&mut flow).await;
1119        assert!(matches!(err, Error::NoBootstrapPeersFound));
1120
1121        let requests = mock_server.received_requests().await.unwrap();
1122        assert_eq!(requests.len(), 2);
1123        assert_eq!(requests[0].url.path(), "/contacts_one");
1124        assert_eq!(requests[1].url.path(), "/contacts_two");
1125
1126        remove_env_var(ANT_PEERS_ENV);
1127    }
1128
1129    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1130    async fn test_disable_env_peers_flag() {
1131        let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1132
1133        let _env_guard = env_lock().await;
1134        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1135
1136        let temp_dir = TempDir::new().unwrap();
1137
1138        let config = InitialPeersConfig {
1139            local: true,
1140            ignore_cache: true,
1141            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1142            ..Default::default()
1143        };
1144        let mut config =
1145            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1146        config.disable_env_peers = true;
1147
1148        let result = Bootstrap::new(config).await;
1149        assert!(
1150            result.is_err(),
1151            "Should error when env peers are disabled and no other sources available"
1152        );
1153        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1154
1155        remove_env_var(ANT_PEERS_ENV);
1156    }
1157
1158    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1159    async fn test_disable_cache_reading_flag() {
1160        let _env_guard = env_lock().await;
1161
1162        let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1163        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1164
1165        let temp_dir = TempDir::new().unwrap();
1166        let file_name = BootstrapCacheStore::cache_file_name(true);
1167
1168        let mut cache_data = CacheData::default();
1169        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1170        cache_data
1171            .write_to_file(temp_dir.path(), &file_name)
1172            .unwrap();
1173
1174        let config = InitialPeersConfig {
1175            local: true,
1176            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1177            ..Default::default()
1178        };
1179        let mut config =
1180            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1181        config.disable_env_peers = true;
1182        config.disable_cache_reading = true;
1183
1184        let result = Bootstrap::new(config).await;
1185        assert!(
1186            result.is_err(),
1187            "Should error when cache reading is disabled and no other sources available"
1188        );
1189        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1190    }
1191
1192    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1193    async fn test_bootstrap_completed_initialization() {
1194        let temp_dir = TempDir::new().unwrap();
1195
1196        let config = InitialPeersConfig {
1197            first: true,
1198            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1199            ..Default::default()
1200        };
1201        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1202        let flow = Bootstrap::new(config).await.unwrap();
1203
1204        assert!(
1205            flow.has_terminated(),
1206            "bootstrap_completed should be true for first node"
1207        );
1208
1209        let config = InitialPeersConfig {
1210            first: false,
1211            local: true,
1212            ignore_cache: true,
1213            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1214            addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1215            ..Default::default()
1216        };
1217        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1218        let flow = Bootstrap::new(config).await.unwrap();
1219
1220        assert!(
1221            !flow.has_terminated(),
1222            "bootstrap_completed should be false for non-first node"
1223        );
1224    }
1225
1226    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1227    async fn test_bootstrap_peer_ids_population() {
1228        let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1229        let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1230
1231        let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1232        let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1233
1234        let _env_guard = env_lock().await;
1235        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1236
1237        let temp_dir = TempDir::new().unwrap();
1238
1239        let config = InitialPeersConfig {
1240            local: true,
1241            ignore_cache: true,
1242            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1243            addrs: vec![cli_addr.clone()],
1244            ..Default::default()
1245        };
1246        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1247        let flow = Bootstrap::new(config).await.unwrap();
1248
1249        assert!(
1250            flow.is_bootstrap_peer(&env_peer_id),
1251            "Peer ID from env should be tracked"
1252        );
1253        assert!(
1254            flow.is_bootstrap_peer(&cli_peer_id),
1255            "Peer ID from CLI should be tracked"
1256        );
1257
1258        remove_env_var(ANT_PEERS_ENV);
1259    }
1260
1261    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1262    async fn test_invalid_multiaddr_in_initial_peers() {
1263        let _env_guard = env_lock().await;
1264
1265        let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1266        let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1267
1268        let temp_dir = TempDir::new().unwrap();
1269
1270        let config = InitialPeersConfig {
1271            local: true,
1272            ignore_cache: true,
1273            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1274            addrs: vec![valid_addr.clone()],
1275            ..Default::default()
1276        };
1277        let mut config =
1278            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1279        config.disable_env_peers = true;
1280
1281        config.initial_peers.push(invalid_addr);
1282
1283        let mut flow = Bootstrap::new(config).await.unwrap();
1284
1285        let first = expect_next_addr(&mut flow).await.unwrap();
1286        assert_eq!(first, valid_addr, "Should get the valid address");
1287
1288        let err = expect_err(&mut flow).await;
1289        assert!(
1290            matches!(err, Error::NoBootstrapPeersFound),
1291            "Should not find any more peers after valid one (invalid addr was filtered)"
1292        );
1293    }
1294
1295    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1296    async fn test_local_network_skips_contacts() {
1297        let _env_guard = env_lock().await;
1298
1299        let mock_server = MockServer::start().await;
1300        Mock::given(method("GET"))
1301            .and(path("/should-not-be-called"))
1302            .respond_with(ResponseTemplate::new(200).set_body_string(
1303                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1304            ))
1305            .expect(0)
1306            .mount(&mock_server)
1307            .await;
1308
1309        let temp_dir = TempDir::new().unwrap();
1310
1311        let config = InitialPeersConfig {
1312            local: true,
1313            ignore_cache: true,
1314            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1315            network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1316            addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1317            ..Default::default()
1318        };
1319        let mut config =
1320            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1321        config.disable_env_peers = true;
1322
1323        let addr_from_config = config.initial_peers[0].clone();
1324        let mut flow = Bootstrap::new(config).await.unwrap();
1325
1326        let first = expect_next_addr(&mut flow).await.unwrap();
1327        assert_eq!(
1328            first, addr_from_config,
1329            "Should get the address from config"
1330        );
1331
1332        let err = expect_err(&mut flow).await;
1333        assert!(matches!(err, Error::NoBootstrapPeersFound));
1334
1335        assert!(
1336            mock_server.received_requests().await.unwrap().is_empty(),
1337            "local flag should prevent contact fetches"
1338        );
1339    }
1340
1341    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1342    async fn test_timeout_with_no_addresses() {
1343        let _env_guard = env_lock().await;
1344
1345        let temp_dir = TempDir::new().unwrap();
1346        let file_name = BootstrapCacheStore::cache_file_name(true);
1347        let cache_data = CacheData::default();
1348        cache_data
1349            .write_to_file(temp_dir.path(), &file_name)
1350            .unwrap();
1351
1352        let config = InitialPeersConfig {
1353            local: true,
1354            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1355            ..Default::default()
1356        };
1357        let mut config =
1358            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1359        config.disable_env_peers = true;
1360
1361        let result = Bootstrap::new(config).await;
1362
1363        assert!(
1364            result.is_err(),
1365            "Should error when no addresses are available from any source"
1366        );
1367        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1368    }
1369
1370    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1371    async fn test_first_node_clears_cache() {
1372        let _env_guard = env_lock().await;
1373
1374        let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1375        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1376
1377        let temp_dir = TempDir::new().unwrap();
1378        let file_name = BootstrapCacheStore::cache_file_name(false);
1379
1380        let mut cache_data = CacheData::default();
1381        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1382        cache_data
1383            .write_to_file(temp_dir.path(), &file_name)
1384            .unwrap();
1385
1386        let file_path = temp_dir.path().join(format!(
1387            "version_{}/{}",
1388            CacheData::CACHE_DATA_VERSION,
1389            file_name
1390        ));
1391
1392        let contents_before = std::fs::read_to_string(&file_path).unwrap();
1393        assert!(
1394            contents_before.contains(&cache_addr.to_string()),
1395            "Cache should contain the address before initialization"
1396        );
1397
1398        let config = InitialPeersConfig {
1399            first: true,
1400            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1401            ..Default::default()
1402        };
1403        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1404        let _flow = Bootstrap::new(config).await.unwrap();
1405
1406        tokio::time::sleep(Duration::from_millis(100)).await;
1407
1408        let contents_after = std::fs::read_to_string(&file_path).unwrap();
1409        assert!(
1410            !contents_after.contains(&cache_addr.to_string()),
1411            "Cache should be cleared for first node"
1412        );
1413    }
1414
1415    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1416    async fn test_new_loads_at_least_50_contacts() {
1417        let _env_guard = env_lock().await;
1418
1419        let temp_dir = TempDir::new().unwrap();
1420        let file_name = BootstrapCacheStore::cache_file_name(true);
1421
1422        let mut cache_data = CacheData::default();
1423        for i in 0..60 {
1424            let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1425            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1426            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1427        }
1428        cache_data
1429            .write_to_file(temp_dir.path(), &file_name)
1430            .unwrap();
1431
1432        let config = InitialPeersConfig {
1433            local: true,
1434            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1435            ..Default::default()
1436        };
1437        let mut config =
1438            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1439        config.disable_env_peers = true;
1440
1441        let result = Bootstrap::new(config).await;
1442
1443        assert!(
1444            result.is_ok(),
1445            "Should successfully initialize with 60 contacts in cache"
1446        );
1447
1448        let mut flow = result.unwrap();
1449        let mut count = 0;
1450        while let Ok(Some(_addr)) = flow.next_addr() {
1451            count += 1;
1452            if count >= 60 {
1453                break;
1454            }
1455        }
1456
1457        assert!(
1458            count > 0,
1459            "Should have loaded contacts from cache, got {count}"
1460        );
1461    }
1462
1463    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1464    async fn test_new_succeeds_with_few_contacts() {
1465        let _env_guard = env_lock().await;
1466
1467        let temp_dir = TempDir::new().unwrap();
1468        let file_name = BootstrapCacheStore::cache_file_name(true);
1469
1470        let mut cache_data = CacheData::default();
1471        for i in 0..5 {
1472            let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1473            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1474            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1475        }
1476        cache_data
1477            .write_to_file(temp_dir.path(), &file_name)
1478            .unwrap();
1479
1480        let config = InitialPeersConfig {
1481            local: true,
1482            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1483            ..Default::default()
1484        };
1485        let mut config =
1486            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1487        config.disable_env_peers = true;
1488
1489        let result = Bootstrap::new(config).await;
1490        assert!(
1491            result.is_ok(),
1492            "Should succeed with few contacts (< 50 but > 0)"
1493        );
1494
1495        let mut flow = result.unwrap();
1496        let mut count = 0;
1497        while let Ok(Some(_addr)) = flow.next_addr() {
1498            count += 1;
1499            if count >= 10 {
1500                break;
1501            }
1502        }
1503
1504        assert_eq!(count, 5, "Should have exactly 5 contacts");
1505    }
1506
1507    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1508    async fn test_new_errors_with_zero_contacts() {
1509        let _env_guard = env_lock().await;
1510
1511        let temp_dir = TempDir::new().unwrap();
1512        let file_name = BootstrapCacheStore::cache_file_name(false);
1513        let cache_data = CacheData::default();
1514        cache_data
1515            .write_to_file(temp_dir.path(), &file_name)
1516            .unwrap();
1517
1518        let mock_server = MockServer::start().await;
1519        Mock::given(method("GET"))
1520            .and(path("/failing-endpoint"))
1521            .respond_with(ResponseTemplate::new(500))
1522            .expect(1..)
1523            .mount(&mock_server)
1524            .await;
1525
1526        let config = InitialPeersConfig {
1527            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1528            network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1529            ..Default::default()
1530        };
1531        let mut config =
1532            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1533        config.disable_env_peers = true;
1534
1535        let result = Bootstrap::new(config).await;
1536
1537        assert!(
1538            result.is_err(),
1539            "Should error when all sources fail and no contacts are available"
1540        );
1541        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1542    }
1543}