ant_bootstrap/
contacts.rs1use crate::{cache_store::CacheData, craft_valid_multiaddr_from_str, BootstrapAddr, Error, Result};
10use futures::stream::{self, StreamExt};
11use libp2p::Multiaddr;
12use reqwest::Client;
13use std::time::Duration;
14use url::Url;
15
16pub const MAINNET_CONTACTS: &[&str] = &[
17 "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
18 "http://159.89.251.80/bootstrap_cache.json",
19 "http://159.65.210.89/bootstrap_cache.json",
20 "http://159.223.246.45/bootstrap_cache.json",
21 "http://139.59.201.153/bootstrap_cache.json",
22 "http://139.59.200.27/bootstrap_cache.json",
23];
24pub const ALPHANET_CONTACTS: &[&str] = &[
25 "http://188.166.133.208/bootstrap_cache.json",
26 "http://188.166.133.125/bootstrap_cache.json",
27 "http://178.128.137.64/bootstrap_cache.json",
28 "http://159.223.242.7/bootstrap_cache.json",
29 "http://143.244.197.147/bootstrap_cache.json",
30];
31
32const FETCH_TIMEOUT_SECS: u64 = 30;
34const MAX_CONCURRENT_FETCHES: usize = 3;
36const MAX_RETRIES_ON_FETCH_FAILURE: usize = 3;
38
39pub struct ContactsFetcher {
41 max_addrs: usize,
43 endpoints: Vec<Url>,
45 request_client: Client,
47 ignore_peer_id: bool,
49}
50
51impl ContactsFetcher {
52 pub fn new() -> Result<Self> {
54 Self::with_endpoints(vec![])
55 }
56
57 pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
59 let request_client = Client::builder()
60 .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
61 .build()?;
62
63 Ok(Self {
64 max_addrs: usize::MAX,
65 endpoints,
66 request_client,
67 ignore_peer_id: false,
68 })
69 }
70
71 pub fn set_max_addrs(&mut self, max_addrs: usize) {
73 self.max_addrs = max_addrs;
74 }
75
76 pub fn with_mainnet_endpoints() -> Result<Self> {
78 let mut fetcher = Self::new()?;
79 let mainnet_contact = MAINNET_CONTACTS
80 .iter()
81 .map(|url| url.parse().expect("Failed to parse static URL"))
82 .collect();
83 fetcher.endpoints = mainnet_contact;
84 Ok(fetcher)
85 }
86
87 pub fn with_alphanet_endpoints() -> Result<Self> {
89 let mut fetcher = Self::new()?;
90 let alphanet_contact = ALPHANET_CONTACTS
91 .iter()
92 .map(|url| url.parse().expect("Failed to parse static URL"))
93 .collect();
94 fetcher.endpoints = alphanet_contact;
95 Ok(fetcher)
96 }
97
98 pub fn insert_endpoint(&mut self, endpoint: Url) {
99 self.endpoints.push(endpoint);
100 }
101
102 pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
103 self.ignore_peer_id = ignore_peer_id;
104 }
105
106 pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<BootstrapAddr>> {
108 Ok(self
109 .fetch_addrs()
110 .await?
111 .into_iter()
112 .map(BootstrapAddr::new)
113 .collect())
114 }
115
116 pub async fn fetch_addrs(&self) -> Result<Vec<Multiaddr>> {
118 info!(
119 "Starting peer fetcher from {} endpoints: {:?}",
120 self.endpoints.len(),
121 self.endpoints
122 );
123 let mut bootstrap_addresses = Vec::new();
124
125 let mut fetches = stream::iter(self.endpoints.clone())
126 .map(|endpoint| async move {
127 info!(
128 "Attempting to fetch bootstrap addresses from endpoint: {}",
129 endpoint
130 );
131 (
132 Self::fetch_from_endpoint(
133 self.request_client.clone(),
134 &endpoint,
135 self.ignore_peer_id,
136 )
137 .await,
138 endpoint,
139 )
140 })
141 .buffer_unordered(MAX_CONCURRENT_FETCHES);
142
143 while let Some((result, endpoint)) = fetches.next().await {
144 match result {
145 Ok(mut endpoing_bootstrap_addresses) => {
146 info!(
147 "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
148 endpoing_bootstrap_addresses.len(),
149 endpoint,
150 endpoing_bootstrap_addresses
151 .iter()
152 .take(3)
153 .collect::<Vec<_>>()
154 );
155 bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
156 if bootstrap_addresses.len() >= self.max_addrs {
157 info!(
158 "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
159 self.max_addrs,
160 bootstrap_addresses.len()
161 );
162 break;
163 }
164 }
165 Err(e) => {
166 warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
167 }
168 }
169 }
170
171 info!(
172 "Successfully discovered {} total addresses. First few: {:?}",
173 bootstrap_addresses.len(),
174 bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
175 );
176 Ok(bootstrap_addresses)
177 }
178
179 async fn fetch_from_endpoint(
181 request_client: Client,
182 endpoint: &Url,
183 ignore_peer_id: bool,
184 ) -> Result<Vec<Multiaddr>> {
185 info!("Fetching peers from endpoint: {endpoint}");
186 let mut retries = 0;
187
188 let bootstrap_addresses = loop {
189 let response = request_client.get(endpoint.clone()).send().await;
190
191 match response {
192 Ok(response) => {
193 if response.status().is_success() {
194 let text = response.text().await?;
195
196 match Self::try_parse_response(&text, ignore_peer_id) {
197 Ok(addrs) => break addrs,
198 Err(err) => {
199 warn!("Failed to parse response with err: {err:?}");
200 retries += 1;
201 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
202 return Err(Error::FailedToObtainAddrsFromUrl(
203 endpoint.to_string(),
204 MAX_RETRIES_ON_FETCH_FAILURE,
205 ));
206 }
207 }
208 }
209 } else {
210 retries += 1;
211 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
212 return Err(Error::FailedToObtainAddrsFromUrl(
213 endpoint.to_string(),
214 MAX_RETRIES_ON_FETCH_FAILURE,
215 ));
216 }
217 }
218 }
219 Err(err) => {
220 error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
221 retries += 1;
222 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
223 return Err(Error::FailedToObtainAddrsFromUrl(
224 endpoint.to_string(),
225 MAX_RETRIES_ON_FETCH_FAILURE,
226 ));
227 }
228 }
229 }
230 trace!(
231 "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
232 );
233
234 tokio::time::sleep(Duration::from_secs(1)).await;
235 };
236
237 Ok(bootstrap_addresses)
238 }
239
240 fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
242 match serde_json::from_str::<CacheData>(response) {
243 Ok(json_endpoints) => {
244 info!(
245 "Successfully parsed JSON response with {} peers",
246 json_endpoints.peers.len()
247 );
248 let our_network_version = crate::get_network_version();
249
250 if json_endpoints.network_version != our_network_version {
251 warn!(
252 "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.", json_endpoints.network_version
253 );
254 return Ok(vec![]);
255 }
256 let bootstrap_addresses = json_endpoints
257 .peers
258 .into_iter()
259 .filter_map(|(_, addresses)| {
260 addresses.get_least_faulty().map(|addr| addr.addr.clone())
261 })
262 .collect::<Vec<_>>();
263
264 info!(
265 "Successfully parsed {} valid peers from JSON",
266 bootstrap_addresses.len()
267 );
268 Ok(bootstrap_addresses)
269 }
270 Err(_err) => {
271 info!("Attempting to parse response as plain text");
272 let bootstrap_addresses = response
275 .split('\n')
276 .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
277 .collect::<Vec<_>>();
278
279 info!(
280 "Successfully parsed {} valid bootstrap addrs from plain text",
281 bootstrap_addresses.len()
282 );
283 Ok(bootstrap_addresses)
284 }
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use libp2p::Multiaddr;
293 use wiremock::{
294 matchers::{method, path},
295 Mock, MockServer, ResponseTemplate,
296 };
297
298 #[tokio::test]
299 async fn test_fetch_addrs() {
300 let mock_server = MockServer::start().await;
301
302 Mock::given(method("GET"))
303 .and(path("/"))
304 .respond_with(
305 ResponseTemplate::new(200)
306 .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
307 )
308 .mount(&mock_server)
309 .await;
310
311 let mut fetcher = ContactsFetcher::new().unwrap();
312 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
313
314 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
315 assert_eq!(addrs.len(), 2);
316
317 let addr1: Multiaddr =
318 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
319 .parse()
320 .unwrap();
321 let addr2: Multiaddr =
322 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
323 .parse()
324 .unwrap();
325 assert!(addrs.iter().any(|p| p.addr == addr1));
326 assert!(addrs.iter().any(|p| p.addr == addr2));
327 }
328
329 #[tokio::test]
330 async fn test_endpoint_failover() {
331 let mock_server1 = MockServer::start().await;
332 let mock_server2 = MockServer::start().await;
333
334 Mock::given(method("GET"))
336 .and(path("/"))
337 .respond_with(ResponseTemplate::new(500))
338 .mount(&mock_server1)
339 .await;
340
341 Mock::given(method("GET"))
343 .and(path("/"))
344 .respond_with(ResponseTemplate::new(200).set_body_string(
345 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
346 ))
347 .mount(&mock_server2)
348 .await;
349
350 let mut fetcher = ContactsFetcher::new().unwrap();
351 fetcher.endpoints = vec![
352 mock_server1.uri().parse().unwrap(),
353 mock_server2.uri().parse().unwrap(),
354 ];
355
356 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
357 assert_eq!(addrs.len(), 1);
358
359 let addr: Multiaddr =
360 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
361 .parse()
362 .unwrap();
363 assert_eq!(addrs[0].addr, addr);
364 }
365
366 #[tokio::test]
367 async fn test_invalid_multiaddr() {
368 let mock_server = MockServer::start().await;
369
370 Mock::given(method("GET"))
371 .and(path("/"))
372 .respond_with(
373 ResponseTemplate::new(200).set_body_string(
374 "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
375 ),
376 )
377 .mount(&mock_server)
378 .await;
379
380 let mut fetcher = ContactsFetcher::new().unwrap();
381 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
382
383 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
384 let valid_addr: Multiaddr =
385 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
386 .parse()
387 .unwrap();
388 assert_eq!(addrs[0].addr, valid_addr);
389 }
390
391 #[tokio::test]
392 async fn test_whitespace_and_empty_lines() {
393 let mock_server = MockServer::start().await;
394
395 Mock::given(method("GET"))
396 .and(path("/"))
397 .respond_with(
398 ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n \n"),
399 )
400 .mount(&mock_server)
401 .await;
402
403 let mut fetcher = ContactsFetcher::new().unwrap();
404 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
405
406 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
407 assert_eq!(addrs.len(), 1);
408
409 let addr: Multiaddr =
410 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
411 .parse()
412 .unwrap();
413 assert_eq!(addrs[0].addr, addr);
414 }
415
416 #[tokio::test]
417 async fn test_custom_endpoints() {
418 let endpoints = vec!["http://example.com".parse().unwrap()];
419 let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
420 assert_eq!(fetcher.endpoints, endpoints);
421 }
422}