sn_client 0.110.4

Safe Network Client
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{Client, Error, SpendDag};

use dashmap::DashMap;
use futures::{
    future::join_all,
    stream::{self, StreamExt},
};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{
    NanoTokens, SignedSpend, SpendAddress, SpendReason, UniquePubkey, WalletError, WalletResult,
    DEFAULT_NETWORK_ROYALTIES_PK, GENESIS_SPEND_UNIQUE_KEY, NETWORK_ROYALTIES_PK,
};
use std::{
    collections::{BTreeMap, BTreeSet},
    sync::Arc,
    time::{Duration, Instant},
};
use tokio::sync::mpsc::Sender;

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;

enum InternalGetNetworkSpend {
    Spend(Box<SignedSpend>),
    DoubleSpend(Vec<SignedSpend>),
    NotFound,
    Error(Error),
}

impl Client {
    pub async fn new_dag_with_genesis_only(&self) -> WalletResult<SpendDag> {
        let genesis_addr = SpendAddress::from_unique_pubkey(&GENESIS_SPEND_UNIQUE_KEY);
        let mut dag = SpendDag::new(genesis_addr);
        match self.get_spend_from_network(genesis_addr).await {
            Ok(spend) => {
                dag.insert(genesis_addr, spend);
            }
            Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => {
                println!("Burnt spend detected at Genesis: {genesis_addr:?}");
                warn!("Burnt spend detected at Genesis: {genesis_addr:?}");
                for (i, spend) in spends.into_iter().enumerate() {
                    let reason = spend.reason();
                    let amount = spend.spend.amount();
                    let ancestors_len = spend.spend.ancestors.len();
                    let descendants_len = spend.spend.descendants.len();
                    let roy_len = spend.spend.network_royalties().len();
                    warn!(
                                "burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
                                spend.spend.ancestors, spend.spend.descendants
                            );
                    dag.insert(genesis_addr, spend);
                }
            }
            Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())),
        };

        Ok(dag)
    }

    /// Builds a SpendDag from a given SpendAddress recursively following descendants all the way to UTxOs
    /// Started from Genesis this gives the entire SpendDag of the Network at a certain point in time
    /// Once the DAG collected, optionally verifies and records errors in the DAG
    ///
    /// ```text
    ///                                   -> Spend7 ---> UTXO_11
    ///                                 /
    /// Genesis -> Spend1 -----> Spend2 ---> Spend5 ---> UTXO_10
    ///                   \
    ///                     ---> Spend3 ---> Spend6 ---> UTXO_9
    ///                     \
    ///                       -> Spend4 ---> UTXO_8
    ///
    /// ```
    pub async fn spend_dag_build_from(
        &self,
        spend_addr: SpendAddress,
        spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
        verify: bool,
    ) -> WalletResult<SpendDag> {
        let (tx, mut rx) = tokio::sync::mpsc::channel(SPENDS_PROCESSING_BUFFER_SIZE);

        // start crawling from the given spend address
        let self_clone = self.clone();
        let crawl_handle =
            tokio::spawn(async move { self_clone.spend_dag_crawl_from(spend_addr, tx).await });

        // start DAG building from the spends gathered while crawling
        // forward spends to processing if provided
        let build_handle: tokio::task::JoinHandle<Result<SpendDag, WalletError>> =
            tokio::spawn(async move {
                debug!("Starting building DAG from {spend_addr:?}...");
                let now = std::time::Instant::now();
                let mut dag = SpendDag::new(spend_addr);
                while let Some(spend) = rx.recv().await {
                    let addr = spend.address();
                    debug!(
                        "Inserting spend at {addr:?} size: {}",
                        dag.all_spends().len()
                    );
                    dag.insert(addr, spend.clone());
                    if let Some(sender) = &spend_processing {
                        let outputs = spend.spend.descendants.len() as u64;
                        sender
                            .send((spend, outputs, false))
                            .await
                            .map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
                    }
                }
                info!(
                    "Done gathering DAG of size: {} in {:?}",
                    dag.all_spends().len(),
                    now.elapsed()
                );
                Ok(dag)
            });

        // wait for both to finish
        let (crawl_res, build_res) = tokio::join!(crawl_handle, build_handle);
        crawl_res.map_err(|e| {
            WalletError::SpendProcessing(format!("Failed to Join crawling results {e}"))
        })??;
        let mut dag = build_res.map_err(|e| {
            WalletError::SpendProcessing(format!("Failed to Join DAG building results {e}"))
        })??;

        // verify the DAG
        if verify {
            info!("Now verifying SpendDAG from {spend_addr:?} and recording errors...");
            let start = std::time::Instant::now();
            if let Err(e) = dag.record_faults(&dag.source()) {
                let s = format!(
                    "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
                );
                error!("{s}");
                return Err(WalletError::Dag(s));
            }
            let elapsed = start.elapsed();
            info!("Finished verifying SpendDAG from {spend_addr:?} in {elapsed:?}");
        }

        Ok(dag)
    }

    /// Get spends from a set of given SpendAddresses
    /// Drain the addresses at the same layer first, then:
    ///     1, return failed_utxos for re-attempt (with insertion time stamp)
    ///     2, return fetched_address to avoid un-necessary re-attempts
    ///     3, return addrs_for_further_track for further track
    pub async fn crawl_to_next_utxos(
        &self,
        addrs_to_get: BTreeMap<SpendAddress, (u64, NanoTokens)>,
        sender: Sender<(SignedSpend, u64, bool)>,
        reattempt_seconds: u64,
    ) -> (
        BTreeMap<SpendAddress, (u64, Instant, NanoTokens)>,
        Vec<SpendAddress>,
        BTreeSet<(SpendAddress, NanoTokens)>,
    ) {
        // max concurrency for the tasks of fetching records from network.
        const MAX_CONCURRENT: usize = 64;

        let failed_utxos_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
        let addrs_for_further_track_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
        let fetched_addrs_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());

        stream::iter(addrs_to_get.into_iter())
            .map(|(addr, (failed_times, amount))| {
                let client_clone = self.clone();
                let sender_clone = sender.clone();

                let failed_utxos = Arc::clone(&failed_utxos_arc);
                let addrs_for_further_track = Arc::clone(&addrs_for_further_track_arc);
                let fetched_addrs = Arc::clone(&fetched_addrs_arc);
                async move {
                    let result = client_clone.crawl_spend(addr).await;

                    match result {
                        InternalGetNetworkSpend::Spend(spend) => {
                            let for_further_track = beta_track_analyze_spend(&spend);
                            let _ = sender_clone
                                .send((*spend, for_further_track.len() as u64, false))
                                .await;
                            for entry in for_further_track {
                                let _ = addrs_for_further_track.insert(entry, ());
                            }
                            fetched_addrs.insert(addr, ());
                        }
                        InternalGetNetworkSpend::DoubleSpend(spends) => {
                            warn!(
                                "Detected burnt spend regarding {addr:?} - {:?}",
                                spends.len()
                            );

                            for (i, spend) in spends.into_iter().enumerate() {
                                let reason = spend.reason();
                                let amount = spend.spend.amount();
                                let ancestors_len = spend.spend.ancestors.len();
                                let descendants_len = spend.spend.descendants.len();
                                let roy_len = spend.spend.network_royalties().len();
                                warn!("burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
                                spend.spend.ancestors, spend.spend.descendants);
                            }
                            fetched_addrs.insert(addr, ());
                        }
                        InternalGetNetworkSpend::NotFound => {
                            let reattempt_interval = if amount.as_nano() > 100000 {
                                info!("Not find spend of big-UTXO {addr:?} with {amount}");
                                reattempt_seconds
                            } else {
                                reattempt_seconds * (failed_times * 8 + 1)
                            };
                            failed_utxos.insert(
                                addr,
                                (
                                    failed_times + 1,
                                    Instant::now() + Duration::from_secs(reattempt_interval),
                                    amount,
                                ),
                            );
                        }
                        InternalGetNetworkSpend::Error(e) => {
                            warn!("Fetching spend {addr:?} with {amount:?} result in error {e:?}");
                            // Error of `NotEnoughCopies` could be re-attempted and succeed eventually.
                            failed_utxos.insert(
                                addr,
                                (
                                    failed_times + 1,
                                    Instant::now() + Duration::from_secs(reattempt_seconds),
                                    amount,
                                ),
                            );
                        }
                    }

                    (addr, amount)
                }
            })
            .buffer_unordered(MAX_CONCURRENT)
            .for_each(|(address, amount)| async move {
                info!("Completed fetching attempt of {address:?} with amount {amount:?}");
            })
            .await;

        let mut failed_utxos_result = BTreeMap::new();
        for entry in failed_utxos_arc.iter() {
            let key = entry.key();
            let val = entry.value();
            let _ = failed_utxos_result.insert(*key, *val);
        }

        let mut fetched_addrs = Vec::new();
        for entry in fetched_addrs_arc.iter() {
            let key = entry.key();
            fetched_addrs.push(*key);
        }

        let mut addrs_for_further_track = BTreeSet::new();
        for entry in addrs_for_further_track_arc.iter() {
            let key = entry.key();
            let _ = addrs_for_further_track.insert(*key);
        }

        (failed_utxos_result, fetched_addrs, addrs_for_further_track)
    }

    /// Crawls the Spend Dag from a given SpendAddress recursively
    /// following descendants all the way to UTXOs
    /// Returns the UTXOs reached
    pub async fn spend_dag_crawl_from(
        &self,
        spend_addr: SpendAddress,
        spend_processing: Sender<SignedSpend>,
    ) -> WalletResult<BTreeSet<SpendAddress>> {
        info!("Crawling spend DAG from {spend_addr:?}");
        let mut utxos = BTreeSet::new();

        // get first spend
        let mut descendants_to_follow = match self.crawl_spend(spend_addr).await {
            InternalGetNetworkSpend::Spend(spend) => {
                let spend = *spend;
                let descendants_to_follow = spend.spend.descendants.clone();

                spend_processing
                    .send(spend)
                    .await
                    .map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
                descendants_to_follow
            }
            InternalGetNetworkSpend::DoubleSpend(spends) => {
                let mut descendants_to_follow = BTreeMap::new();
                for spend in spends.into_iter() {
                    descendants_to_follow.extend(spend.spend.descendants.clone());
                    spend_processing
                        .send(spend)
                        .await
                        .map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
                }
                descendants_to_follow
            }
            InternalGetNetworkSpend::NotFound => {
                // the cashnote was not spent yet, so it's an UTXO
                info!("UTXO at {spend_addr:?}");
                utxos.insert(spend_addr);
                return Ok(utxos);
            }
            InternalGetNetworkSpend::Error(e) => {
                return Err(WalletError::FailedToGetSpend(e.to_string()));
            }
        };

        // use iteration instead of recursion to avoid stack overflow
        let mut known_descendants: BTreeSet<UniquePubkey> = BTreeSet::new();
        let mut gen: u32 = 0;
        let start = std::time::Instant::now();

        while !descendants_to_follow.is_empty() {
            let mut next_gen_descendants = BTreeMap::new();

            // list up all descendants
            let mut addrs = vec![];
            for (descendant, _amount) in descendants_to_follow.iter() {
                let addrs_to_follow = SpendAddress::from_unique_pubkey(descendant);
                info!("Gen {gen} - Following descendant : {descendant:?}");
                addrs.push(addrs_to_follow);
            }

            // get all spends in parallel
            let mut stream = futures::stream::iter(addrs.clone())
                .map(|a| async move { (self.crawl_spend(a).await, a) })
                .buffer_unordered(crate::MAX_CONCURRENT_TASKS);
            info!(
                "Gen {gen} - Getting {} spends from {} txs in batches of: {}",
                addrs.len(),
                descendants_to_follow.len(),
                crate::MAX_CONCURRENT_TASKS,
            );

            // insert spends in the dag as they are collected
            while let Some((get_spend, addr)) = stream.next().await {
                match get_spend {
                    InternalGetNetworkSpend::Spend(spend) => {
                        next_gen_descendants.extend(spend.spend.descendants.clone());
                        spend_processing
                            .send(*spend.clone())
                            .await
                            .map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
                    }
                    InternalGetNetworkSpend::DoubleSpend(spends) => {
                        info!("Fetched double spend(s) of len {} at {addr:?} from network, following all of them.", spends.len());
                        for s in spends.into_iter() {
                            next_gen_descendants.extend(s.spend.descendants.clone());
                            spend_processing
                                .send(s.clone())
                                .await
                                .map_err(|e| WalletError::SpendProcessing(e.to_string()))?;
                        }
                    }
                    InternalGetNetworkSpend::NotFound => {
                        info!("Reached UTXO at {addr:?}");
                        utxos.insert(addr);
                    }
                    InternalGetNetworkSpend::Error(err) => {
                        error!("Failed to get spend at {addr:?} during DAG collection: {err:?}")
                    }
                }
            }

            // only follow descendants we haven't already gathered
            let followed_descendants: BTreeSet<UniquePubkey> =
                descendants_to_follow.keys().copied().collect();
            known_descendants.extend(followed_descendants);
            descendants_to_follow = next_gen_descendants
                .into_iter()
                .filter(|(key, _)| !known_descendants.contains(key))
                .collect();

            // go on to next gen
            gen += 1;
        }

        let elapsed = start.elapsed();
        info!("Finished crawling SpendDAG from {spend_addr:?} in {elapsed:?}");
        Ok(utxos)
    }

    /// Extends an existing SpendDag with a new SignedSpend,
    /// tracing back the ancestors of that Spend all the way to a known Spend in the DAG or else back to Genesis
    /// Verifies the DAG and records faults if any
    /// This is useful to keep a partial SpendDag to be able to verify that new spends come from Genesis
    ///
    /// ```text
    ///              ... --
    ///                     \
    ///              ... ----                  ... --
    ///                       \                       \
    /// Spend0 -> Spend1 -----> Spend2 ---> Spend5 ---> Spend2 ---> Genesis
    ///                   \                           /
    ///                    ---> Spend3 ---> Spend6 ->
    ///                     \            /
    ///                      -> Spend4 ->
    ///                                /
    ///                            ...
    ///
    /// ```
    pub async fn spend_dag_extend_until(
        &self,
        dag: &mut SpendDag,
        spend_addr: SpendAddress,
        new_spend: SignedSpend,
    ) -> WalletResult<()> {
        // check existence of spend in dag
        let is_new_spend = dag.insert(spend_addr, new_spend.clone());
        if !is_new_spend {
            return Ok(());
        }

        // use iteration instead of recursion to avoid stack overflow
        let mut ancestors_to_verify = new_spend.spend.ancestors.clone();
        let mut depth = 0;
        let mut known_ancestors = BTreeSet::from_iter([dag.source()]);
        let start = std::time::Instant::now();

        while !ancestors_to_verify.is_empty() {
            let mut next_gen_ancestors = BTreeSet::new();

            for ancestor in ancestors_to_verify {
                let addrs_to_verify = vec![SpendAddress::from_unique_pubkey(&ancestor)];
                debug!("Depth {depth} - checking parent : {ancestor:?} - {addrs_to_verify:?}");

                // get all parent spends in parallel
                let tasks: Vec<_> = addrs_to_verify
                    .iter()
                    .map(|a| self.crawl_spend(*a))
                    .collect();
                let mut spends = BTreeSet::new();
                for (spend_get, a) in join_all(tasks)
                    .await
                    .into_iter()
                    .zip(addrs_to_verify.clone())
                {
                    match spend_get {
                        InternalGetNetworkSpend::Spend(s) => {
                            spends.insert(*s);
                        }
                        InternalGetNetworkSpend::DoubleSpend(s) => {
                            spends.extend(s.into_iter());
                        }
                        InternalGetNetworkSpend::NotFound => {
                            return Err(WalletError::FailedToGetSpend(format!(
                                "Missing ancestor spend at {a:?}"
                            )))
                        }
                        InternalGetNetworkSpend::Error(e) => {
                            return Err(WalletError::FailedToGetSpend(format!(
                                "Failed to get ancestor spend at {a:?}: {e}"
                            )))
                        }
                    }
                }
                let spends_len = spends.len();
                debug!("Depth {depth} - Got {spends_len} spends for parent: {addrs_to_verify:?}");
                trace!("Spends for {addrs_to_verify:?} - {spends:?}");

                // add spends to the dag
                known_ancestors.extend(addrs_to_verify.clone());
                for (spend, addr) in spends.clone().into_iter().zip(addrs_to_verify) {
                    let is_new_spend = dag.insert(addr, spend.clone());

                    // no need to check this spend's parents if it was already in the DAG
                    if is_new_spend {
                        next_gen_ancestors.extend(spend.spend.ancestors.clone());
                    }
                }
            }

            // only verify parents we haven't already verified
            ancestors_to_verify = next_gen_ancestors
                .into_iter()
                .filter(|ancestor| {
                    !known_ancestors.contains(&SpendAddress::from_unique_pubkey(ancestor))
                })
                .collect();

            depth += 1;
            let elapsed = start.elapsed();
            let n = known_ancestors.len();
            info!("Now at depth {depth} - Collected spends from {n} transactions in {elapsed:?}");
        }

        let elapsed = start.elapsed();
        let n = known_ancestors.len();
        info!("Collected the DAG branch all the way to known spends or genesis! Through {depth} generations, collecting spends from {n} transactions in {elapsed:?}");

        // verify the DAG
        info!("Now verifying SpendDAG extended at {spend_addr:?} and recording errors...");
        let start = std::time::Instant::now();
        if let Err(e) = dag.record_faults(&dag.source()) {
            let s = format!(
                "Collected DAG starting at {spend_addr:?} is invalid, this is probably a bug: {e}"
            );
            error!("{s}");
            return Err(WalletError::Dag(s));
        }
        let elapsed = start.elapsed();
        info!("Finished verifying SpendDAG extended from {spend_addr:?} in {elapsed:?}");
        Ok(())
    }

    /// Extends an existing SpendDag starting from the given utxos
    /// If verify is true, records faults in the DAG
    pub async fn spend_dag_continue_from(
        &self,
        dag: &mut SpendDag,
        utxos: BTreeSet<SpendAddress>,
        spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
        verify: bool,
    ) {
        let main_dag_src = dag.source();
        info!(
            "Expanding spend DAG with source: {main_dag_src:?} from {} utxos",
            utxos.len()
        );

        let sender = spend_processing.clone();
        let tasks = utxos
            .iter()
            .map(|utxo| self.spend_dag_build_from(*utxo, sender.clone(), false));
        let sub_dags = join_all(tasks).await;
        for (res, addr) in sub_dags.into_iter().zip(utxos.into_iter()) {
            match res {
                Ok(sub_dag) => {
                    debug!("Gathered sub DAG from: {addr:?}");
                    if let Err(e) = dag.merge(sub_dag, verify) {
                        warn!("Failed to merge sub dag from {addr:?} into dag: {e}");
                    }
                }
                Err(e) => warn!("Failed to gather sub dag from {addr:?}: {e}"),
            };
        }

        info!("Done gathering spend DAG from utxos");
    }

    /// Extends an existing SpendDag starting from the utxos in this DAG
    /// Covers the entirety of currently existing Spends if the DAG was built from Genesis
    /// If verify is true, records faults in the DAG
    /// Stops gathering after max_depth generations
    pub async fn spend_dag_continue_from_utxos(
        &self,
        dag: &mut SpendDag,
        spend_processing: Option<Sender<(SignedSpend, u64, bool)>>,
        verify: bool,
    ) {
        let utxos = dag.get_utxos();
        self.spend_dag_continue_from(dag, utxos, spend_processing, verify)
            .await
    }

    /// Internal get spend helper for DAG purposes
    /// For crawling, a special fetch policy is deployed to improve the performance:
    ///   1. Expect `majority` copies as it is a `Spend`;
    ///   2. But don't retry as most will be `UTXO` which won't be found.
    async fn crawl_spend(&self, spend_addr: SpendAddress) -> InternalGetNetworkSpend {
        match self.crawl_spend_from_network(spend_addr).await {
            Ok(s) => {
                debug!("DAG crawling: fetched spend {spend_addr:?} from network");
                InternalGetNetworkSpend::Spend(Box::new(s))
            }
            Err(Error::Network(NetworkError::GetRecordError(GetRecordError::RecordNotFound))) => {
                debug!("DAG crawling: spend at {spend_addr:?} not found on the network");
                InternalGetNetworkSpend::NotFound
            }
            Err(Error::Network(NetworkError::DoubleSpendAttempt(spends))) => {
                debug!("DAG crawling: got a double spend(s) of len {} at {spend_addr:?} on the network", spends.len());
                InternalGetNetworkSpend::DoubleSpend(spends)
            }
            Err(e) => {
                debug!(
                    "DAG crawling: got an error for spend at {spend_addr:?} on the network: {e}"
                );
                InternalGetNetworkSpend::Error(e)
            }
        }
    }
}

