use ant_evm::PaymentQuote;
use ant_protocol::{NetworkAddress, PrettyPrintRecordKey};
use futures::stream::{self, StreamExt};
use super::{Network, RetryStrategy};
use super::{NetworkError, PeerInfo, Record, Strategy};
use tokio::time::sleep;
pub const FALLBACK_PEERS_COUNT: usize = 20;
impl Network {
pub async fn put_record_with_retries(
&self,
record: Record,
to: Vec<PeerInfo>,
strategy: &Strategy,
) -> Result<(), NetworkError> {
let addr = PrettyPrintRecordKey::from(&record.key).into_owned();
let mut errors = vec![];
for duration in strategy.put_retry.backoff() {
match self
.put_record(record.clone(), to.clone(), strategy.put_quorum)
.await
{
Ok(()) => return Ok(()),
Err(err) if err.cannot_retry() => {
return Err(err);
}
Err(err) => {
warn!("Put record failed at {addr}: {err:?}, retrying in {duration:?}");
errors.push(err.clone());
match duration {
Some(retry_delay) => sleep(retry_delay).await,
None => return Err(err),
}
}
}
}
Err(NetworkError::InvalidRetryStrategy)
}
pub async fn get_record_with_retries(
&self,
addr: NetworkAddress,
strategy: &Strategy,
) -> Result<Option<Vec<Record>>, NetworkError> {
let mut errors = vec![];
let quorum = strategy.get_quorum;
for duration in strategy.get_retry.backoff() {
match self.get_record(addr.clone(), quorum).await {
Ok(Some(record)) => return Ok(Some(vec![record])),
Err(err) if matches!(err, NetworkError::SplitRecord(_)) => {
return Err(err);
}
Err(err) if err.cannot_retry() => {
return Err(err);
}
Ok(None) => {
warn!("Record not found at {addr}, retrying in {duration:?}");
match duration {
Some(retry_delay) => sleep(retry_delay).await,
None => break,
}
}
Err(err) => {
warn!("Get record failed at {addr}: {err:?}, retrying in {duration:?}");
errors.push(err.clone());
match duration {
Some(retry_delay) => sleep(retry_delay).await,
None => break,
}
}
}
}
debug!("All retries exhausted for {addr} after error, trying fallback to closest peers");
self.fetch_records_from_closest_peers_fallback(addr).await
}
async fn fetch_records_from_closest_peers_fallback(
&self,
addr: NetworkAddress,
) -> Result<Option<Vec<Record>>, NetworkError> {
debug!("Querying closest {FALLBACK_PEERS_COUNT} nodes directly for {addr:?}");
let closest_peers = match self
.get_closest_peers(addr.clone(), Some(FALLBACK_PEERS_COUNT))
.await
{
Ok(peers) => peers,
Err(e) => {
error!("Failed to get closest peers for {addr:?}: {e}");
return Ok(None);
}
};
debug!(
"Querying {} closest peers in parallel for {addr:?}",
closest_peers.len()
);
let network = self.clone();
let query_futures: Vec<_> = closest_peers
.into_iter()
.map(|peer| {
let network = network.clone();
let addr = addr.clone();
async move { network.get_record_from_peer(addr, peer).await }
})
.collect();
let results: Vec<_> = stream::iter(query_futures)
.buffer_unordered(FALLBACK_PEERS_COUNT)
.collect()
.await;
let records: Vec<Record> = results
.into_iter()
.filter_map(|result| match result {
Ok(Some(record)) => Some(record),
_ => None,
})
.collect();
if records.is_empty() {
error!("❌ All closest peers failed to return records for {addr:?}");
Ok(None)
} else {
debug!(
"✅ Retrieved {} records from closest peers for {addr:?}",
records.len()
);
Ok(Some(records))
}
}
pub async fn get_quotes_with_retries(
&self,
addr: NetworkAddress,
data_type: u32,
data_size: usize,
) -> Result<Option<Vec<(PeerInfo, PaymentQuote)>>, NetworkError> {
let mut errors = vec![];
for duration in RetryStrategy::Once.backoff() {
match self.get_quotes(addr.clone(), data_type, data_size).await {
Ok(quotes) => return Ok(quotes),
Err(err) if err.cannot_retry() => {
return Err(err);
}
Err(err) => {
warn!("Get quotes failed at {addr}: {err:?}, retrying in {duration:?}");
errors.push(err.clone());
match duration {
Some(retry_delay) => sleep(retry_delay).await,
None => return Err(err),
}
}
}
}
Err(NetworkError::InvalidRetryStrategy)
}
pub async fn get_closest_peers_with_retries(
&self,
addr: NetworkAddress,
count: Option<usize>,
) -> Result<Vec<PeerInfo>, NetworkError> {
let mut errors = vec![];
for duration in RetryStrategy::Once.backoff() {
match self.get_closest_peers(addr.clone(), count).await {
Ok(peers) => return Ok(peers),
Err(err) if err.cannot_retry() => {
return Err(err);
}
Err(err) => {
warn!("Get closest peers failed at {addr}: {err:?}, retrying in {duration:?}");
errors.push(err.clone());
match duration {
Some(retry_delay) => sleep(retry_delay).await,
None => return Err(err),
}
}
}
}
Err(NetworkError::InvalidRetryStrategy)
}
}