ant_bootstrap/
contacts_fetcher.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    Error, Result, cache_store::CACHE_DATA_VERSION_LATEST, craft_valid_multiaddr_from_str,
11};
12use futures::stream::{self, StreamExt};
13use libp2p::Multiaddr;
14use reqwest::Client;
15use std::time::Duration;
16use url::Url;
17
18const CONTACTS_CACHE_VERSION_HEADER: &str = "Cache-Version";
19
20pub const MAINNET_CONTACTS: &[&str] = &[
21    "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
22    "http://159.89.251.80/bootstrap_cache.json",
23    "http://159.65.210.89/bootstrap_cache.json",
24    "http://159.223.246.45/bootstrap_cache.json",
25    "http://139.59.201.153/bootstrap_cache.json",
26    "http://139.59.200.27/bootstrap_cache.json",
27];
28pub const ALPHANET_CONTACTS: &[&str] = &[
29    "http://188.166.133.208/bootstrap_cache.json",
30    "http://188.166.133.125/bootstrap_cache.json",
31    "http://178.128.137.64/bootstrap_cache.json",
32    "http://159.223.242.7/bootstrap_cache.json",
33    "http://143.244.197.147/bootstrap_cache.json",
34];
35
36/// The client fetch timeout
37const FETCH_TIMEOUT_SECS: u64 = 30;
38/// Maximum number of endpoints to fetch at a time
39const MAX_CONCURRENT_FETCHES: usize = 3;
40/// The max number of retries for an endpoint on failure.
41const MAX_RETRIES_ON_FETCH_FAILURE: usize = 2;
42
43/// Discovers initial peers from a list of endpoints
44pub struct ContactsFetcher {
45    /// The number of addrs to fetch
46    max_addrs: usize,
47    /// The list of endpoints
48    endpoints: Vec<Url>,
49    /// Reqwest Client
50    request_client: Client,
51    /// Ignore PeerId in the multiaddr if not present. This is only useful for fetching nat detection contacts
52    ignore_peer_id: bool,
53}
54
55impl ContactsFetcher {
56    /// Create a new struct with the default endpoint
57    pub fn new() -> Result<Self> {
58        Self::with_endpoints(vec![])
59    }
60
61    /// Create a new struct with the provided endpoints
62    pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
63        let request_client = Client::builder()
64            .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
65            .build()?;
66
67        Ok(Self {
68            max_addrs: usize::MAX,
69            endpoints,
70            request_client,
71            ignore_peer_id: false,
72        })
73    }
74
75    /// Set the number of addrs to fetch
76    pub fn set_max_addrs(&mut self, max_addrs: usize) {
77        self.max_addrs = max_addrs;
78    }
79
80    /// Create a new struct with the mainnet endpoints
81    pub fn with_mainnet_endpoints() -> Result<Self> {
82        let mut fetcher = Self::new()?;
83        #[allow(clippy::expect_used)]
84        let mainnet_contact = MAINNET_CONTACTS
85            .iter()
86            .map(|url| url.parse().expect("Failed to parse static URL"))
87            .collect();
88        fetcher.endpoints = mainnet_contact;
89        Ok(fetcher)
90    }
91
92    /// Create a new struct with the alphanet endpoints
93    pub fn with_alphanet_endpoints() -> Result<Self> {
94        let mut fetcher = Self::new()?;
95        #[allow(clippy::expect_used)]
96        let alphanet_contact = ALPHANET_CONTACTS
97            .iter()
98            .map(|url| url.parse().expect("Failed to parse static URL"))
99            .collect();
100        fetcher.endpoints = alphanet_contact;
101        Ok(fetcher)
102    }
103
104    pub fn insert_endpoint(&mut self, endpoint: Url) {
105        self.endpoints.push(endpoint);
106    }
107
108    pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
109        self.ignore_peer_id = ignore_peer_id;
110    }
111
112    /// Fetch the list of bootstrap multiaddrs from all configured endpoints
113    pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<Multiaddr>> {
114        info!(
115            "Starting peer fetcher from {} endpoints: {:?}",
116            self.endpoints.len(),
117            self.endpoints
118        );
119        let mut bootstrap_addresses = Vec::new();
120
121        let mut fetches = stream::iter(self.endpoints.clone())
122            .map(|endpoint| async move {
123                info!(
124                    "Attempting to fetch bootstrap addresses from endpoint: {}",
125                    endpoint
126                );
127                (
128                    Self::fetch_from_endpoint(
129                        self.request_client.clone(),
130                        &endpoint,
131                        self.ignore_peer_id,
132                    )
133                    .await,
134                    endpoint,
135                )
136            })
137            .buffer_unordered(MAX_CONCURRENT_FETCHES);
138
139        while let Some((result, endpoint)) = fetches.next().await {
140            match result {
141                Ok(mut endpoing_bootstrap_addresses) => {
142                    info!(
143                        "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
144                        endpoing_bootstrap_addresses.len(),
145                        endpoint,
146                        endpoing_bootstrap_addresses
147                            .iter()
148                            .take(3)
149                            .collect::<Vec<_>>()
150                    );
151                    bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
152                    if bootstrap_addresses.len() >= self.max_addrs {
153                        info!(
154                            "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
155                            self.max_addrs,
156                            bootstrap_addresses.len()
157                        );
158                        break;
159                    }
160                }
161                Err(e) => {
162                    warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
163                }
164            }
165        }
166
167        bootstrap_addresses.truncate(self.max_addrs);
168
169        info!(
170            "Successfully discovered {} total addresses. First few: {:?}",
171            bootstrap_addresses.len(),
172            bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
173        );
174        Ok(bootstrap_addresses)
175    }
176
177    /// Fetch the list of multiaddrs from a single endpoint
178    async fn fetch_from_endpoint(
179        request_client: Client,
180        endpoint: &Url,
181        ignore_peer_id: bool,
182    ) -> Result<Vec<Multiaddr>> {
183        let mut retries = 0;
184
185        let bootstrap_addresses = loop {
186            let response = request_client
187                .get(endpoint.clone())
188                .header(CONTACTS_CACHE_VERSION_HEADER, CACHE_DATA_VERSION_LATEST)
189                .send()
190                .await;
191
192            match response {
193                Ok(response) => {
194                    if response.status().is_success() {
195                        let text = response.text().await?;
196
197                        match Self::try_parse_response(&text, ignore_peer_id) {
198                            Ok(addrs) => break addrs,
199                            Err(err) => {
200                                warn!("Failed to parse response with err: {err:?}");
201                                retries += 1;
202                                if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
203                                    return Err(Error::FailedToObtainAddrsFromUrl(
204                                        endpoint.to_string(),
205                                        MAX_RETRIES_ON_FETCH_FAILURE,
206                                    ));
207                                }
208                            }
209                        }
210                    } else {
211                        retries += 1;
212                        if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
213                            return Err(Error::FailedToObtainAddrsFromUrl(
214                                endpoint.to_string(),
215                                MAX_RETRIES_ON_FETCH_FAILURE,
216                            ));
217                        }
218                    }
219                }
220                Err(err) => {
221                    error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
222                    retries += 1;
223                    if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
224                        return Err(Error::FailedToObtainAddrsFromUrl(
225                            endpoint.to_string(),
226                            MAX_RETRIES_ON_FETCH_FAILURE,
227                        ));
228                    }
229                }
230            }
231            debug!(
232                "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
233            );
234
235            tokio::time::sleep(Duration::from_millis(300)).await;
236        };
237
238        Ok(bootstrap_addresses)
239    }
240
241    /// Try to parse a response from an endpoint
242    fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
243        let cache_data = if let Ok(data) =
244            serde_json::from_str::<super::cache_store::cache_data_v1::CacheData>(response)
245        {
246            Some(data)
247        } else if let Ok(data) =
248            serde_json::from_str::<super::cache_store::cache_data_v0::CacheData>(response)
249        {
250            Some(data.into())
251        } else {
252            None
253        };
254
255        match cache_data {
256            Some(cache_data) => {
257                info!(
258                    "Successfully parsed JSON response with {} peers",
259                    cache_data.peers.len()
260                );
261                let our_network_version = crate::get_network_version();
262
263                if cache_data.network_version != our_network_version {
264                    warn!(
265                        "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.",
266                        cache_data.network_version
267                    );
268                    return Ok(vec![]);
269                }
270                let bootstrap_addresses = cache_data.get_all_addrs().cloned().collect::<Vec<_>>();
271
272                info!(
273                    "Successfully parsed {} valid peers from JSON",
274                    bootstrap_addresses.len()
275                );
276                Ok(bootstrap_addresses)
277            }
278            None => {
279                info!("Attempting to parse response as plain text");
280                // Try parsing as plain text with one multiaddr per line
281                // example of contacts file exists in resources/network-contacts-examples
282
283                let bootstrap_addresses = response
284                    .split('\n')
285                    .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
286                    .collect::<Vec<_>>();
287
288                if bootstrap_addresses.is_empty() {
289                    warn!("Failed to parse response as plain text");
290                    return Err(Error::FailedToParseCacheData);
291                }
292
293                info!(
294                    "Successfully parsed {} valid bootstrap addrs from plain text",
295                    bootstrap_addresses.len()
296                );
297                Ok(bootstrap_addresses)
298            }
299        }
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::cache_store::{cache_data_v0, cache_data_v1};
307    use libp2p::{Multiaddr, PeerId};
308    use std::{
309        sync::atomic::{AtomicUsize, Ordering},
310        time::SystemTime,
311    };
312    use wiremock::{
313        Mock, MockServer, ResponseTemplate,
314        matchers::{method, path},
315    };
316
317    #[tokio::test]
318    async fn test_network_contacts_formats() {
319        let mock_server = MockServer::start().await;
320        let peer_id = PeerId::random();
321        let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
322            .parse()
323            .unwrap();
324
325        let mut v0 = cache_data_v0::CacheData {
326            peers: Default::default(),
327            last_updated: SystemTime::now(),
328            network_version: crate::get_network_version(),
329        };
330        v0.peers.insert(
331            peer_id,
332            cache_data_v0::BootstrapAddresses(vec![cache_data_v0::BootstrapAddr {
333                addr: addr.clone(),
334                success_count: 1,
335                failure_count: 0,
336                last_seen: SystemTime::now(),
337            }]),
338        );
339        let v0_json = serde_json::to_string(&v0).unwrap();
340
341        let mut v1 = cache_data_v1::CacheData::default();
342        v1.add_peer(peer_id, [addr.clone()].iter(), 10, 10);
343        let v1_json = serde_json::to_string(&v1).unwrap();
344
345        Mock::given(method("GET"))
346            .and(path("/v0"))
347            .respond_with(ResponseTemplate::new(200).set_body_string(v0_json))
348            .mount(&mock_server)
349            .await;
350        Mock::given(method("GET"))
351            .and(path("/v1"))
352            .respond_with(ResponseTemplate::new(200).set_body_string(v1_json))
353            .mount(&mock_server)
354            .await;
355        Mock::given(method("GET"))
356            .and(path("/text"))
357            .respond_with(ResponseTemplate::new(200).set_body_string(
358                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n",
359            ))
360            .mount(&mock_server)
361            .await;
362        Mock::given(method("GET"))
363            .and(path("/malformed"))
364            .respond_with(ResponseTemplate::new(200).set_body_string("this is not valid data"))
365            .mount(&mock_server)
366            .await;
367
368        for endpoint in ["/v0", "/v1", "/text"] {
369            let url = format!("{}{}", mock_server.uri(), endpoint)
370                .parse()
371                .unwrap();
372            let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
373            let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
374            assert!(!addrs.is_empty(), "should fetch addresses from {endpoint}");
375            assert_eq!(addrs[0].to_string(), addr.to_string());
376        }
377
378        let malformed = format!("{}/malformed", mock_server.uri()).parse().unwrap();
379        let fetcher = ContactsFetcher::with_endpoints(vec![malformed]).unwrap();
380        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
381        assert!(
382            addrs.is_empty(),
383            "malformed responses should return empty list"
384        );
385    }
386
387    #[tokio::test]
388    async fn test_network_contacts_retries() {
389        let mock_server = MockServer::start().await;
390        let counter = AtomicUsize::new(0);
391
392        Mock::given(method("GET"))
393            .and(path("/retry"))
394            .respond_with(move |_: &wiremock::Request| {
395                let count = counter.fetch_add(1, Ordering::SeqCst);
396                if count < 1 {
397                    ResponseTemplate::new(500)
398                } else {
399                    ResponseTemplate::new(200).set_body_string(
400                        "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
401                    )
402                }
403            })
404            .mount(&mock_server)
405            .await;
406
407        let url = format!("{}/retry", mock_server.uri()).parse().unwrap();
408        let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
409        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
410        assert_eq!(addrs.len(), 1);
411    }
412
413    #[tokio::test]
414    async fn test_fetch_addrs() {
415        let mock_server = MockServer::start().await;
416
417        Mock::given(method("GET"))
418            .and(path("/"))
419            .respond_with(
420                ResponseTemplate::new(200)
421                    .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
422            )
423            .mount(&mock_server)
424            .await;
425
426        let mut fetcher = ContactsFetcher::new().unwrap();
427        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
428
429        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
430        assert_eq!(addrs.len(), 2);
431
432        let addr1: Multiaddr =
433            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
434                .parse()
435                .unwrap();
436        let addr2: Multiaddr =
437            "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
438                .parse()
439                .unwrap();
440        assert!(addrs.iter().any(|p| p == &addr1));
441        assert!(addrs.iter().any(|p| p == &addr2));
442    }
443
444    #[tokio::test]
445    async fn test_endpoint_failover() {
446        let mock_server1 = MockServer::start().await;
447        let mock_server2 = MockServer::start().await;
448
449        // First endpoint fails
450        Mock::given(method("GET"))
451            .and(path("/"))
452            .respond_with(ResponseTemplate::new(500))
453            .mount(&mock_server1)
454            .await;
455
456        // Second endpoint succeeds
457        Mock::given(method("GET"))
458            .and(path("/"))
459            .respond_with(ResponseTemplate::new(200).set_body_string(
460                "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
461            ))
462            .mount(&mock_server2)
463            .await;
464
465        let mut fetcher = ContactsFetcher::new().unwrap();
466        fetcher.endpoints = vec![
467            mock_server1.uri().parse().unwrap(),
468            mock_server2.uri().parse().unwrap(),
469        ];
470
471        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
472        assert_eq!(addrs.len(), 1);
473
474        let addr: Multiaddr =
475            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
476                .parse()
477                .unwrap();
478        assert_eq!(addrs[0], addr);
479    }
480
481    #[tokio::test]
482    async fn test_network_failure_recovery() {
483        let bad_url: Url = "http://does-not-exist.example.invalid".parse().unwrap();
484
485        let mock_server = MockServer::start().await;
486        Mock::given(method("GET"))
487            .and(path("/valid"))
488            .respond_with(ResponseTemplate::new(200).set_body_string(
489                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
490            ))
491            .mount(&mock_server)
492            .await;
493        let valid_url = format!("{}/valid", mock_server.uri()).parse().unwrap();
494
495        let failing = ContactsFetcher::with_endpoints(vec![bad_url.clone()]).unwrap();
496        let empty = failing.fetch_bootstrap_addresses().await.unwrap();
497        assert!(
498            empty.is_empty(),
499            "all failing endpoints should yield empty set"
500        );
501
502        let mixed = ContactsFetcher::with_endpoints(vec![bad_url, valid_url]).unwrap();
503        let addrs = mixed.fetch_bootstrap_addresses().await.unwrap();
504        assert!(
505            !addrs.is_empty(),
506            "mix of failing and working endpoints should return addresses"
507        );
508    }
509
510    #[tokio::test]
511    async fn test_invalid_multiaddr() {
512        let mock_server = MockServer::start().await;
513
514        Mock::given(method("GET"))
515            .and(path("/"))
516            .respond_with(
517                ResponseTemplate::new(200).set_body_string(
518                    "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
519                ),
520            )
521            .mount(&mock_server)
522            .await;
523
524        let mut fetcher = ContactsFetcher::new().unwrap();
525        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
526
527        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
528        let valid_addr: Multiaddr =
529            "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
530                .parse()
531                .unwrap();
532        assert_eq!(addrs[0], valid_addr);
533    }
534
535    #[tokio::test]
536    async fn test_empty_response_handling() {
537        let mock_server = MockServer::start().await;
538        Mock::given(method("GET"))
539            .and(path("/empty"))
540            .respond_with(ResponseTemplate::new(200).set_body_string(""))
541            .mount(&mock_server)
542            .await;
543
544        let url = format!("{}/empty", mock_server.uri()).parse().unwrap();
545        let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
546        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
547        assert!(
548            addrs.is_empty(),
549            "empty HTTP response should produce no addresses"
550        );
551    }
552
553    #[tokio::test]
554    async fn test_whitespace_and_empty_lines() {
555        let mock_server = MockServer::start().await;
556
557        Mock::given(method("GET"))
558            .and(path("/"))
559            .respond_with(
560                ResponseTemplate::new(200).set_body_string("\n  \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n  \n"),
561            )
562            .mount(&mock_server)
563            .await;
564
565        let mut fetcher = ContactsFetcher::new().unwrap();
566        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
567
568        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
569        assert_eq!(addrs.len(), 1);
570
571        let addr: Multiaddr =
572            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
573                .parse()
574                .unwrap();
575        assert_eq!(addrs[0], addr);
576    }
577
578    #[tokio::test]
579    async fn test_fetch_max_addresses() {
580        let mock_server = MockServer::start().await;
581        Mock::given(method("GET"))
582            .and(path("/multiple"))
583            .respond_with(ResponseTemplate::new(200).set_body_string(
584                "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n\
585                 /ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n\
586                 /ip4/127.0.0.3/udp/8082/quic-v1/p2p/12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwxME7Zdw68n",
587            ))
588            .mount(&mock_server)
589            .await;
590
591        let url = format!("{}/multiple", mock_server.uri()).parse().unwrap();
592        let mut fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
593        fetcher.set_max_addrs(2);
594        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
595        assert_eq!(addrs.len(), 2, "max_addrs should limit returned addresses");
596    }
597
598    #[tokio::test]
599    async fn test_custom_endpoints() {
600        let endpoints = vec!["http://example.com".parse().unwrap()];
601        let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
602        assert_eq!(fetcher.endpoints, endpoints);
603    }
604}