ant_bootstrap/
bootstrap.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::ANT_PEERS_ENV;
10use crate::BootstrapCacheStore;
11use crate::BootstrapConfig;
12use crate::ContactsFetcher;
13use crate::Result;
14use crate::contacts_fetcher::ALPHANET_CONTACTS;
15use crate::contacts_fetcher::MAINNET_CONTACTS;
16use crate::craft_valid_multiaddr;
17use crate::craft_valid_multiaddr_from_str;
18use crate::error::Error;
19use crate::multiaddr_get_peer_id;
20use ant_protocol::version::ALPHANET_ID;
21use ant_protocol::version::MAINNET_ID;
22use ant_protocol::version::get_network_id;
23use libp2p::{
24    Multiaddr, PeerId, Swarm,
25    core::connection::ConnectedPoint,
26    multiaddr::Protocol,
27    swarm::{
28        DialError, NetworkBehaviour,
29        dial_opts::{DialOpts, PeerCondition},
30    },
31};
32use std::collections::{HashSet, VecDeque};
33use std::time::Duration;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::sync::mpsc::UnboundedSender;
36use url::Url;
37
38/// Timeout for individual fetch operations
39const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
40
41/// Minimum number of initial addresses to fetch before returning from `new()`
42const MIN_INITIAL_ADDRS: usize = 5;
43
44/// Timeout in seconds to wait for initial addresses during bootstrap initialization
45const INITIAL_ADDR_FETCH_TIMEOUT_SECS: u64 = 30;
46
47/// Manages the flow of obtaining bootstrap peer addresses from various sources and also writes to the bootstrap cache.
48///
49/// The sources are tried in the following order while reading:
50/// 1. Environment variable `ANT_PEERS`
51/// 2. Command-line provided addresses
52/// 3. Bootstrap cache file on disk
53/// 4. Network contacts endpoints
54///
55/// Addresses are returned one at a time via the `next_addr` method.
56/// It handles asynchronous fetching from the cache and contacts endpoints,
57/// ensuring only one fetch is in progress at a time.
58///
59/// If no more addresses are available from any source, `next_addr` returns an error.
60/// It is expected that the caller will retry `next_addr` later to allow
61/// for asynchronous fetches to complete.
62#[derive(custom_debug::Debug)]
63pub struct Bootstrap {
64    cache_store: BootstrapCacheStore,
65    addrs: VecDeque<Multiaddr>,
66    // The task responsible for syncing the cache, this is aborted on drop.
67    #[debug(skip)]
68    cache_task: Option<tokio::task::JoinHandle<()>>,
69    // fetcher
70    cache_pending: bool,
71    contacts_progress: Option<ContactsProgress>,
72    event_tx: UnboundedSender<FetchEvent>,
73    event_rx: UnboundedReceiver<FetchEvent>,
74    fetch_in_progress: Option<FetchKind>,
75    // dialer
76    ongoing_dials: HashSet<Multiaddr>,
77    bootstrap_peer_ids: HashSet<PeerId>,
78    bootstrap_completed: bool,
79}
80
81impl Bootstrap {
82    pub async fn new(mut config: BootstrapConfig) -> Result<Self> {
83        let contacts_progress = Self::build_contacts_progress(&config)?;
84
85        let mut addrs_queue = VecDeque::new();
86        let mut bootstrap_peer_ids = HashSet::new();
87        if !config.first {
88            if !config.disable_env_peers {
89                for addr in Self::fetch_from_env() {
90                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
91                }
92            } else {
93                info!("Skipping ANT_PEERS environment variable as per configuration");
94            }
95
96            for addr in config.initial_peers.drain(..) {
97                if let Some(addr) = craft_valid_multiaddr(&addr, false) {
98                    info!("Adding addr from arguments: {addr}");
99                    Self::push_addr(&mut addrs_queue, &mut bootstrap_peer_ids, addr);
100                } else {
101                    warn!("Invalid multiaddress format from arguments: {addr}");
102                }
103            }
104        }
105
106        let cache_pending = !config.first && !config.disable_cache_reading;
107        if !cache_pending {
108            info!(
109                "Not loading from cache as per configuration (first={}, disable_cache_reading={})",
110                config.first, config.disable_cache_reading
111            );
112        } else {
113            info!("Cache loading is enabled - cache will be fetched if needed");
114        }
115        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
116
117        let cache_store = BootstrapCacheStore::new(config.clone())?;
118
119        let mut bootstrap = Self {
120            cache_store,
121            addrs: addrs_queue,
122            cache_pending,
123            contacts_progress,
124            event_tx,
125            event_rx,
126            fetch_in_progress: None,
127            ongoing_dials: HashSet::new(),
128            bootstrap_peer_ids,
129            bootstrap_completed: config.first,
130            cache_task: None,
131        };
132
133        info!("Cache store is initialized and will sync and flush periodically");
134        let cache_task = bootstrap.cache_store.sync_and_flush_periodically();
135        bootstrap.cache_task = Some(cache_task);
136
137        if config.first {
138            info!("First node in network; clearing any existing cache");
139            bootstrap.cache_store.write().await?;
140            return Ok(bootstrap);
141        }
142
143        // ensure the initial queue is not empty by fetching from cache/contacts if needed
144        //
145        // not required for 'first' node
146        let mut collected_addrs = Vec::new();
147        if bootstrap.addrs.len() < MIN_INITIAL_ADDRS {
148            info!("Initial address queue < {MIN_INITIAL_ADDRS}; fetching from cache/contacts");
149            let now = std::time::Instant::now();
150            loop {
151                match bootstrap.next_addr() {
152                    Ok(Some(addr)) => {
153                        collected_addrs.push(addr);
154                        if Self::try_finalize_initial_addrs(
155                            &mut bootstrap,
156                            &mut collected_addrs,
157                            MIN_INITIAL_ADDRS,
158                        ) {
159                            break;
160                        }
161                        continue;
162                    }
163                    Ok(None) => {
164                        debug!(
165                            "No immediate address available; waiting for async fetch to complete"
166                        );
167                    }
168                    Err(err) => {
169                        if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1)
170                        {
171                            break;
172                        }
173                        warn!("Failed to fetch initial address: {err}");
174                        return Err(err);
175                    }
176                }
177
178                if now.elapsed() > std::time::Duration::from_secs(INITIAL_ADDR_FETCH_TIMEOUT_SECS) {
179                    if Self::try_finalize_initial_addrs(&mut bootstrap, &mut collected_addrs, 1) {
180                        break;
181                    }
182                    error!("Timed out waiting for initial addresses. ");
183                    return Err(Error::NoBootstrapPeersFound);
184                }
185                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
186            }
187        }
188
189        Ok(bootstrap)
190    }
191
192    /// Attempts to finalize the initial address collection by extending the bootstrap with collected addresses.
193    /// Returns `true` if addresses were successfully added and initialization should complete.
194    /// Returns `false` if no addresses are available yet.
195    fn try_finalize_initial_addrs(
196        bootstrap: &mut Bootstrap,
197        collected_addrs: &mut Vec<Multiaddr>,
198        min_address: usize,
199    ) -> bool {
200        if collected_addrs.len() < min_address {
201            return false;
202        }
203        info!(
204            "Collected minimum required initial addresses ({}), proceeding with bootstrap.",
205            collected_addrs.len()
206        );
207        bootstrap.extend_with_addrs(std::mem::take(collected_addrs));
208        true
209    }
210
211    /// Returns the next address from the sources. Returns `Ok(None)` if we are waiting for a source to return more
212    /// addresses.
213    /// Error if we have exhausted all sources and have no more addresses to return.
214    ///
215    /// This does not start any dial attempts, it returns the next address to dial.
216    /// Use `trigger_bootstrapping_process` to start auto manage the whole bootstrapping process.
217    pub fn next_addr(&mut self) -> Result<Option<Multiaddr>> {
218        loop {
219            self.process_events();
220
221            if let Some(addr) = self.addrs.pop_front() {
222                info!(?addr, "next_addr returning queued address");
223                return Ok(Some(addr));
224            }
225
226            if self.fetch_in_progress.is_some() {
227                debug!("next_addr waiting for in-flight fetch result");
228                return Ok(None);
229            }
230
231            if self.cache_pending && !matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
232                info!("next_addr triggering cache fetch");
233                self.start_cache_fetch()?;
234                continue;
235            }
236
237            if self.contacts_progress.is_some()
238                && !matches!(self.fetch_in_progress, Some(FetchKind::Contacts))
239            {
240                info!("next_addr triggering contacts fetch");
241                self.start_contacts_fetch()?;
242                if self.fetch_in_progress.is_some() {
243                    return Ok(None);
244                }
245                continue;
246            }
247
248            info!("next_addr exhausted all address sources");
249            return Err(Error::NoBootstrapPeersFound);
250        }
251    }
252
253    fn process_events(&mut self) {
254        while let Ok(event) = self.event_rx.try_recv() {
255            match event {
256                FetchEvent::Cache(addrs) => {
257                    info!(count = addrs.len(), "process_events received cache batch");
258                    if addrs.is_empty() {
259                        info!("No addresses retrieved from cache");
260                    } else {
261                        self.extend_with_addrs(addrs);
262                    }
263                }
264                FetchEvent::Contacts(addrs) => {
265                    info!(
266                        count = addrs.len(),
267                        "process_events received contacts batch"
268                    );
269                    self.extend_with_addrs(addrs);
270                    if self
271                        .contacts_progress
272                        .as_ref()
273                        .is_none_or(ContactsProgress::is_empty)
274                    {
275                        self.contacts_progress = None;
276                    }
277                }
278            }
279
280            self.fetch_in_progress = None;
281        }
282    }
283
284    fn extend_with_addrs(&mut self, addrs: Vec<Multiaddr>) {
285        if addrs.is_empty() {
286            return;
287        }
288        for addr in addrs {
289            Self::push_addr(&mut self.addrs, &mut self.bootstrap_peer_ids, addr);
290        }
291    }
292
293    fn push_addr(queue: &mut VecDeque<Multiaddr>, peer_ids: &mut HashSet<PeerId>, addr: Multiaddr) {
294        if let Some(peer_id) = multiaddr_get_peer_id(&addr) {
295            peer_ids.insert(peer_id);
296        }
297        queue.push_back(addr);
298    }
299
300    fn pop_p2p(addr: &mut Multiaddr) -> Option<PeerId> {
301        if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
302            let _ = addr.pop();
303            Some(peer_id)
304        } else {
305            None
306        }
307    }
308
309    fn try_next_dial_addr(&mut self) -> Result<Option<Multiaddr>> {
310        match self.next_addr() {
311            Ok(Some(addr)) => Ok(Some(addr)),
312            Ok(None) => Ok(None),
313            Err(Error::NoBootstrapPeersFound) => {
314                self.bootstrap_completed = true;
315                Err(Error::NoBootstrapPeersFound)
316            }
317            Err(err) => Err(err),
318        }
319    }
320
321    fn should_continue_bootstrapping(&mut self, contacted_peers: usize) -> bool {
322        if self.bootstrap_completed {
323            info!("Initial bootstrap process has already completed successfully.");
324            return false;
325        }
326
327        if contacted_peers
328            >= self
329                .cache_store
330                .config()
331                .max_contacted_peers_before_termination
332        {
333            self.bootstrap_completed = true;
334            self.addrs.clear();
335            self.ongoing_dials.clear();
336
337            info!(
338                "Initial bootstrap process completed successfully. We have {contacted_peers} peers in the routing table."
339            );
340
341            return false;
342        }
343
344        if self.ongoing_dials.len() >= self.cache_store.config().max_concurrent_dials {
345            info!(
346                "Initial bootstrap has {} ongoing dials. Not dialing anymore.",
347                self.ongoing_dials.len()
348            );
349
350            return false;
351        }
352
353        if self.addrs.is_empty() {
354            if self.cache_pending
355                || self.contacts_progress.is_some()
356                || self.fetch_in_progress.is_some()
357            {
358                info!("Initial bootstrap is awaiting additional addresses to arrive.");
359                return false;
360            }
361            info!(
362                "We have {contacted_peers} peers in RT, but no more addresses to dial. Stopping initial bootstrap."
363            );
364
365            self.bootstrap_completed = true;
366            return false;
367        }
368
369        true
370    }
371
372    pub fn trigger_bootstrapping_process<B: NetworkBehaviour>(
373        &mut self,
374        swarm: &mut Swarm<B>,
375        contacted_peers: usize,
376    ) {
377        if !self.should_continue_bootstrapping(contacted_peers) {
378            return;
379        }
380
381        while self.ongoing_dials.len() < self.cache_store.config().max_concurrent_dials {
382            match self.try_next_dial_addr() {
383                Ok(Some(mut addr)) => {
384                    let addr_clone = addr.clone();
385                    let peer_id = Self::pop_p2p(&mut addr);
386
387                    let opts = match peer_id {
388                        Some(peer_id) => DialOpts::peer_id(peer_id)
389                            .condition(PeerCondition::NotDialing)
390                            .addresses(vec![addr])
391                            .build(),
392                        None => DialOpts::unknown_peer_id().address(addr).build(),
393                    };
394
395                    info!("Trying to dial peer with address: {addr_clone}");
396
397                    match swarm.dial(opts) {
398                        Ok(()) => {
399                            info!(
400                                "Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}",
401                                self.ongoing_dials.len() + 1
402                            );
403                            let _ = self.ongoing_dials.insert(addr_clone);
404                        }
405                        Err(err) => match err {
406                            DialError::LocalPeerId { .. } => {
407                                warn!(
408                                    "Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer"
409                                );
410                            }
411                            DialError::NoAddresses => {
412                                error!(
413                                    "Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer"
414                                );
415                            }
416                            DialError::DialPeerConditionFalse(_) => {
417                                warn!(
418                                    "We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless."
419                                );
420                            }
421                            DialError::Aborted => {
422                                error!(
423                                    "Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer."
424                                );
425                            }
426                            DialError::WrongPeerId { obtained, .. } => {
427                                error!(
428                                    "The peer identity obtained on the connection did not match the one that was expected. Obtained: {obtained}. Dialing the next peer."
429                                );
430                            }
431                            DialError::Denied { cause } => {
432                                error!(
433                                    "The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer."
434                                );
435                            }
436                            DialError::Transport(items) => {
437                                error!(
438                                    "Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer."
439                                );
440                            }
441                        },
442                    }
443                }
444                Ok(None) => {
445                    debug!("Waiting for additional bootstrap addresses before continuing to dial");
446                    break;
447                }
448                Err(Error::NoBootstrapPeersFound) => {
449                    info!("No more bootstrap peers available to dial.");
450                    break;
451                }
452                Err(err) => {
453                    warn!("Failed to obtain next bootstrap address: {err}");
454                    break;
455                }
456            }
457        }
458    }
459
460    pub fn on_connection_established<B: NetworkBehaviour>(
461        &mut self,
462        peer_id: &PeerId,
463        endpoint: &ConnectedPoint,
464        swarm: &mut Swarm<B>,
465        contacted_peers: usize,
466    ) {
467        if self.bootstrap_completed {
468            return;
469        }
470
471        if let ConnectedPoint::Dialer { address, .. } = endpoint
472            && !self.ongoing_dials.remove(address)
473        {
474            self.ongoing_dials
475                .retain(|addr| match multiaddr_get_peer_id(addr) {
476                    Some(id) => id != *peer_id,
477                    None => true,
478                });
479        }
480
481        self.trigger_bootstrapping_process(swarm, contacted_peers);
482    }
483
484    pub fn on_outgoing_connection_error<B: NetworkBehaviour>(
485        &mut self,
486        peer_id: Option<PeerId>,
487        swarm: &mut Swarm<B>,
488        contacted_peers: usize,
489    ) {
490        if self.bootstrap_completed {
491            return;
492        }
493
494        match peer_id {
495            Some(peer_id) => {
496                self.ongoing_dials.retain(|addr| {
497                    if let Some(id) = multiaddr_get_peer_id(addr) {
498                        id != peer_id
499                    } else {
500                        true
501                    }
502                });
503            }
504            None => {
505                // we are left with no option but to remove all the addresses from the ongoing dials that
506                // do not have a peer ID.
507                self.ongoing_dials
508                    .retain(|addr| multiaddr_get_peer_id(addr).is_some());
509            }
510        }
511
512        self.trigger_bootstrapping_process(swarm, contacted_peers);
513    }
514
515    pub fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
516        self.bootstrap_peer_ids.contains(peer_id)
517    }
518
519    pub fn has_terminated(&self) -> bool {
520        self.bootstrap_completed
521    }
522
523    fn start_cache_fetch(&mut self) -> Result<()> {
524        if matches!(self.fetch_in_progress, Some(FetchKind::Cache)) {
525            error!("Cache fetch already in progress, not starting another");
526            return Ok(());
527        }
528
529        self.cache_pending = false;
530        let config = self.cache_store.config().clone();
531        let event_tx = self.event_tx.clone();
532
533        tokio::spawn(async move {
534            let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async move {
535                tokio::task::spawn_blocking(move || BootstrapCacheStore::load_cache_data(&config))
536                    .await
537            })
538            .await;
539
540            let addrs = match fetch_result {
541                Ok(spawn_result) => match spawn_result {
542                    Ok(Ok(cache_data)) => cache_data.get_all_addrs().cloned().collect(),
543                    Ok(Err(err)) => {
544                        warn!("Failed to load cache data: {err}");
545                        Vec::new()
546                    }
547                    Err(err) => {
548                        warn!("Cache fetch task failed to join: {err}");
549                        Vec::new()
550                    }
551                },
552                Err(_) => {
553                    warn!(
554                        "Cache fetch timed out after {} seconds",
555                        FETCH_TIMEOUT.as_secs()
556                    );
557                    Vec::new()
558                }
559            };
560
561            info!(
562                "Bootstrap cache loaded from disk with {} addresses",
563                addrs.len()
564            );
565            if let Err(err) = event_tx.send(FetchEvent::Cache(addrs)) {
566                error!("Failed to send cache fetch event: {err:?}");
567            }
568        });
569
570        self.fetch_in_progress = Some(FetchKind::Cache);
571
572        Ok(())
573    }
574
575    fn start_contacts_fetch(&mut self) -> Result<()> {
576        if matches!(self.fetch_in_progress, Some(FetchKind::Contacts)) {
577            error!("Contacts fetch already in progress, not starting another");
578            return Ok(());
579        }
580
581        let Some(progress) = self.contacts_progress.as_mut() else {
582            info!("No contacts progress available");
583            return Ok(());
584        };
585
586        let Some(endpoint) = progress.next_endpoint() else {
587            info!("No more contacts endpoints to try");
588            self.contacts_progress = None;
589            return Ok(());
590        };
591
592        let event_tx = self.event_tx.clone();
593
594        tokio::spawn(async move {
595            let fetch_result = tokio::time::timeout(FETCH_TIMEOUT, async {
596                let fetcher = ContactsFetcher::with_endpoints(vec![endpoint.clone()])?;
597                fetcher.fetch_bootstrap_addresses().await
598            })
599            .await;
600
601            let addrs = match fetch_result {
602                Ok(Ok(addrs)) => addrs,
603                Ok(Err(err)) => {
604                    warn!("Failed to fetch contacts from {endpoint}: {err}");
605                    Vec::new()
606                }
607                Err(_) => {
608                    warn!(
609                        "Contacts fetch from {endpoint} timed out after {} seconds",
610                        FETCH_TIMEOUT.as_secs()
611                    );
612                    Vec::new()
613                }
614            };
615
616            info!(
617                "Contacts fetch completed from endpoint {endpoint:?} with {} addresses",
618                addrs.len()
619            );
620            if let Err(err) = event_tx.send(FetchEvent::Contacts(addrs)) {
621                error!("Failed to send contacts fetch event: {err:?}");
622            }
623        });
624
625        self.fetch_in_progress = Some(FetchKind::Contacts);
626
627        Ok(())
628    }
629
630    fn build_contacts_progress(config: &BootstrapConfig) -> Result<Option<ContactsProgress>> {
631        if config.first {
632            info!("First node in network; not fetching contacts");
633            return Ok(None);
634        }
635
636        if config.local {
637            info!("Local network configuration; skipping contacts endpoints");
638            return Ok(None);
639        }
640
641        if !config.network_contacts_url.is_empty() {
642            let endpoints = config
643                .network_contacts_url
644                .iter()
645                .map(|endpoint| endpoint.parse::<Url>().map_err(|_| Error::FailedToParseUrl))
646                .collect::<Result<Vec<_>>>()?;
647            info!("Using provided contacts endpoints: {endpoints:?}");
648            return Ok(ContactsProgress::new(endpoints));
649        }
650
651        match get_network_id() {
652            id if id == MAINNET_ID => {
653                info!("Using built-in mainnet contacts endpoints");
654                Ok(ContactsProgress::from_static(MAINNET_CONTACTS))
655            }
656
657            id if id == ALPHANET_ID => {
658                info!("Using built-in alphanet contacts endpoints");
659                Ok(ContactsProgress::from_static(ALPHANET_CONTACTS))
660            }
661            _ => Ok(None),
662        }
663    }
664
665    pub fn fetch_from_env() -> Vec<Multiaddr> {
666        let mut bootstrap_addresses = Vec::new();
667        // Read from ANT_PEERS environment variable if present
668        if let Ok(addrs) = std::env::var(ANT_PEERS_ENV) {
669            for addr_str in addrs.split(',') {
670                if let Some(addr) = craft_valid_multiaddr_from_str(addr_str, false) {
671                    info!("Adding addr from environment variable: {addr}");
672                    bootstrap_addresses.push(addr);
673                } else {
674                    warn!("Invalid multiaddress format from environment variable: {addr_str}");
675                }
676            }
677        }
678        bootstrap_addresses
679    }
680
681    pub fn cache_store_mut(&mut self) -> &mut BootstrapCacheStore {
682        &mut self.cache_store
683    }
684
685    pub fn cache_store(&self) -> &BootstrapCacheStore {
686        &self.cache_store
687    }
688}
689
690impl Drop for Bootstrap {
691    fn drop(&mut self) {
692        if let Some(cache_task) = self.cache_task.take() {
693            cache_task.abort();
694        }
695    }
696}
697
698#[derive(Debug)]
699struct ContactsProgress {
700    remaining: VecDeque<Url>,
701}
702
703enum FetchEvent {
704    Cache(Vec<Multiaddr>),
705    Contacts(Vec<Multiaddr>),
706}
707
708#[derive(Clone, Copy, Debug, PartialEq, Eq)]
709enum FetchKind {
710    Cache,
711    Contacts,
712}
713
714impl ContactsProgress {
715    fn new(urls: Vec<Url>) -> Option<Self> {
716        if urls.is_empty() {
717            None
718        } else {
719            Some(Self {
720                remaining: VecDeque::from(urls),
721            })
722        }
723    }
724
725    fn from_static(urls: &[&str]) -> Option<Self> {
726        let mut parsed = Vec::new();
727        for url in urls {
728            match url.parse::<Url>() {
729                Ok(parsed_url) => parsed.push(parsed_url),
730                Err(err) => {
731                    warn!("Failed to parse static contacts URL {url}: {err}");
732                }
733            }
734        }
735        Self::new(parsed)
736    }
737
738    fn next_endpoint(&mut self) -> Option<Url> {
739        self.remaining.pop_front()
740    }
741
742    fn is_empty(&self) -> bool {
743        self.remaining.is_empty()
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use super::*;
750    use crate::{
751        InitialPeersConfig,
752        cache_store::{BootstrapCacheStore, cache_data_v1::CacheData},
753        multiaddr_get_peer_id,
754    };
755    use libp2p::Multiaddr;
756    use std::collections::HashSet;
757    use std::sync::{Arc, OnceLock};
758    use std::time::{Duration, Instant};
759    use tempfile::TempDir;
760    use tokio::sync::{Mutex, OwnedMutexGuard};
761    use tokio::time::sleep;
762    use wiremock::{
763        Mock, MockServer, ResponseTemplate,
764        matchers::{method, path},
765    };
766
767    async fn env_lock() -> OwnedMutexGuard<()> {
768        static ENV_MUTEX: OnceLock<Arc<Mutex<()>>> = OnceLock::new();
769        Arc::clone(ENV_MUTEX.get_or_init(|| Arc::new(Mutex::new(()))))
770            .lock_owned()
771            .await
772    }
773
774    #[allow(unsafe_code)]
775    fn set_env_var(key: &str, value: &str) {
776        unsafe {
777            std::env::set_var(key, value);
778        }
779    }
780
781    #[allow(unsafe_code)]
782    fn remove_env_var(key: &str) {
783        unsafe {
784            std::env::remove_var(key);
785        }
786    }
787
788    async fn expect_next_addr(flow: &mut Bootstrap) -> Result<Multiaddr> {
789        let deadline = Instant::now() + Duration::from_secs(2);
790        loop {
791            match flow.next_addr() {
792                Ok(Some(addr)) => return Ok(addr),
793                Ok(None) => {
794                    if Instant::now() >= deadline {
795                        panic!("Timed out waiting for next address");
796                    }
797                    sleep(Duration::from_millis(5)).await;
798                }
799                Err(err) => return Err(err),
800            }
801        }
802    }
803
804    async fn expect_err(flow: &mut Bootstrap) -> Error {
805        let deadline = Instant::now() + Duration::from_secs(2);
806        loop {
807            match flow.next_addr() {
808                Ok(Some(addr)) => panic!("unexpected address returned: {addr}"),
809                Ok(None) => {
810                    if Instant::now() >= deadline {
811                        panic!("Timed out waiting for error from flow");
812                    }
813                    sleep(Duration::from_millis(5)).await;
814                }
815                Err(err) => return err,
816            }
817        }
818    }
819
820    fn generate_valid_test_multiaddr(ip_third: u8, ip_fourth: u8, port: u16) -> Multiaddr {
821        let peer_id = libp2p::PeerId::random();
822        format!("/ip4/10.{ip_third}.{ip_fourth}.1/tcp/{port}/p2p/{peer_id}")
823            .parse()
824            .unwrap()
825    }
826
827    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
828    async fn test_cli_arguments_precedence() {
829        let env_addr: Multiaddr =
830            "/ip4/10.0.0.1/tcp/1200/p2p/12D3KooWQnE7zXkVUEGBnJtNfR88Ujz4ezgm6bVnkvxHCzhF7S5S"
831                .parse()
832                .unwrap();
833        let cli_addr: Multiaddr =
834            "/ip4/10.0.0.2/tcp/1201/p2p/12D3KooWQx2TSK7g1C8x3QK7gBqdqbQEkd6vDT7Pxu5gb1xmgjvp"
835                .parse()
836                .unwrap();
837
838        let _env_guard = env_lock().await;
839        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
840
841        let temp_dir = TempDir::new().unwrap();
842
843        let config = InitialPeersConfig {
844            ignore_cache: true,
845            local: true,
846            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
847            addrs: vec![cli_addr.clone()],
848            ..Default::default()
849        };
850        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
851        let mut flow = Bootstrap::new(config).await.unwrap();
852
853        let first_two = vec![
854            expect_next_addr(&mut flow).await.unwrap(),
855            expect_next_addr(&mut flow).await.unwrap(),
856        ];
857        let first_set: HashSet<_> = first_two.into_iter().collect();
858        let expected: HashSet<_> = [env_addr.clone(), cli_addr.clone()].into_iter().collect();
859        assert_eq!(first_set, expected);
860
861        let err = expect_err(&mut flow).await;
862        assert!(matches!(err, Error::NoBootstrapPeersFound));
863
864        remove_env_var(ANT_PEERS_ENV);
865    }
866
867    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
868    async fn test_env_variable_parsing() {
869        let _env_guard = env_lock().await;
870        set_env_var(
871            ANT_PEERS_ENV,
872            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE,\
873/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
874        );
875
876        let parsed = Bootstrap::fetch_from_env();
877        remove_env_var(ANT_PEERS_ENV);
878
879        assert_eq!(parsed.len(), 2);
880        let parsed_set: std::collections::HashSet<_> =
881            parsed.into_iter().map(|addr| addr.to_string()).collect();
882        let expected = std::collections::HashSet::from([
883            "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
884                .to_string(),
885            "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
886                .to_string(),
887        ]);
888        assert_eq!(parsed_set, expected);
889    }
890
891    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
892    async fn loads_addresses_from_cache_when_initial_queue_is_empty() {
893        let _env_guard = env_lock().await;
894        let cache_addr: Multiaddr =
895            "/ip4/127.0.0.1/tcp/1202/p2p/12D3KooWKGt8umjJQ4sDzFXo2UcHBaF33rqmFcWtXM6nbryL5G4J"
896                .parse()
897                .unwrap();
898        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
899
900        let temp_dir = TempDir::new().unwrap();
901        let file_name = BootstrapCacheStore::cache_file_name(true);
902
903        let mut cache_data = CacheData::default();
904        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
905        cache_data
906            .write_to_file(temp_dir.path(), &file_name)
907            .unwrap();
908
909        let config = InitialPeersConfig {
910            local: true,
911            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
912            ..Default::default()
913        };
914        let mut config =
915            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
916        config.disable_env_peers = true;
917        let mut flow = Bootstrap::new(config).await.unwrap();
918
919        let got = expect_next_addr(&mut flow).await.unwrap();
920        assert_eq!(got, cache_addr);
921
922        let err = expect_err(&mut flow).await;
923        assert!(matches!(err, Error::NoBootstrapPeersFound));
924    }
925
926    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
927    async fn test_first_flag_behavior() {
928        let _env_guard = env_lock().await;
929
930        let mock_server = MockServer::start().await;
931        Mock::given(method("GET"))
932            .and(path("/peers"))
933            .respond_with(ResponseTemplate::new(200).set_body_string(
934                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
935            ))
936            .expect(0)
937            .mount(&mock_server)
938            .await;
939
940        let temp_dir = TempDir::new().unwrap();
941        let config = InitialPeersConfig {
942                first: true,
943                addrs: vec![
944                "/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
945                    .parse()
946                    .unwrap(),
947                ],
948                network_contacts_url: vec![format!("{}/peers", mock_server.uri())],
949                bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
950                ..Default::default()
951            };
952        let mut config =
953            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
954        config.disable_env_peers = true;
955        let mut flow = Bootstrap::new(config).await.unwrap();
956
957        let err = expect_err(&mut flow).await;
958        assert!(matches!(err, Error::NoBootstrapPeersFound));
959        assert!(
960            mock_server.received_requests().await.unwrap().is_empty(),
961            "first flag should prevent contact fetches"
962        );
963    }
964
965    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
966    async fn test_multiple_network_contacts() {
967        let _env_guard = env_lock().await;
968
969        let mock_server = MockServer::start().await;
970
971        let contact_one: Multiaddr =
972            "/ip4/192.168.0.1/tcp/1203/p2p/12D3KooWPULWT1qXJ1jzYVtQocvKXgcv6U7Pp3ui3EB7mN8hXAsP"
973                .parse()
974                .unwrap();
975        let contact_two: Multiaddr =
976            "/ip4/192.168.0.2/tcp/1204/p2p/12D3KooWPsMPaEjaWjW6GWpAne6LYcwBQEJfnDbhQFNs6ytzmBn5"
977                .parse()
978                .unwrap();
979
980        Mock::given(method("GET"))
981            .and(path("/first"))
982            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
983            .expect(1)
984            .mount(&mock_server)
985            .await;
986
987        Mock::given(method("GET"))
988            .and(path("/second"))
989            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
990            .expect(1)
991            .mount(&mock_server)
992            .await;
993
994        let config = InitialPeersConfig {
995            ignore_cache: true,
996            network_contacts_url: vec![
997                format!("{}/first", mock_server.uri()),
998                format!("{}/second", mock_server.uri()),
999            ],
1000            ..Default::default()
1001        };
1002        let mut config =
1003            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1004        config.disable_env_peers = true;
1005        let mut flow = Bootstrap::new(config).await.unwrap();
1006
1007        let first = expect_next_addr(&mut flow).await.unwrap();
1008        assert_eq!(first, contact_one);
1009
1010        let second = expect_next_addr(&mut flow).await.unwrap();
1011        assert_eq!(second, contact_two);
1012
1013        let err = expect_err(&mut flow).await;
1014        assert!(matches!(err, Error::NoBootstrapPeersFound));
1015
1016        let requests = mock_server.received_requests().await.unwrap();
1017        assert_eq!(requests.len(), 2);
1018        assert_eq!(requests[0].url.path(), "/first");
1019        assert_eq!(requests[1].url.path(), "/second");
1020    }
1021
1022    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1023    async fn test_full_bootstrap_flow() {
1024        let _env_guard = env_lock().await;
1025        remove_env_var(ANT_PEERS_ENV);
1026
1027        let env_addr: Multiaddr =
1028            "/ip4/10.1.0.1/tcp/1300/p2p/12D3KooWBbtXX6gY5xPD7NzNGGbj2428NJQ4HNvvBnSE5g4R7Pkf"
1029                .parse()
1030                .unwrap();
1031        let cli_addr: Multiaddr =
1032            "/ip4/10.1.0.2/tcp/1301/p2p/12D3KooWCRfYwq9c3PAXo5cTp3snq72Knqukcec4c9qT1AMyvMPd"
1033                .parse()
1034                .unwrap();
1035        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1036
1037        let cache_addr_one: Multiaddr =
1038            "/ip4/10.1.0.3/tcp/1302/p2p/12D3KooWMmKJcWUP9UqP4g1n3LH1htkvSUStn1aQGQxGc1dQcYxA"
1039                .parse()
1040                .unwrap();
1041        let cache_addr_two: Multiaddr =
1042            "/ip4/10.1.0.4/tcp/1303/p2p/12D3KooWA4b4T6Dz4RUtqnYDEBt3eGkqRykGGBqBP3ZiZsaAJ2jp"
1043                .parse()
1044                .unwrap();
1045
1046        let temp_dir = TempDir::new().unwrap();
1047        let file_name = BootstrapCacheStore::cache_file_name(false);
1048        let mut cache_data = CacheData::default();
1049        cache_data.add_peer(
1050            multiaddr_get_peer_id(&cache_addr_one).unwrap(),
1051            std::iter::once(&cache_addr_one),
1052            3,
1053            10,
1054        );
1055        cache_data.add_peer(
1056            multiaddr_get_peer_id(&cache_addr_two).unwrap(),
1057            std::iter::once(&cache_addr_two),
1058            3,
1059            10,
1060        );
1061        cache_data
1062            .write_to_file(temp_dir.path(), &file_name)
1063            .unwrap();
1064
1065        let mock_server = MockServer::start().await;
1066        let contact_one: Multiaddr =
1067            "/ip4/10.1.0.5/tcp/1304/p2p/12D3KooWQGyiCWkmKvgFVF1PsvBLnBxG29BAsoAhH4m6qjUpBAk1"
1068                .parse()
1069                .unwrap();
1070        let contact_two: Multiaddr =
1071            "/ip4/10.1.0.6/tcp/1305/p2p/12D3KooWGpMibW82dManEXZDV4SSQSSHqzTeWY5Avzkdx6yrosNG"
1072                .parse()
1073                .unwrap();
1074
1075        Mock::given(method("GET"))
1076            .and(path("/contacts_one"))
1077            .respond_with(ResponseTemplate::new(200).set_body_string(contact_one.to_string()))
1078            .expect(1)
1079            .mount(&mock_server)
1080            .await;
1081
1082        Mock::given(method("GET"))
1083            .and(path("/contacts_two"))
1084            .respond_with(ResponseTemplate::new(200).set_body_string(contact_two.to_string()))
1085            .expect(1)
1086            .mount(&mock_server)
1087            .await;
1088
1089        let file_path = temp_dir.path().join(format!(
1090            "version_{}/{}",
1091            CacheData::CACHE_DATA_VERSION,
1092            file_name
1093        ));
1094        let contents = std::fs::read_to_string(&file_path).unwrap();
1095        assert!(contents.contains(&cache_addr_one.to_string()));
1096        assert!(contents.contains(&cache_addr_two.to_string()));
1097
1098        assert_eq!(
1099            Bootstrap::fetch_from_env(),
1100            vec![env_addr.clone()],
1101            "environment variable should yield the configured address"
1102        );
1103
1104        let config = InitialPeersConfig {
1105            addrs: vec![cli_addr.clone()],
1106            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1107            network_contacts_url: vec![
1108                format!("{}/contacts_one", mock_server.uri()),
1109                format!("{}/contacts_two", mock_server.uri()),
1110            ],
1111            ..Default::default()
1112        };
1113        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1114        let mut flow = Bootstrap::new(config).await.unwrap();
1115
1116        let initial_results = vec![
1117            expect_next_addr(&mut flow).await.unwrap(),
1118            expect_next_addr(&mut flow).await.unwrap(),
1119        ];
1120        let initial_set: HashSet<_> = initial_results.into_iter().collect();
1121        let expected_initial: HashSet<_> =
1122            [env_addr.clone(), cli_addr.clone()].into_iter().collect();
1123        assert_eq!(initial_set, expected_initial);
1124
1125        let cache_results = vec![
1126            expect_next_addr(&mut flow).await.unwrap(),
1127            expect_next_addr(&mut flow).await.unwrap(),
1128        ];
1129        let cache_set: HashSet<_> = cache_results.into_iter().collect();
1130        let expected_cache: HashSet<_> = [cache_addr_one.clone(), cache_addr_two.clone()]
1131            .into_iter()
1132            .collect();
1133        assert_eq!(cache_set, expected_cache);
1134
1135        let contact_first = expect_next_addr(&mut flow).await.unwrap();
1136        assert_eq!(contact_first, contact_one);
1137
1138        let contact_second = expect_next_addr(&mut flow).await.unwrap();
1139        assert_eq!(contact_second, contact_two);
1140
1141        let err = expect_err(&mut flow).await;
1142        assert!(matches!(err, Error::NoBootstrapPeersFound));
1143
1144        let requests = mock_server.received_requests().await.unwrap();
1145        assert_eq!(requests.len(), 2);
1146        assert_eq!(requests[0].url.path(), "/contacts_one");
1147        assert_eq!(requests[1].url.path(), "/contacts_two");
1148
1149        remove_env_var(ANT_PEERS_ENV);
1150    }
1151
1152    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1153    async fn test_disable_env_peers_flag() {
1154        let env_addr = generate_valid_test_multiaddr(2, 0, 2000);
1155
1156        let _env_guard = env_lock().await;
1157        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1158
1159        let temp_dir = TempDir::new().unwrap();
1160
1161        let config = InitialPeersConfig {
1162            local: true,
1163            ignore_cache: true,
1164            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1165            ..Default::default()
1166        };
1167        let mut config =
1168            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1169        config.disable_env_peers = true;
1170
1171        let result = Bootstrap::new(config).await;
1172        assert!(
1173            result.is_err(),
1174            "Should error when env peers are disabled and no other sources available"
1175        );
1176        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1177
1178        remove_env_var(ANT_PEERS_ENV);
1179    }
1180
1181    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1182    async fn test_disable_cache_reading_flag() {
1183        let _env_guard = env_lock().await;
1184
1185        let cache_addr = generate_valid_test_multiaddr(2, 0, 2001);
1186        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1187
1188        let temp_dir = TempDir::new().unwrap();
1189        let file_name = BootstrapCacheStore::cache_file_name(true);
1190
1191        let mut cache_data = CacheData::default();
1192        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1193        cache_data
1194            .write_to_file(temp_dir.path(), &file_name)
1195            .unwrap();
1196
1197        let config = InitialPeersConfig {
1198            local: true,
1199            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1200            ..Default::default()
1201        };
1202        let mut config =
1203            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1204        config.disable_env_peers = true;
1205        config.disable_cache_reading = true;
1206
1207        let result = Bootstrap::new(config).await;
1208        assert!(
1209            result.is_err(),
1210            "Should error when cache reading is disabled and no other sources available"
1211        );
1212        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1213    }
1214
1215    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1216    async fn test_bootstrap_completed_initialization() {
1217        let temp_dir = TempDir::new().unwrap();
1218
1219        let config = InitialPeersConfig {
1220            first: true,
1221            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1222            ..Default::default()
1223        };
1224        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1225        let flow = Bootstrap::new(config).await.unwrap();
1226
1227        assert!(
1228            flow.has_terminated(),
1229            "bootstrap_completed should be true for first node"
1230        );
1231
1232        let config = InitialPeersConfig {
1233            first: false,
1234            local: true,
1235            ignore_cache: true,
1236            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1237            addrs: vec![generate_valid_test_multiaddr(2, 0, 2002)],
1238            ..Default::default()
1239        };
1240        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1241        let flow = Bootstrap::new(config).await.unwrap();
1242
1243        assert!(
1244            !flow.has_terminated(),
1245            "bootstrap_completed should be false for non-first node"
1246        );
1247    }
1248
1249    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1250    async fn test_bootstrap_peer_ids_population() {
1251        let env_addr = generate_valid_test_multiaddr(2, 0, 2003);
1252        let cli_addr = generate_valid_test_multiaddr(2, 0, 2004);
1253
1254        let env_peer_id = multiaddr_get_peer_id(&env_addr).unwrap();
1255        let cli_peer_id = multiaddr_get_peer_id(&cli_addr).unwrap();
1256
1257        let _env_guard = env_lock().await;
1258        set_env_var(ANT_PEERS_ENV, &env_addr.to_string());
1259
1260        let temp_dir = TempDir::new().unwrap();
1261
1262        let config = InitialPeersConfig {
1263            local: true,
1264            ignore_cache: true,
1265            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1266            addrs: vec![cli_addr.clone()],
1267            ..Default::default()
1268        };
1269        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1270        let flow = Bootstrap::new(config).await.unwrap();
1271
1272        assert!(
1273            flow.is_bootstrap_peer(&env_peer_id),
1274            "Peer ID from env should be tracked"
1275        );
1276        assert!(
1277            flow.is_bootstrap_peer(&cli_peer_id),
1278            "Peer ID from CLI should be tracked"
1279        );
1280
1281        remove_env_var(ANT_PEERS_ENV);
1282    }
1283
1284    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1285    async fn test_invalid_multiaddr_in_initial_peers() {
1286        let _env_guard = env_lock().await;
1287
1288        let valid_addr = generate_valid_test_multiaddr(2, 0, 2005);
1289        let invalid_addr: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse().unwrap();
1290
1291        let temp_dir = TempDir::new().unwrap();
1292
1293        let config = InitialPeersConfig {
1294            local: true,
1295            ignore_cache: true,
1296            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1297            addrs: vec![valid_addr.clone()],
1298            ..Default::default()
1299        };
1300        let mut config =
1301            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1302        config.disable_env_peers = true;
1303
1304        config.initial_peers.push(invalid_addr);
1305
1306        let mut flow = Bootstrap::new(config).await.unwrap();
1307
1308        let first = expect_next_addr(&mut flow).await.unwrap();
1309        assert_eq!(first, valid_addr, "Should get the valid address");
1310
1311        let err = expect_err(&mut flow).await;
1312        assert!(
1313            matches!(err, Error::NoBootstrapPeersFound),
1314            "Should not find any more peers after valid one (invalid addr was filtered)"
1315        );
1316    }
1317
1318    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1319    async fn test_local_network_skips_contacts() {
1320        let _env_guard = env_lock().await;
1321
1322        let mock_server = MockServer::start().await;
1323        Mock::given(method("GET"))
1324            .and(path("/should-not-be-called"))
1325            .respond_with(ResponseTemplate::new(200).set_body_string(
1326                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
1327            ))
1328            .expect(0)
1329            .mount(&mock_server)
1330            .await;
1331
1332        let temp_dir = TempDir::new().unwrap();
1333
1334        let config = InitialPeersConfig {
1335            local: true,
1336            ignore_cache: true,
1337            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1338            network_contacts_url: vec![format!("{}/should-not-be-called", mock_server.uri())],
1339            addrs: vec![generate_valid_test_multiaddr(2, 0, 2006)],
1340            ..Default::default()
1341        };
1342        let mut config =
1343            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1344        config.disable_env_peers = true;
1345
1346        let addr_from_config = config.initial_peers[0].clone();
1347        let mut flow = Bootstrap::new(config).await.unwrap();
1348
1349        let first = expect_next_addr(&mut flow).await.unwrap();
1350        assert_eq!(
1351            first, addr_from_config,
1352            "Should get the address from config"
1353        );
1354
1355        let err = expect_err(&mut flow).await;
1356        assert!(matches!(err, Error::NoBootstrapPeersFound));
1357
1358        assert!(
1359            mock_server.received_requests().await.unwrap().is_empty(),
1360            "local flag should prevent contact fetches"
1361        );
1362    }
1363
1364    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1365    async fn test_timeout_with_no_addresses() {
1366        let _env_guard = env_lock().await;
1367
1368        let temp_dir = TempDir::new().unwrap();
1369        let file_name = BootstrapCacheStore::cache_file_name(true);
1370        let cache_data = CacheData::default();
1371        cache_data
1372            .write_to_file(temp_dir.path(), &file_name)
1373            .unwrap();
1374
1375        let config = InitialPeersConfig {
1376            local: true,
1377            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1378            ..Default::default()
1379        };
1380        let mut config =
1381            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1382        config.disable_env_peers = true;
1383
1384        let result = Bootstrap::new(config).await;
1385
1386        assert!(
1387            result.is_err(),
1388            "Should error when no addresses are available from any source"
1389        );
1390        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1391    }
1392
1393    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1394    async fn test_first_node_clears_cache() {
1395        let _env_guard = env_lock().await;
1396
1397        let cache_addr = generate_valid_test_multiaddr(2, 0, 2007);
1398        let peer_id = multiaddr_get_peer_id(&cache_addr).unwrap();
1399
1400        let temp_dir = TempDir::new().unwrap();
1401        let file_name = BootstrapCacheStore::cache_file_name(false);
1402
1403        let mut cache_data = CacheData::default();
1404        cache_data.add_peer(peer_id, std::iter::once(&cache_addr), 3, 10);
1405        cache_data
1406            .write_to_file(temp_dir.path(), &file_name)
1407            .unwrap();
1408
1409        let file_path = temp_dir.path().join(format!(
1410            "version_{}/{}",
1411            CacheData::CACHE_DATA_VERSION,
1412            file_name
1413        ));
1414
1415        let contents_before = std::fs::read_to_string(&file_path).unwrap();
1416        assert!(
1417            contents_before.contains(&cache_addr.to_string()),
1418            "Cache should contain the address before initialization"
1419        );
1420
1421        let config = InitialPeersConfig {
1422            first: true,
1423            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1424            ..Default::default()
1425        };
1426        let config = BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1427        let _flow = Bootstrap::new(config).await.unwrap();
1428
1429        tokio::time::sleep(Duration::from_millis(100)).await;
1430
1431        let contents_after = std::fs::read_to_string(&file_path).unwrap();
1432        assert!(
1433            !contents_after.contains(&cache_addr.to_string()),
1434            "Cache should be cleared for first node"
1435        );
1436    }
1437
1438    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1439    async fn test_new_loads_at_least_50_contacts() {
1440        let _env_guard = env_lock().await;
1441
1442        let temp_dir = TempDir::new().unwrap();
1443        let file_name = BootstrapCacheStore::cache_file_name(true);
1444
1445        let mut cache_data = CacheData::default();
1446        for i in 0..60 {
1447            let addr = generate_valid_test_multiaddr(3, i as u8, 3000 + i);
1448            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1449            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1450        }
1451        cache_data
1452            .write_to_file(temp_dir.path(), &file_name)
1453            .unwrap();
1454
1455        let config = InitialPeersConfig {
1456            local: true,
1457            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1458            ..Default::default()
1459        };
1460        let mut config =
1461            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1462        config.disable_env_peers = true;
1463
1464        let result = Bootstrap::new(config).await;
1465
1466        assert!(
1467            result.is_ok(),
1468            "Should successfully initialize with 60 contacts in cache"
1469        );
1470
1471        let mut flow = result.unwrap();
1472        let mut count = 0;
1473        while let Ok(Some(_addr)) = flow.next_addr() {
1474            count += 1;
1475            if count >= 60 {
1476                break;
1477            }
1478        }
1479
1480        assert!(
1481            count > 0,
1482            "Should have loaded contacts from cache, got {count}"
1483        );
1484    }
1485
1486    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1487    async fn test_new_succeeds_with_few_contacts() {
1488        let _env_guard = env_lock().await;
1489
1490        let temp_dir = TempDir::new().unwrap();
1491        let file_name = BootstrapCacheStore::cache_file_name(true);
1492
1493        let mut cache_data = CacheData::default();
1494        for i in 0..5 {
1495            let addr = generate_valid_test_multiaddr(4, i as u8, 4000 + i);
1496            let peer_id = multiaddr_get_peer_id(&addr).unwrap();
1497            cache_data.add_peer(peer_id, std::iter::once(&addr), 3, 10);
1498        }
1499        cache_data
1500            .write_to_file(temp_dir.path(), &file_name)
1501            .unwrap();
1502
1503        let config = InitialPeersConfig {
1504            local: true,
1505            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1506            ..Default::default()
1507        };
1508        let mut config =
1509            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1510        config.disable_env_peers = true;
1511
1512        let result = Bootstrap::new(config).await;
1513        assert!(
1514            result.is_ok(),
1515            "Should succeed with few contacts (< 50 but > 0)"
1516        );
1517
1518        let mut flow = result.unwrap();
1519        let mut count = 0;
1520        while let Ok(Some(_addr)) = flow.next_addr() {
1521            count += 1;
1522            if count >= 10 {
1523                break;
1524            }
1525        }
1526
1527        assert_eq!(count, 5, "Should have exactly 5 contacts");
1528    }
1529
1530    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1531    async fn test_new_errors_with_zero_contacts() {
1532        let _env_guard = env_lock().await;
1533
1534        let temp_dir = TempDir::new().unwrap();
1535        let file_name = BootstrapCacheStore::cache_file_name(false);
1536        let cache_data = CacheData::default();
1537        cache_data
1538            .write_to_file(temp_dir.path(), &file_name)
1539            .unwrap();
1540
1541        let mock_server = MockServer::start().await;
1542        Mock::given(method("GET"))
1543            .and(path("/failing-endpoint"))
1544            .respond_with(ResponseTemplate::new(500))
1545            .expect(1..)
1546            .mount(&mock_server)
1547            .await;
1548
1549        let config = InitialPeersConfig {
1550            bootstrap_cache_dir: Some(temp_dir.path().to_path_buf()),
1551            network_contacts_url: vec![format!("{}/failing-endpoint", mock_server.uri())],
1552            ..Default::default()
1553        };
1554        let mut config =
1555            BootstrapConfig::try_from(&config).expect("Failed to create BootstrapConfig");
1556        config.disable_env_peers = true;
1557
1558        let result = Bootstrap::new(config).await;
1559
1560        assert!(
1561            result.is_err(),
1562            "Should error when all sources fail and no contacts are available"
1563        );
1564        assert!(matches!(result.unwrap_err(), Error::NoBootstrapPeersFound));
1565    }
1566}