ant_bootstrap/
contacts.rs1use crate::{
10 cache_store::CACHE_DATA_VERSION_LATEST, craft_valid_multiaddr_from_str, Error, Result,
11};
12use futures::stream::{self, StreamExt};
13use libp2p::Multiaddr;
14use reqwest::Client;
15use std::time::Duration;
16use url::Url;
17
18const CONTACTS_CACHE_VERSION_HEADER: &str = "Cache-Version";
19
20pub const MAINNET_CONTACTS: &[&str] = &[
21 "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
22 "http://159.89.251.80/bootstrap_cache.json",
23 "http://159.65.210.89/bootstrap_cache.json",
24 "http://159.223.246.45/bootstrap_cache.json",
25 "http://139.59.201.153/bootstrap_cache.json",
26 "http://139.59.200.27/bootstrap_cache.json",
27];
28pub const ALPHANET_CONTACTS: &[&str] = &[
29 "http://188.166.133.208/bootstrap_cache.json",
30 "http://188.166.133.125/bootstrap_cache.json",
31 "http://178.128.137.64/bootstrap_cache.json",
32 "http://159.223.242.7/bootstrap_cache.json",
33 "http://143.244.197.147/bootstrap_cache.json",
34];
35
36const FETCH_TIMEOUT_SECS: u64 = 30;
38const MAX_CONCURRENT_FETCHES: usize = 3;
40const MAX_RETRIES_ON_FETCH_FAILURE: usize = 3;
42
43pub struct ContactsFetcher {
45 max_addrs: usize,
47 endpoints: Vec<Url>,
49 request_client: Client,
51 ignore_peer_id: bool,
53}
54
55impl ContactsFetcher {
56 pub fn new() -> Result<Self> {
58 Self::with_endpoints(vec![])
59 }
60
61 pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
63 let request_client = Client::builder()
64 .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
65 .build()?;
66
67 Ok(Self {
68 max_addrs: usize::MAX,
69 endpoints,
70 request_client,
71 ignore_peer_id: false,
72 })
73 }
74
75 pub fn set_max_addrs(&mut self, max_addrs: usize) {
77 self.max_addrs = max_addrs;
78 }
79
80 pub fn with_mainnet_endpoints() -> Result<Self> {
82 let mut fetcher = Self::new()?;
83 let mainnet_contact = MAINNET_CONTACTS
84 .iter()
85 .map(|url| url.parse().expect("Failed to parse static URL"))
86 .collect();
87 fetcher.endpoints = mainnet_contact;
88 Ok(fetcher)
89 }
90
91 pub fn with_alphanet_endpoints() -> Result<Self> {
93 let mut fetcher = Self::new()?;
94 let alphanet_contact = ALPHANET_CONTACTS
95 .iter()
96 .map(|url| url.parse().expect("Failed to parse static URL"))
97 .collect();
98 fetcher.endpoints = alphanet_contact;
99 Ok(fetcher)
100 }
101
102 pub fn insert_endpoint(&mut self, endpoint: Url) {
103 self.endpoints.push(endpoint);
104 }
105
106 pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
107 self.ignore_peer_id = ignore_peer_id;
108 }
109
110 pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<Multiaddr>> {
112 Ok(self.fetch_addrs().await?.into_iter().collect())
113 }
114
115 pub async fn fetch_addrs(&self) -> Result<Vec<Multiaddr>> {
117 info!(
118 "Starting peer fetcher from {} endpoints: {:?}",
119 self.endpoints.len(),
120 self.endpoints
121 );
122 let mut bootstrap_addresses = Vec::new();
123
124 let mut fetches = stream::iter(self.endpoints.clone())
125 .map(|endpoint| async move {
126 info!(
127 "Attempting to fetch bootstrap addresses from endpoint: {}",
128 endpoint
129 );
130 (
131 Self::fetch_from_endpoint(
132 self.request_client.clone(),
133 &endpoint,
134 self.ignore_peer_id,
135 )
136 .await,
137 endpoint,
138 )
139 })
140 .buffer_unordered(MAX_CONCURRENT_FETCHES);
141
142 while let Some((result, endpoint)) = fetches.next().await {
143 match result {
144 Ok(mut endpoing_bootstrap_addresses) => {
145 info!(
146 "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
147 endpoing_bootstrap_addresses.len(),
148 endpoint,
149 endpoing_bootstrap_addresses
150 .iter()
151 .take(3)
152 .collect::<Vec<_>>()
153 );
154 bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
155 if bootstrap_addresses.len() >= self.max_addrs {
156 info!(
157 "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
158 self.max_addrs,
159 bootstrap_addresses.len()
160 );
161 break;
162 }
163 }
164 Err(e) => {
165 warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
166 }
167 }
168 }
169
170 bootstrap_addresses.truncate(self.max_addrs);
171
172 info!(
173 "Successfully discovered {} total addresses. First few: {:?}",
174 bootstrap_addresses.len(),
175 bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
176 );
177 Ok(bootstrap_addresses)
178 }
179
180 async fn fetch_from_endpoint(
182 request_client: Client,
183 endpoint: &Url,
184 ignore_peer_id: bool,
185 ) -> Result<Vec<Multiaddr>> {
186 let mut retries = 0;
187
188 let bootstrap_addresses = loop {
189 let response = request_client
190 .get(endpoint.clone())
191 .header(CONTACTS_CACHE_VERSION_HEADER, CACHE_DATA_VERSION_LATEST)
192 .send()
193 .await;
194
195 match response {
196 Ok(response) => {
197 if response.status().is_success() {
198 let text = response.text().await?;
199
200 match Self::try_parse_response(&text, ignore_peer_id) {
201 Ok(addrs) => break addrs,
202 Err(err) => {
203 warn!("Failed to parse response with err: {err:?}");
204 retries += 1;
205 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
206 return Err(Error::FailedToObtainAddrsFromUrl(
207 endpoint.to_string(),
208 MAX_RETRIES_ON_FETCH_FAILURE,
209 ));
210 }
211 }
212 }
213 } else {
214 retries += 1;
215 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
216 return Err(Error::FailedToObtainAddrsFromUrl(
217 endpoint.to_string(),
218 MAX_RETRIES_ON_FETCH_FAILURE,
219 ));
220 }
221 }
222 }
223 Err(err) => {
224 error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
225 retries += 1;
226 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
227 return Err(Error::FailedToObtainAddrsFromUrl(
228 endpoint.to_string(),
229 MAX_RETRIES_ON_FETCH_FAILURE,
230 ));
231 }
232 }
233 }
234 debug!(
235 "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
236 );
237
238 tokio::time::sleep(Duration::from_secs(1)).await;
239 };
240
241 Ok(bootstrap_addresses)
242 }
243
244 fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
246 let cache_data = if let Ok(data) =
247 serde_json::from_str::<super::cache_store::cache_data_v1::CacheData>(response)
248 {
249 Some(data)
250 } else if let Ok(data) =
251 serde_json::from_str::<super::cache_store::cache_data_v0::CacheData>(response)
252 {
253 Some(data.into())
254 } else {
255 None
256 };
257
258 match cache_data {
259 Some(cache_data) => {
260 info!(
261 "Successfully parsed JSON response with {} peers",
262 cache_data.peers.len()
263 );
264 let our_network_version = crate::get_network_version();
265
266 if cache_data.network_version != our_network_version {
267 warn!(
268 "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.", cache_data.network_version
269 );
270 return Ok(vec![]);
271 }
272 let bootstrap_addresses = cache_data.get_all_addrs().cloned().collect::<Vec<_>>();
273
274 info!(
275 "Successfully parsed {} valid peers from JSON",
276 bootstrap_addresses.len()
277 );
278 Ok(bootstrap_addresses)
279 }
280 None => {
281 info!("Attempting to parse response as plain text");
282 let bootstrap_addresses = response
286 .split('\n')
287 .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
288 .collect::<Vec<_>>();
289
290 if bootstrap_addresses.is_empty() {
291 warn!("Failed to parse response as plain text");
292 return Err(Error::FailedToParseCacheData);
293 }
294
295 info!(
296 "Successfully parsed {} valid bootstrap addrs from plain text",
297 bootstrap_addresses.len()
298 );
299 Ok(bootstrap_addresses)
300 }
301 }
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use libp2p::Multiaddr;
309 use wiremock::{
310 matchers::{method, path},
311 Mock, MockServer, ResponseTemplate,
312 };
313
314 #[tokio::test]
315 async fn test_fetch_addrs() {
316 let mock_server = MockServer::start().await;
317
318 Mock::given(method("GET"))
319 .and(path("/"))
320 .respond_with(
321 ResponseTemplate::new(200)
322 .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
323 )
324 .mount(&mock_server)
325 .await;
326
327 let mut fetcher = ContactsFetcher::new().unwrap();
328 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
329
330 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
331 assert_eq!(addrs.len(), 2);
332
333 let addr1: Multiaddr =
334 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
335 .parse()
336 .unwrap();
337 let addr2: Multiaddr =
338 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
339 .parse()
340 .unwrap();
341 assert!(addrs.iter().any(|p| p == &addr1));
342 assert!(addrs.iter().any(|p| p == &addr2));
343 }
344
345 #[tokio::test]
346 async fn test_endpoint_failover() {
347 let mock_server1 = MockServer::start().await;
348 let mock_server2 = MockServer::start().await;
349
350 Mock::given(method("GET"))
352 .and(path("/"))
353 .respond_with(ResponseTemplate::new(500))
354 .mount(&mock_server1)
355 .await;
356
357 Mock::given(method("GET"))
359 .and(path("/"))
360 .respond_with(ResponseTemplate::new(200).set_body_string(
361 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
362 ))
363 .mount(&mock_server2)
364 .await;
365
366 let mut fetcher = ContactsFetcher::new().unwrap();
367 fetcher.endpoints = vec![
368 mock_server1.uri().parse().unwrap(),
369 mock_server2.uri().parse().unwrap(),
370 ];
371
372 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
373 assert_eq!(addrs.len(), 1);
374
375 let addr: Multiaddr =
376 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
377 .parse()
378 .unwrap();
379 assert_eq!(addrs[0], addr);
380 }
381
382 #[tokio::test]
383 async fn test_invalid_multiaddr() {
384 let mock_server = MockServer::start().await;
385
386 Mock::given(method("GET"))
387 .and(path("/"))
388 .respond_with(
389 ResponseTemplate::new(200).set_body_string(
390 "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
391 ),
392 )
393 .mount(&mock_server)
394 .await;
395
396 let mut fetcher = ContactsFetcher::new().unwrap();
397 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
398
399 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
400 let valid_addr: Multiaddr =
401 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
402 .parse()
403 .unwrap();
404 assert_eq!(addrs[0], valid_addr);
405 }
406
407 #[tokio::test]
408 async fn test_whitespace_and_empty_lines() {
409 let mock_server = MockServer::start().await;
410
411 Mock::given(method("GET"))
412 .and(path("/"))
413 .respond_with(
414 ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n \n"),
415 )
416 .mount(&mock_server)
417 .await;
418
419 let mut fetcher = ContactsFetcher::new().unwrap();
420 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
421
422 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
423 assert_eq!(addrs.len(), 1);
424
425 let addr: Multiaddr =
426 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
427 .parse()
428 .unwrap();
429 assert_eq!(addrs[0], addr);
430 }
431
432 #[tokio::test]
433 async fn test_custom_endpoints() {
434 let endpoints = vec!["http://example.com".parse().unwrap()];
435 let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
436 assert_eq!(fetcher.endpoints, endpoints);
437 }
438}