use crate::{Client, Error, SpendDag};
use dashmap::DashMap;
use futures::{
future::join_all,
stream::{self, StreamExt},
};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{
NanoTokens, SignedSpend, SpendAddress, SpendReason, UniquePubkey, WalletError, WalletResult,
DEFAULT_NETWORK_ROYALTIES_PK, GENESIS_SPEND_UNIQUE_KEY, NETWORK_ROYALTIES_PK,
};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::mpsc::Sender;
const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;
enum InternalGetNetworkSpend {
Spend(Box<SignedSpend>),
DoubleSpend(Vec<SignedSpend>),
NotFound,
Error(Error),
}
impl Client {
pub async fn new_dag_with_genesis_only(&self) -> WalletResult<SpendDag> {
let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
let mut dag = SpendDag::new(genesis_addr);
match self.get_spend_from_network(genesis_addr).await {
Ok(spend) => {
dag.insert(genesis_addr, spend);
}
Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => {
println!("Burnt spend detected at Genesis: {genesis_addr:?}");
warn!("Burnt spend detected at Genesis: {genesis_addr:?}");
for (i, spend) in spends.into_iter().enumerate() {
let reason = spend.reason();
let amount = spend.spend.amount();
let ancestors_len = spend.spend.ancestors.len();
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
warn!(
"burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants
);
dag.insert(genesis_addr, spend);
}
}
Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())),
};
Ok(dag)
}
pub async fn spend_dag_build_from(
&self,
spend_addr: SpendAddress,
spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
verify: bool,
) -> WalletResult<SpendDag> {
let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);
let self_clone = self.clone();
let crawl_handle =
tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await });
let build_handle: tokio::task::JoinHandle<Result<SpendDag, WalletError>> =
tokio::spawn(async move {
debug!("Starting building DAG from {spend_addr:?}...");
let now = std::time::Instant::now();
let mut dag = SpendDag::new(spend_addr);
while let Some(spend) = rx.recv().await {
let addr = spend.address();
debug!(
"Inserting spend at {addr:?} size: {}",
dag.all_spends().len()
);
dag.insert(addr, spend.clone());
if let Some(sender) = &spend_processing {
let outputs = spend.spend.descendants.len() as u64;
sender
.send((spend, outputs, false))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
}
info!(
"Done gathering DAG of size: {} in {:?}",
dag.all_spends().len(),
now.elapsed()
);
Ok(dag)
});
let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle);
crawl_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join crawling results {e}"))
})??;
let mut dag = build_res.map_err(|e| {
WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}"))
})??;
if verify {
info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
}
Ok(dag)
}
pub async fn crawl_to_next_utxos(
&self,
addrs_to_get: BTreeMap<SpendAddress, (u64, NanoTokens)>,
sender: Sender<(SignedSpend, u64, bool)>,
reattempt_seconds: u64,
) -> (
BTreeMap<SpendAddress, (u64, Instant, NanoTokens)>,
Vec<SpendAddress>,
BTreeSet<(SpendAddress, NanoTokens)>,
) {
const MAX_CONCURRENT: usize = 64;
let failed_utxos_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
let addrs_for_further_track_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
let fetched_addrs_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
stream::iter(addrs_to_get.into_iter())
.map(|(addr, (failed_times, amount))| {
let client_clone = self.clone();
let sender_clone = sender.clone();
let failed_utxos = Arc::clone(&failed_utxos_arc);
let addrs_for_further_track = Arc::clone(&addrs_for_further_track_arc);
let fetched_addrs = Arc::clone(&fetched_addrs_arc);
async move {
let result = client_clone.crawl_spend(addr).await;
match result {
InternalGetNetworkSpend::Spend(spend) => {
let for_further_track = beta_track_analyze_spend(&spend);
let _ = sender_clone
.send((*spend, for_further_track.len() as u64, false))
.await;
for entry in for_further_track {
let _ = addrs_for_further_track.insert(entry, ());
}
fetched_addrs.insert(addr, ());
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
warn!(
"Detected burnt spend regarding {addr:?} - {:?}",
spends.len()
);
for (i, spend) in spends.into_iter().enumerate() {
let reason = spend.reason();
let amount = spend.spend.amount();
let ancestors_len = spend.spend.ancestors.len();
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
warn!("burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants);
}
fetched_addrs.insert(addr, ());
}
InternalGetNetworkSpend::NotFound => {
let reattempt_interval = if amount.as_nano() > 100000 {
info!("Not find spend of big-UTXO {addr:?} with {amount}");
reattempt_seconds
} else {
reattempt_seconds * (failed_times * 8 + 1)
};
failed_utxos.insert(
addr,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_interval),
amount,
),
);
}
InternalGetNetworkSpend::Error(e) => {
warn!("Fetching spend {addr:?} with {amount:?} result in error {e:?}");
failed_utxos.insert(
addr,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_seconds),
amount,
),
);
}
}
(addr, amount)
}
})
.buffer_unordered(MAX_CONCURRENT)
.for_each(|(address, amount)| async move {
info!("Completed fetching attempt of {address:?} with amount {amount:?}");
})
.await;
let mut failed_utxos_result = BTreeMap::new();
for entry in failed_utxos_arc.iter() {
let key = entry.key();
let val = entry.value();
let _ = failed_utxos_result.insert(*key, *val);
}
let mut fetched_addrs = Vec::new();
for entry in fetched_addrs_arc.iter() {
let key = entry.key();
fetched_addrs.push(*key);
}
let mut addrs_for_further_track = BTreeSet::new();
for entry in addrs_for_further_track_arc.iter() {
let key = entry.key();
let _ = addrs_for_further_track.insert(*key);
}
(failed_utxos_result, fetched_addrs, addrs_for_further_track)
}
pub async fn spend_dag_crawl_from(
&self,
spend_addr: SpendAddress,
spend_processing: Sender<SignedSpend>,
) -> WalletResult<BTreeSet<SpendAddress>> {
info!("Crawling spend DAG from {spend_addr:?}");
let mut utxos = BTreeSet::new();
let mut descendants_to_follow = match self.crawl_spend(spend_addr).await {
InternalGetNetworkSpend::Spend(spend) => {
let spend = *spend;
let descendants_to_follow = spend.spend.descendants.clone();
spend_processing
.send(spend)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
descendants_to_follow
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
let mut descendants_to_follow = BTreeMap::new();
for spend in spends.into_iter() {
descendants_to_follow.extend(spend.spend.descendants.clone());
spend_processing
.send(spend)
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
descendants_to_follow
}
InternalGetNetworkSpend::NotFound => {
info!("UTXO at {spend_addr:?}");
utxos.insert(spend_addr);
return Ok(utxos);
}
InternalGetNetworkSpend::Error(e) => {
return Err(WalletError::FailedToGetSpend(e.to_string()));
}
};
let mut known_descendants: BTreeSet<UniquePubkey> = BTreeSet::new();
let mut gen: u32 = 0;
let start = std::time::Instant::now();
while !descendants_to_follow.is_empty() {
let mut next_gen_descendants = BTreeMap::new();
let mut addrs = vec![];
for (descendant, _amount) in descendants_to_follow.iter() {
let addrs_to_follow = SpendAddress::from_unique_pubkey(descendant);
info!("Gen {gen} - Following descendant : {descendant:?}");
addrs.push(addrs_to_follow);
}
let mut stream = futures::stream::iter(addrs.clone())
.map(|a| async move { (self.crawl_spend(a).await, a) })
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);
info!(
"Gen {gen} - Getting {} spends from {} txs in batches of: {}",
addrs.len(),
descendants_to_follow.len(),
crate::MAX_CONCURRENT_TASKS,
);
while let Some((get_spend, addr)) = stream.next().await {
match get_spend {
InternalGetNetworkSpend::Spend(spend) => {
next_gen_descendants.extend(spend.spend.descendants.clone());
spend_processing
.send(*spend.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
info!("Fetched double spend(s) of len {} at {addr:?} from network, following all of them.", spends.len());
for s in spends.into_iter() {
next_gen_descendants.extend(s.spend.descendants.clone());
spend_processing
.send(s.clone())
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
}
}
InternalGetNetworkSpend::NotFound => {
info!("Reached UTXO at {addr:?}");
utxos.insert(addr);
}
InternalGetNetworkSpend::Error(err) => {
error!("Failed to get spend at {addr:?} during DAG collection: {err:?}")
}
}
}
let followed_descendants: BTreeSet<UniquePubkey> =
descendants_to_follow.keys().copied().collect();
known_descendants.extend(followed_descendants);
descendants_to_follow = next_gen_descendants
.into_iter()
.filter(|(key, _)| !known_descendants.contains(key))
.collect();
gen += 1;
}
let elapsed = start.elapsed();
info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}");
Ok(utxos)
}
pub async fn spend_dag_extend_until(
&self,
dag: &mut SpendDag,
spend_addr: SpendAddress,
new_spend: SignedSpend,
) -> WalletResult<()> {
let is_new_spend = dag.insert(spend_addr, new_spend.clone());
if !is_new_spend {
return Ok(());
}
let mut ancestors_to_verify = new_spend.spend.ancestors.clone();
let mut depth = 0;
let mut known_ancestors = BTreeSet::from_iter([dag.source()]);
let start = std::time::Instant::now();
while !ancestors_to_verify.is_empty() {
let mut next_gen_ancestors = BTreeSet::new();
for ancestor in ancestors_to_verify {
let addrs_to_verify = vec![SpendAddress::from_unique_pubkey(&ancestor)];
debug!("Depth {depth} - checking parent : {ancestor:?} - {addrs_to_verify:?}");
let tasks: Vec<_> = addrs_to_verify
.iter()
.map(|a| self.crawl_spend(*a))
.collect();
let mut spends = BTreeSet::new();
for (spend_get, a) in join_all(tasks)
.await
.into_iter()
.zip(addrs_to_verify.clone())
{
match spend_get {
InternalGetNetworkSpend::Spend(s) => {
spends.insert(*s);
}
InternalGetNetworkSpend::DoubleSpend(s) => {
spends.extend(s.into_iter());
}
InternalGetNetworkSpend::NotFound => {
return Err(WalletError::FailedToGetSpend(format!(
"Missing ancestor spend at {a:?}"
)))
}
InternalGetNetworkSpend::Error(e) => {
return Err(WalletError::FailedToGetSpend(format!(
"Failed to get ancestor spend at {a:?}: {e}"
)))
}
}
}
let spends_len = spends.len();
debug!("Depth {depth} - Got {spends_len} spends for parent: {addrs_to_verify:?}");
trace!("Spends for {addrs_to_verify:?} - {spends:?}");
known_ancestors.extend(addrs_to_verify.clone());
for (spend, addr) in spends.clone().into_iter().zip(addrs_to_verify) {
let is_new_spend = dag.insert(addr, spend.clone());
if is_new_spend {
next_gen_ancestors.extend(spend.spend.ancestors.clone());
}
}
}
ancestors_to_verify = next_gen_ancestors
.into_iter()
.filter(|ancestor| {
!known_ancestors.contains(&SpendAddress::from_unique_pubkey(ancestor))
})
.collect();
depth += 1;
let elapsed = start.elapsed();
let n = known_ancestors.len();
info!("Now at depth {depth} - Collected spends from {n} transactions in {elapsed:?}");
}
let elapsed = start.elapsed();
let n = known_ancestors.len();
info!("Collected the DAG branch all the way to known spends or genesis! Through {depth} generations, collecting spends from {n} transactions in {elapsed:?}");
info!("Now verifying SpendDAG extended at {spend_addr:?} and recording errors...");
let start = std::time::Instant::now();
if let Err(e) = dag.record_faults(&dag.source()) {
let s = format!(
"Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
);
error!("{s}");
return Err(WalletError::Dag(s));
}
let elapsed = start.elapsed();
info!("Finished verifying SpendDAG extended from {spend_addr:?} in {elapsed:?}");
Ok(())
}
pub async fn spend_dag_continue_from(
&self,
dag: &mut SpendDag,
utxos: BTreeSet<SpendAddress>,
spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
verify: bool,
) {
let main_dag_src = dag.source();
info!(
"Expanding spend DAG with source: {main_dag_src:?} from {} utxos",
utxos.len()
);
let sender = spend_processing.clone();
let tasks = utxos
.iter()
.map(|utxo| self.spend_dag_build_from(*utxo, sender.clone(), false));
let sub_dags = join_all(tasks).await;
for (res, addr) in sub_dags.into_iter().zip(utxos.into_iter()) {
match res {
Ok(sub_dag) => {
debug!("Gathered sub DAG from: {addr:?}");
if let Err(e) = dag.merge(sub_dag, verify) {
warn!("Failed to merge sub dag from {addr:?} into dag: {e}");
}
}
Err(e) => warn!("Failed to gather sub dag from {addr:?}: {e}"),
};
}
info!("Done gathering spend DAG from utxos");
}
pub async fn spend_dag_continue_from_utxos(
&self,
dag: &mut SpendDag,
spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
verify: bool,
) {
let utxos = dag.get_utxos();
self.spend_dag_continue_from(dag, utxos, spend_processing, verify)
.await
}
async fn crawl_spend(&self, spend_addr: SpendAddress) -> InternalGetNetworkSpend {
match self.crawl_spend_from_network(spend_addr).await {
Ok(s) => {
debug!("DAG crawling: fetched spend {spend_addr:?} from network");
InternalGetNetworkSpend::Spend(Box::new(s))
}
Err(Error::Network(NetworkError::GetRecordError(GetRecordError::RecordNotFound))) => {
debug!("DAG crawling: spend at {spend_addr:?} not found on the network");
InternalGetNetworkSpend::NotFound
}
Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => {
debug!("DAG crawling: got a double spend(s) of len {} at {spend_addr:?} on the network", spends.len());
InternalGetNetworkSpend::DoubleSpend(spends)
}
Err(e) => {
debug!(
"DAG crawling: got an error for spend at {spend_addr:?} on the network: {e}"
);
InternalGetNetworkSpend::Error(e)
}
}
}
}
fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, NanoTokens)> {
let royalty_pubkeys: BTreeSet<_> = spend
.spend
.network_royalties()
.iter()
.map(|(_, _, der)| NETWORK_ROYALTIES_PK.new_unique_pubkey(der))
.collect();
let default_royalty_pubkeys: BTreeSet<_> = spend
.spend
.network_royalties()
.iter()
.map(|(_, _, der)| DEFAULT_NETWORK_ROYALTIES_PK.new_unique_pubkey(der))
.collect();
let spend_addr = spend.address();
let new_utxos: BTreeSet<_> = spend
.spend
.descendants
.iter()
.filter_map(|(unique_pubkey, amount)| {
if default_royalty_pubkeys.contains(unique_pubkey)
|| royalty_pubkeys.contains(unique_pubkey)
{
None
} else {
let addr = SpendAddress::from_unique_pubkey(unique_pubkey);
if amount.as_nano() > 100000 {
info!("Spend {spend_addr:?} has a big-UTXO {addr:?} with {amount}");
}
Some((addr, *amount))
}
})
.collect();
if let SpendReason::BetaRewardTracking(_) = spend.reason() {
Default::default()
} else {
trace!(
"Spend {spend_addr:?} original has {} outputs, tracking {} of them.",
spend.spend.descendants.len(),
new_utxos.len()
);
new_utxos
}
}