ant_bootstrap/
contacts.rs1use crate::{cache_store::CacheData, craft_valid_multiaddr_from_str, BootstrapAddr, Error, Result};
10use futures::stream::{self, StreamExt};
11use libp2p::Multiaddr;
12use reqwest::Client;
13use std::time::Duration;
14use url::Url;
15
16const MAINNET_CONTACTS: &[&str] = &[
17 "https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
18 "http://159.89.251.80/bootstrap_cache.json",
19 "http://159.65.210.89/bootstrap_cache.json",
20 "http://159.223.246.45/bootstrap_cache.json",
21 "http://139.59.201.153/bootstrap_cache.json",
22 "http://139.59.200.27/bootstrap_cache.json",
23];
24
25const FETCH_TIMEOUT_SECS: u64 = 30;
27const MAX_CONCURRENT_FETCHES: usize = 3;
29const MAX_RETRIES_ON_FETCH_FAILURE: usize = 3;
31
32pub struct ContactsFetcher {
34 max_addrs: usize,
36 endpoints: Vec<Url>,
38 request_client: Client,
40 ignore_peer_id: bool,
42}
43
44impl ContactsFetcher {
45 pub fn new() -> Result<Self> {
47 Self::with_endpoints(vec![])
48 }
49
50 pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
52 let request_client = Client::builder()
53 .timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
54 .build()?;
55
56 Ok(Self {
57 max_addrs: usize::MAX,
58 endpoints,
59 request_client,
60 ignore_peer_id: false,
61 })
62 }
63
64 pub fn set_max_addrs(&mut self, max_addrs: usize) {
66 self.max_addrs = max_addrs;
67 }
68
69 pub fn with_mainnet_endpoints() -> Result<Self> {
71 let mut fetcher = Self::new()?;
72 let mainnet_contact = MAINNET_CONTACTS
73 .iter()
74 .map(|url| url.parse().expect("Failed to parse static URL"))
75 .collect();
76 fetcher.endpoints = mainnet_contact;
77 Ok(fetcher)
78 }
79
80 pub fn insert_endpoint(&mut self, endpoint: Url) {
81 self.endpoints.push(endpoint);
82 }
83
84 pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
85 self.ignore_peer_id = ignore_peer_id;
86 }
87
88 pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<BootstrapAddr>> {
90 Ok(self
91 .fetch_addrs()
92 .await?
93 .into_iter()
94 .map(BootstrapAddr::new)
95 .collect())
96 }
97
98 pub async fn fetch_addrs(&self) -> Result<Vec<Multiaddr>> {
100 info!(
101 "Starting peer fetcher from {} endpoints: {:?}",
102 self.endpoints.len(),
103 self.endpoints
104 );
105 let mut bootstrap_addresses = Vec::new();
106
107 let mut fetches = stream::iter(self.endpoints.clone())
108 .map(|endpoint| async move {
109 info!(
110 "Attempting to fetch bootstrap addresses from endpoint: {}",
111 endpoint
112 );
113 (
114 Self::fetch_from_endpoint(
115 self.request_client.clone(),
116 &endpoint,
117 self.ignore_peer_id,
118 )
119 .await,
120 endpoint,
121 )
122 })
123 .buffer_unordered(MAX_CONCURRENT_FETCHES);
124
125 while let Some((result, endpoint)) = fetches.next().await {
126 match result {
127 Ok(mut endpoing_bootstrap_addresses) => {
128 info!(
129 "Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
130 endpoing_bootstrap_addresses.len(),
131 endpoint,
132 endpoing_bootstrap_addresses
133 .iter()
134 .take(3)
135 .collect::<Vec<_>>()
136 );
137 bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
138 if bootstrap_addresses.len() >= self.max_addrs {
139 info!(
140 "Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
141 self.max_addrs,
142 bootstrap_addresses.len()
143 );
144 break;
145 }
146 }
147 Err(e) => {
148 warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
149 }
150 }
151 }
152
153 info!(
154 "Successfully discovered {} total addresses. First few: {:?}",
155 bootstrap_addresses.len(),
156 bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
157 );
158 Ok(bootstrap_addresses)
159 }
160
161 async fn fetch_from_endpoint(
163 request_client: Client,
164 endpoint: &Url,
165 ignore_peer_id: bool,
166 ) -> Result<Vec<Multiaddr>> {
167 info!("Fetching peers from endpoint: {endpoint}");
168 let mut retries = 0;
169
170 let bootstrap_addresses = loop {
171 let response = request_client.get(endpoint.clone()).send().await;
172
173 match response {
174 Ok(response) => {
175 if response.status().is_success() {
176 let text = response.text().await?;
177
178 match Self::try_parse_response(&text, ignore_peer_id) {
179 Ok(addrs) => break addrs,
180 Err(err) => {
181 warn!("Failed to parse response with err: {err:?}");
182 retries += 1;
183 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
184 return Err(Error::FailedToObtainAddrsFromUrl(
185 endpoint.to_string(),
186 MAX_RETRIES_ON_FETCH_FAILURE,
187 ));
188 }
189 }
190 }
191 } else {
192 retries += 1;
193 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
194 return Err(Error::FailedToObtainAddrsFromUrl(
195 endpoint.to_string(),
196 MAX_RETRIES_ON_FETCH_FAILURE,
197 ));
198 }
199 }
200 }
201 Err(err) => {
202 error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
203 retries += 1;
204 if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
205 return Err(Error::FailedToObtainAddrsFromUrl(
206 endpoint.to_string(),
207 MAX_RETRIES_ON_FETCH_FAILURE,
208 ));
209 }
210 }
211 }
212 trace!(
213 "Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
214 );
215
216 tokio::time::sleep(Duration::from_secs(1)).await;
217 };
218
219 Ok(bootstrap_addresses)
220 }
221
222 fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
224 match serde_json::from_str::<CacheData>(response) {
225 Ok(json_endpoints) => {
226 info!(
227 "Successfully parsed JSON response with {} peers",
228 json_endpoints.peers.len()
229 );
230 let our_network_version = crate::get_network_version();
231
232 if json_endpoints.network_version != our_network_version {
233 warn!(
234 "Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.", json_endpoints.network_version
235 );
236 return Ok(vec![]);
237 }
238 let bootstrap_addresses = json_endpoints
239 .peers
240 .into_iter()
241 .filter_map(|(_, addresses)| {
242 addresses.get_least_faulty().map(|addr| addr.addr.clone())
243 })
244 .collect::<Vec<_>>();
245
246 info!(
247 "Successfully parsed {} valid peers from JSON",
248 bootstrap_addresses.len()
249 );
250 Ok(bootstrap_addresses)
251 }
252 Err(_err) => {
253 info!("Attempting to parse response as plain text");
254 let bootstrap_addresses = response
257 .split('\n')
258 .filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
259 .collect::<Vec<_>>();
260
261 info!(
262 "Successfully parsed {} valid bootstrap addrs from plain text",
263 bootstrap_addresses.len()
264 );
265 Ok(bootstrap_addresses)
266 }
267 }
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use libp2p::Multiaddr;
275 use wiremock::{
276 matchers::{method, path},
277 Mock, MockServer, ResponseTemplate,
278 };
279
280 #[tokio::test]
281 async fn test_fetch_addrs() {
282 let mock_server = MockServer::start().await;
283
284 Mock::given(method("GET"))
285 .and(path("/"))
286 .respond_with(
287 ResponseTemplate::new(200)
288 .set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
289 )
290 .mount(&mock_server)
291 .await;
292
293 let mut fetcher = ContactsFetcher::new().unwrap();
294 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
295
296 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
297 assert_eq!(addrs.len(), 2);
298
299 let addr1: Multiaddr =
300 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
301 .parse()
302 .unwrap();
303 let addr2: Multiaddr =
304 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
305 .parse()
306 .unwrap();
307 assert!(addrs.iter().any(|p| p.addr == addr1));
308 assert!(addrs.iter().any(|p| p.addr == addr2));
309 }
310
311 #[tokio::test]
312 async fn test_endpoint_failover() {
313 let mock_server1 = MockServer::start().await;
314 let mock_server2 = MockServer::start().await;
315
316 Mock::given(method("GET"))
318 .and(path("/"))
319 .respond_with(ResponseTemplate::new(500))
320 .mount(&mock_server1)
321 .await;
322
323 Mock::given(method("GET"))
325 .and(path("/"))
326 .respond_with(ResponseTemplate::new(200).set_body_string(
327 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
328 ))
329 .mount(&mock_server2)
330 .await;
331
332 let mut fetcher = ContactsFetcher::new().unwrap();
333 fetcher.endpoints = vec![
334 mock_server1.uri().parse().unwrap(),
335 mock_server2.uri().parse().unwrap(),
336 ];
337
338 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
339 assert_eq!(addrs.len(), 1);
340
341 let addr: Multiaddr =
342 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
343 .parse()
344 .unwrap();
345 assert_eq!(addrs[0].addr, addr);
346 }
347
348 #[tokio::test]
349 async fn test_invalid_multiaddr() {
350 let mock_server = MockServer::start().await;
351
352 Mock::given(method("GET"))
353 .and(path("/"))
354 .respond_with(
355 ResponseTemplate::new(200).set_body_string(
356 "/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
357 ),
358 )
359 .mount(&mock_server)
360 .await;
361
362 let mut fetcher = ContactsFetcher::new().unwrap();
363 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
364
365 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
366 let valid_addr: Multiaddr =
367 "/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
368 .parse()
369 .unwrap();
370 assert_eq!(addrs[0].addr, valid_addr);
371 }
372
373 #[tokio::test]
374 async fn test_whitespace_and_empty_lines() {
375 let mock_server = MockServer::start().await;
376
377 Mock::given(method("GET"))
378 .and(path("/"))
379 .respond_with(
380 ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n \n"),
381 )
382 .mount(&mock_server)
383 .await;
384
385 let mut fetcher = ContactsFetcher::new().unwrap();
386 fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
387
388 let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
389 assert_eq!(addrs.len(), 1);
390
391 let addr: Multiaddr =
392 "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
393 .parse()
394 .unwrap();
395 assert_eq!(addrs[0].addr, addr);
396 }
397
398 #[tokio::test]
399 async fn test_custom_endpoints() {
400 let endpoints = vec!["http://example.com".parse().unwrap()];
401 let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
402 assert_eq!(fetcher.endpoints, endpoints);
403 }
404}