/// Helper function to analyze spend for beta_tracking optimization.
/// returns the new_utxos that needs to be further tracked.
fn beta_track_analyze_spend(spend: &SignedSpend) -> BTreeSet<(SpendAddress, NanoTokens)> {
    // Filter out royalty outputs
    let royalty_pubkeys: BTreeSet<_> = spend
        .spend
        .network_royalties()
        .iter()
        .map(|(_, _, der)| NETWORK_ROYALTIES_PK.new_unique_pubkey(der))
        .collect();
    let default_royalty_pubkeys: BTreeSet<_> = spend
        .spend
        .network_royalties()
        .iter()
        .map(|(_, _, der)| DEFAULT_NETWORK_ROYALTIES_PK.new_unique_pubkey(der))
        .collect();

    let spend_addr = spend.address();
    let new_utxos: BTreeSet<_> = spend
        .spend
        .descendants
        .iter()
        .filter_map(|(unique_pubkey, amount)| {
            if default_royalty_pubkeys.contains(unique_pubkey)
                || royalty_pubkeys.contains(unique_pubkey)
            {
                None
            } else {
                let addr = SpendAddress::from_unique_pubkey(unique_pubkey);

                if amount.as_nano() > 100000 {
                    info!("Spend {spend_addr:?} has a big-UTXO {addr:?} with {amount}");
                }

                Some((addr, *amount))
            }
        })
        .collect();

    if let SpendReason::BetaRewardTracking(_) = spend.reason() {
        // Do not track down forwarded payment further
        Default::default()
    } else {
        trace!(
            "Spend {spend_addr:?} original has {} outputs, tracking {} of them.",
            spend.spend.descendants.len(),
            new_utxos.len()
        );
        new_utxos
    }
}