use crate::{Client, networking::NetworkError};
use ant_evm::{
AttoTokens, EvmWallet,
merkle_payments::{
CANDIDATES_PER_POOL, MAX_LEAVES, MerklePaymentCandidateNode, MerklePaymentCandidatePool,
MerklePaymentProof, MerklePaymentVerificationError, MerkleTree, MidpointProof,
},
};
use ant_protocol::{
NetworkAddress,
storage::{ChunkAddress, DataTypes},
};
use evmlib::merkle_batch_payment::PoolCommitment;
use futures::stream::FuturesUnordered;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, info};
use xor_name::XorName;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MerklePaymentReceipt {
pub proofs: HashMap<XorName, MerklePaymentProof>,
pub file_chunk_counts: HashMap<String, usize>,
pub amount_paid: AttoTokens,
}
impl Default for MerklePaymentReceipt {
fn default() -> Self {
Self {
proofs: HashMap::new(),
file_chunk_counts: HashMap::new(),
amount_paid: AttoTokens::zero(),
}
}
}
impl MerklePaymentReceipt {
pub fn merge(&mut self, other: Self) {
self.proofs.extend(other.proofs);
self.file_chunk_counts.extend(other.file_chunk_counts);
self.amount_paid = AttoTokens::from_atto(
self.amount_paid
.as_atto()
.saturating_add(other.amount_paid.as_atto()),
);
}
}
#[derive(Debug, thiserror::Error)]
pub enum MerklePaymentError {
#[error("Network error: {0}")]
Network(#[from] NetworkError),
#[error("Merkle tree error: {0}")]
MerkleTree(#[from] ant_evm::merkle_payments::MerkleTreeError),
#[error("Not enough valid candidate responses: got {got}, needed {needed}")]
InsufficientCandidates { got: usize, needed: usize },
#[error("Failed to serialize: {0}")]
Serialization(String),
#[error("Smart contract error: {0}")]
SmartContract(String),
#[error(
"EVM wallet and client use different EVM networks. Please use the same network for both."
)]
EvmWalletNetworkMismatch,
#[error("Wallet error: {0:?}")]
EvmWalletError(#[from] ant_evm::EvmWalletError),
#[error("Failed to get timestamp: {0}")]
TimestampError(#[from] std::time::SystemTimeError),
#[error("Candidate pool verification failed: {0}")]
PoolVerification(#[from] MerklePaymentVerificationError),
}
impl Client {
async fn get_merkle_candidate_pool(
&self,
target_address: XorName,
data_type: DataTypes,
data_size: usize,
merkle_payment_timestamp: u64,
) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL], MerklePaymentError> {
const PEERS_TO_QUERY: usize = CANDIDATES_PER_POOL + (CANDIDATES_PER_POOL / 4);
let network_addr = NetworkAddress::ChunkAddress(ChunkAddress::new(target_address));
let closest_peers = self
.network
.get_closest_peers_with_retries(network_addr.clone(), Some(PEERS_TO_QUERY))
.await?;
let unique_peers: HashMap<libp2p::PeerId, libp2p::kad::PeerInfo> = closest_peers
.into_iter()
.map(|peer_info| (peer_info.peer_id, peer_info))
.collect();
if unique_peers.len() < CANDIDATES_PER_POOL {
return Err(MerklePaymentError::InsufficientCandidates {
got: unique_peers.len(),
needed: CANDIDATES_PER_POOL,
});
}
let peer_info_with_distances: Vec<_> = unique_peers
.values()
.map(|peer_info| {
let peer_addr = NetworkAddress::from(peer_info.peer_id);
let distance = network_addr.distance(&peer_addr);
(peer_info.clone(), distance)
})
.collect();
let mut tasks = FuturesUnordered::new();
for (peer_info, _distance) in &peer_info_with_distances {
let network = self.network.clone();
let network_addr = network_addr.clone();
let data_type_index = data_type.get_index();
let peer_info = peer_info.clone();
let peer_id = peer_info.peer_id;
tasks.push(async move {
let result = network
.get_merkle_candidate_quote(
network_addr,
peer_info,
data_type_index,
data_size,
merkle_payment_timestamp,
)
.await;
(peer_id, result)
});
}
let mut successful_candidates: Vec<(libp2p::PeerId, MerklePaymentCandidateNode)> =
Vec::new();
use futures::StreamExt;
while let Some((peer_id, result)) = tasks.next().await {
match result {
Ok(candidate) => {
successful_candidates.push((peer_id, candidate));
}
Err(e) => {
warn!(
"Failed to get quote from peer {peer_id:?} for target {target_address:?}: {e}"
);
}
}
}
debug!(
"Got {} successful responses out of {} queried peers for target {target_address:?}",
successful_candidates.len(),
peer_info_with_distances.len(),
);
if successful_candidates.len() < CANDIDATES_PER_POOL {
return Err(MerklePaymentError::InsufficientCandidates {
got: successful_candidates.len(),
needed: CANDIDATES_PER_POOL,
});
}
successful_candidates.sort_by_key(|(peer_id, _candidate)| {
let peer_addr = NetworkAddress::from(*peer_id);
network_addr.distance(&peer_addr)
});
let closest_successful: Vec<MerklePaymentCandidateNode> = successful_candidates
.into_iter()
.take(CANDIDATES_PER_POOL)
.map(|(_peer_id, candidate)| candidate)
.collect();
let candidates_array: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
closest_successful.try_into().map_err(|v: Vec<_>| {
MerklePaymentError::InsufficientCandidates {
got: v.len(),
needed: CANDIDATES_PER_POOL,
}
})?;
Ok(candidates_array)
}
pub(crate) async fn build_candidate_pools(
&self,
midpoint_proofs: Vec<MidpointProof>,
data_type: DataTypes,
data_size: usize,
) -> Result<Vec<MerklePaymentCandidatePool>, MerklePaymentError> {
let mut pools = Vec::new();
for midpoint_proof in midpoint_proofs {
let target = midpoint_proof.address();
let timestamp = midpoint_proof.merkle_payment_timestamp;
let candidate_nodes = self
.get_merkle_candidate_pool(target, data_type, data_size, timestamp)
.await?;
let pool = MerklePaymentCandidatePool {
midpoint_proof,
candidate_nodes,
};
pool.verify_signatures(timestamp)?;
pools.push(pool);
}
Ok(pools)
}
pub async fn pay_for_merkle_batch(
&self,
data_type: DataTypes,
content_addrs: impl Iterator<Item = XorName>,
data_size: usize,
wallet: &EvmWallet,
) -> Result<MerklePaymentReceipt, MerklePaymentError> {
if wallet.network() != self.evm_network() {
return Err(MerklePaymentError::EvmWalletNetworkMismatch);
}
let addresses: Vec<XorName> = content_addrs.collect();
let batches: Vec<Vec<XorName>> = addresses.chunks(MAX_LEAVES).map(|c| c.to_vec()).collect();
let batches_len = batches.len();
let addresses_len = addresses.len();
#[cfg(feature = "loud")]
println!("Paying for {addresses_len} addresses in {batches_len} batch(es)");
info!("Paying for {addresses_len} addresses in {batches_len} batch(es)");
let mut merged_receipt = MerklePaymentReceipt::default();
for (i, batch) in batches.into_iter().enumerate() {
#[cfg(feature = "loud")]
println!("Processing batch {}/{batches_len}", i + 1);
info!("Processing batch {}/{batches_len}", i + 1);
let receipt = self
.pay_for_single_merkle_batch(data_type, batch, data_size, wallet)
.await?;
merged_receipt.merge(receipt);
}
Ok(merged_receipt)
}
pub(crate) async fn prepare_merkle_batch(
&self,
data_type: DataTypes,
addresses: Vec<XorName>,
data_size: usize,
) -> Result<
(
MerkleTree,
Vec<MerklePaymentCandidatePool>,
Vec<PoolCommitment>,
u64,
),
MerklePaymentError,
> {
info!(
"Preparing Merkle batch for {} addresses with data_type {data_type:?}",
addresses.len()
);
let tree = MerkleTree::from_xornames(addresses)?;
let depth = tree.depth();
info!("Built Merkle tree: depth={depth}");
let merkle_payment_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
let midpoint_proofs = tree.reward_candidates(merkle_payment_timestamp)?;
info!("Generated {} midpoint proofs", midpoint_proofs.len());
let candidate_pools = self
.build_candidate_pools(midpoint_proofs, data_type, data_size)
.await?;
info!(
"Collected and validated all {} candidate pools",
candidate_pools.len()
);
let pool_commitments: Vec<PoolCommitment> = candidate_pools
.iter()
.map(|pool| pool.to_commitment())
.collect();
Ok((
tree,
candidate_pools,
pool_commitments,
merkle_payment_timestamp,
))
}
pub(crate) async fn pay_for_single_merkle_batch(
&self,
data_type: DataTypes,
addresses: Vec<XorName>,
data_size: usize,
wallet: &EvmWallet,
) -> Result<MerklePaymentReceipt, MerklePaymentError> {
let (tree, candidate_pools, pool_commitments, merkle_payment_timestamp) = self
.prepare_merkle_batch(data_type, addresses.clone(), data_size)
.await?;
let depth = tree.depth();
debug!("Waiting for wallet lock");
let lock_guard = wallet.lock().await;
debug!("Locked wallet");
let (winner_pool_hash, amount) = wallet
.pay_for_merkle_tree(depth, pool_commitments, merkle_payment_timestamp)
.await?;
let amount = AttoTokens::from_atto(amount);
drop(lock_guard);
debug!("Unlocked wallet");
info!("Payment submitted, winner pool: {winner_pool_hash:?}, amount: {amount}");
let winner_pool = candidate_pools
.into_iter()
.find(|pool| pool.hash() == winner_pool_hash)
.ok_or_else(|| {
MerklePaymentError::SmartContract(format!(
"Smart contract returned invalid pool hash: {}",
hex::encode(winner_pool_hash)
))
})?;
let mut proofs = HashMap::new();
for (i, address) in addresses.into_iter().enumerate() {
let address_proof = tree.generate_address_proof(i, address)?;
let payment_proof = MerklePaymentProof {
address,
data_proof: address_proof,
winner_pool: winner_pool.clone(),
};
proofs.insert(address, payment_proof);
}
let receipt = MerklePaymentReceipt {
proofs,
file_chunk_counts: HashMap::new(),
amount_paid: amount,
};
info!(
"Generated {} Merkle payment proofs, total amount: {amount}",
receipt.proofs.len()
);
Ok(receipt)
}
}