use crate::{
Error, Result, cache_store::CACHE_DATA_VERSION_LATEST, craft_valid_multiaddr_from_str,
};
use futures::stream::{self, StreamExt};
use libp2p::Multiaddr;
use reqwest::Client;
use std::time::Duration;
use url::Url;
const CONTACTS_CACHE_VERSION_HEADER: &str = "Cache-Version";
pub const MAINNET_CONTACTS: &[&str] = &[
"https://sn-testnet.s3.eu-west-2.amazonaws.com/network-contacts",
"http://159.89.251.80/bootstrap_cache.json",
"http://159.65.210.89/bootstrap_cache.json",
"http://159.223.246.45/bootstrap_cache.json",
"http://139.59.201.153/bootstrap_cache.json",
"http://139.59.200.27/bootstrap_cache.json",
];
pub const ALPHANET_CONTACTS: &[&str] = &[
"http://188.166.133.208/bootstrap_cache.json",
"http://188.166.133.125/bootstrap_cache.json",
"http://178.128.137.64/bootstrap_cache.json",
"http://159.223.242.7/bootstrap_cache.json",
"http://143.244.197.147/bootstrap_cache.json",
];
const FETCH_TIMEOUT_SECS: u64 = 30;
const MAX_CONCURRENT_FETCHES: usize = 3;
const MAX_RETRIES_ON_FETCH_FAILURE: usize = 2;
pub struct ContactsFetcher {
max_addrs: usize,
endpoints: Vec<Url>,
request_client: Client,
ignore_peer_id: bool,
}
impl ContactsFetcher {
pub fn new() -> Result<Self> {
Self::with_endpoints(vec![])
}
pub fn with_endpoints(endpoints: Vec<Url>) -> Result<Self> {
let request_client = Client::builder()
.timeout(Duration::from_secs(FETCH_TIMEOUT_SECS))
.build()?;
Ok(Self {
max_addrs: usize::MAX,
endpoints,
request_client,
ignore_peer_id: false,
})
}
pub fn set_max_addrs(&mut self, max_addrs: usize) {
self.max_addrs = max_addrs;
}
pub fn with_mainnet_endpoints() -> Result<Self> {
let mut fetcher = Self::new()?;
#[allow(clippy::expect_used)]
let mainnet_contact = MAINNET_CONTACTS
.iter()
.map(|url| url.parse().expect("Failed to parse static URL"))
.collect();
fetcher.endpoints = mainnet_contact;
Ok(fetcher)
}
pub fn with_alphanet_endpoints() -> Result<Self> {
let mut fetcher = Self::new()?;
#[allow(clippy::expect_used)]
let alphanet_contact = ALPHANET_CONTACTS
.iter()
.map(|url| url.parse().expect("Failed to parse static URL"))
.collect();
fetcher.endpoints = alphanet_contact;
Ok(fetcher)
}
pub fn insert_endpoint(&mut self, endpoint: Url) {
self.endpoints.push(endpoint);
}
pub fn ignore_peer_id(&mut self, ignore_peer_id: bool) {
self.ignore_peer_id = ignore_peer_id;
}
pub async fn fetch_bootstrap_addresses(&self) -> Result<Vec<Multiaddr>> {
info!(
"Starting peer fetcher from {} endpoints: {:?}",
self.endpoints.len(),
self.endpoints
);
let mut bootstrap_addresses = Vec::new();
let mut fetches = stream::iter(self.endpoints.clone())
.map(|endpoint| async move {
info!(
"Attempting to fetch bootstrap addresses from endpoint: {}",
endpoint
);
(
Self::fetch_from_endpoint(
self.request_client.clone(),
&endpoint,
self.ignore_peer_id,
)
.await,
endpoint,
)
})
.buffer_unordered(MAX_CONCURRENT_FETCHES);
while let Some((result, endpoint)) = fetches.next().await {
match result {
Ok(mut endpoing_bootstrap_addresses) => {
info!(
"Successfully fetched {} bootstrap addrs from {}. First few addrs: {:?}",
endpoing_bootstrap_addresses.len(),
endpoint,
endpoing_bootstrap_addresses
.iter()
.take(3)
.collect::<Vec<_>>()
);
bootstrap_addresses.append(&mut endpoing_bootstrap_addresses);
if bootstrap_addresses.len() >= self.max_addrs {
info!(
"Fetched enough bootstrap addresses. Stopping. needed: {} Total fetched: {}",
self.max_addrs,
bootstrap_addresses.len()
);
break;
}
}
Err(e) => {
warn!("Failed to fetch bootstrap addrs from {}: {}", endpoint, e);
}
}
}
bootstrap_addresses.truncate(self.max_addrs);
info!(
"Successfully discovered {} total addresses. First few: {:?}",
bootstrap_addresses.len(),
bootstrap_addresses.iter().take(3).collect::<Vec<_>>()
);
Ok(bootstrap_addresses)
}
async fn fetch_from_endpoint(
request_client: Client,
endpoint: &Url,
ignore_peer_id: bool,
) -> Result<Vec<Multiaddr>> {
let mut retries = 0;
let bootstrap_addresses = loop {
let response = request_client
.get(endpoint.clone())
.header(CONTACTS_CACHE_VERSION_HEADER, CACHE_DATA_VERSION_LATEST)
.send()
.await;
match response {
Ok(response) => {
if response.status().is_success() {
let text = response.text().await?;
match Self::try_parse_response(&text, ignore_peer_id) {
Ok(addrs) => break addrs,
Err(err) => {
warn!("Failed to parse response with err: {err:?}");
retries += 1;
if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
return Err(Error::FailedToObtainAddrsFromUrl(
endpoint.to_string(),
MAX_RETRIES_ON_FETCH_FAILURE,
));
}
}
}
} else {
retries += 1;
if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
return Err(Error::FailedToObtainAddrsFromUrl(
endpoint.to_string(),
MAX_RETRIES_ON_FETCH_FAILURE,
));
}
}
}
Err(err) => {
error!("Failed to get bootstrap addrs from URL {endpoint}: {err:?}");
retries += 1;
if retries >= MAX_RETRIES_ON_FETCH_FAILURE {
return Err(Error::FailedToObtainAddrsFromUrl(
endpoint.to_string(),
MAX_RETRIES_ON_FETCH_FAILURE,
));
}
}
}
debug!(
"Failed to get bootstrap addrs from URL, retrying {retries}/{MAX_RETRIES_ON_FETCH_FAILURE}"
);
tokio::time::sleep(Duration::from_millis(300)).await;
};
Ok(bootstrap_addresses)
}
fn try_parse_response(response: &str, ignore_peer_id: bool) -> Result<Vec<Multiaddr>> {
let cache_data = if let Ok(data) =
serde_json::from_str::<super::cache_store::cache_data_v1::CacheData>(response)
{
Some(data)
} else if let Ok(data) =
serde_json::from_str::<super::cache_store::cache_data_v0::CacheData>(response)
{
Some(data.into())
} else {
None
};
match cache_data {
Some(cache_data) => {
info!(
"Successfully parsed JSON response with {} peers",
cache_data.peers.len()
);
let our_network_version = crate::get_network_version();
if cache_data.network_version != our_network_version {
warn!(
"Network version mismatch. Expected: {our_network_version}, got: {}. Skipping.",
cache_data.network_version
);
return Ok(vec![]);
}
let bootstrap_addresses = cache_data.get_all_addrs().cloned().collect::<Vec<_>>();
info!(
"Successfully parsed {} valid peers from JSON",
bootstrap_addresses.len()
);
Ok(bootstrap_addresses)
}
None => {
info!("Attempting to parse response as plain text");
let bootstrap_addresses = response
.split('\n')
.filter_map(|str| craft_valid_multiaddr_from_str(str, ignore_peer_id))
.collect::<Vec<_>>();
if bootstrap_addresses.is_empty() {
warn!("Failed to parse response as plain text");
return Err(Error::FailedToParseCacheData);
}
info!(
"Successfully parsed {} valid bootstrap addrs from plain text",
bootstrap_addresses.len()
);
Ok(bootstrap_addresses)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache_store::{cache_data_v0, cache_data_v1};
use libp2p::{Multiaddr, PeerId};
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::SystemTime,
};
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
#[tokio::test]
async fn test_network_contacts_formats() {
let mock_server = MockServer::start().await;
let peer_id = PeerId::random();
let addr: Multiaddr = "/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
.parse()
.unwrap();
let mut v0 = cache_data_v0::CacheData {
peers: Default::default(),
last_updated: SystemTime::now(),
network_version: crate::get_network_version(),
};
v0.peers.insert(
peer_id,
cache_data_v0::BootstrapAddresses(vec![cache_data_v0::BootstrapAddr {
addr: addr.clone(),
success_count: 1,
failure_count: 0,
last_seen: SystemTime::now(),
}]),
);
let v0_json = serde_json::to_string(&v0).unwrap();
let mut v1 = cache_data_v1::CacheData::default();
v1.add_peer(peer_id, [addr.clone()].iter(), 10, 10);
let v1_json = serde_json::to_string(&v1).unwrap();
Mock::given(method("GET"))
.and(path("/v0"))
.respond_with(ResponseTemplate::new(200).set_body_string(v0_json))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/v1"))
.respond_with(ResponseTemplate::new(200).set_body_string(v1_json))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/text"))
.respond_with(ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n",
))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/malformed"))
.respond_with(ResponseTemplate::new(200).set_body_string("this is not valid data"))
.mount(&mock_server)
.await;
for endpoint in ["/v0", "/v1", "/text"] {
let url = format!("{}{}", mock_server.uri(), endpoint)
.parse()
.unwrap();
let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert!(!addrs.is_empty(), "should fetch addresses from {endpoint}");
assert_eq!(addrs[0].to_string(), addr.to_string());
}
let malformed = format!("{}/malformed", mock_server.uri()).parse().unwrap();
let fetcher = ContactsFetcher::with_endpoints(vec![malformed]).unwrap();
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert!(
addrs.is_empty(),
"malformed responses should return empty list"
);
}
#[tokio::test]
async fn test_network_contacts_retries() {
let mock_server = MockServer::start().await;
let counter = AtomicUsize::new(0);
Mock::given(method("GET"))
.and(path("/retry"))
.respond_with(move |_: &wiremock::Request| {
let count = counter.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(500)
} else {
ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
)
}
})
.mount(&mock_server)
.await;
let url = format!("{}/retry", mock_server.uri()).parse().unwrap();
let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert_eq!(addrs.len(), 1);
}
#[tokio::test]
async fn test_fetch_addrs() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"),
)
.mount(&mock_server)
.await;
let mut fetcher = ContactsFetcher::new().unwrap();
fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert_eq!(addrs.len(), 2);
let addr1: Multiaddr =
"/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE"
.parse()
.unwrap();
let addr2: Multiaddr =
"/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
.parse()
.unwrap();
assert!(addrs.iter().any(|p| p == &addr1));
assert!(addrs.iter().any(|p| p == &addr2));
}
#[tokio::test]
async fn test_endpoint_failover() {
let mock_server1 = MockServer::start().await;
let mock_server2 = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(500))
.mount(&mock_server1)
.await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
))
.mount(&mock_server2)
.await;
let mut fetcher = ContactsFetcher::new().unwrap();
fetcher.endpoints = vec![
mock_server1.uri().parse().unwrap(),
mock_server2.uri().parse().unwrap(),
];
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert_eq!(addrs.len(), 1);
let addr: Multiaddr =
"/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
.parse()
.unwrap();
assert_eq!(addrs[0], addr);
}
#[tokio::test]
async fn test_network_failure_recovery() {
let bad_url: Url = "http://does-not-exist.example.invalid".parse().unwrap();
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/valid"))
.respond_with(ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE",
))
.mount(&mock_server)
.await;
let valid_url = format!("{}/valid", mock_server.uri()).parse().unwrap();
let failing = ContactsFetcher::with_endpoints(vec![bad_url.clone()]).unwrap();
let empty = failing.fetch_bootstrap_addresses().await.unwrap();
assert!(
empty.is_empty(),
"all failing endpoints should yield empty set"
);
let mixed = ContactsFetcher::with_endpoints(vec![bad_url, valid_url]).unwrap();
let addrs = mixed.fetch_bootstrap_addresses().await.unwrap();
assert!(
!addrs.is_empty(),
"mix of failing and working endpoints should return addresses"
);
}
#[tokio::test]
async fn test_invalid_multiaddr() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(
ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/tcp/8080\n/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5",
),
)
.mount(&mock_server)
.await;
let mut fetcher = ContactsFetcher::new().unwrap();
fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
let valid_addr: Multiaddr =
"/ip4/127.0.0.2/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
.parse()
.unwrap();
assert_eq!(addrs[0], valid_addr);
}
#[tokio::test]
async fn test_empty_response_handling() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/empty"))
.respond_with(ResponseTemplate::new(200).set_body_string(""))
.mount(&mock_server)
.await;
let url = format!("{}/empty", mock_server.uri()).parse().unwrap();
let fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert!(
addrs.is_empty(),
"empty HTTP response should produce no addresses"
);
}
#[tokio::test]
async fn test_whitespace_and_empty_lines() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(
ResponseTemplate::new(200).set_body_string("\n \n/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n \n"),
)
.mount(&mock_server)
.await;
let mut fetcher = ContactsFetcher::new().unwrap();
fetcher.endpoints = vec![mock_server.uri().parse().unwrap()];
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert_eq!(addrs.len(), 1);
let addr: Multiaddr =
"/ip4/127.0.0.1/tcp/8080/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5"
.parse()
.unwrap();
assert_eq!(addrs[0], addr);
}
#[tokio::test]
async fn test_fetch_max_addresses() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/multiple"))
.respond_with(ResponseTemplate::new(200).set_body_string(
"/ip4/127.0.0.1/udp/8080/quic-v1/p2p/12D3KooWRBhwfeP2Y4TCx1SM6s9rUoHhR5STiGwxBhgFRcw3UERE\n\
/ip4/127.0.0.2/udp/8081/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5\n\
/ip4/127.0.0.3/udp/8082/quic-v1/p2p/12D3KooWCKCeqLPSgMnDjyFsJuWqREDtKNHx1JEBiwxME7Zdw68n",
))
.mount(&mock_server)
.await;
let url = format!("{}/multiple", mock_server.uri()).parse().unwrap();
let mut fetcher = ContactsFetcher::with_endpoints(vec![url]).unwrap();
fetcher.set_max_addrs(2);
let addrs = fetcher.fetch_bootstrap_addresses().await.unwrap();
assert_eq!(addrs.len(), 2, "max_addrs should limit returned addresses");
}
#[tokio::test]
async fn test_custom_endpoints() {
let endpoints = vec!["http://example.com".parse().unwrap()];
let fetcher = ContactsFetcher::with_endpoints(endpoints.clone()).unwrap();
assert_eq!(fetcher.endpoints, endpoints);
}
}