1use crate::{
10 Error, Result, cache_store::CACHE_DATA_VERSION_LATEST, craft_valid_multiaddr_from_str,
11};
12use futures::stream::{self, StreamExt};
13use libp2p::Multiaddr;
14use reqwest::Client;
15use std::time::Duration;
16use url::Url;
17
18const CONTACTS_CACHE_VERSION_HEADER: &str = "Cache-Version";
19
20pub const MAINNET_CONTACTS: &[&str] = &[
21 "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
22 "http://159.89.251.80/bootstrap_cache.json",
23 "http://159.65.210.89/bootstrap_cache.json",
24 "http://159.223.246.45/bootstrap_cache.json",
25 "http://139.59.201.153/bootstrap_cache.json",
26 "http://139.59.200.27/bootstrap_cache.json",
27];
28pub const ALPHANET_CONTACTS: &[&str] = &[
29 "http://188.166.133.208/bootstrap_cache.json",
30 "http://188.166.133.125/bootstrap_cache.json",
31 "http://178.128.137.64/bootstrap_cache.json",
32 "http://159.223.242.7/bootstrap_cache.json",
33 "http://143.244.197.147/bootstrap_cache.json",
34];
35
36const FETCH_TIMEOUT_SECS: u64 = 30;
38const MAX_CONCURRENT_FETCHES: usize = 3;
40const MAX_RETRIES_ON_FETCH_FAILURE: usize = 2;
42
43pub struct ContactsFetcher {
45 max_addrs: usize,
47 endpoints: Vec<Url>,
49 request_client: Client,
51 ignore_peer_id: bool,
53}
54
55impl ContactsFetcher {
56 pub fn new() -> Result<Self> {
58 Self::with_endpoints(vec![])
59 }
60
61 pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
63 let request_client = Client::builder()
64 .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
65 .build()?;
66
67 Ok(Self {
68 max_addrs: usize::MAX,
69 endpoints,
70 request_client,
71 ignore_peer_id: false,
72 })
73 }
74
75 pub fn set_max_addrs(&mut self, max_addrs: usize) {
77 self.max_addrs = max_addrs;
78 }
79
80 pub fn with_mainnet_endpoints() -> Result<Self> {
82 let mut fetcher = Self::new()?;
83 #[allow(clippy::expect_used)]
84 let mainnet_contact = MAINNET_CONTACTS
85 .iter()
86 .map(|url| url.parse().expect("Failed to parse static URL"))
87 .collect();
88 fetcher.endpoints = mainnet_contact;
89 Ok(fetcher)
90 }
91
92 pub fn with_alphanet_endpoints() -> Result<Self> {
94 let mut fetcher = Self::new()?;
95 #[allow(clippy::expect_used)]
96 let alphanet_contact = ALPHANET_CONTACTS
97 .iter()
98 .map(|url| url.parse().expect("Failed to parse static URL"))
99 .collect();
100 fetcher.endpoints = alphanet_contact;
101 Ok(fetcher)
102 }
103
104 pub fn insert_endpoint(&mut self, endpoint: Url) {
105 self.endpoints.push(endpoint);
106 }
107
108 pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
109 self.ignore_peer_id = ignore_peer_id;
110 }
111
112 pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<Multiaddr>> {
114 info!(
115 "Starting peer fetcher from {} endpoints: {:?}",
116 self.endpoints.len(),
117 self.endpoints
118 );
119 let mut bootstrap_addresses = Vec::new();
120
121 let mut fetches = stream::iter(self.endpoints.clone())
122 .map(|endpoint| async move {
123 info!(
124 "Attempting to fetch bootstrap addresses from endpoint: {}",
125 endpoint
126 );
127 (
128 Self::fetch_from_endpoint(
129 self.request_client.clone(),
130 &endpoint,
131 self.ignore_peer_id,
132 )
133 .await,
134 endpoint,
135 )
136 })
137 .buffer_unordered(MAX_CONCURRENT_FETCHES);
138
139 while let Some((result, endpoint)) = fetches.next().await {
140 match result {
141 Ok(mut endpoing_bootstrap_addresses) => {
142 info!(
143 "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
144 endpoing_bootstrap_addresses.len(),
145 endpoint,
146 endpoing_bootstrap_addresses
147 .iter()
148 .take(3)
149 .collect::<Vec<_>>()
150 );
151 bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
152 if bootstrap_addresses.len() >= self.max_addrs {
153 info!(
154 "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
155 self.max_addrs,
156 bootstrap_addresses.len()
157 );
158 break;
159 }
160 }
161 Err(e) => {
162 warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
163 }
164 }
165 }
166
167 bootstrap_addresses.truncate(self.max_addrs);
168
169 info!(
170 "Successfully discovered {} total addresses. First few: {:?}",
171 bootstrap_addresses.len(),
172 bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
173 );
174 Ok(bootstrap_addresses)
175 }
176
177 async fn fetch_from_endpoint(
179 request_client: Client,
180 endpoint: &Url,
181 ignore_peer_id: bool,
182 ) -> Result<Vec<Multiaddr>> {
183 let mut retries = 0;
184
185 let bootstrap_addresses = loop {
186 let response = request_client
187 .get(endpoint.clone())
188 .header(CONTACTS_CACHE_VERSION_HEADER, CACHE_DATA_VERSION_LATEST)
189 .send()
190 .await;
191
192 match response {
193 Ok(response) => {
194 if response.status().is_success() {
195 let text = response.text().await?;
196
197 match Self::try_parse_response(&text, ignore_peer_id) {
198 Ok(addrs) => break addrs,
199 Err(err) => {
200 warn!("Failed to parse response with err: {err:?}");
201 retries += 1;
202 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
203 return Err(Error::FailedToObtainAddrsFromUrl(
204 endpoint.to_string(),
205 MAX_RETRIES_ON_FETCH_FAILURE,
206 ));
207 }
208 }
209 }
210 } else {
211 retries += 1;
212 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
213 return Err(Error::FailedToObtainAddrsFromUrl(
214 endpoint.to_string(),
215 MAX_RETRIES_ON_FETCH_FAILURE,
216 ));
217 }
218 }
219 }
220 Err(err) => {
221 error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
222 retries += 1;
223 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
224 return Err(Error::FailedToObtainAddrsFromUrl(
225 endpoint.to_string(),
226 MAX_RETRIES_ON_FETCH_FAILURE,
227 ));
228 }
229 }
230 }
231 debug!(
232 "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
233 );
234
235 tokio::time::sleep(Duration::from_millis(300)).await;
236 };
237
238 Ok(bootstrap_addresses)
239 }
240
241 fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
243 let cache_data = if let Ok(data) =
244 serde_json::from_str::<super::cache_store::cache_data_v1::CacheData>(response)
245 {
246 Some(data)
247 } else if let Ok(data) =
248 serde_json::from_str::<super::cache_store::cache_data_v0::CacheData>(response)
249 {
250 Some(data.into())
251 } else {
252 None
253 };
254
255 match cache_data {
256 Some(cache_data) => {
257 info!(
258 "Successfully parsed JSON response with {} peers",
259 cache_data.peers.len()
260 );
261 let our_network_version = crate::get_network_version();
262
263 if cache_data.network_version != our_network_version {
264 warn!(
265 "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.",
266 cache_data.network_version
267 );
268 return Ok(vec![]);
269 }
270 let bootstrap_addresses = cache_data.get_all_addrs().cloned().collect::<Vec<_>>();
271
272 info!(
273 "Successfully parsed {} valid peers from JSON",
274 bootstrap_addresses.len()
275 );
276 Ok(bootstrap_addresses)
277 }
278 None => {
279 info!("Attempting to parse response as plain text");
280 let bootstrap_addresses = response
284 .split('\n')
285 .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
286 .collect::<Vec<_>>();
287
288 if bootstrap_addresses.is_empty() {
289 warn!("Failed to parse response as plain text");
290 return Err(Error::FailedToParseCacheData);
291 }
292
293 info!(
294 "Successfully parsed {} valid bootstrap addrs from plain text",
295 bootstrap_addresses.len()
296 );
297 Ok(bootstrap_addresses)
298 }
299 }
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::cache_store::{cache_data_v0, cache_data_v1};
307 use libp2p::{Multiaddr, PeerId};
308 use std::{
309 sync::atomic::{AtomicUsize, Ordering},
310 time::SystemTime,
311 };
312 use wiremock::{
313 Mock, MockServer, ResponseTemplate,
314 matchers::{method, path},
315 };
316
317 #[tokio::test]
318 async fn test_network_contacts_formats() {
319 let mock_server = MockServer::start().await;
320 let peer_id = PeerId::random();
321 let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
322 .parse()
323 .unwrap();
324
325 let mut v0 = cache_data_v0::CacheData {
326 peers: Default::default(),
327 last_updated: SystemTime::now(),
328 network_version: crate::get_network_version(),
329 };
330 v0.peers.insert(
331 peer_id,
332 cache_data_v0::BootstrapAddresses(vec![cache_data_v0::BootstrapAddr {
333 addr: addr.clone(),
334 success_count: 1,
335 failure_count: 0,
336 last_seen: SystemTime::now(),
337 }]),
338 );
339 let v0_json = serde_json::to_string(&v0).unwrap();
340
341 let mut v1 = cache_data_v1::CacheData::default();
342 v1.add_peer(peer_id, [addr.clone()].iter(), 10, 10);
343 let v1_json = serde_json::to_string(&v1).unwrap();
344
345 Mock::given(method("GET"))
346 .and(path("/v0"))
347 .respond_with(ResponseTemplate::new(200).set_body_string(v0_json))
348 .mount(&mock_server)
349 .await;
350 Mock::given(method("GET"))
351 .and(path("/v1"))
352 .respond_with(ResponseTemplate::new(200).set_body_string(v1_json))
353 .mount(&mock_server)
354 .await;
355 Mock::given(method("GET"))
356 .and(path("/text"))
357 .respond_with(ResponseTemplate::new(200).set_body_string(
358 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n",
359 ))
360 .mount(&mock_server)
361 .await;
362 Mock::given(method("GET"))
363 .and(path("/malformed"))
364 .respond_with(ResponseTemplate::new(200).set_body_string("this is not valid data"))
365 .mount(&mock_server)
366 .await;
367
368 for endpoint in ["/v0", "/v1", "/text"] {
369 let url = format!("{}{}", mock_server.uri(), endpoint)
370 .parse()
371 .unwrap();
372 let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
373 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
374 assert!(!addrs.is_empty(), "should fetch addresses from {endpoint}");
375 assert_eq!(addrs[0].to_string(), addr.to_string());
376 }
377
378 let malformed = format!("{}/malformed", mock_server.uri()).parse().unwrap();
379 let fetcher = ContactsFetcher::with_endpoints(vec![malformed]).unwrap();
380 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
381 assert!(
382 addrs.is_empty(),
383 "malformed responses should return empty list"
384 );
385 }
386
387 #[tokio::test]
388 async fn test_network_contacts_retries() {
389 let mock_server = MockServer::start().await;
390 let counter = AtomicUsize::new(0);
391
392 Mock::given(method("GET"))
393 .and(path("/retry"))
394 .respond_with(move |_: &wiremock::Request| {
395 let count = counter.fetch_add(1, Ordering::SeqCst);
396 if count < 1 {
397 ResponseTemplate::new(500)
398 } else {
399 ResponseTemplate::new(200).set_body_string(
400 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
401 )
402 }
403 })
404 .mount(&mock_server)
405 .await;
406
407 let url = format!("{}/retry", mock_server.uri()).parse().unwrap();
408 let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
409 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
410 assert_eq!(addrs.len(), 1);
411 }
412
413 #[tokio::test]
414 async fn test_fetch_addrs() {
415 let mock_server = MockServer::start().await;
416
417 Mock::given(method("GET"))
418 .and(path("/"))
419 .respond_with(
420 ResponseTemplate::new(200)
421 .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
422 )
423 .mount(&mock_server)
424 .await;
425
426 let mut fetcher = ContactsFetcher::new().unwrap();
427 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
428
429 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
430 assert_eq!(addrs.len(), 2);
431
432 let addr1: Multiaddr =
433 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
434 .parse()
435 .unwrap();
436 let addr2: Multiaddr =
437 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
438 .parse()
439 .unwrap();
440 assert!(addrs.iter().any(|p| p == &addr1));
441 assert!(addrs.iter().any(|p| p == &addr2));
442 }
443
444 #[tokio::test]
445 async fn test_endpoint_failover() {
446 let mock_server1 = MockServer::start().await;
447 let mock_server2 = MockServer::start().await;
448
449 Mock::given(method("GET"))
451 .and(path("/"))
452 .respond_with(ResponseTemplate::new(500))
453 .mount(&mock_server1)
454 .await;
455
456 Mock::given(method("GET"))
458 .and(path("/"))
459 .respond_with(ResponseTemplate::new(200).set_body_string(
460 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
461 ))
462 .mount(&mock_server2)
463 .await;
464
465 let mut fetcher = ContactsFetcher::new().unwrap();
466 fetcher.endpoints = vec![
467 mock_server1.uri().parse().unwrap(),
468 mock_server2.uri().parse().unwrap(),
469 ];
470
471 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
472 assert_eq!(addrs.len(), 1);
473
474 let addr: Multiaddr =
475 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
476 .parse()
477 .unwrap();
478 assert_eq!(addrs[0], addr);
479 }
480
481 #[tokio::test]
482 async fn test_network_failure_recovery() {
483 let bad_url: Url = "http://does-not-exist.example.invalid".parse().unwrap();
484
485 let mock_server = MockServer::start().await;
486 Mock::given(method("GET"))
487 .and(path("/valid"))
488 .respond_with(ResponseTemplate::new(200).set_body_string(
489 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
490 ))
491 .mount(&mock_server)
492 .await;
493 let valid_url = format!("{}/valid", mock_server.uri()).parse().unwrap();
494
495 let failing = ContactsFetcher::with_endpoints(vec![bad_url.clone()]).unwrap();
496 let empty = failing.fetch_bootstrap_addresses().await.unwrap();
497 assert!(
498 empty.is_empty(),
499 "all failing endpoints should yield empty set"
500 );
501
502 let mixed = ContactsFetcher::with_endpoints(vec![bad_url, valid_url]).unwrap();
503 let addrs = mixed.fetch_bootstrap_addresses().await.unwrap();
504 assert!(
505 !addrs.is_empty(),
506 "mix of failing and working endpoints should return addresses"
507 );
508 }
509
510 #[tokio::test]
511 async fn test_invalid_multiaddr() {
512 let mock_server = MockServer::start().await;
513
514 Mock::given(method("GET"))
515 .and(path("/"))
516 .respond_with(
517 ResponseTemplate::new(200).set_body_string(
518 "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
519 ),
520 )
521 .mount(&mock_server)
522 .await;
523
524 let mut fetcher = ContactsFetcher::new().unwrap();
525 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
526
527 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
528 let valid_addr: Multiaddr =
529 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
530 .parse()
531 .unwrap();
532 assert_eq!(addrs[0], valid_addr);
533 }
534
535 #[tokio::test]
536 async fn test_empty_response_handling() {
537 let mock_server = MockServer::start().await;
538 Mock::given(method("GET"))
539 .and(path("/empty"))
540 .respond_with(ResponseTemplate::new(200).set_body_string(""))
541 .mount(&mock_server)
542 .await;
543
544 let url = format!("{}/empty", mock_server.uri()).parse().unwrap();
545 let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
546 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
547 assert!(
548 addrs.is_empty(),
549 "empty HTTP response should produce no addresses"
550 );
551 }
552
553 #[tokio::test]
554 async fn test_whitespace_and_empty_lines() {
555 let mock_server = MockServer::start().await;
556
557 Mock::given(method("GET"))
558 .and(path("/"))
559 .respond_with(
560 ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n \n"),
561 )
562 .mount(&mock_server)
563 .await;
564
565 let mut fetcher = ContactsFetcher::new().unwrap();
566 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
567
568 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
569 assert_eq!(addrs.len(), 1);
570
571 let addr: Multiaddr =
572 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
573 .parse()
574 .unwrap();
575 assert_eq!(addrs[0], addr);
576 }
577
578 #[tokio::test]
579 async fn test_fetch_max_addresses() {
580 let mock_server = MockServer::start().await;
581 Mock::given(method("GET"))
582 .and(path("/multiple"))
583 .respond_with(ResponseTemplate::new(200).set_body_string(
584 "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n\
585 /ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n\
586 /ip4/127.0.0.3/udp/8082/quic-v1/p2p/12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwxME7Zdw68n",
587 ))
588 .mount(&mock_server)
589 .await;
590
591 let url = format!("{}/multiple", mock_server.uri()).parse().unwrap();
592 let mut fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
593 fetcher.set_max_addrs(2);
594 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
595 assert_eq!(addrs.len(), 2, "max_addrs should limit returned addresses");
596 }
597
598 #[tokio::test]
599 async fn test_custom_endpoints() {
600 let endpoints = vec!["http://example.com".parse().unwrap()];
601 let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
602 assert_eq!(fetcher.endpoints, endpoints);
603 }
604}