use crate::Client;
use crate::networking::version::PackageVersion;
use crate::networking::{NetworkError, PeerQuoteWithStorageProof};
use crate::utils::process_tasks_with_max_concurrency;
use ant_protocol::NetworkAddress;
use ant_protocol::storage::DataTypes;
use libp2p::PeerId;
use libp2p::kad::{PeerInfo, Quorum, Record};
use std::collections::HashSet;
impl Client {
pub async fn get_closest_to_address(
&self,
network_address: impl Into<NetworkAddress>,
count: Option<usize>,
) -> Result<Vec<PeerInfo>, NetworkError> {
self.network
.get_closest_peers_with_retries(network_address.into(), count)
.await
}
pub async fn get_record_from_peer(
&self,
network_address: impl Into<NetworkAddress>,
peer: PeerInfo,
) -> Result<Option<Record>, NetworkError> {
self.network
.get_record_from_peer(network_address.into(), peer)
.await
}
pub async fn get_record_and_holders(
&self,
network_address: impl Into<NetworkAddress>,
quorum: Quorum,
) -> Result<(Option<Record>, Vec<PeerId>), NetworkError> {
self.network
.get_record_and_holders(network_address.into(), quorum)
.await
}
pub async fn get_storage_proofs_from_peer(
&self,
network_address: impl Into<NetworkAddress>,
peer: PeerInfo,
nonce: u64,
difficulty: usize,
data_type: DataTypes,
data_size: usize,
) -> Result<PeerQuoteWithStorageProof, NetworkError> {
self.network
.get_storage_proofs_from_peer(
network_address.into(),
peer,
nonce,
difficulty,
data_type,
data_size,
)
.await
}
pub async fn get_node_version(&self, peer: PeerInfo) -> Result<PackageVersion, String> {
self.network.get_node_version(peer).await
}
pub async fn check_records_exist_batch(
&self,
addresses: &[NetworkAddress],
batch_size: usize,
) -> HashSet<NetworkAddress> {
if addresses.is_empty() {
return HashSet::new();
}
let total = addresses.len();
let mut existing: HashSet<NetworkAddress> = HashSet::new();
let mut checked = 0;
for batch in addresses.chunks(batch_size) {
let tasks: Vec<_> = batch
.iter()
.map(|addr| {
let network = self.network.clone();
async move {
match network.get_record(addr.clone(), Quorum::One).await {
Ok(Some(_)) => Some(addr),
Ok(None) => None,
Err(NetworkError::SplitRecord { .. }) => Some(addr),
Err(_) => None,
}
}
})
.collect();
let results = process_tasks_with_max_concurrency(tasks, batch_size).await;
existing.extend(results.into_iter().flatten().cloned());
checked += batch.len();
crate::loud_info!("Checked {checked}/{total} chunks for existence...");
}
existing
}
}