use crate::Error;
use super::{error::Result, Client};
use futures::{future::join_all, TryFutureExt};
use sn_protocol::NetworkAddress;
use sn_transfers::{
CashNote, LocalWallet, MainPubkey, NanoTokens, SignedSpend, Transfer, UniquePubkey,
WalletError, WalletResult,
};
use std::{
collections::{BTreeMap, BTreeSet},
iter::Iterator,
time::Duration,
};
use tokio::{task::JoinSet, time::sleep};
use xor_name::XorName;
pub struct WalletClient {
client: Client,
wallet: LocalWallet,
}
impl WalletClient {
pub fn new(client: Client, wallet: LocalWallet) -> Self {
Self { client, wallet }
}
pub fn store_local_wallet(&mut self) -> WalletResult<()> {
self.wallet.deposit_and_store_to_disk(&vec![])
}
pub fn balance(&self) -> NanoTokens {
self.wallet.balance()
}
pub fn unconfirmed_spend_requests_exist(&self) -> bool {
self.wallet.unconfirmed_spend_requests_exist()
}
pub fn unconfirmed_spend_requests(&self) -> &BTreeSet<SignedSpend> {
self.wallet.unconfirmed_spend_requests()
}
pub fn get_payment_transfers(&self, address: &NetworkAddress) -> WalletResult<Vec<Transfer>> {
match &address.as_xorname() {
Some(xorname) => {
let cash_notes = self.wallet.get_payment_cash_notes(xorname);
info!(
"Payment cash notes retrieved from wallet: {:?}",
cash_notes.len()
);
Ok(Transfer::transfers_from_cash_notes(cash_notes)?)
}
None => Err(WalletError::InvalidAddressType),
}
}
pub fn mark_note_as_spent(&mut self, cash_note_key: UniquePubkey) {
self.wallet.mark_note_as_spent(cash_note_key);
}
pub async fn send_cash_note(
&mut self,
amount: NanoTokens,
to: MainPubkey,
verify_store: bool,
) -> WalletResult<CashNote> {
let created_cash_notes = self.wallet.local_send(vec![(amount, to)], None)?;
if let Err(error) = self
.client
.send(
self.wallet.unconfirmed_spend_requests().iter(),
verify_store,
)
.await
{
return Err(WalletError::CouldNotSendMoney(format!(
"The transfer was not successfully registered in the network: {error:?}"
)));
} else {
self.wallet.clear_unconfirmed_spend_requests();
}
match &created_cash_notes[..] {
[cashnote] => Ok(cashnote.clone()),
[_multiple, ..] => Err(WalletError::CouldNotSendMoney(
"Multiple CashNotes were returned from the transaction when only one was expected. This is a BUG."
.into(),
)),
[] => Err(WalletError::CouldNotSendMoney(
"No CashNotes were returned from the wallet.".into(),
)),
}
}
pub async fn get_store_cost_at_address(
&self,
address: &NetworkAddress,
) -> WalletResult<Vec<(MainPubkey, NanoTokens)>> {
self.client
.get_store_costs_at_address(address)
.await
.map_err(|error| WalletError::CouldNotSendMoney(error.to_string()))
}
pub async fn pay_for_storage(
&mut self,
content_addrs: impl Iterator<Item = NetworkAddress>,
verify_store: bool,
) -> WalletResult<NanoTokens> {
let mut total_cost = NanoTokens::zero();
let mut payment_map = BTreeMap::default();
let mut tasks = JoinSet::new();
for content_addr in content_addrs {
let client = self.client.clone();
tasks.spawn(async move {
let costs = client
.get_store_costs_at_address(&content_addr)
.await
.map_err(|error| WalletError::CouldNotSendMoney(error.to_string()));
debug!("Storecosts retrieved for {content_addr:?} {costs:?}");
(content_addr, costs)
});
}
debug!("Pending store cost tasks: {:?}", tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok((content_addr, Ok(costs))) => {
if let Some(xorname) = content_addr.as_xorname() {
let _ = payment_map.insert(xorname, costs);
debug!("Storecosts inserted into payment map for {content_addr:?}");
} else {
warn!("Cannot get store cost for a content that is not a data type: {content_addr:?}");
}
}
Ok((content_addr, Err(err))) => {
warn!("Cannot get store cost for {content_addr:?} with error {err:?}");
}
Err(e) => {
warn!("Cannot get a store cost for a content with error {e:?}");
}
}
}
info!("Storecosts retrieved");
if !payment_map.is_empty() {
self.wallet.adjust_payment_map(&mut payment_map);
let cost = self.pay_for_records(payment_map, verify_store).await?;
if let Some(cost) = total_cost.checked_add(cost) {
total_cost = cost;
}
}
Ok(total_cost)
}
pub async fn pay_for_records(
&mut self,
all_data_payments: BTreeMap<XorName, Vec<(MainPubkey, NanoTokens)>>,
verify_store: bool,
) -> WalletResult<NanoTokens> {
let mut total_cost = NanoTokens::zero();
for (_data, costs) in all_data_payments.iter() {
for (_target, cost) in costs {
if let Some(new_cost) = total_cost.checked_add(*cost) {
total_cost = new_cost;
} else {
return Err(WalletError::TotalPriceTooHigh);
}
}
}
self.wallet
.local_send_storage_payment(all_data_payments, None)?;
trace!("Sending storage payment transfer to the network");
let spend_attempt_result = self
.client
.send(
self.wallet.unconfirmed_spend_requests().iter(),
verify_store,
)
.await;
if let Err(error) = spend_attempt_result {
warn!("The storage payment transfer was not successfully registered in the network: {error:?}. It will be retried later.");
if let WalletError::DoubleSpendAttemptedForCashNotes(spent_cash_notes) = &error {
for cash_note_key in spent_cash_notes {
warn!("Removing CashNote from wallet: {cash_note_key:?}");
self.wallet.mark_note_as_spent(*cash_note_key);
self.wallet.clear_specific_spend_request(*cash_note_key);
}
}
return Err(WalletError::CouldNotSendMoney(format!(
"The storage payment transfer was not successfully registered in the network: {error:?}"
)));
} else {
info!("Spend has completed: {:?}", spend_attempt_result);
self.wallet.clear_unconfirmed_spend_requests();
}
Ok(total_cost)
}
pub async fn resend_pending_txs(&mut self, verify_store: bool) {
if self
.client
.send(
self.wallet.unconfirmed_spend_requests().iter(),
verify_store,
)
.await
.is_ok()
{
self.wallet.clear_unconfirmed_spend_requests();
}
}
pub fn into_wallet(self) -> LocalWallet {
self.wallet
}
pub fn mut_wallet(&mut self) -> &mut LocalWallet {
&mut self.wallet
}
}
impl Client {
pub async fn send(
&self,
spend_requests: impl Iterator<Item = &SignedSpend>,
verify_store: bool,
) -> WalletResult<()> {
let mut tasks = Vec::new();
for spend_request in spend_requests {
trace!(
"sending spend request to the network: {:?}: {spend_request:#?}",
spend_request.unique_pubkey()
);
let the_task = async move {
let cash_note_key = spend_request.unique_pubkey();
let result = self
.network_store_spend(spend_request.clone(), verify_store)
.await;
(cash_note_key, result)
};
tasks.push(the_task);
}
let mut spent_cash_notes = BTreeSet::default();
for (cash_note_key, spend_attempt_result) in join_all(tasks).await {
if let Err(Error::Network(sn_networking::Error::ReturnedRecordDoesNotMatch(
record_key,
))) = spend_attempt_result
{
warn!("Record mismatch on spend, removing CashNote from wallet: {record_key:?}");
spent_cash_notes.insert(*cash_note_key);
} else {
return spend_attempt_result
.map_err(|err| WalletError::CouldNotSendMoney(err.to_string()));
}
}
if spent_cash_notes.is_empty() {
Ok(())
} else {
Err(WalletError::DoubleSpendAttemptedForCashNotes(
spent_cash_notes,
))
}
}
pub async fn receive(
&self,
transfer: &Transfer,
wallet: &LocalWallet,
) -> WalletResult<Vec<CashNote>> {
let cashnotes = self
.network
.verify_and_unpack_transfer(transfer, wallet)
.map_err(|e| WalletError::CouldNotReceiveMoney(format!("{e:?}")))
.await?;
Ok(cashnotes)
}
pub async fn verify(&self, cash_note: &CashNote) -> WalletResult<()> {
let mut tasks = Vec::new();
for spend in &cash_note.signed_spends {
tasks.push(self.get_spend_from_network(spend.unique_pubkey()));
}
let mut received_spends = std::collections::BTreeSet::new();
for result in join_all(tasks).await {
let network_valid_spend =
result.map_err(|err| WalletError::CouldNotVerifyTransfer(err.to_string()))?;
let _ = received_spends.insert(network_valid_spend);
}
if received_spends == cash_note.signed_spends {
return Ok(());
}
Err(WalletError::CouldNotVerifyTransfer(
"The spends in network were not the same as the ones in the CashNote. The parents of this CashNote are probably double spends.".into(),
))
}
}
pub async fn send(
from: LocalWallet,
amount: NanoTokens,
to: MainPubkey,
client: &Client,
verify_store: bool,
) -> Result<CashNote> {
if amount.is_zero() {
return Err(Error::AmountIsZero);
}
let mut wallet_client = WalletClient::new(client.clone(), from);
let new_cash_note = wallet_client
.send_cash_note(amount, to, verify_store)
.await
.map_err(|err| {
error!("Could not send cash note, err: {err:?}");
err
})?;
let mut did_error = false;
if verify_store {
let mut attempts = 0;
while wallet_client.unconfirmed_spend_requests_exist() {
info!("Unconfirmed txs exist, sending again after 1 second...");
sleep(Duration::from_secs(1)).await;
wallet_client.resend_pending_txs(verify_store).await;
if attempts > 10 {
did_error = true;
break;
}
attempts += 1;
}
}
wallet_client
.into_wallet()
.deposit_and_store_to_disk(&vec![new_cash_note.clone()])?;
if did_error {
return Err(WalletError::UnconfirmedTxAfterRetries.into());
}
Ok(new_cash_note)
}