Skip to main content

pepper_sync/
sync.rs

1//! Entrypoint for sync engine
2
3use std::collections::{BTreeMap, HashMap};
4use std::ops::Range;
5use std::sync::Arc;
6use std::sync::atomic::{self, AtomicBool, AtomicU8};
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::{RwLock, mpsc};
10
11use incrementalmerkletree::{Marking, Retention};
12use orchard::tree::MerkleHashOrchard;
13use shardtree::store::ShardStore;
14use tonic::transport::Channel;
15use zcash_client_backend::proto::service::RawTransaction;
16use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
17use zcash_keys::keys::UnifiedFullViewingKey;
18use zcash_primitives::transaction::{Transaction, TxId};
19use zcash_protocol::ShieldedProtocol;
20use zcash_protocol::consensus::{self, BlockHeight};
21use zip32::AccountId;
22
23use zingo_status::confirmation_status::ConfirmationStatus;
24
25use crate::client::{self, FetchRequest};
26use crate::config::{PerformanceLevel, SyncConfig};
27use crate::error::{
28    ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
29    SyncStatusError,
30};
31use crate::keys::transparent::TransparentAddressId;
32use crate::scan::ScanResults;
33use crate::scan::task::{Scanner, ScannerState};
34use crate::scan::transactions::scan_transaction;
35use crate::sync::state::truncate_scan_ranges;
36use crate::wallet::traits::{
37    SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
38};
39use crate::wallet::{
40    KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
41    SyncState, WalletBlock, WalletTransaction,
42};
43use crate::witness::LocatedTreeData;
44
45#[cfg(not(feature = "darkside_test"))]
46use crate::witness;
47
48pub(crate) mod spend;
49pub(crate) mod state;
50pub(crate) mod transparent;
51
52const UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
53pub(crate) const MAX_REORG_ALLOWANCE: u32 = 100;
54const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
55
56/// A snapshot of the current state of sync. Useful for displaying the status of sync to a user / consumer.
57///
58/// `percentage_outputs_scanned` is a much more accurate indicator of sync completion than `percentage_blocks_scanned`.
59/// `percentage_total_outputs_scanned` is the percentage of outputs scanned from birthday to chain height.
60#[derive(Debug, Clone)]
61#[allow(missing_docs)]
62pub struct SyncStatus {
63    pub scan_ranges: Vec<ScanRange>,
64    pub sync_start_height: BlockHeight,
65    pub session_blocks_scanned: u32,
66    pub total_blocks_scanned: u32,
67    pub percentage_session_blocks_scanned: f32,
68    pub percentage_total_blocks_scanned: f32,
69    pub session_sapling_outputs_scanned: u32,
70    pub total_sapling_outputs_scanned: u32,
71    pub session_orchard_outputs_scanned: u32,
72    pub total_orchard_outputs_scanned: u32,
73    pub percentage_session_outputs_scanned: f32,
74    pub percentage_total_outputs_scanned: f32,
75}
76
77// TODO: complete display, scan ranges in raw form are too verbose
78impl std::fmt::Display for SyncStatus {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(
81            f,
82            "percentage complete: {}",
83            self.percentage_total_outputs_scanned
84        )
85    }
86}
87
88impl From<SyncStatus> for json::JsonValue {
89    fn from(value: SyncStatus) -> Self {
90        let scan_ranges: Vec<json::JsonValue> = value
91            .scan_ranges
92            .iter()
93            .map(|range| {
94                json::object! {
95                    "priority" => format!("{:?}", range.priority()),
96                    "start_block" => range.block_range().start.to_string(),
97                    "end_block" => (range.block_range().end - 1).to_string(),
98                }
99            })
100            .collect();
101
102        json::object! {
103            "scan_ranges" => scan_ranges,
104            "sync_start_height" => u32::from(value.sync_start_height),
105            "session_blocks_scanned" => value.session_blocks_scanned,
106            "total_blocks_scanned" => value.total_blocks_scanned,
107            "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
108            "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
109            "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
110            "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
111            "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
112            "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
113            "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
114            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
115        }
116    }
117}
118
119/// Returned when [`crate::sync::sync`] successfully completes.
120#[derive(Debug, Clone)]
121#[allow(missing_docs)]
122pub struct SyncResult {
123    pub sync_start_height: BlockHeight,
124    pub sync_end_height: BlockHeight,
125    pub blocks_scanned: u32,
126    pub sapling_outputs_scanned: u32,
127    pub orchard_outputs_scanned: u32,
128    pub percentage_total_outputs_scanned: f32,
129}
130
131impl std::fmt::Display for SyncResult {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(
134            f,
135            "Sync completed succesfully:
136{{
137    sync start height: {}
138    sync end height: {}
139    blocks scanned: {}
140    sapling outputs scanned: {}
141    orchard outputs scanned: {}
142    percentage total outputs scanned: {}
143}}",
144            self.sync_start_height,
145            self.sync_end_height,
146            self.blocks_scanned,
147            self.sapling_outputs_scanned,
148            self.orchard_outputs_scanned,
149            self.percentage_total_outputs_scanned,
150        )
151    }
152}
153
154impl From<SyncResult> for json::JsonValue {
155    fn from(value: SyncResult) -> Self {
156        json::object! {
157            "sync_start_height" => u32::from(value.sync_start_height),
158            "sync_end_height" => u32::from(value.sync_end_height),
159            "blocks_scanned" => value.blocks_scanned,
160            "sapling_outputs_scanned" => value.sapling_outputs_scanned,
161            "orchard_outputs_scanned" => value.orchard_outputs_scanned,
162            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
163        }
164    }
165}
166
167/// Scanning range priority levels.
168#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
169pub enum ScanPriority {
170    /// Block ranges that are currently refetching nullifiers.
171    RefetchingNullifiers,
172    /// Block ranges that are currently being scanned.
173    Scanning,
174    /// Block ranges that have already been scanned will not be re-scanned.
175    Scanned,
176    /// Block ranges that have already been scanned. The nullifiers from this range were not mapped after scanning and
177    /// spend detection to reduce memory consumption and/or storage for non-linear scanning. These nullifiers will need
178    /// to be re-fetched for final spend detection when this range is the lowest unscanned range in the wallet's list
179    /// of scan ranges.
180    ScannedWithoutMapping,
181    /// Block ranges to be scanned to advance the fully-scanned height.
182    Historic,
183    /// Block ranges adjacent to heights at which the user opened the wallet.
184    OpenAdjacent,
185    /// Blocks that must be scanned to complete note commitment tree shards adjacent to found notes.
186    FoundNote,
187    /// Blocks that must be scanned to complete the latest note commitment tree shard.
188    ChainTip,
189    /// A previously scanned range that must be verified to check it is still in the
190    /// main chain, has highest priority.
191    Verify,
192}
193
194/// A range of blocks to be scanned, along with its associated priority.
195#[derive(Debug, Clone, PartialEq, Eq)]
196pub struct ScanRange {
197    block_range: Range<BlockHeight>,
198    priority: ScanPriority,
199}
200
201impl std::fmt::Display for ScanRange {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        write!(
204            f,
205            "{:?}({}..{})",
206            self.priority, self.block_range.start, self.block_range.end,
207        )
208    }
209}
210
211impl ScanRange {
212    /// Constructs a scan range from its constituent parts.
213    #[must_use]
214    pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
215        assert!(
216            block_range.end >= block_range.start,
217            "{block_range:?} is invalid for ScanRange({priority:?})",
218        );
219        ScanRange {
220            block_range,
221            priority,
222        }
223    }
224
225    /// Returns the range of block heights to be scanned.
226    #[must_use]
227    pub fn block_range(&self) -> &Range<BlockHeight> {
228        &self.block_range
229    }
230
231    /// Returns the priority with which the scan range should be scanned.
232    #[must_use]
233    pub fn priority(&self) -> ScanPriority {
234        self.priority
235    }
236
237    /// Returns whether or not the scan range is empty.
238    #[must_use]
239    pub fn is_empty(&self) -> bool {
240        self.block_range.is_empty()
241    }
242
243    /// Returns the number of blocks in the scan range.
244    #[must_use]
245    pub fn len(&self) -> usize {
246        usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
247            .expect("due to number of max blocks should always be valid usize")
248    }
249
250    /// Shifts the start of the block range to the right if `block_height >
251    /// self.block_range().start`. Returns `None` if the resulting range would
252    /// be empty (or the range was already empty).
253    #[must_use]
254    pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
255        if block_height >= self.block_range.end || self.is_empty() {
256            None
257        } else {
258            Some(ScanRange {
259                block_range: self.block_range.start.max(block_height)..self.block_range.end,
260                priority: self.priority,
261            })
262        }
263    }
264
265    /// Shifts the end of the block range to the left if `block_height <
266    /// self.block_range().end`. Returns `None` if the resulting range would
267    /// be empty (or the range was already empty).
268    #[must_use]
269    pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
270        if block_height <= self.block_range.start || self.is_empty() {
271            None
272        } else {
273            Some(ScanRange {
274                block_range: self.block_range.start..self.block_range.end.min(block_height),
275                priority: self.priority,
276            })
277        }
278    }
279
280    /// Splits this scan range at the specified height, such that the provided height becomes the
281    /// end of the first range returned and the start of the second. Returns `None` if
282    /// `p <= self.block_range().start || p >= self.block_range().end`.
283    #[must_use]
284    pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
285        (p > self.block_range.start && p < self.block_range.end).then_some((
286            ScanRange {
287                block_range: self.block_range.start..p,
288                priority: self.priority,
289            },
290            ScanRange {
291                block_range: p..self.block_range.end,
292                priority: self.priority,
293            },
294        ))
295    }
296}
297
298/// Syncs a wallet to the latest state of the blockchain.
299///
300/// `sync_mode` is intended to be stored in a struct that owns the wallet(s) (i.e. lightclient) and has a non-atomic
301/// counterpart [`crate::wallet::SyncMode`]. The sync engine will set the `sync_mode` to `Running` at the start of sync.
302/// However, the consumer is required to set the `sync_mode` back to `NotRunning` when sync is succussful or returns an
303/// error. This allows more flexibility and safety with sync task handles etc.
304/// `sync_mode` may also be set to `Paused` externally to pause scanning so the wallet lock can be acquired multiple
305/// times in quick sucession without the sync engine interrupting.
306/// Set `sync_mode` back to `Running` to resume scanning.
307/// Set `sync_mode` to `Shutdown` to stop the sync process.
308pub async fn sync<P, W>(
309    client: CompactTxStreamerClient<Channel>,
310    consensus_parameters: &P,
311    wallet: Arc<RwLock<W>>,
312    sync_mode: Arc<AtomicU8>,
313    config: SyncConfig,
314) -> Result<SyncResult, SyncError<W::Error>>
315where
316    P: consensus::Parameters + Sync + Send + 'static,
317    W: SyncWallet
318        + SyncBlocks
319        + SyncTransactions
320        + SyncNullifiers
321        + SyncOutPoints
322        + SyncShardTrees
323        + Send,
324{
325    let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
326    if sync_mode_enum == SyncMode::NotRunning {
327        sync_mode_enum = SyncMode::Running;
328        sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
329    } else {
330        return Err(SyncModeError::SyncAlreadyRunning.into());
331    }
332
333    tracing::info!("Starting sync...");
334
335    // create channel for sending fetch requests and launch fetcher task
336    let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
337    let client_clone = client.clone();
338    let fetcher_handle =
339        tokio::spawn(
340            async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
341        );
342
343    // create channel for receiving mempool transactions and launch mempool monitor
344    let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
345    let shutdown_mempool = Arc::new(AtomicBool::new(false));
346    let shutdown_mempool_clone = shutdown_mempool.clone();
347    let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
348    let unprocessed_mempool_transactions_count_clone =
349        unprocessed_mempool_transactions_count.clone();
350    let mempool_handle = tokio::spawn(async move {
351        mempool_monitor(
352            client,
353            mempool_transaction_sender,
354            unprocessed_mempool_transactions_count_clone,
355            shutdown_mempool_clone,
356        )
357        .await
358    });
359
360    // pre-scan initialisation
361    let mut wallet_guard = wallet.write().await;
362
363    let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
364    if chain_height == 0.into() {
365        return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
366    }
367    let last_known_chain_height =
368        checked_wallet_height(&mut *wallet_guard, chain_height, consensus_parameters)?;
369
370    let ufvks = wallet_guard
371        .get_unified_full_viewing_keys()
372        .map_err(SyncError::WalletError)?;
373
374    transparent::update_addresses_and_scan_targets(
375        consensus_parameters,
376        &mut *wallet_guard,
377        fetch_request_sender.clone(),
378        &ufvks,
379        last_known_chain_height,
380        chain_height,
381        config.transparent_address_discovery,
382    )
383    .await?;
384
385    #[cfg(not(feature = "darkside_test"))]
386    update_subtree_roots(
387        consensus_parameters,
388        fetch_request_sender.clone(),
389        &mut *wallet_guard,
390    )
391    .await?;
392
393    add_initial_frontier(
394        consensus_parameters,
395        fetch_request_sender.clone(),
396        &mut *wallet_guard,
397    )
398    .await?;
399
400    let initial_reorg_detection_start_height = state::update_scan_ranges(
401        consensus_parameters,
402        last_known_chain_height,
403        chain_height,
404        wallet_guard
405            .get_sync_state_mut()
406            .map_err(SyncError::WalletError)?,
407    );
408
409    state::set_initial_state(
410        consensus_parameters,
411        fetch_request_sender.clone(),
412        &mut *wallet_guard,
413        chain_height,
414    )
415    .await?;
416
417    expire_transactions(&mut *wallet_guard)?;
418
419    drop(wallet_guard);
420
421    // create channel for receiving scan results and launch scanner
422    let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
423    let mut scanner = Scanner::new(
424        consensus_parameters.clone(),
425        scan_results_sender,
426        fetch_request_sender.clone(),
427        ufvks.clone(),
428    );
429    scanner.launch(config.performance_level);
430
431    // TODO: implement an option for continuous scanning where it doesnt exit when complete
432
433    let mut nullifier_map_limit_exceeded = false;
434    let mut interval = tokio::time::interval(Duration::from_millis(50));
435    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
436    loop {
437        tokio::select! {
438            Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
439                let mut wallet_guard = wallet.write().await;
440                process_scan_results(
441                    consensus_parameters,
442                    &mut *wallet_guard,
443                    fetch_request_sender.clone(),
444                    &ufvks,
445                    scan_range,
446                    scan_results,
447                    initial_reorg_detection_start_height,
448                    config.performance_level,
449                    &mut nullifier_map_limit_exceeded,
450                )
451                .await?;
452                wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
453                drop(wallet_guard);
454            }
455
456            Some(raw_transaction) = mempool_transaction_receiver.recv() => {
457                let mut wallet_guard = wallet.write().await;
458                process_mempool_transaction(
459                    consensus_parameters,
460                    &ufvks,
461                    &mut *wallet_guard,
462                    raw_transaction,
463                )
464                .await?;
465                unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
466                drop(wallet_guard);
467            }
468
469            _update_scanner = interval.tick() => {
470                sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
471                match sync_mode_enum {
472                    SyncMode::Paused => {
473                        let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
474                        pause_interval.tick().await;
475                        while sync_mode_enum == SyncMode::Paused {
476                            pause_interval.tick().await;
477                            sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
478                        }
479                    },
480                    SyncMode::Shutdown => {
481                        let mut wallet_guard = wallet.write().await;
482                        let sync_status = match sync_status(&*wallet_guard).await {
483                            Ok(status) => status,
484                            Err(SyncStatusError::WalletError(e)) => {
485                                return Err(SyncError::WalletError(e));
486                            }
487                            Err(SyncStatusError::NoSyncData) => {
488                                panic!("sync data must exist!");
489                            }
490                        };
491                        wallet_guard
492                            .set_save_flag()
493                            .map_err(SyncError::WalletError)?;
494                        drop(wallet_guard);
495                        tracing::info!("Sync successfully shutdown.");
496
497                        return Ok(SyncResult {
498                            sync_start_height: sync_status.sync_start_height,
499                            sync_end_height: (sync_status
500                                .scan_ranges
501                                .last()
502                                .expect("should be non-empty after syncing")
503                                .block_range()
504                                .end
505                                - 1),
506                            blocks_scanned: sync_status.session_blocks_scanned,
507                            sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
508                            orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
509                            percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
510                        });
511                    }
512                    SyncMode::Running => (),
513                    SyncMode::NotRunning => {
514                        panic!("sync mode should not be manually set to NotRunning!");
515                    },
516                }
517
518                scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), nullifier_map_limit_exceeded).await?;
519
520                if matches!(scanner.state, ScannerState::Shutdown) {
521                    // wait for mempool monitor to receive mempool transactions
522                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
523                    if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
524                    {
525                        tracing::info!("Sync successfully shutdown.");
526                        break;
527                    }
528                }
529            }
530        }
531    }
532
533    let mut wallet_guard = wallet.write().await;
534    let sync_status = match sync_status(&*wallet_guard).await {
535        Ok(status) => status,
536        Err(SyncStatusError::WalletError(e)) => {
537            return Err(SyncError::WalletError(e));
538        }
539        Err(SyncStatusError::NoSyncData) => {
540            panic!("sync data must exist!");
541        }
542    };
543    // once sync is complete, all nullifiers will have been re-fetched so this note metadata can be discarded.
544    for transaction in wallet_guard
545        .get_wallet_transactions_mut()
546        .map_err(SyncError::WalletError)?
547        .values_mut()
548    {
549        for note in transaction.sapling_notes.as_mut_slice() {
550            note.refetch_nullifier_ranges = Vec::new();
551        }
552        for note in transaction.orchard_notes.as_mut_slice() {
553            note.refetch_nullifier_ranges = Vec::new();
554        }
555    }
556    wallet_guard
557        .set_save_flag()
558        .map_err(SyncError::WalletError)?;
559
560    drop(wallet_guard);
561    drop(scanner);
562    drop(fetch_request_sender);
563
564    match mempool_handle.await.expect("task panicked") {
565        Ok(()) => (),
566        Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
567        Err(e) => return Err(e.into()),
568    }
569    fetcher_handle.await.expect("task panicked");
570
571    Ok(SyncResult {
572        sync_start_height: sync_status.sync_start_height,
573        sync_end_height: (sync_status
574            .scan_ranges
575            .last()
576            .expect("should be non-empty after syncing")
577            .block_range()
578            .end
579            - 1),
580        blocks_scanned: sync_status.session_blocks_scanned,
581        sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
582        orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
583        percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
584    })
585}
586
587/// This ensures that the wallet height used to calculate the lower bound for scan range creation is valid.
588/// The comparison takes two input heights and uses several constants to select the correct height.
589///
590/// The input parameter heights are:
591///
592///   (1) chain_height:
593///       * the best block-height reported by the proxy (zainod or lwd)
594///   (2) last_known_chain_height
595///       * the last max height the wallet recorded from earlier scans
596///
597/// The constants are:
598///   (1) MAX_REORG_ALLOWANCE:
599///       * the maximum number of blocks the wallet can truncate during re-org detection
600///   (2) Sapling Activation Height:
601///       * the lower bound on the wallet birthday
602fn checked_wallet_height<W, P>(
603    wallet: &mut W,
604    chain_height: BlockHeight,
605    consensus_parameters: &P,
606) -> Result<BlockHeight, SyncError<W::Error>>
607where
608    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
609    P: zcash_protocol::consensus::Parameters,
610{
611    let sync_state = wallet.get_sync_state().map_err(SyncError::WalletError)?;
612    if let Some(last_known_chain_height) = sync_state.last_known_chain_height() {
613        if last_known_chain_height > chain_height {
614            if last_known_chain_height - chain_height >= MAX_REORG_ALLOWANCE {
615                // There's a human attention requiring problem, the wallet supplied
616                // last_known_chain_height is more than MAX_REORG_ALLOWANCE **above**
617                // the proxy's reported height.
618                return Err(SyncError::ChainError(
619                    u32::from(last_known_chain_height),
620                    MAX_REORG_ALLOWANCE,
621                    u32::from(chain_height),
622                ));
623            }
624            // The wallet reported height is above the current proxy height
625            // reset to the proxy height.
626            truncate_wallet_data(wallet, chain_height)?;
627            return Ok(chain_height);
628        }
629        // The last wallet reported height is equal or below the proxy height.
630        Ok(last_known_chain_height)
631    } else {
632        // This is the wallet's first sync. Use [birthday - 1] as wallet height.
633        let sapling_activation_height = consensus_parameters
634            .activation_height(consensus::NetworkUpgrade::Sapling)
635            .expect("sapling activation height should always return Some");
636        let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
637        if birthday > chain_height {
638            // Human attention requiring error, a birthday *above* the proxy reported
639            // chain height has been provided.
640            return Err(SyncError::ChainError(
641                u32::from(birthday),
642                MAX_REORG_ALLOWANCE,
643                u32::from(chain_height),
644            ));
645        } else if birthday < sapling_activation_height {
646            return Err(SyncError::BirthdayBelowSapling(
647                u32::from(birthday),
648                u32::from(sapling_activation_height),
649            ));
650        }
651
652        Ok(birthday - 1)
653    }
654}
655
656/// Creates a [`self::SyncStatus`] from the wallet's current [`crate::wallet::SyncState`].
657/// If there is still nullifiers to be re-fetched when scanning is complete, the percentages will be overrided to 99%
658/// until sync is complete.
659///
660/// Intended to be called while [`self::sync`] is running in a separate task.
661pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
662where
663    W: SyncWallet + SyncBlocks,
664{
665    let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
666        state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
667    let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
668
669    let sync_state = wallet
670        .get_sync_state()
671        .map_err(SyncStatusError::WalletError)?;
672    if sync_state.initial_sync_state.sync_start_height == 0.into() {
673        return Ok(SyncStatus {
674            scan_ranges: sync_state.scan_ranges.clone(),
675            sync_start_height: 0.into(),
676            session_blocks_scanned: 0,
677            total_blocks_scanned: 0,
678            percentage_session_blocks_scanned: 0.0,
679            percentage_total_blocks_scanned: 0.0,
680            session_sapling_outputs_scanned: 0,
681            session_orchard_outputs_scanned: 0,
682            total_sapling_outputs_scanned: 0,
683            total_orchard_outputs_scanned: 0,
684            percentage_session_outputs_scanned: 0.0,
685            percentage_total_outputs_scanned: 0.0,
686        });
687    }
688    let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
689
690    let birthday = sync_state
691        .wallet_birthday()
692        .ok_or(SyncStatusError::NoSyncData)?;
693    let last_known_chain_height = sync_state
694        .last_known_chain_height()
695        .ok_or(SyncStatusError::NoSyncData)?;
696    let total_blocks = last_known_chain_height - birthday + 1;
697    let total_sapling_outputs = sync_state
698        .initial_sync_state
699        .wallet_tree_bounds
700        .sapling_final_tree_size
701        - sync_state
702            .initial_sync_state
703            .wallet_tree_bounds
704            .sapling_initial_tree_size;
705    let total_orchard_outputs = sync_state
706        .initial_sync_state
707        .wallet_tree_bounds
708        .orchard_final_tree_size
709        - sync_state
710            .initial_sync_state
711            .wallet_tree_bounds
712            .orchard_initial_tree_size;
713    let total_outputs = total_sapling_outputs + total_orchard_outputs;
714
715    let session_blocks_scanned =
716        total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
717    let mut percentage_session_blocks_scanned = ((session_blocks_scanned as f32
718        / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
719        * 100.0)
720        .clamp(0.0, 100.0);
721    let mut percentage_total_blocks_scanned =
722        ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
723
724    let session_sapling_outputs_scanned = total_sapling_outputs_scanned
725        - sync_state
726            .initial_sync_state
727            .previously_scanned_sapling_outputs;
728    let session_orchard_outputs_scanned = total_orchard_outputs_scanned
729        - sync_state
730            .initial_sync_state
731            .previously_scanned_orchard_outputs;
732    let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
733    let previously_scanned_outputs = sync_state
734        .initial_sync_state
735        .previously_scanned_sapling_outputs
736        + sync_state
737            .initial_sync_state
738            .previously_scanned_orchard_outputs;
739    let mut percentage_session_outputs_scanned = ((session_outputs_scanned as f32
740        / (total_outputs - previously_scanned_outputs) as f32)
741        * 100.0)
742        .clamp(0.0, 100.0);
743    let mut percentage_total_outputs_scanned =
744        ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
745
746    if sync_state.scan_ranges().iter().any(|scan_range| {
747        scan_range.priority() == ScanPriority::ScannedWithoutMapping
748            || scan_range.priority() == ScanPriority::RefetchingNullifiers
749    }) {
750        if percentage_session_blocks_scanned == 100.0 {
751            percentage_session_blocks_scanned = 99.0;
752        }
753        if percentage_total_blocks_scanned == 100.0 {
754            percentage_total_blocks_scanned = 99.0;
755        }
756        if percentage_session_outputs_scanned == 100.0 {
757            percentage_session_outputs_scanned = 99.0;
758        }
759        if percentage_total_outputs_scanned == 100.0 {
760            percentage_total_outputs_scanned = 99.0;
761        }
762    }
763
764    Ok(SyncStatus {
765        scan_ranges: sync_state.scan_ranges.clone(),
766        sync_start_height: sync_state.initial_sync_state.sync_start_height,
767        session_blocks_scanned,
768        total_blocks_scanned,
769        percentage_session_blocks_scanned,
770        percentage_total_blocks_scanned,
771        session_sapling_outputs_scanned,
772        total_sapling_outputs_scanned,
773        session_orchard_outputs_scanned,
774        total_orchard_outputs_scanned,
775        percentage_session_outputs_scanned,
776        percentage_total_outputs_scanned,
777    })
778}
779
780/// Scans a pending `transaction` of a given `status`, adding to the wallet and updating output spend statuses.
781///
782/// Used both internally for scanning mempool transactions and externally for scanning calculated and transmitted
783/// transactions during send.
784///
785/// Panics if `status` is of `Confirmed` variant.
786pub fn scan_pending_transaction<W>(
787    consensus_parameters: &impl consensus::Parameters,
788    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
789    wallet: &mut W,
790    transaction: Transaction,
791    status: ConfirmationStatus,
792    datetime: u32,
793) -> Result<(), SyncError<W::Error>>
794where
795    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
796{
797    if matches!(status, ConfirmationStatus::Confirmed(_)) {
798        panic!("this fn is for unconfirmed transactions only");
799    }
800
801    let mut pending_transaction_nullifiers = NullifierMap::new();
802    let mut pending_transaction_outpoints = BTreeMap::new();
803    let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
804        .get_transparent_addresses()
805        .map_err(SyncError::WalletError)?
806        .iter()
807        .map(|(id, address)| (address.clone(), *id))
808        .collect();
809    let pending_transaction = scan_transaction(
810        consensus_parameters,
811        ufvks,
812        transaction.txid(),
813        transaction,
814        status,
815        None,
816        &mut pending_transaction_nullifiers,
817        &mut pending_transaction_outpoints,
818        &transparent_addresses,
819        datetime,
820    )?;
821
822    let wallet_transactions = wallet
823        .get_wallet_transactions()
824        .map_err(SyncError::WalletError)?;
825    let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
826    let transparent_spend_scan_targets = spend::detect_transparent_spends(
827        &mut pending_transaction_outpoints,
828        transparent_output_ids,
829    );
830    let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
831        spend::collect_derived_nullifiers(wallet_transactions);
832    let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
833        &mut pending_transaction_nullifiers,
834        sapling_derived_nullifiers,
835        orchard_derived_nullifiers,
836    );
837
838    // return if transaction is not relevant to the wallet
839    if pending_transaction.transparent_coins().is_empty()
840        && pending_transaction.sapling_notes().is_empty()
841        && pending_transaction.orchard_notes().is_empty()
842        && pending_transaction.outgoing_orchard_notes().is_empty()
843        && pending_transaction.outgoing_sapling_notes().is_empty()
844        && transparent_spend_scan_targets.is_empty()
845        && sapling_spend_scan_targets.is_empty()
846        && orchard_spend_scan_targets.is_empty()
847    {
848        return Ok(());
849    }
850
851    wallet
852        .insert_wallet_transaction(pending_transaction)
853        .map_err(SyncError::WalletError)?;
854    spend::update_spent_coins(
855        wallet
856            .get_wallet_transactions_mut()
857            .map_err(SyncError::WalletError)?,
858        transparent_spend_scan_targets,
859    );
860    spend::update_spent_notes(
861        wallet,
862        sapling_spend_scan_targets,
863        orchard_spend_scan_targets,
864        false,
865    )
866    .map_err(SyncError::WalletError)?;
867
868    Ok(())
869}
870
871/// API for targetted scanning.
872///
873/// Allows `scan_targets` to be added externally to the wallet's `sync_state` and be prioritised for scanning. Each
874/// scan target must include the block height which will be used to prioritise the block range containing the note
875/// commitments to the surrounding orchard shard(s). If the block height is pre-orchard then the surrounding sapling
876/// shard(s) will be prioritised instead. The txid in each scan target may be omitted and set to [0u8; 32] in order to
877/// prioritise the surrounding blocks for scanning but be ignored when fetching specific relevant transactions to the
878/// wallet. However, in the case where a relevant spending transaction at a given height contains no decryptable
879/// incoming notes (change), only the nullifier will be mapped and this transaction will be scanned when the
880/// transaction containing the spent notes is scanned instead.
881pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
882    for scan_target in scan_targets {
883        sync_state.scan_targets.insert(*scan_target);
884    }
885}
886
887/// Resets the spending transaction field of all outputs that were previously spent but became unspent due to a
888/// spending transactions becoming invalid.
889///
890/// `invalid_txids` are the id's of the invalidated spending transactions. Any outputs in the `wallet_transactions`
891/// matching these spending transactions will be reset back to `None`.
892pub fn reset_spends(
893    wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
894    invalid_txids: Vec<TxId>,
895) {
896    wallet_transactions
897        .values_mut()
898        .flat_map(|transaction| transaction.orchard_notes_mut())
899        .filter(|output| {
900            output
901                .spending_transaction
902                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
903        })
904        .for_each(|output| {
905            output.set_spending_transaction(None);
906        });
907    wallet_transactions
908        .values_mut()
909        .flat_map(|transaction| transaction.sapling_notes_mut())
910        .filter(|output| {
911            output
912                .spending_transaction
913                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
914        })
915        .for_each(|output| {
916            output.set_spending_transaction(None);
917        });
918    wallet_transactions
919        .values_mut()
920        .flat_map(|transaction| transaction.transparent_coins_mut())
921        .filter(|output| {
922            output
923                .spending_transaction
924                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
925        })
926        .for_each(|output| {
927            output.set_spending_transaction(None);
928        });
929}
930
931/// Sets transactions associated with list of `failed_txids` in `wallet_transactions` to `Failed` status.
932///
933/// Sets the `spending_transaction` fields of any outputs spent in these transactions to `None`.
934pub fn set_transactions_failed(
935    wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
936    failed_txids: Vec<TxId>,
937) {
938    for failed_txid in failed_txids.iter() {
939        if let Some(transaction) = wallet_transactions.get_mut(failed_txid) {
940            let height = transaction.status().get_height();
941            transaction.update_status(
942                ConfirmationStatus::Failed(height),
943                SystemTime::now()
944                    .duration_since(SystemTime::UNIX_EPOCH)
945                    .expect("infalliable for such long time periods")
946                    .as_secs() as u32,
947            );
948        }
949    }
950    reset_spends(wallet_transactions, failed_txids);
951}
952
953/// Returns true if the scanner and mempool are shutdown.
954fn is_shutdown<P>(
955    scanner: &Scanner<P>,
956    mempool_unprocessed_transactions_count: Arc<AtomicU8>,
957) -> bool
958where
959    P: consensus::Parameters + Sync + Send + 'static,
960{
961    scanner.worker_poolsize() == 0
962        && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
963}
964
965/// Scan post-processing
966#[allow(clippy::too_many_arguments)]
967async fn process_scan_results<W>(
968    consensus_parameters: &impl consensus::Parameters,
969    wallet: &mut W,
970    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
971    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
972    scan_range: ScanRange,
973    scan_results: Result<ScanResults, ScanError>,
974    initial_reorg_detection_start_height: BlockHeight,
975    performance_level: PerformanceLevel,
976    nullifier_map_limit_exceeded: &mut bool,
977) -> Result<(), SyncError<W::Error>>
978where
979    W: SyncWallet
980        + SyncBlocks
981        + SyncTransactions
982        + SyncNullifiers
983        + SyncOutPoints
984        + SyncShardTrees
985        + Send,
986{
987    match scan_results {
988        Ok(results) => {
989            let ScanResults {
990                mut nullifiers,
991                mut outpoints,
992                scanned_blocks,
993                wallet_transactions,
994                sapling_located_trees,
995                orchard_located_trees,
996            } = results;
997
998            if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
999                // add missing block bounds in the case that nullifier batch limit was reached and the fetch nullifier
1000                // scan range was split.
1001                let full_refetching_nullifiers_range = wallet
1002                    .get_sync_state()
1003                    .map_err(SyncError::WalletError)?
1004                    .scan_ranges
1005                    .iter()
1006                    .find(|&wallet_scan_range| {
1007                        wallet_scan_range
1008                            .block_range()
1009                            .contains(&scan_range.block_range().start)
1010                            && wallet_scan_range
1011                                .block_range()
1012                                .contains(&(scan_range.block_range().end - 1))
1013                    })
1014                    .expect("wallet scan range containing scan range should exist!");
1015                if scan_range.block_range().start
1016                    != full_refetching_nullifiers_range.block_range().start
1017                    || scan_range.block_range().end
1018                        != full_refetching_nullifiers_range.block_range().end
1019                {
1020                    let mut missing_block_bounds = BTreeMap::new();
1021                    for block_bound in [
1022                        scan_range.block_range().start - 1,
1023                        scan_range.block_range().start,
1024                        scan_range.block_range().end - 1,
1025                        scan_range.block_range().end,
1026                    ] {
1027                        if block_bound < full_refetching_nullifiers_range.block_range().start
1028                            || block_bound >= full_refetching_nullifiers_range.block_range().end
1029                        {
1030                            continue;
1031                        }
1032                        if wallet.get_wallet_block(block_bound).is_err() {
1033                            missing_block_bounds.insert(
1034                                block_bound,
1035                                WalletBlock::from_compact_block(
1036                                    consensus_parameters,
1037                                    fetch_request_sender.clone(),
1038                                    &client::get_compact_block(
1039                                        fetch_request_sender.clone(),
1040                                        block_bound,
1041                                    )
1042                                    .await?,
1043                                )
1044                                .await?,
1045                            );
1046                        }
1047                    }
1048                    if !missing_block_bounds.is_empty() {
1049                        wallet
1050                            .append_wallet_blocks(missing_block_bounds)
1051                            .map_err(SyncError::WalletError)?;
1052                    }
1053                }
1054
1055                let first_unscanned_range = wallet
1056                    .get_sync_state()
1057                    .map_err(SyncError::WalletError)?
1058                    .scan_ranges
1059                    .iter()
1060                    .find(|scan_range| scan_range.priority() != ScanPriority::Scanned)
1061                    .expect("the scan range being processed is not yet set to scanned so at least one unscanned range must exist");
1062                if !first_unscanned_range
1063                    .block_range()
1064                    .contains(&scan_range.block_range().start)
1065                    || !first_unscanned_range
1066                        .block_range()
1067                        .contains(&(scan_range.block_range().end - 1))
1068                {
1069                    // in this rare edge case, a scanned `ScannedWithoutMapping` range was the highest priority yet it was not the first unscanned range so it must be discarded to avoid missing spends
1070
1071                    // reset scan range from `RefetchingNullifiers` to `ScannedWithoutMapping`
1072                    state::reset_refetching_nullifiers_scan_range(
1073                        wallet
1074                            .get_sync_state_mut()
1075                            .map_err(SyncError::WalletError)?,
1076                        scan_range.block_range().clone(),
1077                    );
1078                    tracing::debug!(
1079                        "Nullifiers discarded and will be re-fetched to avoid missing spends."
1080                    );
1081
1082                    return Ok(());
1083                }
1084
1085                spend::update_shielded_spends(
1086                    consensus_parameters,
1087                    wallet,
1088                    fetch_request_sender.clone(),
1089                    ufvks,
1090                    &scanned_blocks,
1091                    Some(&mut nullifiers),
1092                )
1093                .await?;
1094
1095                state::set_scanned_scan_range(
1096                    wallet
1097                        .get_sync_state_mut()
1098                        .map_err(SyncError::WalletError)?,
1099                    scan_range.block_range().clone(),
1100                    true, // NOTE: although nullifiers are not actually added to the wallet's nullifier map for efficiency, there is effectively no difference as spends are still updated using the `additional_nullifier_map` and would be removed on the following cleanup (`remove_irrelevant_data`) due to `ScannedWithoutMapping` ranges always being the first non-scanned range and therefore always raise the wallet's fully scanned height after processing.
1101                );
1102            } else {
1103                // nullifiers are not mapped if nullifier map size limit will be exceeded
1104                if !*nullifier_map_limit_exceeded {
1105                    let nullifier_map = wallet.get_nullifiers().map_err(SyncError::WalletError)?;
1106                    if max_nullifier_map_size(performance_level).is_some_and(|max| {
1107                        nullifier_map.orchard.len()
1108                            + nullifier_map.sapling.len()
1109                            + nullifiers.orchard.len()
1110                            + nullifiers.sapling.len()
1111                            > max
1112                    }) {
1113                        *nullifier_map_limit_exceeded = true;
1114                    }
1115                }
1116                let mut map_nullifiers = !*nullifier_map_limit_exceeded;
1117
1118                // all transparent spend locations are known before scanning so there is no need to map outpoints from untargetted ranges.
1119                // outpoints of untargetted ranges will still be checked before being discarded.
1120                let map_outpoints = scan_range.priority() >= ScanPriority::FoundNote;
1121
1122                // always map nullifiers if scanning the lowest range to be scanned for final spend detection.
1123                // this will set the range to `Scanned` (as oppose to `ScannedWithoutMapping`) and prevent immediate
1124                // re-fetching of the nullifiers in this range. these will be immediately cleared after cleanup so will not
1125                // have an impact on memory or wallet file size.
1126                // the selected range is not the lowest range to be scanned unless all ranges before it are scanned or
1127                // scanning.
1128                for query_scan_range in wallet
1129                    .get_sync_state()
1130                    .map_err(SyncError::WalletError)?
1131                    .scan_ranges()
1132                {
1133                    let scan_priority = query_scan_range.priority();
1134                    if scan_priority != ScanPriority::Scanned
1135                        && scan_priority != ScanPriority::Scanning
1136                        && scan_priority != ScanPriority::RefetchingNullifiers
1137                    {
1138                        break;
1139                    }
1140
1141                    if scan_priority == ScanPriority::Scanning
1142                        && query_scan_range
1143                            .block_range()
1144                            .contains(&scan_range.block_range().start)
1145                        && query_scan_range
1146                            .block_range()
1147                            .contains(&(scan_range.block_range().end - 1))
1148                    {
1149                        map_nullifiers = true;
1150                        break;
1151                    }
1152                }
1153
1154                update_wallet_data(
1155                    consensus_parameters,
1156                    wallet,
1157                    fetch_request_sender.clone(),
1158                    ufvks,
1159                    &scan_range,
1160                    if map_nullifiers {
1161                        Some(&mut nullifiers)
1162                    } else {
1163                        None
1164                    },
1165                    if map_outpoints {
1166                        Some(&mut outpoints)
1167                    } else {
1168                        None
1169                    },
1170                    wallet_transactions,
1171                    sapling_located_trees,
1172                    orchard_located_trees,
1173                )
1174                .await?;
1175                spend::update_transparent_spends(
1176                    wallet,
1177                    if map_outpoints {
1178                        None
1179                    } else {
1180                        Some(&mut outpoints)
1181                    },
1182                )
1183                .map_err(SyncError::WalletError)?;
1184                spend::update_shielded_spends(
1185                    consensus_parameters,
1186                    wallet,
1187                    fetch_request_sender,
1188                    ufvks,
1189                    &scanned_blocks,
1190                    if map_nullifiers {
1191                        None
1192                    } else {
1193                        Some(&mut nullifiers)
1194                    },
1195                )
1196                .await?;
1197                add_scanned_blocks(wallet, scanned_blocks, &scan_range)
1198                    .map_err(SyncError::WalletError)?;
1199
1200                state::set_scanned_scan_range(
1201                    wallet
1202                        .get_sync_state_mut()
1203                        .map_err(SyncError::WalletError)?,
1204                    scan_range.block_range().clone(),
1205                    map_nullifiers,
1206                );
1207                state::merge_scan_ranges(
1208                    wallet
1209                        .get_sync_state_mut()
1210                        .map_err(SyncError::WalletError)?,
1211                    ScanPriority::ScannedWithoutMapping,
1212                );
1213            }
1214
1215            state::merge_scan_ranges(
1216                wallet
1217                    .get_sync_state_mut()
1218                    .map_err(SyncError::WalletError)?,
1219                ScanPriority::Scanned,
1220            );
1221            remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
1222            tracing::debug!("Scan results processed.");
1223        }
1224        Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
1225            tracing::warn!("Hash discontinuity detected before block {height}.");
1226            if height == scan_range.block_range().start
1227                && scan_range.priority() == ScanPriority::Verify
1228            {
1229                tracing::info!("Re-org detected.");
1230                let sync_state = wallet
1231                    .get_sync_state_mut()
1232                    .map_err(SyncError::WalletError)?;
1233                let last_known_chain_height = sync_state
1234                    .last_known_chain_height()
1235                    .expect("scan ranges should be non-empty in this scope");
1236
1237                // reset scan range from `Scanning` to `Verify`
1238                state::set_scan_priority(
1239                    sync_state,
1240                    scan_range.block_range(),
1241                    ScanPriority::Verify,
1242                );
1243
1244                // extend verification range to VERIFY_BLOCK_RANGE_SIZE blocks below current verification range
1245                let current_reorg_detection_start_height = state::set_verify_scan_range(
1246                    sync_state,
1247                    height - 1,
1248                    state::VerifyEnd::VerifyHighest,
1249                )
1250                .block_range()
1251                .start;
1252                state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1253
1254                if initial_reorg_detection_start_height - current_reorg_detection_start_height
1255                    > MAX_REORG_ALLOWANCE
1256                {
1257                    clear_wallet_data(wallet)?;
1258
1259                    return Err(ServerError::ChainVerificationError.into());
1260                }
1261
1262                truncate_wallet_data(wallet, current_reorg_detection_start_height - 1)?;
1263
1264                state::set_initial_state(
1265                    consensus_parameters,
1266                    fetch_request_sender.clone(),
1267                    wallet,
1268                    last_known_chain_height,
1269                )
1270                .await?;
1271            } else {
1272                scan_results?;
1273            }
1274        }
1275        Err(e) => return Err(e.into()),
1276    }
1277
1278    Ok(())
1279}
1280
1281/// Processes mempool transaction.
1282///
1283/// Scan the transaction and add to the wallet if relevant.
1284async fn process_mempool_transaction<W>(
1285    consensus_parameters: &impl consensus::Parameters,
1286    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1287    wallet: &mut W,
1288    raw_transaction: RawTransaction,
1289) -> Result<(), SyncError<W::Error>>
1290where
1291    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1292{
1293    // does not use raw transaction height due to lightwalletd off-by-one bug and potential to be zero
1294    let mempool_height = wallet
1295        .get_sync_state()
1296        .map_err(SyncError::WalletError)?
1297        .last_known_chain_height()
1298        .expect("wallet height must exist after sync is initialised")
1299        + 1;
1300
1301    let transaction = zcash_primitives::transaction::Transaction::read(
1302        &raw_transaction.data[..],
1303        consensus::BranchId::for_height(consensus_parameters, mempool_height),
1304    )
1305    .map_err(ServerError::InvalidTransaction)?;
1306
1307    tracing::debug!(
1308        "mempool received txid {} at height {}",
1309        transaction.txid(),
1310        mempool_height
1311    );
1312
1313    if let Some(tx) = wallet
1314        .get_wallet_transactions_mut()
1315        .map_err(SyncError::WalletError)?
1316        .get_mut(&transaction.txid())
1317    {
1318        tx.update_status(
1319            ConfirmationStatus::Mempool(mempool_height),
1320            SystemTime::now()
1321                .duration_since(SystemTime::UNIX_EPOCH)
1322                .expect("infalliable for such long time periods")
1323                .as_secs() as u32,
1324        );
1325
1326        return Ok(());
1327    }
1328
1329    scan_pending_transaction(
1330        consensus_parameters,
1331        ufvks,
1332        wallet,
1333        transaction,
1334        ConfirmationStatus::Mempool(mempool_height),
1335        SystemTime::now()
1336            .duration_since(SystemTime::UNIX_EPOCH)
1337            .expect("infalliable for such long time periods")
1338            .as_secs() as u32,
1339    )?;
1340
1341    Ok(())
1342}
1343
1344/// Removes wallet blocks, transactions, nullifiers, outpoints and shard tree data above the given `truncate_height`.
1345fn truncate_wallet_data<W>(
1346    wallet: &mut W,
1347    truncate_height: BlockHeight,
1348) -> Result<(), SyncError<W::Error>>
1349where
1350    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1351{
1352    let sync_state = wallet
1353        .get_sync_state_mut()
1354        .map_err(SyncError::WalletError)?;
1355    let highest_scanned_height = sync_state
1356        .highest_scanned_height()
1357        .expect("should be non-empty in this scope");
1358    let wallet_birthday = sync_state
1359        .wallet_birthday()
1360        .expect("should be non-empty in this scope");
1361    let checked_truncate_height = match truncate_height.cmp(&wallet_birthday) {
1362        std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1363        std::cmp::Ordering::Less => consensus::H0,
1364    };
1365    truncate_scan_ranges(checked_truncate_height, sync_state);
1366
1367    if checked_truncate_height > highest_scanned_height {
1368        return Ok(());
1369    }
1370
1371    wallet
1372        .truncate_wallet_blocks(checked_truncate_height)
1373        .map_err(SyncError::WalletError)?;
1374    wallet
1375        .truncate_wallet_transactions(checked_truncate_height)
1376        .map_err(SyncError::WalletError)?;
1377    wallet
1378        .truncate_nullifiers(checked_truncate_height)
1379        .map_err(SyncError::WalletError)?;
1380    wallet
1381        .truncate_outpoints(checked_truncate_height)
1382        .map_err(SyncError::WalletError)?;
1383    match wallet.truncate_shard_trees(checked_truncate_height) {
1384        Ok(_) => Ok(()),
1385        Err(SyncError::TruncationError(height, pooltype)) => {
1386            clear_wallet_data(wallet)?;
1387
1388            Err(SyncError::TruncationError(height, pooltype))
1389        }
1390        Err(e) => Err(e),
1391    }?;
1392
1393    Ok(())
1394}
1395
1396fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1397where
1398    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1399{
1400    let scan_targets = wallet
1401        .get_wallet_transactions()
1402        .map_err(SyncError::WalletError)?
1403        .values()
1404        .filter_map(|transaction| {
1405            transaction
1406                .status()
1407                .get_confirmed_height()
1408                .map(|height| ScanTarget {
1409                    block_height: height,
1410                    txid: transaction.txid(),
1411                    narrow_scan_area: true,
1412                })
1413        })
1414        .collect::<Vec<_>>();
1415    truncate_wallet_data(wallet, consensus::H0)?;
1416    wallet
1417        .get_wallet_transactions_mut()
1418        .map_err(SyncError::WalletError)?
1419        .clear();
1420    let sync_state = wallet
1421        .get_sync_state_mut()
1422        .map_err(SyncError::WalletError)?;
1423    add_scan_targets(sync_state, &scan_targets);
1424    wallet.set_save_flag().map_err(SyncError::WalletError)?;
1425
1426    Ok(())
1427}
1428
1429/// Updates the wallet with data from `scan_results`
1430#[allow(clippy::too_many_arguments)]
1431async fn update_wallet_data<W>(
1432    consensus_parameters: &impl consensus::Parameters,
1433    wallet: &mut W,
1434    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1435    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1436    scan_range: &ScanRange,
1437    nullifiers: Option<&mut NullifierMap>,
1438    outpoints: Option<&mut BTreeMap<OutputId, ScanTarget>>,
1439    mut transactions: HashMap<TxId, WalletTransaction>,
1440    sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1441    orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1442) -> Result<(), SyncError<W::Error>>
1443where
1444    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1445{
1446    let sync_state = wallet
1447        .get_sync_state_mut()
1448        .map_err(SyncError::WalletError)?;
1449    let highest_scanned_height = sync_state
1450        .highest_scanned_height()
1451        .expect("scan ranges should not be empty in this scope");
1452    for transaction in transactions.values() {
1453        state::update_found_note_shard_priority(
1454            consensus_parameters,
1455            sync_state,
1456            ShieldedProtocol::Sapling,
1457            transaction,
1458        );
1459        state::update_found_note_shard_priority(
1460            consensus_parameters,
1461            sync_state,
1462            ShieldedProtocol::Orchard,
1463            transaction,
1464        );
1465    }
1466    // add all block ranges of scan ranges with `ScannedWithoutMapping` or `RefetchingNullifiers` priority above the
1467    // current scan range to each note to track which ranges need the nullifiers to be re-fetched before the note is
1468    // known to be unspent (in addition to all other ranges above the notes height being `Scanned`,
1469    // `ScannedWithoutMapping` or `RefetchingNullifiers` priority). this information is necessary as these ranges have been scanned but the
1470    // nullifiers have been discarded so must be re-fetched. if ranges are scanned but the nullifiers are discarded
1471    // (set to `ScannedWithoutMapping` priority) *after* this note has been added to the wallet, this is sufficient to
1472    // know this note has not been spent, even if this range is not set to `Scanned` priority.
1473    let refetch_nullifier_ranges = {
1474        let block_ranges: Vec<Range<BlockHeight>> = sync_state
1475            .scan_ranges()
1476            .iter()
1477            .filter(|&scan_range| {
1478                scan_range.priority() == ScanPriority::ScannedWithoutMapping
1479                    || scan_range.priority() == ScanPriority::RefetchingNullifiers
1480            })
1481            .map(|scan_range| scan_range.block_range().clone())
1482            .collect();
1483
1484        block_ranges
1485            [block_ranges.partition_point(|range| range.start < scan_range.block_range().end)..]
1486            .to_vec()
1487    };
1488    for transaction in transactions.values_mut() {
1489        for note in transaction.sapling_notes.as_mut_slice() {
1490            note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1491        }
1492        for note in transaction.orchard_notes.as_mut_slice() {
1493            note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1494        }
1495    }
1496    for transaction in transactions.values() {
1497        discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1498    }
1499
1500    wallet
1501        .extend_wallet_transactions(transactions)
1502        .map_err(SyncError::WalletError)?;
1503    if let Some(nullifiers) = nullifiers {
1504        wallet
1505            .append_nullifiers(nullifiers)
1506            .map_err(SyncError::WalletError)?;
1507    }
1508    if let Some(outpoints) = outpoints {
1509        wallet
1510            .append_outpoints(outpoints)
1511            .map_err(SyncError::WalletError)?;
1512    }
1513    wallet
1514        .update_shard_trees(
1515            fetch_request_sender,
1516            scan_range,
1517            highest_scanned_height,
1518            sapling_located_trees,
1519            orchard_located_trees,
1520        )
1521        .await?;
1522
1523    Ok(())
1524}
1525
1526fn discover_unified_addresses<W>(
1527    wallet: &mut W,
1528    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1529    transaction: &WalletTransaction,
1530) -> Result<(), W::Error>
1531where
1532    W: SyncWallet,
1533{
1534    for note in transaction
1535        .orchard_notes()
1536        .iter()
1537        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1538    {
1539        let ivk = ufvks
1540            .get(&note.key_id().account_id())
1541            .expect("ufvk must exist to decrypt this note")
1542            .orchard()
1543            .expect("fvk must exist to decrypt this note")
1544            .to_ivk(zip32::Scope::External);
1545
1546        wallet.add_orchard_address(
1547            note.key_id().account_id(),
1548            note.note().recipient(),
1549            ivk.diversifier_index(&note.note().recipient())
1550                .expect("must be key used to create this address"),
1551        )?;
1552    }
1553    for note in transaction
1554        .sapling_notes()
1555        .iter()
1556        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1557    {
1558        let ivk = ufvks
1559            .get(&note.key_id().account_id())
1560            .expect("ufvk must exist to decrypt this note")
1561            .sapling()
1562            .expect("fvk must exist to decrypt this note")
1563            .to_external_ivk();
1564
1565        wallet.add_sapling_address(
1566            note.key_id().account_id(),
1567            note.note().recipient(),
1568            ivk.decrypt_diversifier(&note.note().recipient())
1569                .expect("must be key used to create this address"),
1570        )?;
1571    }
1572
1573    Ok(())
1574}
1575
1576fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1577where
1578    W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1579{
1580    let fully_scanned_height = wallet
1581        .get_sync_state()?
1582        .fully_scanned_height()
1583        .expect("scan ranges must be non-empty");
1584
1585    wallet
1586        .get_outpoints_mut()?
1587        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1588    wallet
1589        .get_nullifiers_mut()?
1590        .sapling
1591        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1592    wallet
1593        .get_nullifiers_mut()?
1594        .orchard
1595        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1596    wallet
1597        .get_sync_state_mut()?
1598        .scan_targets
1599        .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1600    remove_irrelevant_blocks(wallet)?;
1601
1602    Ok(())
1603}
1604
1605fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1606where
1607    W: SyncWallet + SyncBlocks + SyncTransactions,
1608{
1609    let sync_state = wallet.get_sync_state()?;
1610    let highest_scanned_height = sync_state
1611        .highest_scanned_height()
1612        .expect("should be non-empty");
1613    let scanned_range_bounds = sync_state
1614        .scan_ranges()
1615        .iter()
1616        .filter(|scan_range| {
1617            scan_range.priority() == ScanPriority::Scanned
1618                || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1619                || scan_range.priority() == ScanPriority::RefetchingNullifiers
1620        })
1621        .flat_map(|scanned_range| {
1622            vec![
1623                scanned_range.block_range().start,
1624                scanned_range.block_range().end - 1,
1625            ]
1626        })
1627        .collect::<Vec<_>>();
1628    let wallet_transaction_heights = wallet
1629        .get_wallet_transactions()?
1630        .values()
1631        .filter_map(|tx| tx.status().get_confirmed_height())
1632        .collect::<Vec<_>>();
1633
1634    wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1635        *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1636            || scanned_range_bounds.contains(height)
1637            || wallet_transaction_heights.contains(height)
1638    });
1639
1640    Ok(())
1641}
1642
1643fn add_scanned_blocks<W>(
1644    wallet: &mut W,
1645    mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1646    scan_range: &ScanRange,
1647) -> Result<(), W::Error>
1648where
1649    W: SyncWallet + SyncBlocks + SyncTransactions,
1650{
1651    let sync_state = wallet.get_sync_state()?;
1652    let highest_scanned_height = sync_state
1653        .highest_scanned_height()
1654        .expect("scan ranges must be non-empty");
1655
1656    let wallet_transaction_heights = wallet
1657        .get_wallet_transactions()?
1658        .values()
1659        .filter_map(|tx| tx.status().get_confirmed_height())
1660        .collect::<Vec<_>>();
1661
1662    scanned_blocks.retain(|height, _| {
1663        *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1664            || *height == scan_range.block_range().start
1665            || *height == scan_range.block_range().end - 1
1666            || wallet_transaction_heights.contains(height)
1667    });
1668
1669    wallet.append_wallet_blocks(scanned_blocks)?;
1670
1671    Ok(())
1672}
1673
1674#[cfg(not(feature = "darkside_test"))]
1675async fn update_subtree_roots<W>(
1676    consensus_parameters: &impl consensus::Parameters,
1677    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1678    wallet: &mut W,
1679) -> Result<(), SyncError<W::Error>>
1680where
1681    W: SyncWallet + SyncShardTrees,
1682{
1683    let sapling_start_index = wallet
1684        .get_shard_trees()
1685        .map_err(SyncError::WalletError)?
1686        .sapling
1687        .store()
1688        .get_shard_roots()
1689        .expect("infallible")
1690        .len() as u32;
1691    let orchard_start_index = wallet
1692        .get_shard_trees()
1693        .map_err(SyncError::WalletError)?
1694        .orchard
1695        .store()
1696        .get_shard_roots()
1697        .expect("infallible")
1698        .len() as u32;
1699    let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1700        client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1701        client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1702    );
1703
1704    let sapling_subtree_roots = sapling_subtree_roots?;
1705    let orchard_subtree_roots = orchard_subtree_roots?;
1706
1707    let sync_state = wallet
1708        .get_sync_state_mut()
1709        .map_err(SyncError::WalletError)?;
1710    state::add_shard_ranges(
1711        consensus_parameters,
1712        ShieldedProtocol::Sapling,
1713        sync_state,
1714        &sapling_subtree_roots,
1715    );
1716    state::add_shard_ranges(
1717        consensus_parameters,
1718        ShieldedProtocol::Orchard,
1719        sync_state,
1720        &orchard_subtree_roots,
1721    );
1722
1723    let shard_trees = wallet
1724        .get_shard_trees_mut()
1725        .map_err(SyncError::WalletError)?;
1726    witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1727    witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1728
1729    Ok(())
1730}
1731
1732async fn add_initial_frontier<W>(
1733    consensus_parameters: &impl consensus::Parameters,
1734    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1735    wallet: &mut W,
1736) -> Result<(), SyncError<W::Error>>
1737where
1738    W: SyncWallet + SyncShardTrees,
1739{
1740    let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
1741    if birthday
1742        == consensus_parameters
1743            .activation_height(consensus::NetworkUpgrade::Sapling)
1744            .expect("sapling activation height should always return Some")
1745    {
1746        return Ok(());
1747    }
1748
1749    // if the shard store only contains the first checkpoint added on initialisation, add frontiers to complete the
1750    // shard trees.
1751    let shard_trees = wallet
1752        .get_shard_trees_mut()
1753        .map_err(SyncError::WalletError)?;
1754    if shard_trees
1755        .sapling
1756        .store()
1757        .checkpoint_count()
1758        .expect("infallible")
1759        == 1
1760    {
1761        let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1762        shard_trees
1763            .sapling
1764            .insert_frontier(
1765                frontiers.final_sapling_tree().clone(),
1766                Retention::Checkpoint {
1767                    id: birthday,
1768                    marking: Marking::None,
1769                },
1770            )
1771            .expect("infallible");
1772        shard_trees
1773            .orchard
1774            .insert_frontier(
1775                frontiers.final_orchard_tree().clone(),
1776                Retention::Checkpoint {
1777                    id: birthday,
1778                    marking: Marking::None,
1779                },
1780            )
1781            .expect("infallible");
1782    }
1783
1784    Ok(())
1785}
1786
1787/// Sets up mempool stream.
1788///
1789/// If there is some raw transaction, send to be scanned.
1790/// If the mempool stream message is `None` (a block was mined) or the request failed, setup a new mempool stream.
1791async fn mempool_monitor(
1792    mut client: CompactTxStreamerClient<Channel>,
1793    mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1794    unprocessed_transactions_count: Arc<AtomicU8>,
1795    shutdown_mempool: Arc<AtomicBool>,
1796) -> Result<(), MempoolError> {
1797    let mut interval = tokio::time::interval(Duration::from_secs(1));
1798    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1799    'main: loop {
1800        let response =
1801            client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1802
1803        match response {
1804            Ok(mut mempool_stream) => {
1805                interval.reset();
1806                loop {
1807                    tokio::select! {
1808                        mempool_stream_message = mempool_stream.message() => {
1809                            match mempool_stream_message.unwrap_or(None) {
1810                                Some(raw_transaction) => {
1811                                     let _ignore_error = mempool_transaction_sender
1812                                        .send(raw_transaction)
1813                                        .await;
1814                                    unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1815                                }
1816                                None => {
1817                                    continue 'main;
1818                                }
1819                            }
1820
1821                        }
1822
1823                        _ = interval.tick() => {
1824                            if shutdown_mempool.load(atomic::Ordering::Acquire) {
1825                                break 'main;
1826                            }
1827                        }
1828                    }
1829                }
1830            }
1831            Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1832            Err(MempoolError::ServerError(e)) => {
1833                tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1834                tokio::time::sleep(Duration::from_secs(3)).await;
1835            }
1836        }
1837    }
1838
1839    Ok(())
1840}
1841
1842/// Spends will be reset to free up funds if transaction has been unconfirmed for
1843/// `UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD` confirmed blocks.
1844/// Transaction status will then be set to `Failed` if it's still unconfirmed when the chain reaches it's expiry height.
1845// TODO: add config to pepper-sync to set UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1846fn expire_transactions<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1847where
1848    W: SyncWallet + SyncTransactions,
1849{
1850    let last_known_chain_height = wallet
1851        .get_sync_state()
1852        .map_err(SyncError::WalletError)?
1853        .last_known_chain_height()
1854        .expect("wallet height must exist after scan ranges have been updated");
1855    let wallet_transactions = wallet
1856        .get_wallet_transactions_mut()
1857        .map_err(SyncError::WalletError)?;
1858
1859    let expired_txids = wallet_transactions
1860        .values()
1861        .filter(|transaction| {
1862            transaction.status().is_pending()
1863                && last_known_chain_height >= transaction.transaction().expiry_height()
1864        })
1865        .map(super::wallet::WalletTransaction::txid)
1866        .collect::<Vec<_>>();
1867    set_transactions_failed(wallet_transactions, expired_txids);
1868
1869    let stuck_funds_txids = wallet_transactions
1870        .values()
1871        .filter(|transaction| {
1872            transaction.status().is_pending()
1873                && last_known_chain_height
1874                    >= transaction.status().get_height() + UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1875        })
1876        .map(super::wallet::WalletTransaction::txid)
1877        .collect::<Vec<_>>();
1878    reset_spends(wallet_transactions, stuck_funds_txids);
1879
1880    Ok(())
1881}
1882
1883fn max_nullifier_map_size(performance_level: PerformanceLevel) -> Option<usize> {
1884    match performance_level {
1885        PerformanceLevel::Low => Some(0),
1886        PerformanceLevel::Medium => Some(125_000),
1887        PerformanceLevel::High => Some(2_000_000),
1888        PerformanceLevel::Maximum => None,
1889    }
1890}
1891
1892#[cfg(test)]
1893mod test {
1894
1895    mod checked_height_validation {
1896        use zcash_protocol::consensus::BlockHeight;
1897        use zcash_protocol::local_consensus::LocalNetwork;
1898        const LOCAL_NETWORK: LocalNetwork = LocalNetwork {
1899            overwinter: Some(BlockHeight::from_u32(1)),
1900            sapling: Some(BlockHeight::from_u32(3)),
1901            blossom: Some(BlockHeight::from_u32(3)),
1902            heartwood: Some(BlockHeight::from_u32(3)),
1903            canopy: Some(BlockHeight::from_u32(3)),
1904            nu5: Some(BlockHeight::from_u32(3)),
1905            nu6: Some(BlockHeight::from_u32(3)),
1906            nu6_1: Some(BlockHeight::from_u32(3)),
1907        };
1908        use crate::{error::SyncError, mocks::MockWalletError, sync::checked_wallet_height};
1909        // It's possible an error from an implementor's get_sync_state could bubble up to checked_wallet_height
1910        // this test shows that such an error is raies wrapped in a WalletError and return as the Err variant
1911        #[tokio::test]
1912        async fn get_sync_state_error() {
1913            let builder = crate::mocks::MockWalletBuilder::new();
1914            let test_error = "get_sync_state_error";
1915            let mut test_wallet = builder
1916                .get_sync_state_patch(Box::new(|_| {
1917                    Err(MockWalletError::AnErrorVariant(test_error.to_string()))
1918                }))
1919                .create_mock_wallet();
1920            let res =
1921                checked_wallet_height(&mut test_wallet, BlockHeight::from_u32(1), &LOCAL_NETWORK);
1922            assert!(matches!(
1923                res,
1924                Err(SyncError::WalletError(
1925                    crate::mocks::MockWalletError::AnErrorVariant(ref s)
1926                )) if s == test_error
1927            ));
1928        }
1929
1930        mod last_known_chain_height {
1931            use crate::{
1932                sync::{MAX_REORG_ALLOWANCE, ScanRange},
1933                wallet::SyncState,
1934            };
1935            const DEFAULT_START_HEIGHT: BlockHeight = BlockHeight::from_u32(1);
1936            const _DEFAULT_LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(102);
1937            const DEFAULT_CHAIN_HEIGHT: BlockHeight = BlockHeight::from_u32(110);
1938
1939            use super::*;
1940            #[tokio::test]
1941            async fn above_allowance() {
1942                const LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(211);
1943                let lkch = vec![ScanRange::from_parts(
1944                    DEFAULT_START_HEIGHT..LAST_KNOWN_HEIGHT,
1945                    crate::sync::ScanPriority::Scanned,
1946                )];
1947                let state = SyncState {
1948                    scan_ranges: lkch,
1949                    ..Default::default()
1950                };
1951                let builder = crate::mocks::MockWalletBuilder::new();
1952                let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1953                let res =
1954                    checked_wallet_height(&mut test_wallet, DEFAULT_CHAIN_HEIGHT, &LOCAL_NETWORK);
1955                if let Err(e) = res {
1956                    assert_eq!(
1957                        e.to_string(),
1958                        format!(
1959                            "wallet height {} is more than {} blocks ahead of best chain height {}",
1960                            LAST_KNOWN_HEIGHT - 1,
1961                            MAX_REORG_ALLOWANCE,
1962                            DEFAULT_CHAIN_HEIGHT
1963                        )
1964                    );
1965                } else {
1966                    panic!()
1967                }
1968            }
1969            #[tokio::test]
1970            async fn above_chain_height_below_allowance() {
1971                // The hain_height is received from the proxy
1972                // truncate uses the wallet scan start height
1973                // as a
1974                let lkch = vec![ScanRange::from_parts(
1975                    BlockHeight::from_u32(6)..BlockHeight::from_u32(10),
1976                    crate::sync::ScanPriority::Scanned,
1977                )];
1978                let state = SyncState {
1979                    scan_ranges: lkch,
1980                    ..Default::default()
1981                };
1982                let builder = crate::mocks::MockWalletBuilder::new();
1983                let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1984                let chain_height = BlockHeight::from_u32(4);
1985                // This will trigger a call to truncate_wallet_data with
1986                // chain_height and start_height inferred from the wallet.
1987                // chain must be greater than by this time which hits the Greater cmp
1988                // match
1989                let res = checked_wallet_height(&mut test_wallet, chain_height, &LOCAL_NETWORK);
1990                assert_eq!(res.unwrap(), BlockHeight::from_u32(4));
1991            }
1992            #[ignore = "in progress"]
1993            #[tokio::test]
1994            async fn equal_or_below_chain_height_and_above_sapling() {
1995                let lkch = vec![ScanRange::from_parts(
1996                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
1997                    crate::sync::ScanPriority::Scanned,
1998                )];
1999                let state = SyncState {
2000                    scan_ranges: lkch,
2001                    ..Default::default()
2002                };
2003                let builder = crate::mocks::MockWalletBuilder::new();
2004                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2005            }
2006            #[ignore = "in progress"]
2007            #[tokio::test]
2008            async fn equal_or_below_chain_height_and_below_sapling() {
2009                // This case requires that the wallet have a scan_start_below sapling
2010                // which is an unexpected state.
2011                let lkch = vec![ScanRange::from_parts(
2012                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2013                    crate::sync::ScanPriority::Scanned,
2014                )];
2015                let state = SyncState {
2016                    scan_ranges: lkch,
2017                    ..Default::default()
2018                };
2019                let builder = crate::mocks::MockWalletBuilder::new();
2020                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2021            }
2022            #[ignore = "in progress"]
2023            #[tokio::test]
2024            async fn below_sapling() {
2025                let lkch = vec![ScanRange::from_parts(
2026                    BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2027                    crate::sync::ScanPriority::Scanned,
2028                )];
2029                let state = SyncState {
2030                    scan_ranges: lkch,
2031                    ..Default::default()
2032                };
2033                let builder = crate::mocks::MockWalletBuilder::new();
2034                let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2035            }
2036        }
2037        mod no_last_known_chain_height {
2038            use super::*;
2039            // If there are know scan_ranges in the SyncState
2040            #[tokio::test]
2041            async fn get_bday_error() {
2042                let test_error = "get_bday_error";
2043                let builder = crate::mocks::MockWalletBuilder::new();
2044                let mut test_wallet = builder
2045                    .get_birthday_patch(Box::new(|_| {
2046                        Err(crate::mocks::MockWalletError::AnErrorVariant(
2047                            test_error.to_string(),
2048                        ))
2049                    }))
2050                    .create_mock_wallet();
2051                let res = checked_wallet_height(
2052                    &mut test_wallet,
2053                    BlockHeight::from_u32(1),
2054                    &LOCAL_NETWORK,
2055                );
2056                assert!(matches!(
2057                    res,
2058                    Err(SyncError::WalletError(
2059                        crate::mocks::MockWalletError::AnErrorVariant(ref s)
2060                    )) if s == test_error
2061                ));
2062            }
2063            #[ignore = "in progress"]
2064            #[tokio::test]
2065            async fn raw_bday_above_chain_height() {
2066                let builder = crate::mocks::MockWalletBuilder::new();
2067                let mut test_wallet = builder
2068                    .birthday(BlockHeight::from_u32(15))
2069                    .create_mock_wallet();
2070                let res = checked_wallet_height(
2071                    &mut test_wallet,
2072                    BlockHeight::from_u32(1),
2073                    &LOCAL_NETWORK,
2074                );
2075                if let Err(e) = res {
2076                    assert_eq!(
2077                        e.to_string(),
2078                        format!(
2079                            "wallet height is more than {} blocks ahead of best chain height",
2080                            15 - 1
2081                        )
2082                    );
2083                } else {
2084                    panic!()
2085                }
2086            }
2087            mod sapling_height {
2088                use super::*;
2089                #[tokio::test]
2090                async fn raw_bday_above() {
2091                    let builder = crate::mocks::MockWalletBuilder::new();
2092                    let mut test_wallet = builder
2093                        .birthday(BlockHeight::from_u32(4))
2094                        .create_mock_wallet();
2095                    let res = checked_wallet_height(
2096                        &mut test_wallet,
2097                        BlockHeight::from_u32(5),
2098                        &LOCAL_NETWORK,
2099                    );
2100                    assert_eq!(res.unwrap(), BlockHeight::from_u32(4 - 1));
2101                }
2102                #[tokio::test]
2103                async fn raw_bday_equal() {
2104                    let builder = crate::mocks::MockWalletBuilder::new();
2105                    let mut test_wallet = builder
2106                        .birthday(BlockHeight::from_u32(3))
2107                        .create_mock_wallet();
2108                    let res = checked_wallet_height(
2109                        &mut test_wallet,
2110                        BlockHeight::from_u32(5),
2111                        &LOCAL_NETWORK,
2112                    );
2113                    assert_eq!(res.unwrap(), BlockHeight::from_u32(3 - 1));
2114                }
2115                #[tokio::test]
2116                async fn raw_bday_below() {
2117                    let builder = crate::mocks::MockWalletBuilder::new();
2118                    let mut test_wallet = builder
2119                        .birthday(BlockHeight::from_u32(1))
2120                        .create_mock_wallet();
2121                    let res = checked_wallet_height(
2122                        &mut test_wallet,
2123                        BlockHeight::from_u32(5),
2124                        &LOCAL_NETWORK,
2125                    );
2126                    assert!(matches!(res, Err(SyncError::BirthdayBelowSapling(1, 3))));
2127                }
2128            }
2129        }
2130    }
2131}