Skip to main content

pepper_sync/
sync.rs

1//! Entrypoint for sync engine
2
3use std::collections::{BTreeMap, HashMap};
4use std::ops::Range;
5use std::sync::Arc;
6use std::sync::atomic::{self, AtomicBool, AtomicU8};
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::{RwLock, mpsc};
10
11use incrementalmerkletree::{Marking, Retention};
12use orchard::tree::MerkleHashOrchard;
13use shardtree::store::ShardStore;
14use zcash_client_backend::proto::service::RawTransaction;
15use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
16use zcash_keys::keys::UnifiedFullViewingKey;
17use zcash_primitives::transaction::{Transaction, TxId};
18use zcash_protocol::ShieldedProtocol;
19use zcash_protocol::consensus::{self, BlockHeight};
20use zip32::AccountId;
21
22use zingo_status::confirmation_status::ConfirmationStatus;
23
24use crate::client::{self, FetchRequest};
25use crate::config::{PerformanceLevel, SyncConfig};
26use crate::error::{
27    ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
28    SyncStatusError,
29};
30use crate::keys::transparent::TransparentAddressId;
31use crate::scan::ScanResults;
32use crate::scan::task::{Scanner, ScannerState};
33use crate::scan::transactions::scan_transaction;
34use crate::sync::state::truncate_scan_ranges;
35use crate::wallet::traits::{
36    SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
37};
38use crate::wallet::{
39    KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
40    SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
52pub(crate) const MAX_REORG_ALLOWANCE: u32 = 100;
53const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
54
55/// A snapshot of the current state of sync. Useful for displaying the status of sync to a user / consumer.
56///
57/// `percentage_outputs_scanned` is a much more accurate indicator of sync completion than `percentage_blocks_scanned`.
58/// `percentage_total_outputs_scanned` is the percentage of outputs scanned from birthday to chain height.
59#[derive(Debug, Clone)]
60#[allow(missing_docs)]
61pub struct SyncStatus {
62    pub scan_ranges: Vec<ScanRange>,
63    pub sync_start_height: BlockHeight,
64    pub session_blocks_scanned: u32,
65    pub total_blocks_scanned: u32,
66    pub percentage_session_blocks_scanned: f32,
67    pub percentage_total_blocks_scanned: f32,
68    pub session_sapling_outputs_scanned: u32,
69    pub total_sapling_outputs_scanned: u32,
70    pub session_orchard_outputs_scanned: u32,
71    pub total_orchard_outputs_scanned: u32,
72    pub percentage_session_outputs_scanned: f32,
73    pub percentage_total_outputs_scanned: f32,
74}
75
76// TODO: complete display, scan ranges in raw form are too verbose
77impl std::fmt::Display for SyncStatus {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "percentage complete: {}",
82            self.percentage_total_outputs_scanned
83        )
84    }
85}
86
87impl From<SyncStatus> for json::JsonValue {
88    fn from(value: SyncStatus) -> Self {
89        let scan_ranges: Vec<json::JsonValue> = value
90            .scan_ranges
91            .iter()
92            .map(|range| {
93                json::object! {
94                    "priority" => format!("{:?}", range.priority()),
95                    "start_block" => range.block_range().start.to_string(),
96                    "end_block" => (range.block_range().end - 1).to_string(),
97                }
98            })
99            .collect();
100
101        json::object! {
102            "scan_ranges" => scan_ranges,
103            "sync_start_height" => u32::from(value.sync_start_height),
104            "session_blocks_scanned" => value.session_blocks_scanned,
105            "total_blocks_scanned" => value.total_blocks_scanned,
106            "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
107            "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
108            "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
109            "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
110            "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
111            "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
112            "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
113            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
114        }
115    }
116}
117
118/// Returned when [`crate::sync::sync`] successfully completes.
119#[derive(Debug, Clone)]
120#[allow(missing_docs)]
121pub struct SyncResult {
122    pub sync_start_height: BlockHeight,
123    pub sync_end_height: BlockHeight,
124    pub blocks_scanned: u32,
125    pub sapling_outputs_scanned: u32,
126    pub orchard_outputs_scanned: u32,
127    pub percentage_total_outputs_scanned: f32,
128}
129
130impl std::fmt::Display for SyncResult {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        write!(
133            f,
134            "Sync completed succesfully:
135{{
136    sync start height: {}
137    sync end height: {}
138    blocks scanned: {}
139    sapling outputs scanned: {}
140    orchard outputs scanned: {}
141    percentage total outputs scanned: {}
142}}",
143            self.sync_start_height,
144            self.sync_end_height,
145            self.blocks_scanned,
146            self.sapling_outputs_scanned,
147            self.orchard_outputs_scanned,
148            self.percentage_total_outputs_scanned,
149        )
150    }
151}
152
153impl From<SyncResult> for json::JsonValue {
154    fn from(value: SyncResult) -> Self {
155        json::object! {
156            "sync_start_height" => u32::from(value.sync_start_height),
157            "sync_end_height" => u32::from(value.sync_end_height),
158            "blocks_scanned" => value.blocks_scanned,
159            "sapling_outputs_scanned" => value.sapling_outputs_scanned,
160            "orchard_outputs_scanned" => value.orchard_outputs_scanned,
161            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
162        }
163    }
164}
165
166/// Scanning range priority levels.
167#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
168pub enum ScanPriority {
169    /// Block ranges that are currently refetching nullifiers.
170    RefetchingNullifiers,
171    /// Block ranges that are currently being scanned.
172    Scanning,
173    /// Block ranges that have already been scanned will not be re-scanned.
174    Scanned,
175    /// Block ranges that have already been scanned. The nullifiers from this range were not mapped after scanning and
176    /// spend detection to reduce memory consumption and/or storage for non-linear scanning. These nullifiers will need
177    /// to be re-fetched for final spend detection when this range is the lowest unscanned range in the wallet's list
178    /// of scan ranges.
179    ScannedWithoutMapping,
180    /// Block ranges to be scanned to advance the fully-scanned height.
181    Historic,
182    /// Block ranges adjacent to heights at which the user opened the wallet.
183    OpenAdjacent,
184    /// Blocks that must be scanned to complete note commitment tree shards adjacent to found notes.
185    FoundNote,
186    /// Blocks that must be scanned to complete the latest note commitment tree shard.
187    ChainTip,
188    /// A previously scanned range that must be verified to check it is still in the
189    /// main chain, has highest priority.
190    Verify,
191}
192
193/// A range of blocks to be scanned, along with its associated priority.
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ScanRange {
196    block_range: Range<BlockHeight>,
197    priority: ScanPriority,
198}
199
200impl std::fmt::Display for ScanRange {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(
203            f,
204            "{:?}({}..{})",
205            self.priority, self.block_range.start, self.block_range.end,
206        )
207    }
208}
209
210impl ScanRange {
211    /// Constructs a scan range from its constituent parts.
212    #[must_use]
213    pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
214        assert!(
215            block_range.end >= block_range.start,
216            "{block_range:?} is invalid for ScanRange({priority:?})",
217        );
218        ScanRange {
219            block_range,
220            priority,
221        }
222    }
223
224    /// Returns the range of block heights to be scanned.
225    #[must_use]
226    pub fn block_range(&self) -> &Range<BlockHeight> {
227        &self.block_range
228    }
229
230    /// Returns the priority with which the scan range should be scanned.
231    #[must_use]
232    pub fn priority(&self) -> ScanPriority {
233        self.priority
234    }
235
236    /// Returns whether or not the scan range is empty.
237    #[must_use]
238    pub fn is_empty(&self) -> bool {
239        self.block_range.is_empty()
240    }
241
242    /// Returns the number of blocks in the scan range.
243    #[must_use]
244    pub fn len(&self) -> usize {
245        usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
246            .expect("due to number of max blocks should always be valid usize")
247    }
248
249    /// Shifts the start of the block range to the right if `block_height >
250    /// self.block_range().start`. Returns `None` if the resulting range would
251    /// be empty (or the range was already empty).
252    #[must_use]
253    pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
254        if block_height >= self.block_range.end || self.is_empty() {
255            None
256        } else {
257            Some(ScanRange {
258                block_range: self.block_range.start.max(block_height)..self.block_range.end,
259                priority: self.priority,
260            })
261        }
262    }
263
264    /// Shifts the end of the block range to the left if `block_height <
265    /// self.block_range().end`. Returns `None` if the resulting range would
266    /// be empty (or the range was already empty).
267    #[must_use]
268    pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
269        if block_height <= self.block_range.start || self.is_empty() {
270            None
271        } else {
272            Some(ScanRange {
273                block_range: self.block_range.start..self.block_range.end.min(block_height),
274                priority: self.priority,
275            })
276        }
277    }
278
279    /// Splits this scan range at the specified height, such that the provided height becomes the
280    /// end of the first range returned and the start of the second. Returns `None` if
281    /// `p <= self.block_range().start || p >= self.block_range().end`.
282    #[must_use]
283    pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
284        (p > self.block_range.start && p < self.block_range.end).then_some((
285            ScanRange {
286                block_range: self.block_range.start..p,
287                priority: self.priority,
288            },
289            ScanRange {
290                block_range: p..self.block_range.end,
291                priority: self.priority,
292            },
293        ))
294    }
295}
296
297/// Syncs a wallet to the latest state of the blockchain.
298///
299/// `sync_mode` is intended to be stored in a struct that owns the wallet(s) (i.e. lightclient) and has a non-atomic
300/// counterpart [`crate::wallet::SyncMode`]. The sync engine will set the `sync_mode` to `Running` at the start of sync.
301/// However, the consumer is required to set the `sync_mode` back to `NotRunning` when sync is succussful or returns an
302/// error. This allows more flexibility and safety with sync task handles etc.
303/// `sync_mode` may also be set to `Paused` externally to pause scanning so the wallet lock can be acquired multiple
304/// times in quick sucession without the sync engine interrupting.
305/// Set `sync_mode` back to `Running` to resume scanning.
306/// Set `sync_mode` to `Shutdown` to stop the sync process.
307pub async fn sync<P, W>(
308    client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
309    consensus_parameters: &P,
310    wallet: Arc<RwLock<W>>,
311    sync_mode: Arc<AtomicU8>,
312    config: SyncConfig,
313) -> Result<SyncResult, SyncError<W::Error>>
314where
315    P: consensus::Parameters + Sync + Send + 'static,
316    W: SyncWallet
317        + SyncBlocks
318        + SyncTransactions
319        + SyncNullifiers
320        + SyncOutPoints
321        + SyncShardTrees
322        + Send,
323{
324    let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
325    if sync_mode_enum == SyncMode::NotRunning {
326        sync_mode_enum = SyncMode::Running;
327        sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
328    } else {
329        return Err(SyncModeError::SyncAlreadyRunning.into());
330    }
331
332    tracing::info!("Starting sync...");
333
334    // create channel for sending fetch requests and launch fetcher task
335    let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
336    let client_clone = client.clone();
337    let fetcher_handle =
338        tokio::spawn(
339            async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
340        );
341
342    // create channel for receiving mempool transactions and launch mempool monitor
343    let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
344    let shutdown_mempool = Arc::new(AtomicBool::new(false));
345    let shutdown_mempool_clone = shutdown_mempool.clone();
346    let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
347    let unprocessed_mempool_transactions_count_clone =
348        unprocessed_mempool_transactions_count.clone();
349    let mempool_handle = tokio::spawn(async move {
350        mempool_monitor(
351            client,
352            mempool_transaction_sender,
353            unprocessed_mempool_transactions_count_clone,
354            shutdown_mempool_clone,
355        )
356        .await
357    });
358
359    // pre-scan initialisation
360    let mut wallet_guard = wallet.write().await;
361
362    let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
363    if chain_height == 0.into() {
364        return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
365    }
366    let last_known_chain_height =
367        checked_wallet_height(&mut *wallet_guard, chain_height, consensus_parameters)?;
368
369    let ufvks = wallet_guard
370        .get_unified_full_viewing_keys()
371        .map_err(SyncError::WalletError)?;
372
373    transparent::update_addresses_and_scan_targets(
374        consensus_parameters,
375        &mut *wallet_guard,
376        fetch_request_sender.clone(),
377        &ufvks,
378        last_known_chain_height,
379        chain_height,
380        config.transparent_address_discovery,
381    )
382    .await?;
383
384    #[cfg(not(feature = "darkside_test"))]
385    update_subtree_roots(
386        consensus_parameters,
387        fetch_request_sender.clone(),
388        &mut *wallet_guard,
389    )
390    .await?;
391
392    add_initial_frontier(
393        consensus_parameters,
394        fetch_request_sender.clone(),
395        &mut *wallet_guard,
396    )
397    .await?;
398
399    let initial_reorg_detection_start_height = state::update_scan_ranges(
400        consensus_parameters,
401        last_known_chain_height,
402        chain_height,
403        wallet_guard
404            .get_sync_state_mut()
405            .map_err(SyncError::WalletError)?,
406    );
407
408    state::set_initial_state(
409        consensus_parameters,
410        fetch_request_sender.clone(),
411        &mut *wallet_guard,
412        chain_height,
413    )
414    .await?;
415
416    expire_transactions(&mut *wallet_guard)?;
417
418    drop(wallet_guard);
419
420    // create channel for receiving scan results and launch scanner
421    let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
422    let mut scanner = Scanner::new(
423        consensus_parameters.clone(),
424        scan_results_sender,
425        fetch_request_sender.clone(),
426        ufvks.clone(),
427    );
428    scanner.launch(config.performance_level);
429
430    // TODO: implement an option for continuous scanning where it doesnt exit when complete
431
432    let mut nullifier_map_limit_exceeded = false;
433    let mut interval = tokio::time::interval(Duration::from_millis(50));
434    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
435    loop {
436        tokio::select! {
437            Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
438                let mut wallet_guard = wallet.write().await;
439                process_scan_results(
440                    consensus_parameters,
441                    &mut *wallet_guard,
442                    fetch_request_sender.clone(),
443                    &ufvks,
444                    scan_range,
445                    scan_results,
446                    initial_reorg_detection_start_height,
447                    config.performance_level,
448                    &mut nullifier_map_limit_exceeded,
449                )
450                .await?;
451                wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
452                drop(wallet_guard);
453            }
454
455            Some(raw_transaction) = mempool_transaction_receiver.recv() => {
456                let mut wallet_guard = wallet.write().await;
457                process_mempool_transaction(
458                    consensus_parameters,
459                    &ufvks,
460                    &mut *wallet_guard,
461                    raw_transaction,
462                )
463                .await?;
464                unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
465                drop(wallet_guard);
466            }
467
468            _update_scanner = interval.tick() => {
469                sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
470                match sync_mode_enum {
471                    SyncMode::Paused => {
472                        let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
473                        pause_interval.tick().await;
474                        while sync_mode_enum == SyncMode::Paused {
475                            pause_interval.tick().await;
476                            sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
477                        }
478                    },
479                    SyncMode::Shutdown => {
480                        let mut wallet_guard = wallet.write().await;
481                        let sync_status = match sync_status(&*wallet_guard).await {
482                            Ok(status) => status,
483                            Err(SyncStatusError::WalletError(e)) => {
484                                return Err(SyncError::WalletError(e));
485                            }
486                            Err(SyncStatusError::NoSyncData) => {
487                                panic!("sync data must exist!");
488                            }
489                        };
490                        wallet_guard
491                            .set_save_flag()
492                            .map_err(SyncError::WalletError)?;
493                        drop(wallet_guard);
494                        tracing::info!("Sync successfully shutdown.");
495
496                        return Ok(SyncResult {
497                            sync_start_height: sync_status.sync_start_height,
498                            sync_end_height: (sync_status
499                                .scan_ranges
500                                .last()
501                                .expect("should be non-empty after syncing")
502                                .block_range()
503                                .end
504                                - 1),
505                            blocks_scanned: sync_status.session_blocks_scanned,
506                            sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
507                            orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
508                            percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
509                        });
510                    }
511                    SyncMode::Running => (),
512                    SyncMode::NotRunning => {
513                        panic!("sync mode should not be manually set to NotRunning!");
514                    },
515                }
516
517                scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), nullifier_map_limit_exceeded).await?;
518
519                if matches!(scanner.state, ScannerState::Shutdown) {
520                    // wait for mempool monitor to receive mempool transactions
521                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
522                    if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
523                    {
524                        tracing::info!("Sync successfully shutdown.");
525                        break;
526                    }
527                }
528            }
529        }
530    }
531
532    let mut wallet_guard = wallet.write().await;
533    let sync_status = match sync_status(&*wallet_guard).await {
534        Ok(status) => status,
535        Err(SyncStatusError::WalletError(e)) => {
536            return Err(SyncError::WalletError(e));
537        }
538        Err(SyncStatusError::NoSyncData) => {
539            panic!("sync data must exist!");
540        }
541    };
542    // once sync is complete, all nullifiers will have been re-fetched so this note metadata can be discarded.
543    for transaction in wallet_guard
544        .get_wallet_transactions_mut()
545        .map_err(SyncError::WalletError)?
546        .values_mut()
547    {
548        for note in transaction.sapling_notes.as_mut_slice() {
549            note.refetch_nullifier_ranges = Vec::new();
550        }
551        for note in transaction.orchard_notes.as_mut_slice() {
552            note.refetch_nullifier_ranges = Vec::new();
553        }
554    }
555    wallet_guard
556        .set_save_flag()
557        .map_err(SyncError::WalletError)?;
558
559    drop(wallet_guard);
560    drop(scanner);
561    drop(fetch_request_sender);
562
563    match mempool_handle.await.expect("task panicked") {
564        Ok(()) => (),
565        Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
566        Err(e) => return Err(e.into()),
567    }
568    fetcher_handle.await.expect("task panicked");
569
570    Ok(SyncResult {
571        sync_start_height: sync_status.sync_start_height,
572        sync_end_height: (sync_status
573            .scan_ranges
574            .last()
575            .expect("should be non-empty after syncing")
576            .block_range()
577            .end
578            - 1),
579        blocks_scanned: sync_status.session_blocks_scanned,
580        sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
581        orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
582        percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
583    })
584}
585
586/// This ensures that the wallet height used to calculate the lower bound for scan range creation is valid.
587/// The comparison takes two input heights and uses several constants to select the correct height.
588///
589/// The input parameter heights are:
590///
591///   (1) chain_height:
592///       * the best block-height reported by the proxy (zainod or lwd)
593///   (2) last_known_chain_height
594///       * the last max height the wallet recorded from earlier scans
595///
596/// The constants are:
597///   (1) MAX_REORG_ALLOWANCE:
598///       * the maximum number of blocks the wallet can truncate during re-org detection
599///   (2) Sapling Activation Height:
600///       * the lower bound on the wallet birthday
601fn checked_wallet_height<W, P>(
602    wallet: &mut W,
603    chain_height: BlockHeight,
604    consensus_parameters: &P,
605) -> Result<BlockHeight, SyncError<W::Error>>
606where
607    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
608    P: zcash_protocol::consensus::Parameters,
609{
610    let sync_state = wallet.get_sync_state().map_err(SyncError::WalletError)?;
611    if let Some(last_known_chain_height) = sync_state.last_known_chain_height() {
612        if last_known_chain_height > chain_height {
613            if last_known_chain_height - chain_height >= MAX_REORG_ALLOWANCE {
614                // There's a human attention requiring problem, the wallet supplied
615                // last_known_chain_height is more than MAX_REORG_ALLOWANCE **above**
616                // the proxy's reported height.
617                return Err(SyncError::ChainError(
618                    u32::from(last_known_chain_height),
619                    MAX_REORG_ALLOWANCE,
620                    u32::from(chain_height),
621                ));
622            }
623            // The wallet reported height is above the current proxy height
624            // reset to the proxy height.
625            truncate_wallet_data(wallet, chain_height)?;
626            return Ok(chain_height);
627        }
628        // The last wallet reported height is equal or below the proxy height.
629        Ok(last_known_chain_height)
630    } else {
631        // This is the wallet's first sync. Use [birthday - 1] as wallet height.
632        let sapling_activation_height = consensus_parameters
633            .activation_height(consensus::NetworkUpgrade::Sapling)
634            .expect("sapling activation height should always return Some");
635        let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
636        if birthday > chain_height {
637            // Human attention requiring error, a birthday *above* the proxy reported
638            // chain height has been provided.
639            return Err(SyncError::ChainError(
640                u32::from(birthday),
641                MAX_REORG_ALLOWANCE,
642                u32::from(chain_height),
643            ));
644        } else if birthday < sapling_activation_height {
645            return Err(SyncError::BirthdayBelowSapling(
646                u32::from(birthday),
647                u32::from(sapling_activation_height),
648            ));
649        }
650
651        Ok(birthday - 1)
652    }
653}
654
655/// Creates a [`self::SyncStatus`] from the wallet's current [`crate::wallet::SyncState`].
656/// If there is still nullifiers to be re-fetched when scanning is complete, the percentages will be overrided to 99%
657/// until sync is complete.
658///
659/// Intended to be called while [`self::sync`] is running in a separate task.
660pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
661where
662    W: SyncWallet + SyncBlocks,
663{
664    let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
665        state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
666    let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
667
668    let sync_state = wallet
669        .get_sync_state()
670        .map_err(SyncStatusError::WalletError)?;
671    if sync_state.initial_sync_state.sync_start_height == 0.into() {
672        return Ok(SyncStatus {
673            scan_ranges: sync_state.scan_ranges.clone(),
674            sync_start_height: 0.into(),
675            session_blocks_scanned: 0,
676            total_blocks_scanned: 0,
677            percentage_session_blocks_scanned: 0.0,
678            percentage_total_blocks_scanned: 0.0,
679            session_sapling_outputs_scanned: 0,
680            session_orchard_outputs_scanned: 0,
681            total_sapling_outputs_scanned: 0,
682            total_orchard_outputs_scanned: 0,
683            percentage_session_outputs_scanned: 0.0,
684            percentage_total_outputs_scanned: 0.0,
685        });
686    }
687    let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
688
689    let birthday = sync_state
690        .wallet_birthday()
691        .ok_or(SyncStatusError::NoSyncData)?;
692    let last_known_chain_height = sync_state
693        .last_known_chain_height()
694        .ok_or(SyncStatusError::NoSyncData)?;
695    let total_blocks = last_known_chain_height - birthday + 1;
696    let total_sapling_outputs = sync_state
697        .initial_sync_state
698        .wallet_tree_bounds
699        .sapling_final_tree_size
700        - sync_state
701            .initial_sync_state
702            .wallet_tree_bounds
703            .sapling_initial_tree_size;
704    let total_orchard_outputs = sync_state
705        .initial_sync_state
706        .wallet_tree_bounds
707        .orchard_final_tree_size
708        - sync_state
709            .initial_sync_state
710            .wallet_tree_bounds
711            .orchard_initial_tree_size;
712    let total_outputs = total_sapling_outputs + total_orchard_outputs;
713
714    let session_blocks_scanned =
715        total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
716    let mut percentage_session_blocks_scanned = ((session_blocks_scanned as f32
717        / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
718        * 100.0)
719        .clamp(0.0, 100.0);
720    let mut percentage_total_blocks_scanned =
721        ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
722
723    let session_sapling_outputs_scanned = total_sapling_outputs_scanned
724        - sync_state
725            .initial_sync_state
726            .previously_scanned_sapling_outputs;
727    let session_orchard_outputs_scanned = total_orchard_outputs_scanned
728        - sync_state
729            .initial_sync_state
730            .previously_scanned_orchard_outputs;
731    let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
732    let previously_scanned_outputs = sync_state
733        .initial_sync_state
734        .previously_scanned_sapling_outputs
735        + sync_state
736            .initial_sync_state
737            .previously_scanned_orchard_outputs;
738    let mut percentage_session_outputs_scanned = ((session_outputs_scanned as f32
739        / (total_outputs - previously_scanned_outputs) as f32)
740        * 100.0)
741        .clamp(0.0, 100.0);
742    let mut percentage_total_outputs_scanned =
743        ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
744
745    if sync_state.scan_ranges().iter().any(|scan_range| {
746        scan_range.priority() == ScanPriority::ScannedWithoutMapping
747            || scan_range.priority() == ScanPriority::RefetchingNullifiers
748    }) {
749        if percentage_session_blocks_scanned == 100.0 {
750            percentage_session_blocks_scanned = 99.0;
751        }
752        if percentage_total_blocks_scanned == 100.0 {
753            percentage_total_blocks_scanned = 99.0;
754        }
755        if percentage_session_outputs_scanned == 100.0 {
756            percentage_session_outputs_scanned = 99.0;
757        }
758        if percentage_total_outputs_scanned == 100.0 {
759            percentage_total_outputs_scanned = 99.0;
760        }
761    }
762
763    Ok(SyncStatus {
764        scan_ranges: sync_state.scan_ranges.clone(),
765        sync_start_height: sync_state.initial_sync_state.sync_start_height,
766        session_blocks_scanned,
767        total_blocks_scanned,
768        percentage_session_blocks_scanned,
769        percentage_total_blocks_scanned,
770        session_sapling_outputs_scanned,
771        total_sapling_outputs_scanned,
772        session_orchard_outputs_scanned,
773        total_orchard_outputs_scanned,
774        percentage_session_outputs_scanned,
775        percentage_total_outputs_scanned,
776    })
777}
778
779/// Scans a pending `transaction` of a given `status`, adding to the wallet and updating output spend statuses.
780///
781/// Used both internally for scanning mempool transactions and externally for scanning calculated and transmitted
782/// transactions during send.
783///
784/// Panics if `status` is of `Confirmed` variant.
785pub fn scan_pending_transaction<W>(
786    consensus_parameters: &impl consensus::Parameters,
787    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
788    wallet: &mut W,
789    transaction: Transaction,
790    status: ConfirmationStatus,
791    datetime: u32,
792) -> Result<(), SyncError<W::Error>>
793where
794    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
795{
796    if matches!(status, ConfirmationStatus::Confirmed(_)) {
797        panic!("this fn is for unconfirmed transactions only");
798    }
799
800    let mut pending_transaction_nullifiers = NullifierMap::new();
801    let mut pending_transaction_outpoints = BTreeMap::new();
802    let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
803        .get_transparent_addresses()
804        .map_err(SyncError::WalletError)?
805        .iter()
806        .map(|(id, address)| (address.clone(), *id))
807        .collect();
808    let pending_transaction = scan_transaction(
809        consensus_parameters,
810        ufvks,
811        transaction.txid(),
812        transaction,
813        status,
814        None,
815        &mut pending_transaction_nullifiers,
816        &mut pending_transaction_outpoints,
817        &transparent_addresses,
818        datetime,
819    )?;
820
821    let wallet_transactions = wallet
822        .get_wallet_transactions()
823        .map_err(SyncError::WalletError)?;
824    let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
825    let transparent_spend_scan_targets = spend::detect_transparent_spends(
826        &mut pending_transaction_outpoints,
827        transparent_output_ids,
828    );
829    let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
830        spend::collect_derived_nullifiers(wallet_transactions);
831    let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
832        &mut pending_transaction_nullifiers,
833        sapling_derived_nullifiers,
834        orchard_derived_nullifiers,
835    );
836
837    // return if transaction is not relevant to the wallet
838    if pending_transaction.transparent_coins().is_empty()
839        && pending_transaction.sapling_notes().is_empty()
840        && pending_transaction.orchard_notes().is_empty()
841        && pending_transaction.outgoing_orchard_notes().is_empty()
842        && pending_transaction.outgoing_sapling_notes().is_empty()
843        && transparent_spend_scan_targets.is_empty()
844        && sapling_spend_scan_targets.is_empty()
845        && orchard_spend_scan_targets.is_empty()
846    {
847        return Ok(());
848    }
849
850    wallet
851        .insert_wallet_transaction(pending_transaction)
852        .map_err(SyncError::WalletError)?;
853    spend::update_spent_coins(
854        wallet
855            .get_wallet_transactions_mut()
856            .map_err(SyncError::WalletError)?,
857        transparent_spend_scan_targets,
858    );
859    spend::update_spent_notes(
860        wallet,
861        sapling_spend_scan_targets,
862        orchard_spend_scan_targets,
863        false,
864    )
865    .map_err(SyncError::WalletError)?;
866
867    Ok(())
868}
869
870/// API for targetted scanning.
871///
872/// Allows `scan_targets` to be added externally to the wallet's `sync_state` and be prioritised for scanning. Each
873/// scan target must include the block height which will be used to prioritise the block range containing the note
874/// commitments to the surrounding orchard shard(s). If the block height is pre-orchard then the surrounding sapling
875/// shard(s) will be prioritised instead. The txid in each scan target may be omitted and set to [0u8; 32] in order to
876/// prioritise the surrounding blocks for scanning but be ignored when fetching specific relevant transactions to the
877/// wallet. However, in the case where a relevant spending transaction at a given height contains no decryptable
878/// incoming notes (change), only the nullifier will be mapped and this transaction will be scanned when the
879/// transaction containing the spent notes is scanned instead.
880pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
881    for scan_target in scan_targets {
882        sync_state.scan_targets.insert(*scan_target);
883    }
884}
885
886/// Resets the spending transaction field of all outputs that were previously spent but became unspent due to a
887/// spending transactions becoming invalid.
888///
889/// `invalid_txids` are the id's of the invalidated spending transactions. Any outputs in the `wallet_transactions`
890/// matching these spending transactions will be reset back to `None`.
891pub fn reset_spends(
892    wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
893    invalid_txids: Vec<TxId>,
894) {
895    wallet_transactions
896        .values_mut()
897        .flat_map(|transaction| transaction.orchard_notes_mut())
898        .filter(|output| {
899            output
900                .spending_transaction
901                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
902        })
903        .for_each(|output| {
904            output.set_spending_transaction(None);
905        });
906    wallet_transactions
907        .values_mut()
908        .flat_map(|transaction| transaction.sapling_notes_mut())
909        .filter(|output| {
910            output
911                .spending_transaction
912                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
913        })
914        .for_each(|output| {
915            output.set_spending_transaction(None);
916        });
917    wallet_transactions
918        .values_mut()
919        .flat_map(|transaction| transaction.transparent_coins_mut())
920        .filter(|output| {
921            output
922                .spending_transaction
923                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
924        })
925        .for_each(|output| {
926            output.set_spending_transaction(None);
927        });
928}
929
930/// Sets transactions associated with list of `failed_txids` in `wallet_transactions` to `Failed` status.
931///
932/// Sets the `spending_transaction` fields of any outputs spent in these transactions to `None`.
933pub fn set_transactions_failed(
934    wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
935    failed_txids: Vec<TxId>,
936) {
937    for failed_txid in failed_txids.iter() {
938        if let Some(transaction) = wallet_transactions.get_mut(failed_txid) {
939            let height = transaction.status().get_height();
940            transaction.update_status(
941                ConfirmationStatus::Failed(height),
942                SystemTime::now()
943                    .duration_since(SystemTime::UNIX_EPOCH)
944                    .expect("infalliable for such long time periods")
945                    .as_secs() as u32,
946            );
947        }
948    }
949    reset_spends(wallet_transactions, failed_txids);
950}
951
952/// Returns true if the scanner and mempool are shutdown.
953fn is_shutdown<P>(
954    scanner: &Scanner<P>,
955    mempool_unprocessed_transactions_count: Arc<AtomicU8>,
956) -> bool
957where
958    P: consensus::Parameters + Sync + Send + 'static,
959{
960    scanner.worker_poolsize() == 0
961        && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
962}
963
964/// Scan post-processing
965#[allow(clippy::too_many_arguments)]
966async fn process_scan_results<W>(
967    consensus_parameters: &impl consensus::Parameters,
968    wallet: &mut W,
969    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
970    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
971    scan_range: ScanRange,
972    scan_results: Result<ScanResults, ScanError>,
973    initial_reorg_detection_start_height: BlockHeight,
974    performance_level: PerformanceLevel,
975    nullifier_map_limit_exceeded: &mut bool,
976) -> Result<(), SyncError<W::Error>>
977where
978    W: SyncWallet
979        + SyncBlocks
980        + SyncTransactions
981        + SyncNullifiers
982        + SyncOutPoints
983        + SyncShardTrees
984        + Send,
985{
986    match scan_results {
987        Ok(results) => {
988            let ScanResults {
989                mut nullifiers,
990                mut outpoints,
991                scanned_blocks,
992                wallet_transactions,
993                sapling_located_trees,
994                orchard_located_trees,
995            } = results;
996
997            if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
998                // add missing block bounds in the case that nullifier batch limit was reached and the fetch nullifier
999                // scan range was split.
1000                let full_refetching_nullifiers_range = wallet
1001                    .get_sync_state()
1002                    .map_err(SyncError::WalletError)?
1003                    .scan_ranges
1004                    .iter()
1005                    .find(|&wallet_scan_range| {
1006                        wallet_scan_range
1007                            .block_range()
1008                            .contains(&scan_range.block_range().start)
1009                            && wallet_scan_range
1010                                .block_range()
1011                                .contains(&(scan_range.block_range().end - 1))
1012                    })
1013                    .expect("wallet scan range containing scan range should exist!");
1014                if scan_range.block_range().start
1015                    != full_refetching_nullifiers_range.block_range().start
1016                    || scan_range.block_range().end
1017                        != full_refetching_nullifiers_range.block_range().end
1018                {
1019                    let mut missing_block_bounds = BTreeMap::new();
1020                    for block_bound in [
1021                        scan_range.block_range().start - 1,
1022                        scan_range.block_range().start,
1023                        scan_range.block_range().end - 1,
1024                        scan_range.block_range().end,
1025                    ] {
1026                        if block_bound < full_refetching_nullifiers_range.block_range().start
1027                            || block_bound >= full_refetching_nullifiers_range.block_range().end
1028                        {
1029                            continue;
1030                        }
1031                        if wallet.get_wallet_block(block_bound).is_err() {
1032                            missing_block_bounds.insert(
1033                                block_bound,
1034                                WalletBlock::from_compact_block(
1035                                    consensus_parameters,
1036                                    fetch_request_sender.clone(),
1037                                    &client::get_compact_block(
1038                                        fetch_request_sender.clone(),
1039                                        block_bound,
1040                                    )
1041                                    .await?,
1042                                )
1043                                .await?,
1044                            );
1045                        }
1046                    }
1047                    if !missing_block_bounds.is_empty() {
1048                        wallet
1049                            .append_wallet_blocks(missing_block_bounds)
1050                            .map_err(SyncError::WalletError)?;
1051                    }
1052                }
1053
1054                let first_unscanned_range = wallet
1055                    .get_sync_state()
1056                    .map_err(SyncError::WalletError)?
1057                    .scan_ranges
1058                    .iter()
1059                    .find(|scan_range| scan_range.priority() != ScanPriority::Scanned)
1060                    .expect("the scan range being processed is not yet set to scanned so at least one unscanned range must exist");
1061                if !first_unscanned_range
1062                    .block_range()
1063                    .contains(&scan_range.block_range().start)
1064                    || !first_unscanned_range
1065                        .block_range()
1066                        .contains(&(scan_range.block_range().end - 1))
1067                {
1068                    // in this rare edge case, a scanned `ScannedWithoutMapping` range was the highest priority yet it was not the first unscanned range so it must be discarded to avoid missing spends
1069
1070                    // reset scan range from `RefetchingNullifiers` to `ScannedWithoutMapping`
1071                    state::reset_refetching_nullifiers_scan_range(
1072                        wallet
1073                            .get_sync_state_mut()
1074                            .map_err(SyncError::WalletError)?,
1075                        scan_range.block_range().clone(),
1076                    );
1077                    tracing::debug!(
1078                        "Nullifiers discarded and will be re-fetched to avoid missing spends."
1079                    );
1080
1081                    return Ok(());
1082                }
1083
1084                spend::update_shielded_spends(
1085                    consensus_parameters,
1086                    wallet,
1087                    fetch_request_sender.clone(),
1088                    ufvks,
1089                    &scanned_blocks,
1090                    Some(&mut nullifiers),
1091                )
1092                .await?;
1093
1094                state::set_scanned_scan_range(
1095                    wallet
1096                        .get_sync_state_mut()
1097                        .map_err(SyncError::WalletError)?,
1098                    scan_range.block_range().clone(),
1099                    true, // NOTE: although nullifiers are not actually added to the wallet's nullifier map for efficiency, there is effectively no difference as spends are still updated using the `additional_nullifier_map` and would be removed on the following cleanup (`remove_irrelevant_data`) due to `ScannedWithoutMapping` ranges always being the first non-scanned range and therefore always raise the wallet's fully scanned height after processing.
1100                );
1101            } else {
1102                // nullifiers are not mapped if nullifier map size limit will be exceeded
1103                if !*nullifier_map_limit_exceeded {
1104                    let nullifier_map = wallet.get_nullifiers().map_err(SyncError::WalletError)?;
1105                    if max_nullifier_map_size(performance_level).is_some_and(|max| {
1106                        nullifier_map.orchard.len()
1107                            + nullifier_map.sapling.len()
1108                            + nullifiers.orchard.len()
1109                            + nullifiers.sapling.len()
1110                            > max
1111                    }) {
1112                        *nullifier_map_limit_exceeded = true;
1113                    }
1114                }
1115                let mut map_nullifiers = !*nullifier_map_limit_exceeded;
1116
1117                // all transparent spend locations are known before scanning so there is no need to map outpoints from untargetted ranges.
1118                // outpoints of untargetted ranges will still be checked before being discarded.
1119                let map_outpoints = scan_range.priority() >= ScanPriority::FoundNote;
1120
1121                // always map nullifiers if scanning the lowest range to be scanned for final spend detection.
1122                // this will set the range to `Scanned` (as oppose to `ScannedWithoutMapping`) and prevent immediate
1123                // re-fetching of the nullifiers in this range. these will be immediately cleared after cleanup so will not
1124                // have an impact on memory or wallet file size.
1125                // the selected range is not the lowest range to be scanned unless all ranges before it are scanned or
1126                // scanning.
1127                for query_scan_range in wallet
1128                    .get_sync_state()
1129                    .map_err(SyncError::WalletError)?
1130                    .scan_ranges()
1131                {
1132                    let scan_priority = query_scan_range.priority();
1133                    if scan_priority != ScanPriority::Scanned
1134                        && scan_priority != ScanPriority::Scanning
1135                        && scan_priority != ScanPriority::RefetchingNullifiers
1136                    {
1137                        break;
1138                    }
1139
1140                    if scan_priority == ScanPriority::Scanning
1141                        && query_scan_range
1142                            .block_range()
1143                            .contains(&scan_range.block_range().start)
1144                        && query_scan_range
1145                            .block_range()
1146                            .contains(&(scan_range.block_range().end - 1))
1147                    {
1148                        map_nullifiers = true;
1149                        break;
1150                    }
1151                }
1152
1153                update_wallet_data(
1154                    consensus_parameters,
1155                    wallet,
1156                    fetch_request_sender.clone(),
1157                    ufvks,
1158                    &scan_range,
1159                    if map_nullifiers {
1160                        Some(&mut nullifiers)
1161                    } else {
1162                        None
1163                    },
1164                    if map_outpoints {
1165                        Some(&mut outpoints)
1166                    } else {
1167                        None
1168                    },
1169                    wallet_transactions,
1170                    sapling_located_trees,
1171                    orchard_located_trees,
1172                )
1173                .await?;
1174                spend::update_transparent_spends(
1175                    wallet,
1176                    if map_outpoints {
1177                        None
1178                    } else {
1179                        Some(&mut outpoints)
1180                    },
1181                )
1182                .map_err(SyncError::WalletError)?;
1183                spend::update_shielded_spends(
1184                    consensus_parameters,
1185                    wallet,
1186                    fetch_request_sender,
1187                    ufvks,
1188                    &scanned_blocks,
1189                    if map_nullifiers {
1190                        None
1191                    } else {
1192                        Some(&mut nullifiers)
1193                    },
1194                )
1195                .await?;
1196                add_scanned_blocks(wallet, scanned_blocks, &scan_range)
1197                    .map_err(SyncError::WalletError)?;
1198
1199                state::set_scanned_scan_range(
1200                    wallet
1201                        .get_sync_state_mut()
1202                        .map_err(SyncError::WalletError)?,
1203                    scan_range.block_range().clone(),
1204                    map_nullifiers,
1205                );
1206                state::merge_scan_ranges(
1207                    wallet
1208                        .get_sync_state_mut()
1209                        .map_err(SyncError::WalletError)?,
1210                    ScanPriority::ScannedWithoutMapping,
1211                );
1212            }
1213
1214            state::merge_scan_ranges(
1215                wallet
1216                    .get_sync_state_mut()
1217                    .map_err(SyncError::WalletError)?,
1218                ScanPriority::Scanned,
1219            );
1220            remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
1221            tracing::debug!("Scan results processed.");
1222        }
1223        Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
1224            tracing::warn!("Hash discontinuity detected before block {height}.");
1225            if height == scan_range.block_range().start
1226                && scan_range.priority() == ScanPriority::Verify
1227            {
1228                tracing::info!("Re-org detected.");
1229                let sync_state = wallet
1230                    .get_sync_state_mut()
1231                    .map_err(SyncError::WalletError)?;
1232                let last_known_chain_height = sync_state
1233                    .last_known_chain_height()
1234                    .expect("scan ranges should be non-empty in this scope");
1235
1236                // reset scan range from `Scanning` to `Verify`
1237                state::set_scan_priority(
1238                    sync_state,
1239                    scan_range.block_range(),
1240                    ScanPriority::Verify,
1241                );
1242
1243                // extend verification range to VERIFY_BLOCK_RANGE_SIZE blocks below current verification range
1244                let current_reorg_detection_start_height = state::set_verify_scan_range(
1245                    sync_state,
1246                    height - 1,
1247                    state::VerifyEnd::VerifyHighest,
1248                )
1249                .block_range()
1250                .start;
1251                state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1252
1253                if initial_reorg_detection_start_height - current_reorg_detection_start_height
1254                    > MAX_REORG_ALLOWANCE
1255                {
1256                    clear_wallet_data(wallet)?;
1257
1258                    return Err(ServerError::ChainVerificationError.into());
1259                }
1260
1261                truncate_wallet_data(wallet, current_reorg_detection_start_height - 1)?;
1262
1263                state::set_initial_state(
1264                    consensus_parameters,
1265                    fetch_request_sender.clone(),
1266                    wallet,
1267                    last_known_chain_height,
1268                )
1269                .await?;
1270            } else {
1271                scan_results?;
1272            }
1273        }
1274        Err(e) => return Err(e.into()),
1275    }
1276
1277    Ok(())
1278}
1279
1280/// Processes mempool transaction.
1281///
1282/// Scan the transaction and add to the wallet if relevant.
1283async fn process_mempool_transaction<W>(
1284    consensus_parameters: &impl consensus::Parameters,
1285    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1286    wallet: &mut W,
1287    raw_transaction: RawTransaction,
1288) -> Result<(), SyncError<W::Error>>
1289where
1290    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1291{
1292    // does not use raw transaction height due to lightwalletd off-by-one bug and potential to be zero
1293    let mempool_height = wallet
1294        .get_sync_state()
1295        .map_err(SyncError::WalletError)?
1296        .last_known_chain_height()
1297        .expect("wallet height must exist after sync is initialised")
1298        + 1;
1299
1300    let transaction = zcash_primitives::transaction::Transaction::read(
1301        &raw_transaction.data[..],
1302        consensus::BranchId::for_height(consensus_parameters, mempool_height),
1303    )
1304    .map_err(ServerError::InvalidTransaction)?;
1305
1306    tracing::debug!(
1307        "mempool received txid {} at height {}",
1308        transaction.txid(),
1309        mempool_height
1310    );
1311
1312    if let Some(tx) = wallet
1313        .get_wallet_transactions_mut()
1314        .map_err(SyncError::WalletError)?
1315        .get_mut(&transaction.txid())
1316    {
1317        tx.update_status(
1318            ConfirmationStatus::Mempool(mempool_height),
1319            SystemTime::now()
1320                .duration_since(SystemTime::UNIX_EPOCH)
1321                .expect("infalliable for such long time periods")
1322                .as_secs() as u32,
1323        );
1324
1325        return Ok(());
1326    }
1327
1328    scan_pending_transaction(
1329        consensus_parameters,
1330        ufvks,
1331        wallet,
1332        transaction,
1333        ConfirmationStatus::Mempool(mempool_height),
1334        SystemTime::now()
1335            .duration_since(SystemTime::UNIX_EPOCH)
1336            .expect("infalliable for such long time periods")
1337            .as_secs() as u32,
1338    )?;
1339
1340    Ok(())
1341}
1342
1343/// Removes wallet blocks, transactions, nullifiers, outpoints and shard tree data above the given `truncate_height`.
1344fn truncate_wallet_data<W>(
1345    wallet: &mut W,
1346    truncate_height: BlockHeight,
1347) -> Result<(), SyncError<W::Error>>
1348where
1349    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1350{
1351    let sync_state = wallet
1352        .get_sync_state_mut()
1353        .map_err(SyncError::WalletError)?;
1354    let highest_scanned_height = sync_state
1355        .highest_scanned_height()
1356        .expect("should be non-empty in this scope");
1357    let wallet_birthday = sync_state
1358        .wallet_birthday()
1359        .expect("should be non-empty in this scope");
1360    let checked_truncate_height = match truncate_height.cmp(&wallet_birthday) {
1361        std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1362        std::cmp::Ordering::Less => consensus::H0,
1363    };
1364    truncate_scan_ranges(checked_truncate_height, sync_state);
1365
1366    if checked_truncate_height > highest_scanned_height {
1367        return Ok(());
1368    }
1369
1370    wallet
1371        .truncate_wallet_blocks(checked_truncate_height)
1372        .map_err(SyncError::WalletError)?;
1373    wallet
1374        .truncate_wallet_transactions(checked_truncate_height)
1375        .map_err(SyncError::WalletError)?;
1376    wallet
1377        .truncate_nullifiers(checked_truncate_height)
1378        .map_err(SyncError::WalletError)?;
1379    wallet
1380        .truncate_outpoints(checked_truncate_height)
1381        .map_err(SyncError::WalletError)?;
1382    match wallet.truncate_shard_trees(checked_truncate_height) {
1383        Ok(_) => Ok(()),
1384        Err(SyncError::TruncationError(height, pooltype)) => {
1385            clear_wallet_data(wallet)?;
1386
1387            Err(SyncError::TruncationError(height, pooltype))
1388        }
1389        Err(e) => Err(e),
1390    }?;
1391
1392    Ok(())
1393}
1394
1395fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1396where
1397    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1398{
1399    let scan_targets = wallet
1400        .get_wallet_transactions()
1401        .map_err(SyncError::WalletError)?
1402        .values()
1403        .filter_map(|transaction| {
1404            transaction
1405                .status()
1406                .get_confirmed_height()
1407                .map(|height| ScanTarget {
1408                    block_height: height,
1409                    txid: transaction.txid(),
1410                    narrow_scan_area: true,
1411                })
1412        })
1413        .collect::<Vec<_>>();
1414    truncate_wallet_data(wallet, consensus::H0)?;
1415    wallet
1416        .get_wallet_transactions_mut()
1417        .map_err(SyncError::WalletError)?
1418        .clear();
1419    let sync_state = wallet
1420        .get_sync_state_mut()
1421        .map_err(SyncError::WalletError)?;
1422    add_scan_targets(sync_state, &scan_targets);
1423    wallet.set_save_flag().map_err(SyncError::WalletError)?;
1424
1425    Ok(())
1426}
1427
1428/// Updates the wallet with data from `scan_results`
1429#[allow(clippy::too_many_arguments)]
1430async fn update_wallet_data<W>(
1431    consensus_parameters: &impl consensus::Parameters,
1432    wallet: &mut W,
1433    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1434    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1435    scan_range: &ScanRange,
1436    nullifiers: Option<&mut NullifierMap>,
1437    outpoints: Option<&mut BTreeMap<OutputId, ScanTarget>>,
1438    mut transactions: HashMap<TxId, WalletTransaction>,
1439    sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1440    orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1441) -> Result<(), SyncError<W::Error>>
1442where
1443    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1444{
1445    let sync_state = wallet
1446        .get_sync_state_mut()
1447        .map_err(SyncError::WalletError)?;
1448    let highest_scanned_height = sync_state
1449        .highest_scanned_height()
1450        .expect("scan ranges should not be empty in this scope");
1451    for transaction in transactions.values() {
1452        state::update_found_note_shard_priority(
1453            consensus_parameters,
1454            sync_state,
1455            ShieldedProtocol::Sapling,
1456            transaction,
1457        );
1458        state::update_found_note_shard_priority(
1459            consensus_parameters,
1460            sync_state,
1461            ShieldedProtocol::Orchard,
1462            transaction,
1463        );
1464    }
1465    // add all block ranges of scan ranges with `ScannedWithoutMapping` or `RefetchingNullifiers` priority above the
1466    // current scan range to each note to track which ranges need the nullifiers to be re-fetched before the note is
1467    // known to be unspent (in addition to all other ranges above the notes height being `Scanned`,
1468    // `ScannedWithoutMapping` or `RefetchingNullifiers` priority). this information is necessary as these ranges have been scanned but the
1469    // nullifiers have been discarded so must be re-fetched. if ranges are scanned but the nullifiers are discarded
1470    // (set to `ScannedWithoutMapping` priority) *after* this note has been added to the wallet, this is sufficient to
1471    // know this note has not been spent, even if this range is not set to `Scanned` priority.
1472    let refetch_nullifier_ranges = {
1473        let block_ranges: Vec<Range<BlockHeight>> = sync_state
1474            .scan_ranges()
1475            .iter()
1476            .filter(|&scan_range| {
1477                scan_range.priority() == ScanPriority::ScannedWithoutMapping
1478                    || scan_range.priority() == ScanPriority::RefetchingNullifiers
1479            })
1480            .map(|scan_range| scan_range.block_range().clone())
1481            .collect();
1482
1483        block_ranges
1484            [block_ranges.partition_point(|range| range.start < scan_range.block_range().end)..]
1485            .to_vec()
1486    };
1487    for transaction in transactions.values_mut() {
1488        for note in transaction.sapling_notes.as_mut_slice() {
1489            note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1490        }
1491        for note in transaction.orchard_notes.as_mut_slice() {
1492            note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1493        }
1494    }
1495    for transaction in transactions.values() {
1496        discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1497    }
1498
1499    wallet
1500        .extend_wallet_transactions(transactions)
1501        .map_err(SyncError::WalletError)?;
1502    if let Some(nullifiers) = nullifiers {
1503        wallet
1504            .append_nullifiers(nullifiers)
1505            .map_err(SyncError::WalletError)?;
1506    }
1507    if let Some(outpoints) = outpoints {
1508        wallet
1509            .append_outpoints(outpoints)
1510            .map_err(SyncError::WalletError)?;
1511    }
1512    wallet
1513        .update_shard_trees(
1514            fetch_request_sender,
1515            scan_range,
1516            highest_scanned_height,
1517            sapling_located_trees,
1518            orchard_located_trees,
1519        )
1520        .await?;
1521
1522    Ok(())
1523}
1524
1525fn discover_unified_addresses<W>(
1526    wallet: &mut W,
1527    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1528    transaction: &WalletTransaction,
1529) -> Result<(), W::Error>
1530where
1531    W: SyncWallet,
1532{
1533    for note in transaction
1534        .orchard_notes()
1535        .iter()
1536        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1537    {
1538        let ivk = ufvks
1539            .get(&note.key_id().account_id())
1540            .expect("ufvk must exist to decrypt this note")
1541            .orchard()
1542            .expect("fvk must exist to decrypt this note")
1543            .to_ivk(zip32::Scope::External);
1544
1545        wallet.add_orchard_address(
1546            note.key_id().account_id(),
1547            note.note().recipient(),
1548            ivk.diversifier_index(&note.note().recipient())
1549                .expect("must be key used to create this address"),
1550        )?;
1551    }
1552    for note in transaction
1553        .sapling_notes()
1554        .iter()
1555        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1556    {
1557        let ivk = ufvks
1558            .get(&note.key_id().account_id())
1559            .expect("ufvk must exist to decrypt this note")
1560            .sapling()
1561            .expect("fvk must exist to decrypt this note")
1562            .to_external_ivk();
1563
1564        wallet.add_sapling_address(
1565            note.key_id().account_id(),
1566            note.note().recipient(),
1567            ivk.decrypt_diversifier(&note.note().recipient())
1568                .expect("must be key used to create this address"),
1569        )?;
1570    }
1571
1572    Ok(())
1573}
1574
1575fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1576where
1577    W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1578{
1579    let fully_scanned_height = wallet
1580        .get_sync_state()?
1581        .fully_scanned_height()
1582        .expect("scan ranges must be non-empty");
1583
1584    wallet
1585        .get_outpoints_mut()?
1586        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1587    wallet
1588        .get_nullifiers_mut()?
1589        .sapling
1590        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1591    wallet
1592        .get_nullifiers_mut()?
1593        .orchard
1594        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1595    wallet
1596        .get_sync_state_mut()?
1597        .scan_targets
1598        .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1599    remove_irrelevant_blocks(wallet)?;
1600
1601    Ok(())
1602}
1603
1604fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1605where
1606    W: SyncWallet + SyncBlocks + SyncTransactions,
1607{
1608    let sync_state = wallet.get_sync_state()?;
1609    let highest_scanned_height = sync_state
1610        .highest_scanned_height()
1611        .expect("should be non-empty");
1612    let scanned_range_bounds = sync_state
1613        .scan_ranges()
1614        .iter()
1615        .filter(|scan_range| {
1616            scan_range.priority() == ScanPriority::Scanned
1617                || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1618                || scan_range.priority() == ScanPriority::RefetchingNullifiers
1619        })
1620        .flat_map(|scanned_range| {
1621            vec![
1622                scanned_range.block_range().start,
1623                scanned_range.block_range().end - 1,
1624            ]
1625        })
1626        .collect::<Vec<_>>();
1627    let wallet_transaction_heights = wallet
1628        .get_wallet_transactions()?
1629        .values()
1630        .filter_map(|tx| tx.status().get_confirmed_height())
1631        .collect::<Vec<_>>();
1632
1633    wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1634        *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1635            || scanned_range_bounds.contains(height)
1636            || wallet_transaction_heights.contains(height)
1637    });
1638
1639    Ok(())
1640}
1641
1642fn add_scanned_blocks<W>(
1643    wallet: &mut W,
1644    mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1645    scan_range: &ScanRange,
1646) -> Result<(), W::Error>
1647where
1648    W: SyncWallet + SyncBlocks + SyncTransactions,
1649{
1650    let sync_state = wallet.get_sync_state()?;
1651    let highest_scanned_height = sync_state
1652        .highest_scanned_height()
1653        .expect("scan ranges must be non-empty");
1654
1655    let wallet_transaction_heights = wallet
1656        .get_wallet_transactions()?
1657        .values()
1658        .filter_map(|tx| tx.status().get_confirmed_height())
1659        .collect::<Vec<_>>();
1660
1661    scanned_blocks.retain(|height, _| {
1662        *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1663            || *height == scan_range.block_range().start
1664            || *height == scan_range.block_range().end - 1
1665            || wallet_transaction_heights.contains(height)
1666    });
1667
1668    wallet.append_wallet_blocks(scanned_blocks)?;
1669
1670    Ok(())
1671}
1672
1673#[cfg(not(feature = "darkside_test"))]
1674async fn update_subtree_roots<W>(
1675    consensus_parameters: &impl consensus::Parameters,
1676    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1677    wallet: &mut W,
1678) -> Result<(), SyncError<W::Error>>
1679where
1680    W: SyncWallet + SyncShardTrees,
1681{
1682    let sapling_start_index = wallet
1683        .get_shard_trees()
1684        .map_err(SyncError::WalletError)?
1685        .sapling
1686        .store()
1687        .get_shard_roots()
1688        .expect("infallible")
1689        .len() as u32;
1690    let orchard_start_index = wallet
1691        .get_shard_trees()
1692        .map_err(SyncError::WalletError)?
1693        .orchard
1694        .store()
1695        .get_shard_roots()
1696        .expect("infallible")
1697        .len() as u32;
1698    let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1699        client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1700        client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1701    );
1702
1703    let sapling_subtree_roots = sapling_subtree_roots?;
1704    let orchard_subtree_roots = orchard_subtree_roots?;
1705
1706    let sync_state = wallet
1707        .get_sync_state_mut()
1708        .map_err(SyncError::WalletError)?;
1709    state::add_shard_ranges(
1710        consensus_parameters,
1711        ShieldedProtocol::Sapling,
1712        sync_state,
1713        &sapling_subtree_roots,
1714    );
1715    state::add_shard_ranges(
1716        consensus_parameters,
1717        ShieldedProtocol::Orchard,
1718        sync_state,
1719        &orchard_subtree_roots,
1720    );
1721
1722    let shard_trees = wallet
1723        .get_shard_trees_mut()
1724        .map_err(SyncError::WalletError)?;
1725    witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1726    witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1727
1728    Ok(())
1729}
1730
1731async fn add_initial_frontier<W>(
1732    consensus_parameters: &impl consensus::Parameters,
1733    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1734    wallet: &mut W,
1735) -> Result<(), SyncError<W::Error>>
1736where
1737    W: SyncWallet + SyncShardTrees,
1738{
1739    let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
1740    if birthday
1741        == consensus_parameters
1742            .activation_height(consensus::NetworkUpgrade::Sapling)
1743            .expect("sapling activation height should always return Some")
1744    {
1745        return Ok(());
1746    }
1747
1748    // if the shard store only contains the first checkpoint added on initialisation, add frontiers to complete the
1749    // shard trees.
1750    let shard_trees = wallet
1751        .get_shard_trees_mut()
1752        .map_err(SyncError::WalletError)?;
1753    if shard_trees
1754        .sapling
1755        .store()
1756        .checkpoint_count()
1757        .expect("infallible")
1758        == 1
1759    {
1760        let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1761        shard_trees
1762            .sapling
1763            .insert_frontier(
1764                frontiers.final_sapling_tree().clone(),
1765                Retention::Checkpoint {
1766                    id: birthday,
1767                    marking: Marking::None,
1768                },
1769            )
1770            .expect("infallible");
1771        shard_trees
1772            .orchard
1773            .insert_frontier(
1774                frontiers.final_orchard_tree().clone(),
1775                Retention::Checkpoint {
1776                    id: birthday,
1777                    marking: Marking::None,
1778                },
1779            )
1780            .expect("infallible");
1781    }
1782
1783    Ok(())
1784}
1785
1786/// Sets up mempool stream.
1787///
1788/// If there is some raw transaction, send to be scanned.
1789/// If the mempool stream message is `None` (a block was mined) or the request failed, setup a new mempool stream.
1790async fn mempool_monitor(
1791    mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1792    mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1793    unprocessed_transactions_count: Arc<AtomicU8>,
1794    shutdown_mempool: Arc<AtomicBool>,
1795) -> Result<(), MempoolError> {
1796    let mut interval = tokio::time::interval(Duration::from_secs(1));
1797    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1798    'main: loop {
1799        let response =
1800            client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1801
1802        match response {
1803            Ok(mut mempool_stream) => {
1804                interval.reset();
1805                loop {
1806                    tokio::select! {
1807                        mempool_stream_message = mempool_stream.message() => {
1808                            match mempool_stream_message.unwrap_or(None) {
1809                                Some(raw_transaction) => {
1810                                     let _ignore_error = mempool_transaction_sender
1811                                        .send(raw_transaction)
1812                                        .await;
1813                                    unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1814                                }
1815                                None => {
1816                                    continue 'main;
1817                                }
1818                            }
1819
1820                        }
1821
1822                        _ = interval.tick() => {
1823                            if shutdown_mempool.load(atomic::Ordering::Acquire) {
1824                                break 'main;
1825                            }
1826                        }
1827                    }
1828                }
1829            }
1830            Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1831            Err(MempoolError::ServerError(e)) => {
1832                tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1833                tokio::time::sleep(Duration::from_secs(3)).await;
1834            }
1835        }
1836    }
1837
1838    Ok(())
1839}
1840
1841/// Spends will be reset to free up funds if transaction has been unconfirmed for
1842/// `UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD` confirmed blocks.
1843/// Transaction status will then be set to `Failed` if it's still unconfirmed when the chain reaches it's expiry height.
1844// TODO: add config to pepper-sync to set UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1845fn expire_transactions<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1846where
1847    W: SyncWallet + SyncTransactions,
1848{
1849    let last_known_chain_height = wallet
1850        .get_sync_state()
1851        .map_err(SyncError::WalletError)?
1852        .last_known_chain_height()
1853        .expect("wallet height must exist after scan ranges have been updated");
1854    let wallet_transactions = wallet
1855        .get_wallet_transactions_mut()
1856        .map_err(SyncError::WalletError)?;
1857
1858    let expired_txids = wallet_transactions
1859        .values()
1860        .filter(|transaction| {
1861            transaction.status().is_pending()
1862                && last_known_chain_height >= transaction.transaction().expiry_height()
1863        })
1864        .map(super::wallet::WalletTransaction::txid)
1865        .collect::<Vec<_>>();
1866    set_transactions_failed(wallet_transactions, expired_txids);
1867
1868    let stuck_funds_txids = wallet_transactions
1869        .values()
1870        .filter(|transaction| {
1871            transaction.status().is_pending()
1872                && last_known_chain_height
1873                    >= transaction.status().get_height() + UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1874        })
1875        .map(super::wallet::WalletTransaction::txid)
1876        .collect::<Vec<_>>();
1877    reset_spends(wallet_transactions, stuck_funds_txids);
1878
1879    Ok(())
1880}
1881
1882fn max_nullifier_map_size(performance_level: PerformanceLevel) -> Option<usize> {
1883    match performance_level {
1884        PerformanceLevel::Low => Some(0),
1885        PerformanceLevel::Medium => Some(125_000),
1886        PerformanceLevel::High => Some(2_000_000),
1887        PerformanceLevel::Maximum => None,
1888    }
1889}
1890
1891#[cfg(test)]
1892mod test {
1893
1894    mod checked_height_validation {
1895        use zcash_protocol::consensus::BlockHeight;
1896        use zcash_protocol::local_consensus::LocalNetwork;
1897        const LOCAL_NETWORK: LocalNetwork = LocalNetwork {
1898            overwinter: Some(BlockHeight::from_u32(1)),
1899            sapling: Some(BlockHeight::from_u32(3)),
1900            blossom: Some(BlockHeight::from_u32(3)),
1901            heartwood: Some(BlockHeight::from_u32(3)),
1902            canopy: Some(BlockHeight::from_u32(3)),
1903            nu5: Some(BlockHeight::from_u32(3)),
1904            nu6: Some(BlockHeight::from_u32(3)),
1905            nu6_1: Some(BlockHeight::from_u32(3)),
1906        };
1907        use crate::{error::SyncError, mocks::MockWalletError, sync::checked_wallet_height};
1908        // It's possible an error from an implementor's get_sync_state could bubble up to checked_wallet_height
1909        // this test shows that such an error is raies wrapped in a WalletError and return as the Err variant
1910        #[tokio::test]
1911        async fn get_sync_state_error() {
1912            let builder = crate::mocks::MockWalletBuilder::new();
1913            let test_error = "get_sync_state_error";
1914            let mut test_wallet = builder
1915                .get_sync_state_patch(Box::new(|_| {
1916                    Err(MockWalletError::AnErrorVariant(test_error.to_string()))
1917                }))
1918                .create_mock_wallet();
1919            let res =
1920                checked_wallet_height(&mut test_wallet, BlockHeight::from_u32(1), &LOCAL_NETWORK);
1921            assert!(matches!(
1922                res,
1923                Err(SyncError::WalletError(
1924                    crate::mocks::MockWalletError::AnErrorVariant(ref s)
1925                )) if s == test_error
1926            ));
1927        }
1928
1929        mod last_known_chain_height {
1930            use crate::{
1931                sync::{MAX_REORG_ALLOWANCE, ScanRange},
1932                wallet::SyncState,
1933            };
1934            const DEFAULT_START_HEIGHT: BlockHeight = BlockHeight::from_u32(1);
1935            const _DEFAULT_LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(102);
1936            const DEFAULT_CHAIN_HEIGHT: BlockHeight = BlockHeight::from_u32(110);
1937
1938            use super::*;
1939            #[tokio::test]
1940            async fn above_allowance() {
1941                const LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(211);
1942                let lkch = vec![ScanRange::from_parts(
1943                    DEFAULT_START_HEIGHT..LAST_KNOWN_HEIGHT,
1944                    crate::sync::ScanPriority::Scanned,
1945                )];
1946                let state = SyncState {
1947                    scan_ranges: lkch,
1948                    ..Default::default()
1949                };
1950                let builder = crate::mocks::MockWalletBuilder::new();
1951                let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1952                let res =
1953                    checked_wallet_height(&mut test_wallet, DEFAULT_CHAIN_HEIGHT, &LOCAL_NETWORK);
1954                if let Err(e) = res {
1955                    assert_eq!(
1956                        e.to_string(),
1957                        format!(
1958                            "wallet height {} is more than {} blocks ahead of best chain height {}",
1959                            LAST_KNOWN_HEIGHT - 1,
1960                            MAX_REORG_ALLOWANCE,
1961                            DEFAULT_CHAIN_HEIGHT
1962                        )
1963                    );
1964                } else {
1965                    panic!()
1966                }
1967            }
1968            #[tokio::test]
1969            async fn above_chain_height_below_allowance() {
1970                // The hain_height is received from the proxy
1971                // truncate uses the wallet scan start height
1972                // as a
1973                let lkch = vec![ScanRange::from_parts(
1974                    BlockHeight::from_u32(6)..BlockHeight::from_u32(10),
1975                    crate::sync::ScanPriority::Scanned,
1976                )];
1977                let state = SyncState {
1978                    scan_ranges: lkch,
1979                    ..Default::default()
1980                };
1981                let builder = crate::mocks::MockWalletBuilder::new();
1982                let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1983                let chain_height = BlockHeight::from_u32(4);
1984                // This will trigger a call to truncate_wallet_data with
1985                // chain_height and start_height inferred from the wallet.
1986                // chain must be greater than by this time which hits the Greater cmp
1987                // match
1988                let res = checked_wallet_height(&mut test_wallet, chain_height, &LOCAL_NETWORK);
1989                assert_eq!(res.unwrap(), BlockHeight::from_u32(4));
1990            }
1991            #[ignore = "in progress"]
1992            #[tokio::test]
1993            async fn equal_or_below_chain_height_and_above_sapling() {
1994                let lkch = vec![ScanRange::from_parts(
1995                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
1996                    crate::sync::ScanPriority::Scanned,
1997                )];
1998                let state = SyncState {
1999                    scan_ranges: lkch,
2000                    ..Default::default()
2001                };
2002                let builder = crate::mocks::MockWalletBuilder::new();
2003                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2004            }
2005            #[ignore = "in progress"]
2006            #[tokio::test]
2007            async fn equal_or_below_chain_height_and_below_sapling() {
2008                // This case requires that the wallet have a scan_start_below sapling
2009                // which is an unexpected state.
2010                let lkch = vec![ScanRange::from_parts(
2011                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2012                    crate::sync::ScanPriority::Scanned,
2013                )];
2014                let state = SyncState {
2015                    scan_ranges: lkch,
2016                    ..Default::default()
2017                };
2018                let builder = crate::mocks::MockWalletBuilder::new();
2019                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2020            }
2021            #[ignore = "in progress"]
2022            #[tokio::test]
2023            async fn below_sapling() {
2024                let lkch = vec![ScanRange::from_parts(
2025                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2026                    crate::sync::ScanPriority::Scanned,
2027                )];
2028                let state = SyncState {
2029                    scan_ranges: lkch,
2030                    ..Default::default()
2031                };
2032                let builder = crate::mocks::MockWalletBuilder::new();
2033                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2034            }
2035        }
2036        mod no_last_known_chain_height {
2037            use super::*;
2038            // If there are know scan_ranges in the SyncState
2039            #[tokio::test]
2040            async fn get_bday_error() {
2041                let test_error = "get_bday_error";
2042                let builder = crate::mocks::MockWalletBuilder::new();
2043                let mut test_wallet = builder
2044                    .get_birthday_patch(Box::new(|_| {
2045                        Err(crate::mocks::MockWalletError::AnErrorVariant(
2046                            test_error.to_string(),
2047                        ))
2048                    }))
2049                    .create_mock_wallet();
2050                let res = checked_wallet_height(
2051                    &mut test_wallet,
2052                    BlockHeight::from_u32(1),
2053                    &LOCAL_NETWORK,
2054                );
2055                assert!(matches!(
2056                    res,
2057                    Err(SyncError::WalletError(
2058                        crate::mocks::MockWalletError::AnErrorVariant(ref s)
2059                    )) if s == test_error
2060                ));
2061            }
2062            #[ignore = "in progress"]
2063            #[tokio::test]
2064            async fn raw_bday_above_chain_height() {
2065                let builder = crate::mocks::MockWalletBuilder::new();
2066                let mut test_wallet = builder
2067                    .birthday(BlockHeight::from_u32(15))
2068                    .create_mock_wallet();
2069                let res = checked_wallet_height(
2070                    &mut test_wallet,
2071                    BlockHeight::from_u32(1),
2072                    &LOCAL_NETWORK,
2073                );
2074                if let Err(e) = res {
2075                    assert_eq!(
2076                        e.to_string(),
2077                        format!(
2078                            "wallet height is more than {} blocks ahead of best chain height",
2079                            15 - 1
2080                        )
2081                    );
2082                } else {
2083                    panic!()
2084                }
2085            }
2086            mod sapling_height {
2087                use super::*;
2088                #[tokio::test]
2089                async fn raw_bday_above() {
2090                    let builder = crate::mocks::MockWalletBuilder::new();
2091                    let mut test_wallet = builder
2092                        .birthday(BlockHeight::from_u32(4))
2093                        .create_mock_wallet();
2094                    let res = checked_wallet_height(
2095                        &mut test_wallet,
2096                        BlockHeight::from_u32(5),
2097                        &LOCAL_NETWORK,
2098                    );
2099                    assert_eq!(res.unwrap(), BlockHeight::from_u32(4 - 1));
2100                }
2101                #[tokio::test]
2102                async fn raw_bday_equal() {
2103                    let builder = crate::mocks::MockWalletBuilder::new();
2104                    let mut test_wallet = builder
2105                        .birthday(BlockHeight::from_u32(3))
2106                        .create_mock_wallet();
2107                    let res = checked_wallet_height(
2108                        &mut test_wallet,
2109                        BlockHeight::from_u32(5),
2110                        &LOCAL_NETWORK,
2111                    );
2112                    assert_eq!(res.unwrap(), BlockHeight::from_u32(3 - 1));
2113                }
2114                #[tokio::test]
2115                async fn raw_bday_below() {
2116                    let builder = crate::mocks::MockWalletBuilder::new();
2117                    let mut test_wallet = builder
2118                        .birthday(BlockHeight::from_u32(1))
2119                        .create_mock_wallet();
2120                    let res = checked_wallet_height(
2121                        &mut test_wallet,
2122                        BlockHeight::from_u32(5),
2123                        &LOCAL_NETWORK,
2124                    );
2125                    assert!(matches!(res, Err(SyncError::BirthdayBelowSapling(1, 3))));
2126                }
2127            }
2128        }
2129    }
2130}