ant_bootstrap/
contacts.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{cache_store::CacheData, craft_valid_multiaddr_from_str, BootstrapAddr, Error, Result};
10use futures::stream::{self, StreamExt};
11use libp2p::Multiaddr;
12use reqwest::Client;
13use std::time::Duration;
14use url::Url;
15
16const MAINNET_CONTACTS: &[&str] = &[
17    "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
18    "http://159.89.251.80/bootstrap_cache.json",
19    "http://159.65.210.89/bootstrap_cache.json",
20    "http://159.223.246.45/bootstrap_cache.json",
21    "http://139.59.201.153/bootstrap_cache.json",
22    "http://139.59.200.27/bootstrap_cache.json",
23];
24
25/// The client fetch timeout
26const FETCH_TIMEOUT_SECS: u64 = 30;
27/// Maximum number of endpoints to fetch at a time
28const MAX_CONCURRENT_FETCHES: usize = 3;
29/// The max number of retries for an endpoint on failure.
30const MAX_RETRIES_ON_FETCH_FAILURE: usize = 3;
31
32/// Discovers initial peers from a list of endpoints
33pub struct ContactsFetcher {
34    /// The number of addrs to fetch
35    max_addrs: usize,
36    /// The list of endpoints
37    endpoints: Vec<Url>,
38    /// Reqwest Client
39    request_client: Client,
40    /// Ignore PeerId in the multiaddr if not present. This is only useful for fetching nat detection contacts
41    ignore_peer_id: bool,
42}
43
44impl ContactsFetcher {
45    /// Create a new struct with the default endpoint
46    pub fn new() -> Result<Self> {
47        Self::with_endpoints(vec![])
48    }
49
50    /// Create a new struct with the provided endpoints
51    pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
52        let request_client = Client::builder()
53            .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
54            .build()?;
55
56        Ok(Self {
57            max_addrs: usize::MAX,
58            endpoints,
59            request_client,
60            ignore_peer_id: false,
61        })
62    }
63
64    /// Set the number of addrs to fetch
65    pub fn set_max_addrs(&mut self, max_addrs: usize) {
66        self.max_addrs = max_addrs;
67    }
68
69    /// Create a new struct with the mainnet endpoints
70    pub fn with_mainnet_endpoints() -> Result<Self> {
71        let mut fetcher = Self::new()?;
72        let mainnet_contact = MAINNET_CONTACTS
73            .iter()
74            .map(|url| url.parse().expect("Failed to parse static URL"))
75            .collect();
76        fetcher.endpoints = mainnet_contact;
77        Ok(fetcher)
78    }
79
80    pub fn insert_endpoint(&mut self, endpoint: Url) {
81        self.endpoints.push(endpoint);
82    }
83
84    pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
85        self.ignore_peer_id = ignore_peer_id;
86    }
87
88    /// Fetch the list of bootstrap addresses from all configured endpoints
89    pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<BootstrapAddr>> {
90        Ok(self
91            .fetch_addrs()
92            .await?
93            .into_iter()
94            .map(BootstrapAddr::new)
95            .collect())
96    }
97
98    /// Fetch the list of multiaddrs from all configured endpoints
99    pub async fn fetch_addrs(&self) -> Result<Vec<Multiaddr>> {
100        info!(
101            "Starting peer fetcher from {} endpoints: {:?}",
102            self.endpoints.len(),
103            self.endpoints
104        );
105        let mut bootstrap_addresses = Vec::new();
106
107        let mut fetches = stream::iter(self.endpoints.clone())
108            .map(|endpoint| async move {
109                info!(
110                    "Attempting to fetch bootstrap addresses from endpoint: {}",
111                    endpoint
112                );
113                (
114                    Self::fetch_from_endpoint(
115                        self.request_client.clone(),
116                        &endpoint,
117                        self.ignore_peer_id,
118                    )
119                    .await,
120                    endpoint,
121                )
122            })
123            .buffer_unordered(MAX_CONCURRENT_FETCHES);
124
125        while let Some((result, endpoint)) = fetches.next().await {
126            match result {
127                Ok(mut endpoing_bootstrap_addresses) => {
128                    info!(
129                        "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
130                        endpoing_bootstrap_addresses.len(),
131                        endpoint,
132                        endpoing_bootstrap_addresses
133                            .iter()
134                            .take(3)
135                            .collect::<Vec<_>>()
136                    );
137                    bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
138                    if bootstrap_addresses.len() >= self.max_addrs {
139                        info!(
140                            "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
141                            self.max_addrs,
142                            bootstrap_addresses.len()
143                        );
144                        break;
145                    }
146                }
147                Err(e) => {
148                    warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
149                }
150            }
151        }
152
153        info!(
154            "Successfully discovered {} total addresses. First few: {:?}",
155            bootstrap_addresses.len(),
156            bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
157        );
158        Ok(bootstrap_addresses)
159    }
160
161    /// Fetch the list of multiaddrs from a single endpoint
162    async fn fetch_from_endpoint(
163        request_client: Client,
164        endpoint: &Url,
165        ignore_peer_id: bool,
166    ) -> Result<Vec<Multiaddr>> {
167        info!("Fetching peers from endpoint: {endpoint}");
168        let mut retries = 0;
169
170        let bootstrap_addresses = loop {
171            let response = request_client.get(endpoint.clone()).send().await;
172
173            match response {
174                Ok(response) => {
175                    if response.status().is_success() {
176                        let text = response.text().await?;
177
178                        match Self::try_parse_response(&text, ignore_peer_id) {
179                            Ok(addrs) => break addrs,
180                            Err(err) => {
181                                warn!("Failed to parse response with err: {err:?}");
182                                retries += 1;
183                                if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
184                                    return Err(Error::FailedToObtainAddrsFromUrl(
185                                        endpoint.to_string(),
186                                        MAX_RETRIES_ON_FETCH_FAILURE,
187                                    ));
188                                }
189                            }
190                        }
191                    } else {
192                        retries += 1;
193                        if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
194                            return Err(Error::FailedToObtainAddrsFromUrl(
195                                endpoint.to_string(),
196                                MAX_RETRIES_ON_FETCH_FAILURE,
197                            ));
198                        }
199                    }
200                }
201                Err(err) => {
202                    error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
203                    retries += 1;
204                    if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
205                        return Err(Error::FailedToObtainAddrsFromUrl(
206                            endpoint.to_string(),
207                            MAX_RETRIES_ON_FETCH_FAILURE,
208                        ));
209                    }
210                }
211            }
212            trace!(
213                "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
214            );
215
216            tokio::time::sleep(Duration::from_secs(1)).await;
217        };
218
219        Ok(bootstrap_addresses)
220    }
221
222    /// Try to parse a response from an endpoint
223    fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
224        match serde_json::from_str::<CacheData>(response) {
225            Ok(json_endpoints) => {
226                info!(
227                    "Successfully parsed JSON response with {} peers",
228                    json_endpoints.peers.len()
229                );
230                let our_network_version = crate::get_network_version();
231
232                if json_endpoints.network_version != our_network_version {
233                    warn!(
234                        "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.", json_endpoints.network_version
235                    );
236                    return Ok(vec![]);
237                }
238                let bootstrap_addresses = json_endpoints
239                    .peers
240                    .into_iter()
241                    .filter_map(|(_, addresses)| {
242                        addresses.get_least_faulty().map(|addr| addr.addr.clone())
243                    })
244                    .collect::<Vec<_>>();
245
246                info!(
247                    "Successfully parsed {} valid peers from JSON",
248                    bootstrap_addresses.len()
249                );
250                Ok(bootstrap_addresses)
251            }
252            Err(_err) => {
253                info!("Attempting to parse response as plain text");
254                // Try parsing as plain text with one multiaddr per line
255                // example of contacts file exists in resources/network-contacts-examples
256                let bootstrap_addresses = response
257                    .split('\n')
258                    .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
259                    .collect::<Vec<_>>();
260
261                info!(
262                    "Successfully parsed {} valid bootstrap addrs from plain text",
263                    bootstrap_addresses.len()
264                );
265                Ok(bootstrap_addresses)
266            }
267        }
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use libp2p::Multiaddr;
275    use wiremock::{
276        matchers::{method, path},
277        Mock, MockServer, ResponseTemplate,
278    };
279
280    #[tokio::test]
281    async fn test_fetch_addrs() {
282        let mock_server = MockServer::start().await;
283
284        Mock::given(method("GET"))
285            .and(path("/"))
286            .respond_with(
287                ResponseTemplate::new(200)
288                    .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
289            )
290            .mount(&mock_server)
291            .await;
292
293        let mut fetcher = ContactsFetcher::new().unwrap();
294        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
295
296        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
297        assert_eq!(addrs.len(), 2);
298
299        let addr1: Multiaddr =
300            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
301                .parse()
302                .unwrap();
303        let addr2: Multiaddr =
304            "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
305                .parse()
306                .unwrap();
307        assert!(addrs.iter().any(|p| p.addr == addr1));
308        assert!(addrs.iter().any(|p| p.addr == addr2));
309    }
310
311    #[tokio::test]
312    async fn test_endpoint_failover() {
313        let mock_server1 = MockServer::start().await;
314        let mock_server2 = MockServer::start().await;
315
316        // First endpoint fails
317        Mock::given(method("GET"))
318            .and(path("/"))
319            .respond_with(ResponseTemplate::new(500))
320            .mount(&mock_server1)
321            .await;
322
323        // Second endpoint succeeds
324        Mock::given(method("GET"))
325            .and(path("/"))
326            .respond_with(ResponseTemplate::new(200).set_body_string(
327                "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
328            ))
329            .mount(&mock_server2)
330            .await;
331
332        let mut fetcher = ContactsFetcher::new().unwrap();
333        fetcher.endpoints = vec![
334            mock_server1.uri().parse().unwrap(),
335            mock_server2.uri().parse().unwrap(),
336        ];
337
338        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
339        assert_eq!(addrs.len(), 1);
340
341        let addr: Multiaddr =
342            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
343                .parse()
344                .unwrap();
345        assert_eq!(addrs[0].addr, addr);
346    }
347
348    #[tokio::test]
349    async fn test_invalid_multiaddr() {
350        let mock_server = MockServer::start().await;
351
352        Mock::given(method("GET"))
353            .and(path("/"))
354            .respond_with(
355                ResponseTemplate::new(200).set_body_string(
356                    "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
357                ),
358            )
359            .mount(&mock_server)
360            .await;
361
362        let mut fetcher = ContactsFetcher::new().unwrap();
363        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
364
365        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
366        let valid_addr: Multiaddr =
367            "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
368                .parse()
369                .unwrap();
370        assert_eq!(addrs[0].addr, valid_addr);
371    }
372
373    #[tokio::test]
374    async fn test_whitespace_and_empty_lines() {
375        let mock_server = MockServer::start().await;
376
377        Mock::given(method("GET"))
378            .and(path("/"))
379            .respond_with(
380                ResponseTemplate::new(200).set_body_string("\n  \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n  \n"),
381            )
382            .mount(&mock_server)
383            .await;
384
385        let mut fetcher = ContactsFetcher::new().unwrap();
386        fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
387
388        let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
389        assert_eq!(addrs.len(), 1);
390
391        let addr: Multiaddr =
392            "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
393                .parse()
394                .unwrap();
395        assert_eq!(addrs[0].addr, addr);
396    }
397
398    #[tokio::test]
399    async fn test_custom_endpoints() {
400        let endpoints = vec!["http://example.com".parse().unwrap()];
401        let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
402        assert_eq!(fetcher.endpoints, endpoints);
403    }
404}