use super::{Client, SpendDag};
use crate::{Error, Result};
use futures::{future::join_all, StreamExt};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult};
use std::collections::BTreeSet;
impl Client {
pub async fn spend_dag_build_from(
&self,
spend_addr: SpendAddress,
max_depth: Option<u32>,
) -> WalletResult<SpendDag> {
info!("Building spend DAG from {spend_addr:?}");
let mut dag = SpendDag::new(spend_addr);
let first_spend = match self.get_spend_from_network(spend_addr).await {
Ok(s) => s,
Err(Error::Network(NetworkError::GetRecordError(GetRecordError::RecordNotFound))) => {
info!("UTXO at {spend_addr:?}");
return Ok(dag);
}
Err(Error::Network(NetworkError::DoubleSpendAttempt(s1, s2))) => {
info!("Double spend at {spend_addr:?}");
dag.insert(spend_addr, *s2);
*s1
}
Err(e) => {
warn!("Failed to get spend at {spend_addr:?}: {e}");
return Err(WalletError::FailedToGetSpend(e.to_string()));
}
};
dag.insert(spend_addr, first_spend.clone());
let mut txs_to_follow = BTreeSet::from_iter([first_spend.spend.spent_tx]);
let mut known_tx = BTreeSet::new();
let mut gen: u32 = 0;
let start = std::time::Instant::now();
while !txs_to_follow.is_empty() {
let mut next_gen_tx = BTreeSet::new();
let mut addrs = vec![];
for descendant_tx in txs_to_follow.iter() {
let descendant_tx_hash = descendant_tx.hash();
let descendant_keys = descendant_tx
.outputs
.iter()
.map(|output| output.unique_pubkey);
let addrs_to_follow = descendant_keys.map(|k| SpendAddress::from_unique_pubkey(&k));
info!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");
addrs.extend(addrs_to_follow);
}
let mut stream = futures::stream::iter(addrs.clone())
.map(|a| async move { (self.get_spend_from_network(a).await, a) })
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);
info!(
"Gen {gen} - Getting {} spends from {} txs in batches of: {}",
addrs.len(),
txs_to_follow.len(),
crate::MAX_CONCURRENT_TASKS,
);
while let Some((res, addr)) = stream.next().await {
match res {
Ok(spend) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
Err(Error::Network(NetworkError::GetRecordError(
GetRecordError::RecordNotFound,
))) => {
info!("Reached UTXO at {addr:?}");
}
Err(err) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
}
}
known_tx.extend(txs_to_follow.iter().map(|tx| tx.hash()));
txs_to_follow = next_gen_tx
.into_iter()
.filter(|tx| !known_tx.contains(&tx.hash()))
.collect();
gen += 1;
if gen >= max_depth.unwrap_or(u32::MAX) {
info!("Reached generation {gen}, stopping DAG collection from {spend_addr:?}");
break;
}
}
let elapsed = start.elapsed();
info!("Finished building SpendDAG from {spend_addr:?} in {elapsed:?}");
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
Ok(dag)
}
pub async fn spend_dag_extend_until(
&self,
dag: &mut SpendDag,
spend_addr: SpendAddress,
new_spend: SignedSpend,
) -> WalletResult<()> {
let is_new_spend = dag.insert(spend_addr, new_spend.clone());
if !is_new_spend {
return Ok(());
}
let mut txs_to_verify = BTreeSet::from_iter([new_spend.spend.parent_tx]);
let mut depth = 0;
let mut known_txs = BTreeSet::new();
let start = std::time::Instant::now();
while !txs_to_verify.is_empty() {
let mut next_gen_tx = BTreeSet::new();
for parent_tx in txs_to_verify {
let parent_tx_hash = parent_tx.hash();
let parent_keys = parent_tx.inputs.iter().map(|input| input.unique_pubkey);
let addrs_to_verify = parent_keys.map(|k| SpendAddress::from_unique_pubkey(&k));
debug!("Depth {depth} - checking parent Tx : {parent_tx_hash:?}");
let tasks: Vec<_> = addrs_to_verify
.clone()
.map(|a| self.get_spend_from_network(a))
.collect();
let spends = join_all(tasks).await
.into_iter()
.collect::<Result<BTreeSet<_>>>()
.map_err(|err| WalletError::FailedToGetSpend(format!("at depth {depth} - Failed to get spends from network for parent Tx {parent_tx_hash:?}: {err}")))?;
debug!(
"Depth {depth} - Got {:?} spends for parent Tx: {parent_tx_hash:?}",
spends.len()
);
trace!("Spends for {parent_tx_hash:?} - {spends:?}");
if parent_tx == sn_transfers::GENESIS_CASHNOTE.src_tx
&& spends
.iter()
.all(|s| s.spend.unique_pubkey == sn_transfers::GENESIS_CASHNOTE.id)
&& spends.len() == 1
{
debug!("Depth {depth} - reached genesis Tx on one branch: {parent_tx_hash:?}");
known_txs.insert(parent_tx_hash);
continue;
}
known_txs.insert(parent_tx_hash);
debug!("Depth {depth} - Verified parent Tx: {parent_tx_hash:?}");
for (spend, addr) in spends.clone().into_iter().zip(addrs_to_verify) {
let spend_parent_tx = spend.spend.parent_tx.clone();
let is_new_spend = dag.insert(addr, spend);
if is_new_spend {
next_gen_tx.insert(spend_parent_tx);
}
}
}
txs_to_verify = next_gen_tx
.into_iter()
.filter(|tx| !known_txs.contains(&tx.hash()))
.collect();
depth += 1;
let elapsed = start.elapsed();
let n = known_txs.len();
info!("Now at depth {depth} - Collected spends from {n} transactions in {elapsed:?}");
}
let elapsed = start.elapsed();
let n = known_txs.len();
info!("Collected the DAG branch all the way to known spends or genesis! Through {depth} generations, collecting spends from {n} transactions in {elapsed:?}");
info!("Now verifying SpendDAG extended at {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG extended from {spend_addr:?} in {elapsed:?}");
Ok(())
}
pub async fn spend_dag_continue_from_utxos(
&self,
dag: &mut SpendDag,
max_depth: Option<u32>,
) -> WalletResult<()> {
info!("Gathering spend DAG from utxos...");
let utxos = dag.get_utxos();
let mut stream = futures::stream::iter(utxos.into_iter())
.map(|utxo| async move {
debug!("Queuing task to gather DAG from utxo: {:?}", utxo);
(self.spend_dag_build_from(utxo, max_depth).await, utxo)
})
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);
while let Some((res, addr)) = stream.next().await {
match res {
Ok(d) => dag.merge(d),
Err(e) => warn!("Failed to gather sub dag from {addr:?}: {e}"),
}
}
dag.record_faults(&dag.source())
.map_err(|e| WalletError::Dag(e.to_string()))?;
info!("Done gathering spend DAG from utxos");
Ok(())
}
}