pepper_sync/
sync.rs

1//! Entrypoint for sync engine
2
3use std::cmp;
4use std::collections::{BTreeMap, HashMap};
5use std::io::{Read, Write};
6use std::sync::Arc;
7use std::sync::atomic::{self, AtomicBool, AtomicU8};
8use std::time::{Duration, SystemTime};
9
10use byteorder::{ReadBytesExt, WriteBytesExt};
11use tokio::sync::{Mutex, mpsc};
12
13use incrementalmerkletree::{Marking, Retention};
14use orchard::tree::MerkleHashOrchard;
15use shardtree::store::ShardStore;
16use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange};
17use zcash_client_backend::proto::service::RawTransaction;
18use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
19use zcash_keys::keys::UnifiedFullViewingKey;
20use zcash_primitives::transaction::{Transaction, TxId};
21use zcash_primitives::zip32::AccountId;
22use zcash_protocol::ShieldedProtocol;
23use zcash_protocol::consensus::{self, BlockHeight};
24
25use zingo_status::confirmation_status::ConfirmationStatus;
26
27use crate::client::{self, FetchRequest};
28use crate::error::{
29    ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
30    SyncStatusError,
31};
32use crate::keys::transparent::TransparentAddressId;
33use crate::scan::ScanResults;
34use crate::scan::task::{Scanner, ScannerState};
35use crate::scan::transactions::scan_transaction;
36use crate::wallet::traits::{
37    SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
38};
39use crate::wallet::{
40    Locator, NullifierMap, OutputId, SyncMode, SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
52pub(crate) const MAX_VERIFICATION_WINDOW: u32 = 100;
53
54/// A snapshot of the current state of sync. Useful for displaying the status of sync to a user / consumer.
55///
56/// `percentage_outputs_scanned` is a much more accurate indicator of sync completion than `percentage_blocks_scanned`.
57/// `percentage_total_outputs_scanned` is the percentage of outputs scanned from birthday to chain height.
58#[derive(Debug, Clone)]
59#[allow(missing_docs)]
60pub struct SyncStatus {
61    pub scan_ranges: Vec<ScanRange>,
62    pub sync_start_height: BlockHeight,
63    pub session_blocks_scanned: u32,
64    pub total_blocks_scanned: u32,
65    pub percentage_session_blocks_scanned: f32,
66    pub percentage_total_blocks_scanned: f32,
67    pub session_sapling_outputs_scanned: u32,
68    pub total_sapling_outputs_scanned: u32,
69    pub session_orchard_outputs_scanned: u32,
70    pub total_orchard_outputs_scanned: u32,
71    pub percentage_session_outputs_scanned: f32,
72    pub percentage_total_outputs_scanned: f32,
73}
74
75// TODO: complete display, scan ranges in raw form are too verbose
76impl std::fmt::Display for SyncStatus {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        write!(
79            f,
80            "percentage complete: {}",
81            self.percentage_total_outputs_scanned
82        )
83    }
84}
85
86impl From<SyncStatus> for json::JsonValue {
87    fn from(value: SyncStatus) -> Self {
88        let scan_ranges: Vec<json::JsonValue> = value
89            .scan_ranges
90            .iter()
91            .map(|range| {
92                json::object! {
93                    "priority" => format!("{:?}", range.priority()),
94                    "start_block" => range.block_range().start.to_string(),
95                    "end_block" => (range.block_range().end - 1).to_string(),
96                }
97            })
98            .collect();
99
100        json::object! {
101            "scan_ranges" => scan_ranges,
102            "sync_start_height" => u32::from(value.sync_start_height),
103            "session_blocks_scanned" => value.session_blocks_scanned,
104            "total_blocks_scanned" => value.total_blocks_scanned,
105            "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
106            "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
107            "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
108            "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
109            "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
110            "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
111            "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
112            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
113        }
114    }
115}
116
117/// Returned when [`crate::sync::sync`] successfully completes.
118#[derive(Debug, Clone)]
119#[allow(missing_docs)]
120pub struct SyncResult {
121    pub sync_start_height: BlockHeight,
122    pub sync_end_height: BlockHeight,
123    pub blocks_scanned: u32,
124    pub sapling_outputs_scanned: u32,
125    pub orchard_outputs_scanned: u32,
126    pub percentage_total_outputs_scanned: f32,
127}
128
129impl std::fmt::Display for SyncResult {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        write!(
132            f,
133            "Sync completed succesfully:
134{{
135    sync start height: {}
136    sync end height: {}
137    blocks scanned: {}
138    sapling outputs scanned: {}
139    orchard outputs scanned: {}
140    percentage total outputs scanned: {}
141}}",
142            self.sync_start_height,
143            self.sync_end_height,
144            self.blocks_scanned,
145            self.sapling_outputs_scanned,
146            self.orchard_outputs_scanned,
147            self.percentage_total_outputs_scanned,
148        )
149    }
150}
151
152impl From<SyncResult> for json::JsonValue {
153    fn from(value: SyncResult) -> Self {
154        json::object! {
155            "sync_start_height" => u32::from(value.sync_start_height),
156            "sync_end_height" => u32::from(value.sync_end_height),
157            "blocks_scanned" => value.blocks_scanned,
158            "sapling_outputs_scanned" => value.sapling_outputs_scanned,
159            "orchard_outputs_scanned" => value.orchard_outputs_scanned,
160            "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
161        }
162    }
163}
164
165/// Sync configuration.
166#[derive(Default, Debug, Clone)]
167pub struct SyncConfig {
168    /// Transparent address discovery configuration.
169    pub transparent_address_discovery: TransparentAddressDiscovery,
170}
171
172impl SyncConfig {
173    fn serialized_version() -> u8 {
174        0
175    }
176
177    /// Deserialize into `reader`
178    pub fn read<R: Read>(mut reader: R) -> std::io::Result<Self> {
179        let _version = reader.read_u8()?;
180
181        let gap_limit = reader.read_u8()?;
182        let scopes = reader.read_u8()?;
183        Ok(Self {
184            transparent_address_discovery: TransparentAddressDiscovery {
185                gap_limit,
186                scopes: TransparentAddressDiscoveryScopes {
187                    external: scopes & 0b1 != 0,
188                    internal: scopes & 0b10 != 0,
189                    refund: scopes & 0b100 != 0,
190                },
191            },
192        })
193    }
194
195    /// Serialize into `writer`
196    pub fn write<W: Write>(&mut self, mut writer: W) -> std::io::Result<()> {
197        writer.write_u8(Self::serialized_version())?;
198        writer.write_u8(self.transparent_address_discovery.gap_limit)?;
199        let mut scopes = 0;
200        if self.transparent_address_discovery.scopes.external {
201            scopes |= 0b1;
202        };
203        if self.transparent_address_discovery.scopes.internal {
204            scopes |= 0b10;
205        };
206        if self.transparent_address_discovery.scopes.refund {
207            scopes |= 0b100;
208        };
209        writer.write_u8(scopes)?;
210
211        Ok(())
212    }
213}
214
215/// Transparent address configuration.
216///
217/// Sets which `scopes` will be searched for addresses in use, scanning relevant transactions, up to a given `gap_limit`.
218#[derive(Debug, Clone)]
219pub struct TransparentAddressDiscovery {
220    /// Sets the gap limit for transparent address discovery.
221    pub gap_limit: u8,
222    /// Sets the scopes for transparent address discovery.
223    pub scopes: TransparentAddressDiscoveryScopes,
224}
225
226impl Default for TransparentAddressDiscovery {
227    fn default() -> Self {
228        Self {
229            gap_limit: 10,
230            scopes: TransparentAddressDiscoveryScopes::default(),
231        }
232    }
233}
234
235impl TransparentAddressDiscovery {
236    /// Constructs a transparent address discovery config with a gap limit of 1 and ignoring the internal scope.
237    pub fn minimal() -> Self {
238        Self {
239            gap_limit: 1,
240            scopes: TransparentAddressDiscoveryScopes::default(),
241        }
242    }
243
244    /// Constructs a transparent address discovery config with a gap limit of 20 for all scopes.
245    pub fn recovery() -> Self {
246        Self {
247            gap_limit: 20,
248            scopes: TransparentAddressDiscoveryScopes::recovery(),
249        }
250    }
251
252    /// Disables transparent address discovery. Sync will only scan transparent outputs for addresses already in the
253    /// wallet in transactions that also contain shielded inputs or outputs relevant to the wallet.
254    pub fn disabled() -> Self {
255        Self {
256            gap_limit: 0,
257            scopes: TransparentAddressDiscoveryScopes {
258                external: false,
259                internal: false,
260                refund: false,
261            },
262        }
263    }
264}
265
266/// Sets the active scopes for transparent address recovery.
267#[derive(Debug, Clone)]
268pub struct TransparentAddressDiscoveryScopes {
269    /// External.
270    pub external: bool,
271    /// Internal.
272    pub internal: bool,
273    /// Refund.
274    pub refund: bool,
275}
276
277impl Default for TransparentAddressDiscoveryScopes {
278    fn default() -> Self {
279        Self {
280            external: true,
281            internal: false,
282            refund: true,
283        }
284    }
285}
286
287impl TransparentAddressDiscoveryScopes {
288    /// Constructor with all all scopes active.
289    pub fn recovery() -> Self {
290        Self {
291            external: true,
292            internal: true,
293            refund: true,
294        }
295    }
296}
297
298/// Syncs a wallet to the latest state of the blockchain.
299///
300/// `sync_mode` is intended to be stored in a struct that owns the wallet(s) (i.e. lightclient) and has a non-atomic
301/// counterpart [`crate::wallet::SyncMode`]. The sync engine will set the `sync_mode` to `Running` at the start of sync.
302/// However, the consumer is required to set the `sync_mode` back to `NotRunning` when sync is succussful or returns an
303/// error. This allows more flexibility and safety with sync task handles etc.
304/// `sync_mode` may also be set to `Paused` externally to pause scanning so the wallet lock can be acquired multiple
305/// times in quick sucession without the sync engine interrupting.
306/// Setting `sync_mode` back to `Running` will resume scanning.
307/// Setting `sync_mode` to `Shutdown` will stop the sync process.
308pub async fn sync<P, W>(
309    client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
310    consensus_parameters: &P,
311    wallet: Arc<Mutex<W>>,
312    sync_mode: Arc<AtomicU8>,
313    config: SyncConfig,
314) -> Result<SyncResult, SyncError<W::Error>>
315where
316    P: consensus::Parameters + Sync + Send + 'static,
317    W: SyncWallet
318        + SyncBlocks
319        + SyncTransactions
320        + SyncNullifiers
321        + SyncOutPoints
322        + SyncShardTrees
323        + Send,
324{
325    let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
326    if sync_mode_enum == SyncMode::NotRunning {
327        sync_mode_enum = SyncMode::Running;
328        sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
329    } else {
330        return Err(SyncModeError::SyncAlreadyRunning.into());
331    }
332
333    tracing::info!("Starting sync...");
334
335    // create channel for sending fetch requests and launch fetcher task
336    let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
337    let client_clone = client.clone();
338    let fetcher_handle =
339        tokio::spawn(
340            async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
341        );
342
343    // create channel for receiving mempool transactions and launch mempool monitor
344    let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
345    let shutdown_mempool = Arc::new(AtomicBool::new(false));
346    let shutdown_mempool_clone = shutdown_mempool.clone();
347    let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
348    let unprocessed_mempool_transactions_count_clone =
349        unprocessed_mempool_transactions_count.clone();
350    let mempool_handle = tokio::spawn(async move {
351        mempool_monitor(
352            client,
353            mempool_transaction_sender,
354            unprocessed_mempool_transactions_count_clone,
355            shutdown_mempool_clone,
356        )
357        .await
358    });
359
360    // pre-scan initialisation
361    let mut wallet_guard = wallet.lock().await;
362
363    let mut wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_guard)
364        .map_err(SyncError::WalletError)?;
365    let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
366    if wallet_height > chain_height {
367        if wallet_height - chain_height > MAX_VERIFICATION_WINDOW {
368            return Err(SyncError::ChainError(MAX_VERIFICATION_WINDOW));
369        }
370        truncate_wallet_data(&mut *wallet_guard, chain_height)?;
371        wallet_height = chain_height;
372    }
373
374    let ufvks = wallet_guard
375        .get_unified_full_viewing_keys()
376        .map_err(SyncError::WalletError)?;
377
378    transparent::update_addresses_and_locators(
379        consensus_parameters,
380        &mut *wallet_guard,
381        fetch_request_sender.clone(),
382        &ufvks,
383        wallet_height,
384        chain_height,
385        config.transparent_address_discovery,
386    )
387    .await?;
388
389    #[cfg(not(feature = "darkside_test"))]
390    update_subtree_roots(
391        consensus_parameters,
392        fetch_request_sender.clone(),
393        &mut *wallet_guard,
394    )
395    .await?;
396
397    add_initial_frontier(
398        consensus_parameters,
399        fetch_request_sender.clone(),
400        &mut *wallet_guard,
401    )
402    .await?;
403
404    state::update_scan_ranges(
405        consensus_parameters,
406        wallet_height,
407        chain_height,
408        wallet_guard
409            .get_sync_state_mut()
410            .map_err(SyncError::WalletError)?,
411    )
412    .await;
413
414    state::set_initial_state(
415        consensus_parameters,
416        fetch_request_sender.clone(),
417        &mut *wallet_guard,
418        chain_height,
419    )
420    .await?;
421
422    let initial_verification_height = wallet_guard
423        .get_sync_state()
424        .map_err(SyncError::WalletError)?
425        .highest_scanned_height()
426        .expect("scan ranges must be non-empty")
427        + 1;
428
429    drop(wallet_guard);
430
431    // create channel for receiving scan results and launch scanner
432    let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
433    let mut scanner = Scanner::new(
434        consensus_parameters.clone(),
435        scan_results_sender,
436        fetch_request_sender.clone(),
437        ufvks.clone(),
438    );
439    scanner.launch();
440
441    // TODO: implement an option for continuous scanning where it doesnt exit when complete
442
443    let mut interval = tokio::time::interval(Duration::from_millis(50));
444    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
445    loop {
446        tokio::select! {
447            Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
448                let mut wallet_guard = wallet.lock().await;
449                process_scan_results(
450                    consensus_parameters,
451                    &mut *wallet_guard,
452                    fetch_request_sender.clone(),
453                    &ufvks,
454                    scan_range,
455                    scan_results,
456                    initial_verification_height,
457                )
458                .await?;
459                wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
460                drop(wallet_guard);
461            }
462
463            Some(raw_transaction) = mempool_transaction_receiver.recv() => {
464                let mut wallet_guard = wallet.lock().await;
465                process_mempool_transaction(
466                    consensus_parameters,
467                    &ufvks,
468                    &mut *wallet_guard,
469                    raw_transaction,
470                )
471                .await?;
472                unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
473                drop(wallet_guard);
474            }
475
476            _update_scanner = interval.tick() => {
477                sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
478                match sync_mode_enum {
479                    SyncMode::Paused => {
480                        let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
481                        pause_interval.tick().await;
482                        while sync_mode_enum == SyncMode::Paused {
483                            pause_interval.tick().await;
484                            sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
485                        }
486                    },
487                    SyncMode::Shutdown => {
488                        let mut wallet_guard = wallet.lock().await;
489                        let sync_status = match sync_status(&*wallet_guard).await {
490                            Ok(status) => status,
491                            Err(SyncStatusError::WalletError(e)) => {
492                                return Err(SyncError::WalletError(e));
493                            }
494                            Err(SyncStatusError::NoSyncData) => {
495                                panic!("sync data must exist!");
496                            }
497                        };
498                        wallet_guard
499                            .set_save_flag()
500                            .map_err(SyncError::WalletError)?;
501                        drop(wallet_guard);
502                        tracing::info!("Sync successfully shutdown.");
503
504                        return Ok(SyncResult {
505                            sync_start_height: sync_status.sync_start_height,
506                            sync_end_height: (sync_status
507                                .scan_ranges
508                                .last()
509                                .expect("should be non-empty after syncing")
510                                .block_range()
511                                .end
512                                - 1),
513                            blocks_scanned: sync_status.session_blocks_scanned,
514                            sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
515                            orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
516                            percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
517                        });
518                    }
519                    SyncMode::Running => (),
520                    SyncMode::NotRunning => {
521                        panic!("sync mode should not be manually set to NotRunning!");
522                    },
523                }
524
525                scanner.update(&mut *wallet.lock().await, shutdown_mempool.clone()).await?;
526
527                if matches!(scanner.state, ScannerState::Shutdown) {
528                    // wait for mempool monitor to receive mempool transactions
529                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
530                    if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
531                    {
532                        tracing::info!("Sync successfully shutdown.");
533                        break;
534                    }
535                }
536            }
537        }
538    }
539
540    let mut wallet_guard = wallet.lock().await;
541    let sync_status = match sync_status(&*wallet_guard).await {
542        Ok(status) => status,
543        Err(SyncStatusError::WalletError(e)) => {
544            return Err(SyncError::WalletError(e));
545        }
546        Err(SyncStatusError::NoSyncData) => {
547            panic!("sync data must exist!");
548        }
549    };
550    wallet_guard
551        .set_save_flag()
552        .map_err(SyncError::WalletError)?;
553
554    drop(wallet_guard);
555    drop(scanner);
556    drop(fetch_request_sender);
557
558    match mempool_handle.await.expect("task panicked") {
559        Ok(_) => (),
560        Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
561        Err(e) => return Err(e.into()),
562    }
563    fetcher_handle.await.expect("task panicked");
564
565    Ok(SyncResult {
566        sync_start_height: sync_status.sync_start_height,
567        sync_end_height: (sync_status
568            .scan_ranges
569            .last()
570            .expect("should be non-empty after syncing")
571            .block_range()
572            .end
573            - 1),
574        blocks_scanned: sync_status.session_blocks_scanned,
575        sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
576        orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
577        percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
578    })
579}
580
581/// Creates a [`self::SyncStatus`] from the wallet's current [`crate::wallet::SyncState`].
582///
583/// Intended to be called while [self::sync] is running in a separate task.
584pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
585where
586    W: SyncWallet + SyncBlocks,
587{
588    let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
589        state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
590    let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
591
592    let sync_state = wallet
593        .get_sync_state()
594        .map_err(SyncStatusError::WalletError)?;
595    if sync_state.initial_sync_state.sync_start_height == 0.into() {
596        return Ok(SyncStatus {
597            scan_ranges: sync_state.scan_ranges.clone(),
598            sync_start_height: 0.into(),
599            session_blocks_scanned: 0,
600            total_blocks_scanned: 0,
601            percentage_session_blocks_scanned: 0.0,
602            percentage_total_blocks_scanned: 0.0,
603            session_sapling_outputs_scanned: 0,
604            session_orchard_outputs_scanned: 0,
605            total_sapling_outputs_scanned: 0,
606            total_orchard_outputs_scanned: 0,
607            percentage_session_outputs_scanned: 0.0,
608            percentage_total_outputs_scanned: 0.0,
609        });
610    }
611    let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
612
613    let birthday = sync_state
614        .wallet_birthday()
615        .ok_or(SyncStatusError::NoSyncData)?;
616    let wallet_height = sync_state
617        .wallet_height()
618        .ok_or(SyncStatusError::NoSyncData)?;
619    let total_blocks = wallet_height - birthday + 1;
620    let total_sapling_outputs = sync_state
621        .initial_sync_state
622        .wallet_tree_bounds
623        .sapling_final_tree_size
624        - sync_state
625            .initial_sync_state
626            .wallet_tree_bounds
627            .sapling_initial_tree_size;
628    let total_orchard_outputs = sync_state
629        .initial_sync_state
630        .wallet_tree_bounds
631        .orchard_final_tree_size
632        - sync_state
633            .initial_sync_state
634            .wallet_tree_bounds
635            .orchard_initial_tree_size;
636    let total_outputs = total_sapling_outputs + total_orchard_outputs;
637
638    let session_blocks_scanned =
639        total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
640    let percentage_session_blocks_scanned = (session_blocks_scanned as f32
641        / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
642        * 100.0;
643    let percentage_total_blocks_scanned =
644        (total_blocks_scanned as f32 / total_blocks as f32) * 100.0;
645
646    let session_sapling_outputs_scanned = total_sapling_outputs_scanned
647        - sync_state
648            .initial_sync_state
649            .previously_scanned_sapling_outputs;
650    let session_orchard_outputs_scanned = total_orchard_outputs_scanned
651        - sync_state
652            .initial_sync_state
653            .previously_scanned_orchard_outputs;
654    let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
655    let previously_scanned_outputs = sync_state
656        .initial_sync_state
657        .previously_scanned_sapling_outputs
658        + sync_state
659            .initial_sync_state
660            .previously_scanned_orchard_outputs;
661    let percentage_session_outputs_scanned = (session_outputs_scanned as f32
662        / (total_outputs - previously_scanned_outputs) as f32)
663        * 100.0;
664    let percentage_total_outputs_scanned =
665        (total_outputs_scanned as f32 / total_outputs as f32) * 100.0;
666
667    Ok(SyncStatus {
668        scan_ranges: sync_state.scan_ranges.clone(),
669        sync_start_height: sync_state.initial_sync_state.sync_start_height,
670        session_blocks_scanned,
671        total_blocks_scanned,
672        percentage_session_blocks_scanned,
673        percentage_total_blocks_scanned,
674        session_sapling_outputs_scanned,
675        total_sapling_outputs_scanned,
676        session_orchard_outputs_scanned,
677        total_orchard_outputs_scanned,
678        percentage_session_outputs_scanned,
679        percentage_total_outputs_scanned,
680    })
681}
682
683/// Scans a pending `transaction` of a given `status`, adding to the wallet and updating output spend statuses.
684///
685/// Used both internally for scanning mempool transactions and externally for scanning calculated and transmitted
686/// transactions during send.
687///
688/// Fails if `status` is of `Confirmed` variant.
689pub fn scan_pending_transaction<W>(
690    consensus_parameters: &impl consensus::Parameters,
691    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
692    wallet: &mut W,
693    transaction: Transaction,
694    status: ConfirmationStatus,
695    datetime: u32,
696) -> Result<(), SyncError<W::Error>>
697where
698    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints,
699{
700    if matches!(status, ConfirmationStatus::Confirmed(_)) {
701        panic!("this fn is for unconfirmed transactions only");
702    }
703
704    let mut pending_transaction_nullifiers = NullifierMap::new();
705    let mut pending_transaction_outpoints = BTreeMap::new();
706    let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
707        .get_transparent_addresses()
708        .map_err(SyncError::WalletError)?
709        .iter()
710        .map(|(id, address)| (address.clone(), *id))
711        .collect();
712    let pending_transaction = scan_transaction(
713        consensus_parameters,
714        ufvks,
715        transaction.txid(),
716        transaction,
717        status,
718        None,
719        &mut pending_transaction_nullifiers,
720        &mut pending_transaction_outpoints,
721        &transparent_addresses,
722        datetime,
723    )?;
724
725    let wallet_transactions = wallet
726        .get_wallet_transactions()
727        .map_err(SyncError::WalletError)?;
728    let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
729    let transparent_spend_locators = spend::detect_transparent_spends(
730        &mut pending_transaction_outpoints,
731        transparent_output_ids,
732    );
733    let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
734        spend::collect_derived_nullifiers(wallet_transactions);
735    let (sapling_spend_locators, orchard_spend_locators) = spend::detect_shielded_spends(
736        &mut pending_transaction_nullifiers,
737        sapling_derived_nullifiers,
738        orchard_derived_nullifiers,
739    );
740
741    // return if transaction is not relevant to the wallet
742    if pending_transaction.transparent_coins().is_empty()
743        && pending_transaction.sapling_notes().is_empty()
744        && pending_transaction.orchard_notes().is_empty()
745        && pending_transaction.outgoing_orchard_notes().is_empty()
746        && pending_transaction.outgoing_sapling_notes().is_empty()
747        && transparent_spend_locators.is_empty()
748        && sapling_spend_locators.is_empty()
749        && orchard_spend_locators.is_empty()
750    {
751        return Ok(());
752    }
753
754    wallet
755        .insert_wallet_transaction(pending_transaction)
756        .map_err(SyncError::WalletError)?;
757    spend::update_spent_coins(
758        wallet
759            .get_wallet_transactions_mut()
760            .map_err(SyncError::WalletError)?,
761        transparent_spend_locators,
762    );
763    spend::update_spent_notes(
764        wallet
765            .get_wallet_transactions_mut()
766            .map_err(SyncError::WalletError)?,
767        sapling_spend_locators,
768        orchard_spend_locators,
769    );
770
771    Ok(())
772}
773
774/// API for targetted scanning.
775///
776/// Allows `scan_targets` to be added externally to the wallet's `sync_state` and be prioritised for scanning. Each
777/// scan target must include the block height which will be used to prioritise the block range containing the note
778/// commitments to the surrounding orchard shard(s). If the block height is pre-orchard then the surrounding sapling
779/// shard(s) will be prioritised instead. The txid in each scan target may be omitted and set to [0u8; 32] in order to
780/// prioritise the surrounding blocks for scanning but be ignored when fetching specific relevant transactions to the
781/// wallet. However, in the case where a relevant spending transaction at a given height contains no decryptable
782/// incoming notes (change), only the nullifier will be mapped and this transaction will be scanned when the
783/// transaction containing the spent notes is scanned instead.
784pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[Locator]) {
785    for scan_target in scan_targets {
786        sync_state.locators.insert(*scan_target);
787    }
788}
789
790/// Returns true if the scanner and mempool are shutdown.
791fn is_shutdown<P>(
792    scanner: &Scanner<P>,
793    mempool_unprocessed_transactions_count: Arc<AtomicU8>,
794) -> bool
795where
796    P: consensus::Parameters + Sync + Send + 'static,
797{
798    scanner.worker_poolsize() == 0
799        && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
800}
801
802/// Scan post-processing
803async fn process_scan_results<W>(
804    consensus_parameters: &impl consensus::Parameters,
805    wallet: &mut W,
806    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
807    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
808    scan_range: ScanRange,
809    scan_results: Result<ScanResults, ScanError>,
810    initial_verification_height: BlockHeight,
811) -> Result<(), SyncError<W::Error>>
812where
813    W: SyncWallet
814        + SyncBlocks
815        + SyncTransactions
816        + SyncNullifiers
817        + SyncOutPoints
818        + SyncShardTrees
819        + Send,
820{
821    match scan_results {
822        Ok(results) => {
823            let ScanResults {
824                nullifiers,
825                outpoints,
826                scanned_blocks,
827                wallet_transactions,
828                sapling_located_trees,
829                orchard_located_trees,
830            } = results;
831            update_wallet_data(
832                consensus_parameters,
833                wallet,
834                fetch_request_sender.clone(),
835                &scan_range,
836                nullifiers,
837                outpoints,
838                wallet_transactions,
839                sapling_located_trees,
840                orchard_located_trees,
841            )
842            .await?;
843            spend::update_transparent_spends(wallet).map_err(SyncError::WalletError)?;
844            spend::update_shielded_spends(
845                consensus_parameters,
846                wallet,
847                fetch_request_sender,
848                ufvks,
849                &scanned_blocks,
850            )
851            .await?;
852            add_scanned_blocks(wallet, scanned_blocks, &scan_range)
853                .map_err(SyncError::WalletError)?;
854            state::set_scanned_scan_range(
855                wallet
856                    .get_sync_state_mut()
857                    .map_err(SyncError::WalletError)?,
858                scan_range.block_range().clone(),
859            );
860            state::merge_scan_ranges(
861                wallet
862                    .get_sync_state_mut()
863                    .map_err(SyncError::WalletError)?,
864                ScanPriority::Scanned,
865            );
866            remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
867            tracing::debug!("Scan results processed.");
868        }
869        Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
870            if height == scan_range.block_range().start
871                && scan_range.priority() == ScanPriority::Verify
872            {
873                tracing::info!("Re-org detected.");
874                let sync_state = wallet
875                    .get_sync_state_mut()
876                    .map_err(SyncError::WalletError)?;
877                let wallet_height = sync_state
878                    .wallet_height()
879                    .expect("scan ranges should be non-empty in this scope");
880
881                // reset scan range from `Ignored` to `Verify`
882                state::set_scan_priority(
883                    sync_state,
884                    scan_range.block_range(),
885                    ScanPriority::Verify,
886                );
887
888                // extend verification range to VERIFY_BLOCK_RANGE_SIZE blocks below current verifaction range
889                let scan_range_to_verify = state::set_verify_scan_range(
890                    sync_state,
891                    height - 1,
892                    state::VerifyEnd::VerifyHighest,
893                );
894                state::merge_scan_ranges(sync_state, ScanPriority::Verify);
895
896                truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1)?;
897
898                if initial_verification_height - scan_range_to_verify.block_range().start
899                    > MAX_VERIFICATION_WINDOW
900                {
901                    return Err(ServerError::ChainVerificationError.into());
902                }
903
904                state::set_initial_state(
905                    consensus_parameters,
906                    fetch_request_sender.clone(),
907                    wallet,
908                    wallet_height,
909                )
910                .await?;
911            } else {
912                scan_results?;
913            }
914        }
915        Err(e) => return Err(e.into()),
916    }
917
918    Ok(())
919}
920
921/// Processes mempool transaction.
922///
923/// Scan the transaction and add to the wallet if relevant.
924async fn process_mempool_transaction<W>(
925    consensus_parameters: &impl consensus::Parameters,
926    ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
927    wallet: &mut W,
928    raw_transaction: RawTransaction,
929) -> Result<(), SyncError<W::Error>>
930where
931    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints,
932{
933    let block_height = BlockHeight::from_u32(
934        u32::try_from(raw_transaction.height + 1).expect("should be valid u32"),
935    );
936    let transaction = zcash_primitives::transaction::Transaction::read(
937        &raw_transaction.data[..],
938        consensus::BranchId::for_height(consensus_parameters, block_height),
939    )
940    .map_err(ServerError::InvalidTransaction)?;
941
942    tracing::debug!(
943        "mempool received txid {} at height {}",
944        transaction.txid(),
945        block_height
946    );
947
948    if let Some(tx) = wallet
949        .get_wallet_transactions()
950        .map_err(SyncError::WalletError)?
951        .get(&transaction.txid())
952    {
953        if tx.status().is_confirmed() {
954            return Ok(());
955        }
956    }
957
958    scan_pending_transaction(
959        consensus_parameters,
960        ufvks,
961        wallet,
962        transaction,
963        ConfirmationStatus::Mempool(block_height),
964        SystemTime::now()
965            .duration_since(SystemTime::UNIX_EPOCH)
966            .expect("infalliable for such long time periods")
967            .as_secs() as u32,
968    )?;
969
970    Ok(())
971}
972
973/// Removes all wallet data above the given `truncate_height`.
974fn truncate_wallet_data<W>(
975    wallet: &mut W,
976    truncate_height: BlockHeight,
977) -> Result<(), SyncError<W::Error>>
978where
979    W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees,
980{
981    let birthday = wallet
982        .get_sync_state()
983        .map_err(SyncError::WalletError)?
984        .wallet_birthday()
985        .expect("should be non-empty in this scope");
986    let checked_truncate_height = match truncate_height.cmp(&birthday) {
987        std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
988        std::cmp::Ordering::Less => birthday,
989    };
990
991    wallet
992        .truncate_wallet_blocks(checked_truncate_height)
993        .map_err(SyncError::WalletError)?;
994    wallet
995        .truncate_wallet_transactions(checked_truncate_height)
996        .map_err(SyncError::WalletError)?;
997    wallet
998        .truncate_nullifiers(checked_truncate_height)
999        .map_err(SyncError::WalletError)?;
1000    wallet.truncate_shard_trees(checked_truncate_height)?;
1001
1002    Ok(())
1003}
1004
1005/// Updates the wallet with data from `scan_results`
1006#[allow(clippy::too_many_arguments)]
1007async fn update_wallet_data<W>(
1008    consensus_parameters: &impl consensus::Parameters,
1009    wallet: &mut W,
1010    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1011    scan_range: &ScanRange,
1012    nullifiers: NullifierMap,
1013    mut outpoints: BTreeMap<OutputId, Locator>,
1014    wallet_transactions: HashMap<TxId, WalletTransaction>,
1015    sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1016    orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1017) -> Result<(), SyncError<W::Error>>
1018where
1019    W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1020{
1021    let sync_state = wallet
1022        .get_sync_state_mut()
1023        .map_err(SyncError::WalletError)?;
1024    let wallet_height = sync_state
1025        .wallet_height()
1026        .expect("scan ranges should not be empty in this scope");
1027    for transaction in wallet_transactions.values() {
1028        state::update_found_note_shard_priority(
1029            consensus_parameters,
1030            sync_state,
1031            ShieldedProtocol::Sapling,
1032            transaction,
1033        );
1034        state::update_found_note_shard_priority(
1035            consensus_parameters,
1036            sync_state,
1037            ShieldedProtocol::Orchard,
1038            transaction,
1039        );
1040    }
1041
1042    wallet
1043        .extend_wallet_transactions(wallet_transactions)
1044        .map_err(SyncError::WalletError)?;
1045    wallet
1046        .append_nullifiers(nullifiers)
1047        .map_err(SyncError::WalletError)?;
1048    wallet
1049        .append_outpoints(&mut outpoints)
1050        .map_err(SyncError::WalletError)?;
1051    wallet
1052        .update_shard_trees(
1053            fetch_request_sender,
1054            scan_range,
1055            wallet_height,
1056            sapling_located_trees,
1057            orchard_located_trees,
1058        )
1059        .await?;
1060
1061    Ok(())
1062}
1063
1064fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1065where
1066    W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1067{
1068    let fully_scanned_height = wallet
1069        .get_sync_state()?
1070        .fully_scanned_height()
1071        .expect("scan ranges must be non-empty");
1072
1073    wallet
1074        .get_outpoints_mut()?
1075        .retain(|_, (height, _)| *height > fully_scanned_height);
1076    wallet
1077        .get_nullifiers_mut()?
1078        .sapling
1079        .retain(|_, (height, _)| *height > fully_scanned_height);
1080    wallet
1081        .get_nullifiers_mut()?
1082        .orchard
1083        .retain(|_, (height, _)| *height > fully_scanned_height);
1084    wallet
1085        .get_sync_state_mut()?
1086        .locators
1087        .retain(|(height, _)| *height > fully_scanned_height);
1088    remove_irrelevant_blocks(wallet)?;
1089
1090    Ok(())
1091}
1092
1093fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1094where
1095    W: SyncWallet + SyncBlocks + SyncTransactions,
1096{
1097    let sync_state = wallet.get_sync_state()?;
1098    let highest_scanned_height = sync_state
1099        .highest_scanned_height()
1100        .expect("should be non-empty");
1101    let scanned_range_bounds = sync_state
1102        .scan_ranges()
1103        .iter()
1104        .filter(|scan_range| scan_range.priority() == ScanPriority::Scanned)
1105        .flat_map(|scanned_range| {
1106            vec![
1107                scanned_range.block_range().start,
1108                scanned_range.block_range().end - 1,
1109            ]
1110        })
1111        .collect::<Vec<_>>();
1112    let wallet_transaction_heights = wallet
1113        .get_wallet_transactions()?
1114        .values()
1115        .filter_map(|tx| tx.status().get_confirmed_height())
1116        .collect::<Vec<_>>();
1117
1118    wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1119        *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1120            || scanned_range_bounds.contains(height)
1121            || wallet_transaction_heights.contains(height)
1122    });
1123
1124    Ok(())
1125}
1126
1127fn add_scanned_blocks<W>(
1128    wallet: &mut W,
1129    mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1130    scan_range: &ScanRange,
1131) -> Result<(), W::Error>
1132where
1133    W: SyncWallet + SyncBlocks + SyncTransactions,
1134{
1135    let sync_state = wallet.get_sync_state()?;
1136    let highest_scanned_height = sync_state
1137        .highest_scanned_height()
1138        .expect("scan ranges must be non-empty");
1139
1140    let wallet_transaction_heights = wallet
1141        .get_wallet_transactions()?
1142        .values()
1143        .filter_map(|tx| tx.status().get_confirmed_height())
1144        .collect::<Vec<_>>();
1145
1146    scanned_blocks.retain(|height, _| {
1147        *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1148            || *height == scan_range.block_range().start
1149            || *height == scan_range.block_range().end - 1
1150            || wallet_transaction_heights.contains(height)
1151    });
1152
1153    wallet.append_wallet_blocks(scanned_blocks)?;
1154
1155    Ok(())
1156}
1157
1158#[cfg(not(feature = "darkside_test"))]
1159async fn update_subtree_roots<W>(
1160    consensus_parameters: &impl consensus::Parameters,
1161    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1162    wallet: &mut W,
1163) -> Result<(), SyncError<W::Error>>
1164where
1165    W: SyncWallet + SyncShardTrees,
1166{
1167    let sapling_start_index = wallet
1168        .get_shard_trees()
1169        .map_err(SyncError::WalletError)?
1170        .sapling
1171        .store()
1172        .get_shard_roots()
1173        .expect("infallible")
1174        .len() as u32;
1175    let orchard_start_index = wallet
1176        .get_shard_trees()
1177        .map_err(SyncError::WalletError)?
1178        .orchard
1179        .store()
1180        .get_shard_roots()
1181        .expect("infallible")
1182        .len() as u32;
1183    let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1184        client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1185        client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1186    );
1187
1188    let sapling_subtree_roots = sapling_subtree_roots?;
1189    let orchard_subtree_roots = orchard_subtree_roots?;
1190
1191    let sync_state = wallet
1192        .get_sync_state_mut()
1193        .map_err(SyncError::WalletError)?;
1194    state::add_shard_ranges(
1195        consensus_parameters,
1196        ShieldedProtocol::Sapling,
1197        sync_state,
1198        &sapling_subtree_roots,
1199    );
1200    state::add_shard_ranges(
1201        consensus_parameters,
1202        ShieldedProtocol::Orchard,
1203        sync_state,
1204        &orchard_subtree_roots,
1205    );
1206
1207    let shard_trees = wallet
1208        .get_shard_trees_mut()
1209        .map_err(SyncError::WalletError)?;
1210    witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1211    witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1212
1213    Ok(())
1214}
1215
1216async fn add_initial_frontier<W>(
1217    consensus_parameters: &impl consensus::Parameters,
1218    fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1219    wallet: &mut W,
1220) -> Result<(), SyncError<W::Error>>
1221where
1222    W: SyncWallet + SyncShardTrees,
1223{
1224    let birthday =
1225        checked_birthday(consensus_parameters, wallet).map_err(SyncError::WalletError)?;
1226    if birthday
1227        == consensus_parameters
1228            .activation_height(consensus::NetworkUpgrade::Sapling)
1229            .expect("sapling activation height should always return Some")
1230    {
1231        return Ok(());
1232    }
1233
1234    // if the shard store only contains the first checkpoint added on initialisation, add frontiers to complete the
1235    // shard trees.
1236    let shard_trees = wallet
1237        .get_shard_trees_mut()
1238        .map_err(SyncError::WalletError)?;
1239    if shard_trees
1240        .sapling
1241        .store()
1242        .checkpoint_count()
1243        .expect("infallible")
1244        == 1
1245    {
1246        let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1247        shard_trees
1248            .sapling
1249            .insert_frontier(
1250                frontiers.final_sapling_tree().clone(),
1251                Retention::Checkpoint {
1252                    id: birthday,
1253                    marking: Marking::None,
1254                },
1255            )
1256            .expect("infallible");
1257        shard_trees
1258            .orchard
1259            .insert_frontier(
1260                frontiers.final_orchard_tree().clone(),
1261                Retention::Checkpoint {
1262                    id: birthday,
1263                    marking: Marking::None,
1264                },
1265            )
1266            .expect("infallible");
1267    }
1268
1269    Ok(())
1270}
1271
1272/// Compares the wallet birthday to sapling activation height and returns the highest block height.
1273fn checked_birthday<W: SyncWallet>(
1274    consensus_parameters: &impl consensus::Parameters,
1275    wallet: &W,
1276) -> Result<BlockHeight, W::Error> {
1277    let wallet_birthday = wallet.get_birthday()?;
1278    let sapling_activation_height = consensus_parameters
1279        .activation_height(consensus::NetworkUpgrade::Sapling)
1280        .expect("sapling activation height should always return Some");
1281
1282    match wallet_birthday.cmp(&sapling_activation_height) {
1283        cmp::Ordering::Greater | cmp::Ordering::Equal => Ok(wallet_birthday),
1284        cmp::Ordering::Less => Ok(sapling_activation_height),
1285    }
1286}
1287
1288/// Sets up mempool stream.
1289///
1290/// If there is some raw transaction, send to be scanned.
1291/// If the mempool stream message is `None` (a block was mined) or the request failed, setup a new mempool stream.
1292async fn mempool_monitor(
1293    mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1294    mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1295    unprocessed_transactions_count: Arc<AtomicU8>,
1296    shutdown_mempool: Arc<AtomicBool>,
1297) -> Result<(), MempoolError> {
1298    let mut interval = tokio::time::interval(Duration::from_secs(1));
1299    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1300    'main: loop {
1301        let response =
1302            client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1303
1304        match response {
1305            Ok(mut mempool_stream) => {
1306                interval.reset();
1307                loop {
1308                    tokio::select! {
1309                        mempool_stream_message = mempool_stream.message() => {
1310                            match mempool_stream_message.unwrap_or(None) {
1311                                Some(raw_transaction) => {
1312                                     let _ignore_error = mempool_transaction_sender
1313                                        .send(raw_transaction)
1314                                        .await;
1315                                    unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1316                                }
1317                                None => {
1318                                    continue 'main;
1319                                }
1320                            }
1321
1322                        }
1323
1324                        _ = interval.tick() => {
1325                            if shutdown_mempool.load(atomic::Ordering::Acquire) {
1326                                break 'main;
1327                            }
1328                        }
1329                    }
1330                }
1331            }
1332            Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1333            Err(MempoolError::ServerError(e)) => {
1334                tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1335                tokio::time::sleep(Duration::from_secs(3)).await;
1336            }
1337        }
1338    }
1339
1340    Ok(())
1341}