pepper_sync/
sync.rs

1//! Entrypoint for sync engine
2
3use std::cmp;
4use std::collections::{BTreeMap, HashMap};
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::atomic::{self, AtomicBool, AtomicU8};
8use std::time::{Duration, SystemTime};
9
10use tokio::sync::{RwLock, mpsc};
11
12use incrementalmerkletree::{Marking, Retention};
13use orchard::tree::MerkleHashOrchard;
14use shardtree::store::ShardStore;
15use zcash_client_backend::proto::service::RawTransaction;
16use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
17use zcash_keys::keys::UnifiedFullViewingKey;
18use zcash_primitives::transaction::{Transaction, TxId};
19use zcash_primitives::zip32::AccountId;
20use zcash_protocol::ShieldedProtocol;
21use zcash_protocol::consensus::{self, BlockHeight};
22
23use zingo_status::confirmation_status::ConfirmationStatus;
24
25use crate::client::{self, FetchRequest};
26use crate::config::SyncConfig;
27use crate::error::{
28    ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
29    SyncStatusError,
30};
31use crate::keys::transparent::TransparentAddressId;
32use crate::scan::ScanResults;
33use crate::scan::task::{Scanner, ScannerState};
34use crate::scan::transactions::scan_transaction;
35use crate::wallet::traits::{
36    SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
37};
38use crate::wallet::{
39    KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
40    SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const MEMPOOL_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
52pub(crate) const MAX_VERIFICATION_WINDOW: u32 = 100;
53const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
54
55/// A snapshot of the current state of sync. Useful for displaying the status of sync to a user / consumer.
56///
57/// `percentage_outputs_scanned` is a much more accurate indicator of sync completion than `percentage_blocks_scanned`.
58/// `percentage_total_outputs_scanned` is the percentage of outputs scanned from birthday to chain height.
59#[derive(Debug, Clone)]
60#[allow(missing_docs)]
61pub struct SyncStatus {
62    pub scan_ranges: Vec<ScanRange>,
63    pub sync_start_height: BlockHeight,
64    pub session_blocks_scanned: u32,
65    pub total_blocks_scanned: u32,
66    pub percentage_session_blocks_scanned: f32,
67    pub percentage_total_blocks_scanned: f32,
68    pub session_sapling_outputs_scanned: u32,
69    pub total_sapling_outputs_scanned: u32,
70    pub session_orchard_outputs_scanned: u32,
71    pub total_orchard_outputs_scanned: u32,
72    pub percentage_session_outputs_scanned: f32,
73    pub percentage_total_outputs_scanned: f32,
74}
75
76// TODO: complete display, scan ranges in raw form are too verbose
77impl std::fmt::Display for SyncStatus {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "percentage complete: {}",
82            self.percentage_total_outputs_scanned
83        )
84    }
85}
86
87impl From<SyncStatus> for json::JsonValue {
88    fn from(value: SyncStatus) -> Self {
89        let scan_ranges: Vec<json::JsonValue> = value
90            .scan_ranges
91            .iter()
92            .map(|range| {
93                json::object! {
94                    "priority" => format!("{:?}", range.priority()),
95                    "start_block" => range.block_range().start.to_string(),
96                    "end_block" => (range.block_range().end - 1).to_string(),
97                }
98            })
99            .collect();
100
101        json::object! {
102            "scan_ranges" => scan_ranges,
103            "sync_start_height" => u32::from(value.sync_start_height),
104            "session_blocks_scanned" => value.session_blocks_scanned,
105            "total_blocks_scanned" => value.total_blocks_scanned,
106            "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
107            "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
108            "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
109            "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
110            "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
111            "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
112            "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
113            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
114        }
115    }
116}
117
118/// Returned when [`crate::sync::sync`] successfully completes.
119#[derive(Debug, Clone)]
120#[allow(missing_docs)]
121pub struct SyncResult {
122    pub sync_start_height: BlockHeight,
123    pub sync_end_height: BlockHeight,
124    pub blocks_scanned: u32,
125    pub sapling_outputs_scanned: u32,
126    pub orchard_outputs_scanned: u32,
127    pub percentage_total_outputs_scanned: f32,
128}
129
130impl std::fmt::Display for SyncResult {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        write!(
133            f,
134            "Sync completed succesfully:
135{{
136    sync start height: {}
137    sync end height: {}
138    blocks scanned: {}
139    sapling outputs scanned: {}
140    orchard outputs scanned: {}
141    percentage total outputs scanned: {}
142}}",
143            self.sync_start_height,
144            self.sync_end_height,
145            self.blocks_scanned,
146            self.sapling_outputs_scanned,
147            self.orchard_outputs_scanned,
148            self.percentage_total_outputs_scanned,
149        )
150    }
151}
152
153impl From<SyncResult> for json::JsonValue {
154    fn from(value: SyncResult) -> Self {
155        json::object! {
156            "sync_start_height" => u32::from(value.sync_start_height),
157            "sync_end_height" => u32::from(value.sync_end_height),
158            "blocks_scanned" => value.blocks_scanned,
159            "sapling_outputs_scanned" => value.sapling_outputs_scanned,
160            "orchard_outputs_scanned" => value.orchard_outputs_scanned,
161            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
162        }
163    }
164}
165
166/// Scanning range priority levels.
167#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
168pub enum ScanPriority {
169    /// Block ranges that are currently being scanned.
170    Scanning,
171    /// Block ranges that have already been scanned will not be re-scanned.
172    Scanned,
173    /// Block ranges that have already been scanned. The nullifiers from this range were not mapped after scanning and
174    /// spend detection to reduce memory consumption and/or storage for non-linear scanning. These nullifiers will need
175    /// to be re-fetched for final spend detection when merging this range is the lowest unscanned range in the
176    /// wallet's list of scan ranges.
177    ScannedWithoutMapping,
178    /// Block ranges to be scanned to advance the fully-scanned height.
179    Historic,
180    /// Block ranges adjacent to heights at which the user opened the wallet.
181    OpenAdjacent,
182    /// Blocks that must be scanned to complete note commitment tree shards adjacent to found notes.
183    FoundNote,
184    /// Blocks that must be scanned to complete the latest note commitment tree shard.
185    ChainTip,
186    /// A previously scanned range that must be verified to check it is still in the
187    /// main chain, has highest priority.
188    Verify,
189}
190
191/// A range of blocks to be scanned, along with its associated priority.
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ScanRange {
194    block_range: Range<BlockHeight>,
195    priority: ScanPriority,
196}
197
198impl std::fmt::Display for ScanRange {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        write!(
201            f,
202            "{:?}({}..{})",
203            self.priority, self.block_range.start, self.block_range.end,
204        )
205    }
206}
207
208impl ScanRange {
209    /// Constructs a scan range from its constituent parts.
210    #[must_use]
211    pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
212        assert!(
213            block_range.end >= block_range.start,
214            "{block_range:?} is invalid for ScanRange({priority:?})",
215        );
216        ScanRange {
217            block_range,
218            priority,
219        }
220    }
221
222    /// Returns the range of block heights to be scanned.
223    #[must_use]
224    pub fn block_range(&self) -> &Range<BlockHeight> {
225        &self.block_range
226    }
227
228    /// Returns the priority with which the scan range should be scanned.
229    #[must_use]
230    pub fn priority(&self) -> ScanPriority {
231        self.priority
232    }
233
234    /// Returns whether or not the scan range is empty.
235    #[must_use]
236    pub fn is_empty(&self) -> bool {
237        self.block_range.is_empty()
238    }
239
240    /// Returns the number of blocks in the scan range.
241    #[must_use]
242    pub fn len(&self) -> usize {
243        usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
244            .expect("due to number of max blocks should always be valid usize")
245    }
246
247    /// Shifts the start of the block range to the right if `block_height >
248    /// self.block_range().start`. Returns `None` if the resulting range would
249    /// be empty (or the range was already empty).
250    #[must_use]
251    pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
252        if block_height >= self.block_range.end || self.is_empty() {
253            None
254        } else {
255            Some(ScanRange {
256                block_range: self.block_range.start.max(block_height)..self.block_range.end,
257                priority: self.priority,
258            })
259        }
260    }
261
262    /// Shifts the end of the block range to the left if `block_height <
263    /// self.block_range().end`. Returns `None` if the resulting range would
264    /// be empty (or the range was already empty).
265    #[must_use]
266    pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
267        if block_height <= self.block_range.start || self.is_empty() {
268            None
269        } else {
270            Some(ScanRange {
271                block_range: self.block_range.start..self.block_range.end.min(block_height),
272                priority: self.priority,
273            })
274        }
275    }
276
277    /// Splits this scan range at the specified height, such that the provided height becomes the
278    /// end of the first range returned and the start of the second. Returns `None` if
279    /// `p <= self.block_range().start || p >= self.block_range().end`.
280    #[must_use]
281    pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
282        (p > self.block_range.start && p < self.block_range.end).then_some((
283            ScanRange {
284                block_range: self.block_range.start..p,
285                priority: self.priority,
286            },
287            ScanRange {
288                block_range: p..self.block_range.end,
289                priority: self.priority,
290            },
291        ))
292    }
293}
294
295/// Syncs a wallet to the latest state of the blockchain.
296///
297/// `sync_mode` is intended to be stored in a struct that owns the wallet(s) (i.e. lightclient) and has a non-atomic
298/// counterpart [`crate::wallet::SyncMode`]. The sync engine will set the `sync_mode` to `Running` at the start of sync.
299/// However, the consumer is required to set the `sync_mode` back to `NotRunning` when sync is succussful or returns an
300/// error. This allows more flexibility and safety with sync task handles etc.
301/// `sync_mode` may also be set to `Paused` externally to pause scanning so the wallet lock can be acquired multiple
302/// times in quick sucession without the sync engine interrupting.
303/// Set `sync_mode` back to `Running` to resume scanning.
304/// Set `sync_mode` to `Shutdown` to stop the sync process.
305pub async fn sync<P, W>(
306    client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
307    consensus_parameters: &P,
308    wallet: Arc<RwLock<W>>,
309    sync_mode: Arc<AtomicU8>,
310    config: SyncConfig,
311) -> Result<SyncResult, SyncError<W::Error>>
312where
313    P: consensus::Parameters + Sync + Send + 'static,
314    W: SyncWallet
315        + SyncBlocks
316        + SyncTransactions
317        + SyncNullifiers
318        + SyncOutPoints
319        + SyncShardTrees
320        + Send,
321{
322    let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
323    if sync_mode_enum == SyncMode::NotRunning {
324        sync_mode_enum = SyncMode::Running;
325        sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
326    } else {
327        return Err(SyncModeError::SyncAlreadyRunning.into());
328    }
329
330    tracing::info!("Starting sync...");
331
332    // create channel for sending fetch requests and launch fetcher task
333    let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
334    let client_clone = client.clone();
335    let fetcher_handle =
336        tokio::spawn(
337            async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
338        );
339
340    // create channel for receiving mempool transactions and launch mempool monitor
341    let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
342    let shutdown_mempool = Arc::new(AtomicBool::new(false));
343    let shutdown_mempool_clone = shutdown_mempool.clone();
344    let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
345    let unprocessed_mempool_transactions_count_clone =
346        unprocessed_mempool_transactions_count.clone();
347    let mempool_handle = tokio::spawn(async move {
348        mempool_monitor(
349            client,
350            mempool_transaction_sender,
351            unprocessed_mempool_transactions_count_clone,
352            shutdown_mempool_clone,
353        )
354        .await
355    });
356
357    // pre-scan initialisation
358    let mut wallet_guard = wallet.write().await;
359
360    let mut wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_guard)
361        .map_err(SyncError::WalletError)?;
362    let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
363    if chain_height == 0.into() {
364        return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
365    }
366    if wallet_height > chain_height {
367        if wallet_height - chain_height >= MAX_VERIFICATION_WINDOW {
368            return Err(SyncError::ChainError(MAX_VERIFICATION_WINDOW));
369        }
370        truncate_wallet_data(&mut *wallet_guard, chain_height)?;
371        wallet_height = chain_height;
372    }
373
374    let ufvks = wallet_guard
375        .get_unified_full_viewing_keys()
376        .map_err(SyncError::WalletError)?;
377
378    transparent::update_addresses_and_scan_targets(
379        consensus_parameters,
380        &mut *wallet_guard,
381        fetch_request_sender.clone(),
382        &ufvks,
383        wallet_height,
384        chain_height,
385        config.transparent_address_discovery,
386    )
387    .await?;
388
389    #[cfg(not(feature = "darkside_test"))]
390    update_subtree_roots(
391        consensus_parameters,
392        fetch_request_sender.clone(),
393        &mut *wallet_guard,
394    )
395    .await?;
396
397    add_initial_frontier(
398        consensus_parameters,
399        fetch_request_sender.clone(),
400        &mut *wallet_guard,
401    )
402    .await?;
403
404    state::update_scan_ranges(
405        consensus_parameters,
406        wallet_height,
407        chain_height,
408        wallet_guard
409            .get_sync_state_mut()
410            .map_err(SyncError::WalletError)?,
411    )
412    .await;
413
414    state::set_initial_state(
415        consensus_parameters,
416        fetch_request_sender.clone(),
417        &mut *wallet_guard,
418        chain_height,
419    )
420    .await?;
421
422    let initial_verification_height = wallet_guard
423        .get_sync_state()
424        .map_err(SyncError::WalletError)?
425        .highest_scanned_height()
426        .expect("scan ranges must be non-empty")
427        + 1;
428
429    reset_invalid_spends(&mut *wallet_guard)?;
430
431    drop(wallet_guard);
432
433    // create channel for receiving scan results and launch scanner
434    let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
435    let mut scanner = Scanner::new(
436        consensus_parameters.clone(),
437        scan_results_sender,
438        fetch_request_sender.clone(),
439        ufvks.clone(),
440    );
441    scanner.launch(config.performance_level);
442
443    // TODO: implement an option for continuous scanning where it doesnt exit when complete
444
445    let mut interval = tokio::time::interval(Duration::from_millis(50));
446    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
447    loop {
448        tokio::select! {
449            Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
450                let mut wallet_guard = wallet.write().await;
451                process_scan_results(
452                    consensus_parameters,
453                    &mut *wallet_guard,
454                    fetch_request_sender.clone(),
455                    &ufvks,
456                    scan_range,
457                    scan_results,
458                    initial_verification_height,
459                )
460                .await?;
461                wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
462                drop(wallet_guard);
463            }
464
465            Some(raw_transaction) = mempool_transaction_receiver.recv() => {
466                let mut wallet_guard = wallet.write().await;
467                process_mempool_transaction(
468                    consensus_parameters,
469                    &ufvks,
470                    &mut *wallet_guard,
471                    raw_transaction,
472                )
473                .await?;
474                unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
475                drop(wallet_guard);
476            }
477
478            _update_scanner = interval.tick() => {
479                sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
480                match sync_mode_enum {
481                    SyncMode::Paused => {
482                        let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
483                        pause_interval.tick().await;
484                        while sync_mode_enum == SyncMode::Paused {
485                            pause_interval.tick().await;
486                            sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
487                        }
488                    },
489                    SyncMode::Shutdown => {
490                        let mut wallet_guard = wallet.write().await;
491                        let sync_status = match sync_status(&*wallet_guard).await {
492                            Ok(status) => status,
493                            Err(SyncStatusError::WalletError(e)) => {
494                                return Err(SyncError::WalletError(e));
495                            }
496                            Err(SyncStatusError::NoSyncData) => {
497                                panic!("sync data must exist!");
498                            }
499                        };
500                        wallet_guard
501                            .set_save_flag()
502                            .map_err(SyncError::WalletError)?;
503                        drop(wallet_guard);
504                        tracing::info!("Sync successfully shutdown.");
505
506                        return Ok(SyncResult {
507                            sync_start_height: sync_status.sync_start_height,
508                            sync_end_height: (sync_status
509                                .scan_ranges
510                                .last()
511                                .expect("should be non-empty after syncing")
512                                .block_range()
513                                .end
514                                - 1),
515                            blocks_scanned: sync_status.session_blocks_scanned,
516                            sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
517                            orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
518                            percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
519                        });
520                    }
521                    SyncMode::Running => (),
522                    SyncMode::NotRunning => {
523                        panic!("sync mode should not be manually set to NotRunning!");
524                    },
525                }
526
527                scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), config.performance_level).await?;
528
529                if matches!(scanner.state, ScannerState::Shutdown) {
530                    // wait for mempool monitor to receive mempool transactions
531                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
532                    if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
533                    {
534                        tracing::info!("Sync successfully shutdown.");
535                        break;
536                    }
537                }
538            }
539        }
540    }
541
542    let mut wallet_guard = wallet.write().await;
543    let sync_status = match sync_status(&*wallet_guard).await {
544        Ok(status) => status,
545        Err(SyncStatusError::WalletError(e)) => {
546            return Err(SyncError::WalletError(e));
547        }
548        Err(SyncStatusError::NoSyncData) => {
549            panic!("sync data must exist!");
550        }
551    };
552    wallet_guard
553        .set_save_flag()
554        .map_err(SyncError::WalletError)?;
555
556    drop(wallet_guard);
557    drop(scanner);
558    drop(fetch_request_sender);
559
560    match mempool_handle.await.expect("task panicked") {
561        Ok(()) => (),
562        Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
563        Err(e) => return Err(e.into()),
564    }
565    fetcher_handle.await.expect("task panicked");
566
567    Ok(SyncResult {
568        sync_start_height: sync_status.sync_start_height,
569        sync_end_height: (sync_status
570            .scan_ranges
571            .last()
572            .expect("should be non-empty after syncing")
573            .block_range()
574            .end
575            - 1),
576        blocks_scanned: sync_status.session_blocks_scanned,
577        sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
578        orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
579        percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
580    })
581}
582
583/// Creates a [`self::SyncStatus`] from the wallet's current [`crate::wallet::SyncState`].
584///
585/// Intended to be called while [`self::sync`] is running in a separate task.
586pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
587where
588    W: SyncWallet + SyncBlocks,
589{
590    let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
591        state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
592    let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
593
594    let sync_state = wallet
595        .get_sync_state()
596        .map_err(SyncStatusError::WalletError)?;
597    if sync_state.initial_sync_state.sync_start_height == 0.into() {
598        return Ok(SyncStatus {
599            scan_ranges: sync_state.scan_ranges.clone(),
600            sync_start_height: 0.into(),
601            session_blocks_scanned: 0,
602            total_blocks_scanned: 0,
603            percentage_session_blocks_scanned: 0.0,
604            percentage_total_blocks_scanned: 0.0,
605            session_sapling_outputs_scanned: 0,
606            session_orchard_outputs_scanned: 0,
607            total_sapling_outputs_scanned: 0,
608            total_orchard_outputs_scanned: 0,
609            percentage_session_outputs_scanned: 0.0,
610            percentage_total_outputs_scanned: 0.0,
611        });
612    }
613    let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
614
615    let birthday = sync_state
616        .wallet_birthday()
617        .ok_or(SyncStatusError::NoSyncData)?;
618    let wallet_height = sync_state
619        .wallet_height()
620        .ok_or(SyncStatusError::NoSyncData)?;
621    let total_blocks = wallet_height - birthday + 1;
622    let total_sapling_outputs = sync_state
623        .initial_sync_state
624        .wallet_tree_bounds
625        .sapling_final_tree_size
626        - sync_state
627            .initial_sync_state
628            .wallet_tree_bounds
629            .sapling_initial_tree_size;
630    let total_orchard_outputs = sync_state
631        .initial_sync_state
632        .wallet_tree_bounds
633        .orchard_final_tree_size
634        - sync_state
635            .initial_sync_state
636            .wallet_tree_bounds
637            .orchard_initial_tree_size;
638    let total_outputs = total_sapling_outputs + total_orchard_outputs;
639
640    let session_blocks_scanned =
641        total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
642    let percentage_session_blocks_scanned = ((session_blocks_scanned as f32
643        / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
644        * 100.0)
645        .clamp(0.0, 100.0);
646    let percentage_total_blocks_scanned =
647        ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
648
649    let session_sapling_outputs_scanned = total_sapling_outputs_scanned
650        - sync_state
651            .initial_sync_state
652            .previously_scanned_sapling_outputs;
653    let session_orchard_outputs_scanned = total_orchard_outputs_scanned
654        - sync_state
655            .initial_sync_state
656            .previously_scanned_orchard_outputs;
657    let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
658    let previously_scanned_outputs = sync_state
659        .initial_sync_state
660        .previously_scanned_sapling_outputs
661        + sync_state
662            .initial_sync_state
663            .previously_scanned_orchard_outputs;
664    let percentage_session_outputs_scanned = ((session_outputs_scanned as f32
665        / (total_outputs - previously_scanned_outputs) as f32)
666        * 100.0)
667        .clamp(0.0, 100.0);
668    let percentage_total_outputs_scanned =
669        ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
670
671    Ok(SyncStatus {
672        scan_ranges: sync_state.scan_ranges.clone(),
673        sync_start_height: sync_state.initial_sync_state.sync_start_height,
674        session_blocks_scanned,
675        total_blocks_scanned,
676        percentage_session_blocks_scanned,
677        percentage_total_blocks_scanned,
678        session_sapling_outputs_scanned,
679        total_sapling_outputs_scanned,
680        session_orchard_outputs_scanned,
681        total_orchard_outputs_scanned,
682        percentage_session_outputs_scanned,
683        percentage_total_outputs_scanned,
684    })
685}
686
687/// Scans a pending `transaction` of a given `status`, adding to the wallet and updating output spend statuses.
688///
689/// Used both internally for scanning mempool transactions and externally for scanning calculated and transmitted
690/// transactions during send.
691///
692/// Panics if `status` is of `Confirmed` variant.
693pub fn scan_pending_transaction<W>(
694    consensus_parameters: &impl consensus::Parameters,
695    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
696    wallet: &mut W,
697    transaction: Transaction,
698    status: ConfirmationStatus,
699    datetime: u32,
700) -> Result<(), SyncError<W::Error>>
701where
702    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
703{
704    if matches!(status, ConfirmationStatus::Confirmed(_)) {
705        panic!("this fn is for unconfirmed transactions only");
706    }
707
708    let mut pending_transaction_nullifiers = NullifierMap::new();
709    let mut pending_transaction_outpoints = BTreeMap::new();
710    let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
711        .get_transparent_addresses()
712        .map_err(SyncError::WalletError)?
713        .iter()
714        .map(|(id, address)| (address.clone(), *id))
715        .collect();
716    let pending_transaction = scan_transaction(
717        consensus_parameters,
718        ufvks,
719        transaction.txid(),
720        transaction,
721        status,
722        None,
723        &mut pending_transaction_nullifiers,
724        &mut pending_transaction_outpoints,
725        &transparent_addresses,
726        datetime,
727    )?;
728
729    let wallet_transactions = wallet
730        .get_wallet_transactions()
731        .map_err(SyncError::WalletError)?;
732    let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
733    let transparent_spend_scan_targets = spend::detect_transparent_spends(
734        &mut pending_transaction_outpoints,
735        transparent_output_ids,
736    );
737    let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
738        spend::collect_derived_nullifiers(wallet_transactions);
739    let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
740        &mut pending_transaction_nullifiers,
741        sapling_derived_nullifiers,
742        orchard_derived_nullifiers,
743    );
744
745    // return if transaction is not relevant to the wallet
746    if pending_transaction.transparent_coins().is_empty()
747        && pending_transaction.sapling_notes().is_empty()
748        && pending_transaction.orchard_notes().is_empty()
749        && pending_transaction.outgoing_orchard_notes().is_empty()
750        && pending_transaction.outgoing_sapling_notes().is_empty()
751        && transparent_spend_scan_targets.is_empty()
752        && sapling_spend_scan_targets.is_empty()
753        && orchard_spend_scan_targets.is_empty()
754    {
755        return Ok(());
756    }
757
758    wallet
759        .insert_wallet_transaction(pending_transaction)
760        .map_err(SyncError::WalletError)?;
761    spend::update_spent_coins(
762        wallet
763            .get_wallet_transactions_mut()
764            .map_err(SyncError::WalletError)?,
765        transparent_spend_scan_targets,
766    );
767    spend::update_spent_notes(
768        wallet,
769        sapling_spend_scan_targets,
770        orchard_spend_scan_targets,
771        false,
772    )
773    .map_err(SyncError::WalletError)?;
774
775    Ok(())
776}
777
778/// API for targetted scanning.
779///
780/// Allows `scan_targets` to be added externally to the wallet's `sync_state` and be prioritised for scanning. Each
781/// scan target must include the block height which will be used to prioritise the block range containing the note
782/// commitments to the surrounding orchard shard(s). If the block height is pre-orchard then the surrounding sapling
783/// shard(s) will be prioritised instead. The txid in each scan target may be omitted and set to [0u8; 32] in order to
784/// prioritise the surrounding blocks for scanning but be ignored when fetching specific relevant transactions to the
785/// wallet. However, in the case where a relevant spending transaction at a given height contains no decryptable
786/// incoming notes (change), only the nullifier will be mapped and this transaction will be scanned when the
787/// transaction containing the spent notes is scanned instead.
788pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
789    for scan_target in scan_targets {
790        sync_state.scan_targets.insert(*scan_target);
791    }
792}
793
794/// Resets the spending transaction field of all outputs that were previously spent but became unspent due to a
795/// spending transactions becoming invalid.
796///
797/// `invalid_txids` are the id's of the invalidated spending transactions. Any outputs in the `wallet_transactions`
798/// matching these spending transactions will be reset back to `None`.
799pub fn reset_spends(
800    wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
801    invalid_txids: Vec<TxId>,
802) {
803    wallet_transactions
804        .values_mut()
805        .flat_map(|transaction| transaction.orchard_notes_mut())
806        .filter(|output| {
807            output
808                .spending_transaction
809                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
810        })
811        .for_each(|output| {
812            output.set_spending_transaction(None);
813        });
814    wallet_transactions
815        .values_mut()
816        .flat_map(|transaction| transaction.sapling_notes_mut())
817        .filter(|output| {
818            output
819                .spending_transaction
820                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
821        })
822        .for_each(|output| {
823            output.set_spending_transaction(None);
824        });
825    wallet_transactions
826        .values_mut()
827        .flat_map(|transaction| transaction.transparent_coins_mut())
828        .filter(|output| {
829            output
830                .spending_transaction
831                .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
832        })
833        .for_each(|output| {
834            output.set_spending_transaction(None);
835        });
836}
837
838/// Returns true if the scanner and mempool are shutdown.
839fn is_shutdown<P>(
840    scanner: &Scanner<P>,
841    mempool_unprocessed_transactions_count: Arc<AtomicU8>,
842) -> bool
843where
844    P: consensus::Parameters + Sync + Send + 'static,
845{
846    scanner.worker_poolsize() == 0
847        && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
848}
849
850/// Scan post-processing
851async fn process_scan_results<W>(
852    consensus_parameters: &impl consensus::Parameters,
853    wallet: &mut W,
854    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
855    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
856    scan_range: ScanRange,
857    scan_results: Result<ScanResults, ScanError>,
858    initial_verification_height: BlockHeight,
859) -> Result<(), SyncError<W::Error>>
860where
861    W: SyncWallet
862        + SyncBlocks
863        + SyncTransactions
864        + SyncNullifiers
865        + SyncOutPoints
866        + SyncShardTrees
867        + Send,
868{
869    match scan_results {
870        Ok(results) => {
871            let ScanResults {
872                mut nullifiers,
873                outpoints,
874                scanned_blocks,
875                wallet_transactions,
876                sapling_located_trees,
877                orchard_located_trees,
878                map_nullifiers,
879            } = results;
880
881            if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
882                spend::update_shielded_spends(
883                    consensus_parameters,
884                    wallet,
885                    fetch_request_sender.clone(),
886                    ufvks,
887                    &scanned_blocks,
888                    Some(&mut nullifiers),
889                )
890                .await?;
891
892                // add missing block bounds in the case that nullifier batch limit was reached and scan range was split
893                let mut missing_block_bounds = BTreeMap::new();
894                for block_bound in [
895                    scan_range.block_range().start,
896                    scan_range.block_range().end - 1,
897                ] {
898                    if wallet.get_wallet_block(block_bound).is_err() {
899                        missing_block_bounds.insert(
900                            block_bound,
901                            WalletBlock::from_compact_block(
902                                consensus_parameters,
903                                fetch_request_sender.clone(),
904                                &client::get_compact_block(
905                                    fetch_request_sender.clone(),
906                                    block_bound,
907                                )
908                                .await?,
909                            )
910                            .await?,
911                        );
912                    }
913                }
914                if !missing_block_bounds.is_empty() {
915                    wallet
916                        .append_wallet_blocks(missing_block_bounds)
917                        .map_err(SyncError::WalletError)?;
918                }
919
920                state::set_scanned_scan_range(
921                    wallet
922                        .get_sync_state_mut()
923                        .map_err(SyncError::WalletError)?,
924                    scan_range.block_range().clone(),
925                    true,
926                );
927            } else {
928                update_wallet_data(
929                    consensus_parameters,
930                    wallet,
931                    fetch_request_sender.clone(),
932                    ufvks,
933                    &scan_range,
934                    if map_nullifiers {
935                        Some(&mut nullifiers)
936                    } else {
937                        None
938                    },
939                    outpoints,
940                    wallet_transactions,
941                    sapling_located_trees,
942                    orchard_located_trees,
943                )
944                .await?;
945                spend::update_transparent_spends(wallet).map_err(SyncError::WalletError)?;
946                spend::update_shielded_spends(
947                    consensus_parameters,
948                    wallet,
949                    fetch_request_sender,
950                    ufvks,
951                    &scanned_blocks,
952                    if map_nullifiers {
953                        None
954                    } else {
955                        Some(&mut nullifiers)
956                    },
957                )
958                .await?;
959                add_scanned_blocks(wallet, scanned_blocks, &scan_range)
960                    .map_err(SyncError::WalletError)?;
961
962                state::set_scanned_scan_range(
963                    wallet
964                        .get_sync_state_mut()
965                        .map_err(SyncError::WalletError)?,
966                    scan_range.block_range().clone(),
967                    map_nullifiers,
968                );
969                state::merge_scan_ranges(
970                    wallet
971                        .get_sync_state_mut()
972                        .map_err(SyncError::WalletError)?,
973                    ScanPriority::ScannedWithoutMapping,
974                );
975            }
976
977            state::merge_scan_ranges(
978                wallet
979                    .get_sync_state_mut()
980                    .map_err(SyncError::WalletError)?,
981                ScanPriority::Scanned,
982            );
983            remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
984            tracing::debug!("Scan results processed.");
985        }
986        Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
987            if height == scan_range.block_range().start
988                && scan_range.priority() == ScanPriority::Verify
989            {
990                tracing::info!("Re-org detected.");
991                let sync_state = wallet
992                    .get_sync_state_mut()
993                    .map_err(SyncError::WalletError)?;
994                let wallet_height = sync_state
995                    .wallet_height()
996                    .expect("scan ranges should be non-empty in this scope");
997
998                // reset scan range from `Scanning` to `Verify`
999                state::set_scan_priority(
1000                    sync_state,
1001                    scan_range.block_range(),
1002                    ScanPriority::Verify,
1003                );
1004
1005                // extend verification range to VERIFY_BLOCK_RANGE_SIZE blocks below current verification range
1006                let verification_start = state::set_verify_scan_range(
1007                    sync_state,
1008                    height - 1,
1009                    state::VerifyEnd::VerifyHighest,
1010                )
1011                .block_range()
1012                .start;
1013                state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1014
1015                if initial_verification_height - verification_start > MAX_VERIFICATION_WINDOW {
1016                    clear_wallet_data(wallet)?;
1017
1018                    return Err(ServerError::ChainVerificationError.into());
1019                }
1020
1021                truncate_wallet_data(wallet, verification_start - 1)?;
1022
1023                state::set_initial_state(
1024                    consensus_parameters,
1025                    fetch_request_sender.clone(),
1026                    wallet,
1027                    wallet_height,
1028                )
1029                .await?;
1030            } else {
1031                scan_results?;
1032            }
1033        }
1034        Err(e) => return Err(e.into()),
1035    }
1036
1037    Ok(())
1038}
1039
1040/// Processes mempool transaction.
1041///
1042/// Scan the transaction and add to the wallet if relevant.
1043async fn process_mempool_transaction<W>(
1044    consensus_parameters: &impl consensus::Parameters,
1045    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1046    wallet: &mut W,
1047    raw_transaction: RawTransaction,
1048) -> Result<(), SyncError<W::Error>>
1049where
1050    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1051{
1052    // does not use raw transaction height due to lightwalletd off-by-one bug and potential to be zero
1053    let mempool_height = wallet
1054        .get_sync_state()
1055        .map_err(SyncError::WalletError)?
1056        .wallet_height()
1057        .expect("wallet height must exist after sync is initialised")
1058        + 1;
1059
1060    let transaction = zcash_primitives::transaction::Transaction::read(
1061        &raw_transaction.data[..],
1062        consensus::BranchId::for_height(consensus_parameters, mempool_height),
1063    )
1064    .map_err(ServerError::InvalidTransaction)?;
1065
1066    tracing::debug!(
1067        "mempool received txid {} at height {}",
1068        transaction.txid(),
1069        mempool_height
1070    );
1071
1072    if let Some(tx) = wallet
1073        .get_wallet_transactions()
1074        .map_err(SyncError::WalletError)?
1075        .get(&transaction.txid())
1076        && (tx.status().is_confirmed() || matches!(tx.status(), ConfirmationStatus::Mempool(_)))
1077    {
1078        return Ok(());
1079    }
1080
1081    scan_pending_transaction(
1082        consensus_parameters,
1083        ufvks,
1084        wallet,
1085        transaction,
1086        ConfirmationStatus::Mempool(mempool_height),
1087        SystemTime::now()
1088            .duration_since(SystemTime::UNIX_EPOCH)
1089            .expect("infalliable for such long time periods")
1090            .as_secs() as u32,
1091    )?;
1092
1093    Ok(())
1094}
1095
1096/// Removes wallet blocks, transactions, nullifiers, outpoints and shard tree data above the given `truncate_height`.
1097fn truncate_wallet_data<W>(
1098    wallet: &mut W,
1099    truncate_height: BlockHeight,
1100) -> Result<(), SyncError<W::Error>>
1101where
1102    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1103{
1104    let birthday = wallet
1105        .get_sync_state()
1106        .map_err(SyncError::WalletError)?
1107        .wallet_birthday()
1108        .expect("should be non-empty in this scope");
1109    let checked_truncate_height = match truncate_height.cmp(&birthday) {
1110        std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1111        std::cmp::Ordering::Less => consensus::H0,
1112    };
1113
1114    wallet
1115        .truncate_wallet_blocks(checked_truncate_height)
1116        .map_err(SyncError::WalletError)?;
1117    wallet
1118        .truncate_wallet_transactions(checked_truncate_height)
1119        .map_err(SyncError::WalletError)?;
1120    wallet
1121        .truncate_nullifiers(checked_truncate_height)
1122        .map_err(SyncError::WalletError)?;
1123    wallet
1124        .truncate_outpoints(checked_truncate_height)
1125        .map_err(SyncError::WalletError)?;
1126    match wallet.truncate_shard_trees(checked_truncate_height) {
1127        Ok(_) => Ok(()),
1128        Err(SyncError::TruncationError(height, pooltype)) => {
1129            clear_wallet_data(wallet)?;
1130
1131            Err(SyncError::TruncationError(height, pooltype))
1132        }
1133        Err(e) => Err(e),
1134    }?;
1135
1136    Ok(())
1137}
1138
1139fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1140where
1141    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1142{
1143    let scan_targets = wallet
1144        .get_wallet_transactions()
1145        .map_err(SyncError::WalletError)?
1146        .values()
1147        .filter_map(|transaction| {
1148            transaction
1149                .status()
1150                .get_confirmed_height()
1151                .map(|height| ScanTarget {
1152                    block_height: height,
1153                    txid: transaction.txid(),
1154                    narrow_scan_area: true,
1155                })
1156        })
1157        .collect::<Vec<_>>();
1158    let sync_state = wallet
1159        .get_sync_state_mut()
1160        .map_err(SyncError::WalletError)?;
1161    *sync_state = SyncState::new();
1162    add_scan_targets(sync_state, &scan_targets);
1163
1164    truncate_wallet_data(wallet, consensus::H0)
1165}
1166
1167/// Updates the wallet with data from `scan_results`
1168#[allow(clippy::too_many_arguments)]
1169async fn update_wallet_data<W>(
1170    consensus_parameters: &impl consensus::Parameters,
1171    wallet: &mut W,
1172    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1173    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1174    scan_range: &ScanRange,
1175    nullifiers: Option<&mut NullifierMap>,
1176    mut outpoints: BTreeMap<OutputId, ScanTarget>,
1177    transactions: HashMap<TxId, WalletTransaction>,
1178    sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1179    orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1180) -> Result<(), SyncError<W::Error>>
1181where
1182    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1183{
1184    let sync_state = wallet
1185        .get_sync_state_mut()
1186        .map_err(SyncError::WalletError)?;
1187    let highest_scanned_height = sync_state
1188        .highest_scanned_height()
1189        .expect("scan ranges should not be empty in this scope");
1190    for transaction in transactions.values() {
1191        state::update_found_note_shard_priority(
1192            consensus_parameters,
1193            sync_state,
1194            ShieldedProtocol::Sapling,
1195            transaction,
1196        );
1197        state::update_found_note_shard_priority(
1198            consensus_parameters,
1199            sync_state,
1200            ShieldedProtocol::Orchard,
1201            transaction,
1202        );
1203    }
1204    for transaction in transactions.values() {
1205        discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1206    }
1207
1208    wallet
1209        .extend_wallet_transactions(transactions)
1210        .map_err(SyncError::WalletError)?;
1211    if let Some(nullifiers) = nullifiers {
1212        wallet
1213            .append_nullifiers(nullifiers)
1214            .map_err(SyncError::WalletError)?;
1215    }
1216    wallet
1217        .append_outpoints(&mut outpoints)
1218        .map_err(SyncError::WalletError)?;
1219    wallet
1220        .update_shard_trees(
1221            fetch_request_sender,
1222            scan_range,
1223            highest_scanned_height,
1224            sapling_located_trees,
1225            orchard_located_trees,
1226        )
1227        .await?;
1228
1229    Ok(())
1230}
1231
1232fn discover_unified_addresses<W>(
1233    wallet: &mut W,
1234    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1235    transaction: &WalletTransaction,
1236) -> Result<(), W::Error>
1237where
1238    W: SyncWallet,
1239{
1240    for note in transaction
1241        .orchard_notes()
1242        .iter()
1243        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1244    {
1245        let ivk = ufvks
1246            .get(&note.key_id().account_id())
1247            .expect("ufvk must exist to decrypt this note")
1248            .orchard()
1249            .expect("fvk must exist to decrypt this note")
1250            .to_ivk(zip32::Scope::External);
1251
1252        wallet.add_orchard_address(
1253            note.key_id().account_id(),
1254            note.note().recipient(),
1255            ivk.diversifier_index(&note.note().recipient())
1256                .expect("must be key used to create this address"),
1257        )?;
1258    }
1259    for note in transaction
1260        .sapling_notes()
1261        .iter()
1262        .filter(|&note| note.key_id().scope == zip32::Scope::External)
1263    {
1264        let ivk = ufvks
1265            .get(&note.key_id().account_id())
1266            .expect("ufvk must exist to decrypt this note")
1267            .sapling()
1268            .expect("fvk must exist to decrypt this note")
1269            .to_external_ivk();
1270
1271        wallet.add_sapling_address(
1272            note.key_id().account_id(),
1273            note.note().recipient(),
1274            ivk.decrypt_diversifier(&note.note().recipient())
1275                .expect("must be key used to create this address"),
1276        )?;
1277    }
1278
1279    Ok(())
1280}
1281
1282fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1283where
1284    W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1285{
1286    let fully_scanned_height = wallet
1287        .get_sync_state()?
1288        .fully_scanned_height()
1289        .expect("scan ranges must be non-empty");
1290
1291    wallet
1292        .get_outpoints_mut()?
1293        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1294    wallet
1295        .get_nullifiers_mut()?
1296        .sapling
1297        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1298    wallet
1299        .get_nullifiers_mut()?
1300        .orchard
1301        .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1302    wallet
1303        .get_sync_state_mut()?
1304        .scan_targets
1305        .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1306    remove_irrelevant_blocks(wallet)?;
1307
1308    Ok(())
1309}
1310
1311fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1312where
1313    W: SyncWallet + SyncBlocks + SyncTransactions,
1314{
1315    let sync_state = wallet.get_sync_state()?;
1316    let highest_scanned_height = sync_state
1317        .highest_scanned_height()
1318        .expect("should be non-empty");
1319    let scanned_range_bounds = sync_state
1320        .scan_ranges()
1321        .iter()
1322        .filter(|scan_range| {
1323            scan_range.priority() == ScanPriority::Scanned
1324                || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1325                || scan_range.priority() == ScanPriority::Scanning
1326        })
1327        .flat_map(|scanned_range| {
1328            vec![
1329                scanned_range.block_range().start,
1330                scanned_range.block_range().end - 1,
1331            ]
1332        })
1333        .collect::<Vec<_>>();
1334    let wallet_transaction_heights = wallet
1335        .get_wallet_transactions()?
1336        .values()
1337        .filter_map(|tx| tx.status().get_confirmed_height())
1338        .collect::<Vec<_>>();
1339
1340    wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1341        *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1342            || scanned_range_bounds.contains(height)
1343            || wallet_transaction_heights.contains(height)
1344    });
1345
1346    Ok(())
1347}
1348
1349fn add_scanned_blocks<W>(
1350    wallet: &mut W,
1351    mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1352    scan_range: &ScanRange,
1353) -> Result<(), W::Error>
1354where
1355    W: SyncWallet + SyncBlocks + SyncTransactions,
1356{
1357    let sync_state = wallet.get_sync_state()?;
1358    let highest_scanned_height = sync_state
1359        .highest_scanned_height()
1360        .expect("scan ranges must be non-empty");
1361
1362    let wallet_transaction_heights = wallet
1363        .get_wallet_transactions()?
1364        .values()
1365        .filter_map(|tx| tx.status().get_confirmed_height())
1366        .collect::<Vec<_>>();
1367
1368    scanned_blocks.retain(|height, _| {
1369        *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1370            || *height == scan_range.block_range().start
1371            || *height == scan_range.block_range().end - 1
1372            || wallet_transaction_heights.contains(height)
1373    });
1374
1375    wallet.append_wallet_blocks(scanned_blocks)?;
1376
1377    Ok(())
1378}
1379
1380#[cfg(not(feature = "darkside_test"))]
1381async fn update_subtree_roots<W>(
1382    consensus_parameters: &impl consensus::Parameters,
1383    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1384    wallet: &mut W,
1385) -> Result<(), SyncError<W::Error>>
1386where
1387    W: SyncWallet + SyncShardTrees,
1388{
1389    let sapling_start_index = wallet
1390        .get_shard_trees()
1391        .map_err(SyncError::WalletError)?
1392        .sapling
1393        .store()
1394        .get_shard_roots()
1395        .expect("infallible")
1396        .len() as u32;
1397    let orchard_start_index = wallet
1398        .get_shard_trees()
1399        .map_err(SyncError::WalletError)?
1400        .orchard
1401        .store()
1402        .get_shard_roots()
1403        .expect("infallible")
1404        .len() as u32;
1405    let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1406        client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1407        client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1408    );
1409
1410    let sapling_subtree_roots = sapling_subtree_roots?;
1411    let orchard_subtree_roots = orchard_subtree_roots?;
1412
1413    let sync_state = wallet
1414        .get_sync_state_mut()
1415        .map_err(SyncError::WalletError)?;
1416    state::add_shard_ranges(
1417        consensus_parameters,
1418        ShieldedProtocol::Sapling,
1419        sync_state,
1420        &sapling_subtree_roots,
1421    );
1422    state::add_shard_ranges(
1423        consensus_parameters,
1424        ShieldedProtocol::Orchard,
1425        sync_state,
1426        &orchard_subtree_roots,
1427    );
1428
1429    let shard_trees = wallet
1430        .get_shard_trees_mut()
1431        .map_err(SyncError::WalletError)?;
1432    witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1433    witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1434
1435    Ok(())
1436}
1437
1438async fn add_initial_frontier<W>(
1439    consensus_parameters: &impl consensus::Parameters,
1440    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1441    wallet: &mut W,
1442) -> Result<(), SyncError<W::Error>>
1443where
1444    W: SyncWallet + SyncShardTrees,
1445{
1446    let birthday =
1447        checked_birthday(consensus_parameters, wallet).map_err(SyncError::WalletError)?;
1448    if birthday
1449        == consensus_parameters
1450            .activation_height(consensus::NetworkUpgrade::Sapling)
1451            .expect("sapling activation height should always return Some")
1452    {
1453        return Ok(());
1454    }
1455
1456    // if the shard store only contains the first checkpoint added on initialisation, add frontiers to complete the
1457    // shard trees.
1458    let shard_trees = wallet
1459        .get_shard_trees_mut()
1460        .map_err(SyncError::WalletError)?;
1461    if shard_trees
1462        .sapling
1463        .store()
1464        .checkpoint_count()
1465        .expect("infallible")
1466        == 1
1467    {
1468        let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1469        shard_trees
1470            .sapling
1471            .insert_frontier(
1472                frontiers.final_sapling_tree().clone(),
1473                Retention::Checkpoint {
1474                    id: birthday,
1475                    marking: Marking::None,
1476                },
1477            )
1478            .expect("infallible");
1479        shard_trees
1480            .orchard
1481            .insert_frontier(
1482                frontiers.final_orchard_tree().clone(),
1483                Retention::Checkpoint {
1484                    id: birthday,
1485                    marking: Marking::None,
1486                },
1487            )
1488            .expect("infallible");
1489    }
1490
1491    Ok(())
1492}
1493
1494/// Compares the wallet birthday to sapling activation height and returns the highest block height.
1495fn checked_birthday<W: SyncWallet>(
1496    consensus_parameters: &impl consensus::Parameters,
1497    wallet: &W,
1498) -> Result<BlockHeight, W::Error> {
1499    let wallet_birthday = wallet.get_birthday()?;
1500    let sapling_activation_height = consensus_parameters
1501        .activation_height(consensus::NetworkUpgrade::Sapling)
1502        .expect("sapling activation height should always return Some");
1503
1504    match wallet_birthday.cmp(&sapling_activation_height) {
1505        cmp::Ordering::Greater | cmp::Ordering::Equal => Ok(wallet_birthday),
1506        cmp::Ordering::Less => Ok(sapling_activation_height),
1507    }
1508}
1509
1510/// Sets up mempool stream.
1511///
1512/// If there is some raw transaction, send to be scanned.
1513/// If the mempool stream message is `None` (a block was mined) or the request failed, setup a new mempool stream.
1514async fn mempool_monitor(
1515    mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1516    mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1517    unprocessed_transactions_count: Arc<AtomicU8>,
1518    shutdown_mempool: Arc<AtomicBool>,
1519) -> Result<(), MempoolError> {
1520    let mut interval = tokio::time::interval(Duration::from_secs(1));
1521    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1522    'main: loop {
1523        let response =
1524            client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1525
1526        match response {
1527            Ok(mut mempool_stream) => {
1528                interval.reset();
1529                loop {
1530                    tokio::select! {
1531                        mempool_stream_message = mempool_stream.message() => {
1532                            match mempool_stream_message.unwrap_or(None) {
1533                                Some(raw_transaction) => {
1534                                     let _ignore_error = mempool_transaction_sender
1535                                        .send(raw_transaction)
1536                                        .await;
1537                                    unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1538                                }
1539                                None => {
1540                                    continue 'main;
1541                                }
1542                            }
1543
1544                        }
1545
1546                        _ = interval.tick() => {
1547                            if shutdown_mempool.load(atomic::Ordering::Acquire) {
1548                                break 'main;
1549                            }
1550                        }
1551                    }
1552                }
1553            }
1554            Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1555            Err(MempoolError::ServerError(e)) => {
1556                tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1557                tokio::time::sleep(Duration::from_secs(3)).await;
1558            }
1559        }
1560    }
1561
1562    Ok(())
1563}
1564
1565fn reset_invalid_spends<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1566where
1567    W: SyncWallet + SyncTransactions,
1568{
1569    let wallet_height = wallet
1570        .get_sync_state()
1571        .map_err(SyncError::WalletError)?
1572        .wallet_height()
1573        .expect("wallet height must exist after scan ranges have been updated");
1574    let wallet_transactions = wallet
1575        .get_wallet_transactions_mut()
1576        .map_err(SyncError::WalletError)?;
1577
1578    let invalid_txids = wallet_transactions
1579        .values()
1580        .filter(|transaction| {
1581            matches!(transaction.status(), ConfirmationStatus::Mempool(_))
1582                && transaction.status().get_height()
1583                    <= wallet_height - MEMPOOL_SPEND_INVALIDATION_THRESHOLD
1584        })
1585        .map(super::wallet::WalletTransaction::txid)
1586        .chain(
1587            wallet_transactions
1588                .values()
1589                .filter(|transaction| {
1590                    (matches!(transaction.status(), ConfirmationStatus::Calculated(_))
1591                        || matches!(transaction.status(), ConfirmationStatus::Transmitted(_)))
1592                        && wallet_height >= transaction.transaction().expiry_height()
1593                })
1594                .map(super::wallet::WalletTransaction::txid),
1595        )
1596        .collect::<Vec<_>>();
1597    reset_spends(wallet_transactions, invalid_txids);
1598
1599    Ok(())
1600}