use crate::access::{cached_merkle_payments, cached_payments};
use ant_protocol::NetworkAddress;
use autonomi::PublicKey;
use autonomi::client::data_types::chunk::ChunkAddress;
use autonomi::client::data_types::graph::GraphEntryAddress;
use autonomi::client::payment::Receipt;
use autonomi::client::{Amount, ClientEvent, UploadSummary};
use autonomi::networking::PeerId;
use color_eyre::Result;
pub fn parse_network_address(addr: &str) -> Result<NetworkAddress> {
let hex_str = addr.strip_prefix("0x").unwrap_or(addr);
if let Ok(chunk_addr) = ChunkAddress::from_hex(addr) {
return Ok(NetworkAddress::from(chunk_addr));
}
if let Ok(public_key) = PublicKey::from_hex(hex_str) {
return Ok(NetworkAddress::from(GraphEntryAddress::new(public_key)));
}
if let Some(start) = addr.find('"')
&& let Some(end) = addr[start + 1..].find('"')
{
let extracted_hex = &addr[start + 1..start + 1 + end];
if let Ok(chunk_addr) = ChunkAddress::from_hex(extracted_hex) {
return Ok(NetworkAddress::from(chunk_addr));
}
}
if let Ok(bytes) = hex::decode(hex_str)
&& bytes.len() == 32
{
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
return Ok(NetworkAddress::from(xor_name::XorName(arr)));
}
if let Ok(peer_id) = addr.parse::<PeerId>() {
return Ok(NetworkAddress::from(peer_id));
}
Err(color_eyre::eyre::eyre!(
"Could not parse address. Expected ChunkAddress, PublicKey, XorName, PeerId, or NetworkAddress debug format. Got: {addr}"
))
}
pub fn collect_upload_summary(
mut event_receiver: tokio::sync::mpsc::Receiver<ClientEvent>,
file_name: Option<String>,
) -> (
tokio::task::JoinHandle<UploadSummary>,
tokio::sync::oneshot::Sender<()>,
) {
let (upload_completed_tx, mut upload_completed_rx) = tokio::sync::oneshot::channel::<()>();
let stats_thread = tokio::spawn(async move {
let mut tokens_spent: Amount = Amount::from(0);
let mut record_count = 0;
let mut records_already_paid = 0;
let mut accumulated_regular_receipt = Receipt::new();
loop {
tokio::select! {
event = event_receiver.recv() => {
match event {
Some(ClientEvent::UploadComplete(upload_summary)) => {
tokens_spent += upload_summary.tokens_spent;
record_count += upload_summary.records_paid;
records_already_paid += upload_summary.records_already_paid;
}
Some(ClientEvent::MerkleBatchPaymentComplete(receipt)) => {
if let Some(ref file) = file_name
&& let Err(e) = cached_merkle_payments::save_merkle_payment(file, &receipt)
{
eprintln!("Warning: Failed to save Merkle payment receipt: {e}");
}
}
Some(ClientEvent::RegularBatchPaymentComplete(batch_receipt)) => {
if let Some(ref file) = file_name {
accumulated_regular_receipt.extend(batch_receipt);
if let Err(e) = cached_payments::save_regular_payment(file, &accumulated_regular_receipt) {
eprintln!("Warning: Failed to save regular payment receipt: {e}");
}
}
}
None => break,
}
}
_ = &mut upload_completed_rx => break,
}
}
while let Ok(event) = event_receiver.try_recv() {
match event {
ClientEvent::UploadComplete(upload_summary) => {
tokens_spent += upload_summary.tokens_spent;
record_count += upload_summary.records_paid;
records_already_paid += upload_summary.records_already_paid;
}
ClientEvent::MerkleBatchPaymentComplete(receipt) => {
if let Some(ref file) = file_name
&& let Err(e) = cached_merkle_payments::save_merkle_payment(file, &receipt)
{
eprintln!("Warning: Failed to save Merkle payment receipt: {e}");
}
}
ClientEvent::RegularBatchPaymentComplete(batch_receipt) => {
if let Some(ref file) = file_name {
accumulated_regular_receipt.extend(batch_receipt);
if let Err(e) = cached_payments::save_regular_payment(
file,
&accumulated_regular_receipt,
) {
eprintln!("Warning: Failed to save regular payment receipt: {e}");
}
}
}
}
}
UploadSummary {
tokens_spent,
records_paid: record_count,
records_already_paid,
}
});
(stats_thread, upload_completed_tx)
}