1use std::collections::{BTreeMap, HashMap};
4use std::ops::Range;
5use std::sync::Arc;
6use std::sync::atomic::{self, AtomicBool, AtomicU8};
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::{RwLock, mpsc};
10
11use incrementalmerkletree::{Marking, Retention};
12use orchard::tree::MerkleHashOrchard;
13use shardtree::store::ShardStore;
14use tonic::transport::Channel;
15use zcash_client_backend::proto::service::RawTransaction;
16use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
17use zcash_keys::keys::UnifiedFullViewingKey;
18use zcash_primitives::transaction::{Transaction, TxId};
19use zcash_protocol::ShieldedProtocol;
20use zcash_protocol::consensus::{self, BlockHeight};
21use zip32::AccountId;
22
23use zingo_status::confirmation_status::ConfirmationStatus;
24
25use crate::client::{self, FetchRequest};
26use crate::config::{PerformanceLevel, SyncConfig};
27use crate::error::{
28 ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
29 SyncStatusError,
30};
31use crate::keys::transparent::TransparentAddressId;
32use crate::scan::ScanResults;
33use crate::scan::task::{Scanner, ScannerState};
34use crate::scan::transactions::scan_transaction;
35use crate::sync::state::truncate_scan_ranges;
36use crate::wallet::traits::{
37 SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
38};
39use crate::wallet::{
40 KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
41 SyncState, WalletBlock, WalletTransaction,
42};
43use crate::witness::LocatedTreeData;
44
45#[cfg(not(feature = "darkside_test"))]
46use crate::witness;
47
48pub(crate) mod spend;
49pub(crate) mod state;
50pub(crate) mod transparent;
51
52const UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
53pub(crate) const MAX_REORG_ALLOWANCE: u32 = 100;
54const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
55
56#[derive(Debug, Clone)]
61#[allow(missing_docs)]
62pub struct SyncStatus {
63 pub scan_ranges: Vec<ScanRange>,
64 pub sync_start_height: BlockHeight,
65 pub session_blocks_scanned: u32,
66 pub total_blocks_scanned: u32,
67 pub percentage_session_blocks_scanned: f32,
68 pub percentage_total_blocks_scanned: f32,
69 pub session_sapling_outputs_scanned: u32,
70 pub total_sapling_outputs_scanned: u32,
71 pub session_orchard_outputs_scanned: u32,
72 pub total_orchard_outputs_scanned: u32,
73 pub percentage_session_outputs_scanned: f32,
74 pub percentage_total_outputs_scanned: f32,
75}
76
77impl std::fmt::Display for SyncStatus {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(
81 f,
82 "percentage complete: {}",
83 self.percentage_total_outputs_scanned
84 )
85 }
86}
87
88impl From<SyncStatus> for json::JsonValue {
89 fn from(value: SyncStatus) -> Self {
90 let scan_ranges: Vec<json::JsonValue> = value
91 .scan_ranges
92 .iter()
93 .map(|range| {
94 json::object! {
95 "priority" => format!("{:?}", range.priority()),
96 "start_block" => range.block_range().start.to_string(),
97 "end_block" => (range.block_range().end - 1).to_string(),
98 }
99 })
100 .collect();
101
102 json::object! {
103 "scan_ranges" => scan_ranges,
104 "sync_start_height" => u32::from(value.sync_start_height),
105 "session_blocks_scanned" => value.session_blocks_scanned,
106 "total_blocks_scanned" => value.total_blocks_scanned,
107 "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
108 "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
109 "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
110 "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
111 "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
112 "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
113 "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
114 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
121#[allow(missing_docs)]
122pub struct SyncResult {
123 pub sync_start_height: BlockHeight,
124 pub sync_end_height: BlockHeight,
125 pub blocks_scanned: u32,
126 pub sapling_outputs_scanned: u32,
127 pub orchard_outputs_scanned: u32,
128 pub percentage_total_outputs_scanned: f32,
129}
130
131impl std::fmt::Display for SyncResult {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(
134 f,
135 "Sync completed succesfully:
136{{
137 sync start height: {}
138 sync end height: {}
139 blocks scanned: {}
140 sapling outputs scanned: {}
141 orchard outputs scanned: {}
142 percentage total outputs scanned: {}
143}}",
144 self.sync_start_height,
145 self.sync_end_height,
146 self.blocks_scanned,
147 self.sapling_outputs_scanned,
148 self.orchard_outputs_scanned,
149 self.percentage_total_outputs_scanned,
150 )
151 }
152}
153
154impl From<SyncResult> for json::JsonValue {
155 fn from(value: SyncResult) -> Self {
156 json::object! {
157 "sync_start_height" => u32::from(value.sync_start_height),
158 "sync_end_height" => u32::from(value.sync_end_height),
159 "blocks_scanned" => value.blocks_scanned,
160 "sapling_outputs_scanned" => value.sapling_outputs_scanned,
161 "orchard_outputs_scanned" => value.orchard_outputs_scanned,
162 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
163 }
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
169pub enum ScanPriority {
170 RefetchingNullifiers,
172 Scanning,
174 Scanned,
176 ScannedWithoutMapping,
181 Historic,
183 OpenAdjacent,
185 FoundNote,
187 ChainTip,
189 Verify,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
196pub struct ScanRange {
197 block_range: Range<BlockHeight>,
198 priority: ScanPriority,
199}
200
201impl std::fmt::Display for ScanRange {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 write!(
204 f,
205 "{:?}({}..{})",
206 self.priority, self.block_range.start, self.block_range.end,
207 )
208 }
209}
210
211impl ScanRange {
212 #[must_use]
214 pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
215 assert!(
216 block_range.end >= block_range.start,
217 "{block_range:?} is invalid for ScanRange({priority:?})",
218 );
219 ScanRange {
220 block_range,
221 priority,
222 }
223 }
224
225 #[must_use]
227 pub fn block_range(&self) -> &Range<BlockHeight> {
228 &self.block_range
229 }
230
231 #[must_use]
233 pub fn priority(&self) -> ScanPriority {
234 self.priority
235 }
236
237 #[must_use]
239 pub fn is_empty(&self) -> bool {
240 self.block_range.is_empty()
241 }
242
243 #[must_use]
245 pub fn len(&self) -> usize {
246 usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
247 .expect("due to number of max blocks should always be valid usize")
248 }
249
250 #[must_use]
254 pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
255 if block_height >= self.block_range.end || self.is_empty() {
256 None
257 } else {
258 Some(ScanRange {
259 block_range: self.block_range.start.max(block_height)..self.block_range.end,
260 priority: self.priority,
261 })
262 }
263 }
264
265 #[must_use]
269 pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
270 if block_height <= self.block_range.start || self.is_empty() {
271 None
272 } else {
273 Some(ScanRange {
274 block_range: self.block_range.start..self.block_range.end.min(block_height),
275 priority: self.priority,
276 })
277 }
278 }
279
280 #[must_use]
284 pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
285 (p > self.block_range.start && p < self.block_range.end).then_some((
286 ScanRange {
287 block_range: self.block_range.start..p,
288 priority: self.priority,
289 },
290 ScanRange {
291 block_range: p..self.block_range.end,
292 priority: self.priority,
293 },
294 ))
295 }
296}
297
298pub async fn sync<P, W>(
309 client: CompactTxStreamerClient<Channel>,
310 consensus_parameters: &P,
311 wallet: Arc<RwLock<W>>,
312 sync_mode: Arc<AtomicU8>,
313 config: SyncConfig,
314) -> Result<SyncResult, SyncError<W::Error>>
315where
316 P: consensus::Parameters + Sync + Send + 'static,
317 W: SyncWallet
318 + SyncBlocks
319 + SyncTransactions
320 + SyncNullifiers
321 + SyncOutPoints
322 + SyncShardTrees
323 + Send,
324{
325 let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
326 if sync_mode_enum == SyncMode::NotRunning {
327 sync_mode_enum = SyncMode::Running;
328 sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
329 } else {
330 return Err(SyncModeError::SyncAlreadyRunning.into());
331 }
332
333 tracing::info!("Starting sync...");
334
335 let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
337 let client_clone = client.clone();
338 let fetcher_handle =
339 tokio::spawn(
340 async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
341 );
342
343 let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
345 let shutdown_mempool = Arc::new(AtomicBool::new(false));
346 let shutdown_mempool_clone = shutdown_mempool.clone();
347 let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
348 let unprocessed_mempool_transactions_count_clone =
349 unprocessed_mempool_transactions_count.clone();
350 let mempool_handle = tokio::spawn(async move {
351 mempool_monitor(
352 client,
353 mempool_transaction_sender,
354 unprocessed_mempool_transactions_count_clone,
355 shutdown_mempool_clone,
356 )
357 .await
358 });
359
360 let mut wallet_guard = wallet.write().await;
362
363 let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
364 if chain_height == 0.into() {
365 return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
366 }
367 let last_known_chain_height =
368 checked_wallet_height(&mut *wallet_guard, chain_height, consensus_parameters)?;
369
370 let ufvks = wallet_guard
371 .get_unified_full_viewing_keys()
372 .map_err(SyncError::WalletError)?;
373
374 transparent::update_addresses_and_scan_targets(
375 consensus_parameters,
376 &mut *wallet_guard,
377 fetch_request_sender.clone(),
378 &ufvks,
379 last_known_chain_height,
380 chain_height,
381 config.transparent_address_discovery,
382 )
383 .await?;
384
385 #[cfg(not(feature = "darkside_test"))]
386 update_subtree_roots(
387 consensus_parameters,
388 fetch_request_sender.clone(),
389 &mut *wallet_guard,
390 )
391 .await?;
392
393 add_initial_frontier(
394 consensus_parameters,
395 fetch_request_sender.clone(),
396 &mut *wallet_guard,
397 )
398 .await?;
399
400 let initial_reorg_detection_start_height = state::update_scan_ranges(
401 consensus_parameters,
402 last_known_chain_height,
403 chain_height,
404 wallet_guard
405 .get_sync_state_mut()
406 .map_err(SyncError::WalletError)?,
407 );
408
409 state::set_initial_state(
410 consensus_parameters,
411 fetch_request_sender.clone(),
412 &mut *wallet_guard,
413 chain_height,
414 )
415 .await?;
416
417 expire_transactions(&mut *wallet_guard)?;
418
419 drop(wallet_guard);
420
421 let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
423 let mut scanner = Scanner::new(
424 consensus_parameters.clone(),
425 scan_results_sender,
426 fetch_request_sender.clone(),
427 ufvks.clone(),
428 );
429 scanner.launch(config.performance_level);
430
431 let mut nullifier_map_limit_exceeded = false;
434 let mut interval = tokio::time::interval(Duration::from_millis(50));
435 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
436 loop {
437 tokio::select! {
438 Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
439 let mut wallet_guard = wallet.write().await;
440 process_scan_results(
441 consensus_parameters,
442 &mut *wallet_guard,
443 fetch_request_sender.clone(),
444 &ufvks,
445 scan_range,
446 scan_results,
447 initial_reorg_detection_start_height,
448 config.performance_level,
449 &mut nullifier_map_limit_exceeded,
450 )
451 .await?;
452 wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
453 drop(wallet_guard);
454 }
455
456 Some(raw_transaction) = mempool_transaction_receiver.recv() => {
457 let mut wallet_guard = wallet.write().await;
458 process_mempool_transaction(
459 consensus_parameters,
460 &ufvks,
461 &mut *wallet_guard,
462 raw_transaction,
463 )
464 .await?;
465 unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
466 drop(wallet_guard);
467 }
468
469 _update_scanner = interval.tick() => {
470 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
471 match sync_mode_enum {
472 SyncMode::Paused => {
473 let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
474 pause_interval.tick().await;
475 while sync_mode_enum == SyncMode::Paused {
476 pause_interval.tick().await;
477 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
478 }
479 },
480 SyncMode::Shutdown => {
481 let mut wallet_guard = wallet.write().await;
482 let sync_status = match sync_status(&*wallet_guard).await {
483 Ok(status) => status,
484 Err(SyncStatusError::WalletError(e)) => {
485 return Err(SyncError::WalletError(e));
486 }
487 Err(SyncStatusError::NoSyncData) => {
488 panic!("sync data must exist!");
489 }
490 };
491 wallet_guard
492 .set_save_flag()
493 .map_err(SyncError::WalletError)?;
494 drop(wallet_guard);
495 tracing::info!("Sync successfully shutdown.");
496
497 return Ok(SyncResult {
498 sync_start_height: sync_status.sync_start_height,
499 sync_end_height: (sync_status
500 .scan_ranges
501 .last()
502 .expect("should be non-empty after syncing")
503 .block_range()
504 .end
505 - 1),
506 blocks_scanned: sync_status.session_blocks_scanned,
507 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
508 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
509 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
510 });
511 }
512 SyncMode::Running => (),
513 SyncMode::NotRunning => {
514 panic!("sync mode should not be manually set to NotRunning!");
515 },
516 }
517
518 scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), nullifier_map_limit_exceeded).await?;
519
520 if matches!(scanner.state, ScannerState::Shutdown) {
521 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
523 if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
524 {
525 tracing::info!("Sync successfully shutdown.");
526 break;
527 }
528 }
529 }
530 }
531 }
532
533 let mut wallet_guard = wallet.write().await;
534 let sync_status = match sync_status(&*wallet_guard).await {
535 Ok(status) => status,
536 Err(SyncStatusError::WalletError(e)) => {
537 return Err(SyncError::WalletError(e));
538 }
539 Err(SyncStatusError::NoSyncData) => {
540 panic!("sync data must exist!");
541 }
542 };
543 for transaction in wallet_guard
545 .get_wallet_transactions_mut()
546 .map_err(SyncError::WalletError)?
547 .values_mut()
548 {
549 for note in transaction.sapling_notes.as_mut_slice() {
550 note.refetch_nullifier_ranges = Vec::new();
551 }
552 for note in transaction.orchard_notes.as_mut_slice() {
553 note.refetch_nullifier_ranges = Vec::new();
554 }
555 }
556 wallet_guard
557 .set_save_flag()
558 .map_err(SyncError::WalletError)?;
559
560 drop(wallet_guard);
561 drop(scanner);
562 drop(fetch_request_sender);
563
564 match mempool_handle.await.expect("task panicked") {
565 Ok(()) => (),
566 Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
567 Err(e) => return Err(e.into()),
568 }
569 fetcher_handle.await.expect("task panicked");
570
571 Ok(SyncResult {
572 sync_start_height: sync_status.sync_start_height,
573 sync_end_height: (sync_status
574 .scan_ranges
575 .last()
576 .expect("should be non-empty after syncing")
577 .block_range()
578 .end
579 - 1),
580 blocks_scanned: sync_status.session_blocks_scanned,
581 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
582 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
583 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
584 })
585}
586
587fn checked_wallet_height<W, P>(
603 wallet: &mut W,
604 chain_height: BlockHeight,
605 consensus_parameters: &P,
606) -> Result<BlockHeight, SyncError<W::Error>>
607where
608 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
609 P: zcash_protocol::consensus::Parameters,
610{
611 let sync_state = wallet.get_sync_state().map_err(SyncError::WalletError)?;
612 if let Some(last_known_chain_height) = sync_state.last_known_chain_height() {
613 if last_known_chain_height > chain_height {
614 if last_known_chain_height - chain_height >= MAX_REORG_ALLOWANCE {
615 return Err(SyncError::ChainError(
619 u32::from(last_known_chain_height),
620 MAX_REORG_ALLOWANCE,
621 u32::from(chain_height),
622 ));
623 }
624 truncate_wallet_data(wallet, chain_height)?;
627 return Ok(chain_height);
628 }
629 Ok(last_known_chain_height)
631 } else {
632 let sapling_activation_height = consensus_parameters
634 .activation_height(consensus::NetworkUpgrade::Sapling)
635 .expect("sapling activation height should always return Some");
636 let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
637 if birthday > chain_height {
638 return Err(SyncError::ChainError(
641 u32::from(birthday),
642 MAX_REORG_ALLOWANCE,
643 u32::from(chain_height),
644 ));
645 } else if birthday < sapling_activation_height {
646 return Err(SyncError::BirthdayBelowSapling(
647 u32::from(birthday),
648 u32::from(sapling_activation_height),
649 ));
650 }
651
652 Ok(birthday - 1)
653 }
654}
655
656pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
662where
663 W: SyncWallet + SyncBlocks,
664{
665 let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
666 state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
667 let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
668
669 let sync_state = wallet
670 .get_sync_state()
671 .map_err(SyncStatusError::WalletError)?;
672 if sync_state.initial_sync_state.sync_start_height == 0.into() {
673 return Ok(SyncStatus {
674 scan_ranges: sync_state.scan_ranges.clone(),
675 sync_start_height: 0.into(),
676 session_blocks_scanned: 0,
677 total_blocks_scanned: 0,
678 percentage_session_blocks_scanned: 0.0,
679 percentage_total_blocks_scanned: 0.0,
680 session_sapling_outputs_scanned: 0,
681 session_orchard_outputs_scanned: 0,
682 total_sapling_outputs_scanned: 0,
683 total_orchard_outputs_scanned: 0,
684 percentage_session_outputs_scanned: 0.0,
685 percentage_total_outputs_scanned: 0.0,
686 });
687 }
688 let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
689
690 let birthday = sync_state
691 .wallet_birthday()
692 .ok_or(SyncStatusError::NoSyncData)?;
693 let last_known_chain_height = sync_state
694 .last_known_chain_height()
695 .ok_or(SyncStatusError::NoSyncData)?;
696 let total_blocks = last_known_chain_height - birthday + 1;
697 let total_sapling_outputs = sync_state
698 .initial_sync_state
699 .wallet_tree_bounds
700 .sapling_final_tree_size
701 - sync_state
702 .initial_sync_state
703 .wallet_tree_bounds
704 .sapling_initial_tree_size;
705 let total_orchard_outputs = sync_state
706 .initial_sync_state
707 .wallet_tree_bounds
708 .orchard_final_tree_size
709 - sync_state
710 .initial_sync_state
711 .wallet_tree_bounds
712 .orchard_initial_tree_size;
713 let total_outputs = total_sapling_outputs + total_orchard_outputs;
714
715 let session_blocks_scanned =
716 total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
717 let mut percentage_session_blocks_scanned = ((session_blocks_scanned as f32
718 / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
719 * 100.0)
720 .clamp(0.0, 100.0);
721 let mut percentage_total_blocks_scanned =
722 ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
723
724 let session_sapling_outputs_scanned = total_sapling_outputs_scanned
725 - sync_state
726 .initial_sync_state
727 .previously_scanned_sapling_outputs;
728 let session_orchard_outputs_scanned = total_orchard_outputs_scanned
729 - sync_state
730 .initial_sync_state
731 .previously_scanned_orchard_outputs;
732 let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
733 let previously_scanned_outputs = sync_state
734 .initial_sync_state
735 .previously_scanned_sapling_outputs
736 + sync_state
737 .initial_sync_state
738 .previously_scanned_orchard_outputs;
739 let mut percentage_session_outputs_scanned = ((session_outputs_scanned as f32
740 / (total_outputs - previously_scanned_outputs) as f32)
741 * 100.0)
742 .clamp(0.0, 100.0);
743 let mut percentage_total_outputs_scanned =
744 ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
745
746 if sync_state.scan_ranges().iter().any(|scan_range| {
747 scan_range.priority() == ScanPriority::ScannedWithoutMapping
748 || scan_range.priority() == ScanPriority::RefetchingNullifiers
749 }) {
750 if percentage_session_blocks_scanned == 100.0 {
751 percentage_session_blocks_scanned = 99.0;
752 }
753 if percentage_total_blocks_scanned == 100.0 {
754 percentage_total_blocks_scanned = 99.0;
755 }
756 if percentage_session_outputs_scanned == 100.0 {
757 percentage_session_outputs_scanned = 99.0;
758 }
759 if percentage_total_outputs_scanned == 100.0 {
760 percentage_total_outputs_scanned = 99.0;
761 }
762 }
763
764 Ok(SyncStatus {
765 scan_ranges: sync_state.scan_ranges.clone(),
766 sync_start_height: sync_state.initial_sync_state.sync_start_height,
767 session_blocks_scanned,
768 total_blocks_scanned,
769 percentage_session_blocks_scanned,
770 percentage_total_blocks_scanned,
771 session_sapling_outputs_scanned,
772 total_sapling_outputs_scanned,
773 session_orchard_outputs_scanned,
774 total_orchard_outputs_scanned,
775 percentage_session_outputs_scanned,
776 percentage_total_outputs_scanned,
777 })
778}
779
780pub fn scan_pending_transaction<W>(
787 consensus_parameters: &impl consensus::Parameters,
788 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
789 wallet: &mut W,
790 transaction: Transaction,
791 status: ConfirmationStatus,
792 datetime: u32,
793) -> Result<(), SyncError<W::Error>>
794where
795 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
796{
797 if matches!(status, ConfirmationStatus::Confirmed(_)) {
798 panic!("this fn is for unconfirmed transactions only");
799 }
800
801 let mut pending_transaction_nullifiers = NullifierMap::new();
802 let mut pending_transaction_outpoints = BTreeMap::new();
803 let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
804 .get_transparent_addresses()
805 .map_err(SyncError::WalletError)?
806 .iter()
807 .map(|(id, address)| (address.clone(), *id))
808 .collect();
809 let pending_transaction = scan_transaction(
810 consensus_parameters,
811 ufvks,
812 transaction.txid(),
813 transaction,
814 status,
815 None,
816 &mut pending_transaction_nullifiers,
817 &mut pending_transaction_outpoints,
818 &transparent_addresses,
819 datetime,
820 )?;
821
822 let wallet_transactions = wallet
823 .get_wallet_transactions()
824 .map_err(SyncError::WalletError)?;
825 let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
826 let transparent_spend_scan_targets = spend::detect_transparent_spends(
827 &mut pending_transaction_outpoints,
828 transparent_output_ids,
829 );
830 let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
831 spend::collect_derived_nullifiers(wallet_transactions);
832 let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
833 &mut pending_transaction_nullifiers,
834 sapling_derived_nullifiers,
835 orchard_derived_nullifiers,
836 );
837
838 if pending_transaction.transparent_coins().is_empty()
840 && pending_transaction.sapling_notes().is_empty()
841 && pending_transaction.orchard_notes().is_empty()
842 && pending_transaction.outgoing_orchard_notes().is_empty()
843 && pending_transaction.outgoing_sapling_notes().is_empty()
844 && transparent_spend_scan_targets.is_empty()
845 && sapling_spend_scan_targets.is_empty()
846 && orchard_spend_scan_targets.is_empty()
847 {
848 return Ok(());
849 }
850
851 wallet
852 .insert_wallet_transaction(pending_transaction)
853 .map_err(SyncError::WalletError)?;
854 spend::update_spent_coins(
855 wallet
856 .get_wallet_transactions_mut()
857 .map_err(SyncError::WalletError)?,
858 transparent_spend_scan_targets,
859 );
860 spend::update_spent_notes(
861 wallet,
862 sapling_spend_scan_targets,
863 orchard_spend_scan_targets,
864 false,
865 )
866 .map_err(SyncError::WalletError)?;
867
868 Ok(())
869}
870
871pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
882 for scan_target in scan_targets {
883 sync_state.scan_targets.insert(*scan_target);
884 }
885}
886
887pub fn reset_spends(
893 wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
894 invalid_txids: Vec<TxId>,
895) {
896 wallet_transactions
897 .values_mut()
898 .flat_map(|transaction| transaction.orchard_notes_mut())
899 .filter(|output| {
900 output
901 .spending_transaction
902 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
903 })
904 .for_each(|output| {
905 output.set_spending_transaction(None);
906 });
907 wallet_transactions
908 .values_mut()
909 .flat_map(|transaction| transaction.sapling_notes_mut())
910 .filter(|output| {
911 output
912 .spending_transaction
913 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
914 })
915 .for_each(|output| {
916 output.set_spending_transaction(None);
917 });
918 wallet_transactions
919 .values_mut()
920 .flat_map(|transaction| transaction.transparent_coins_mut())
921 .filter(|output| {
922 output
923 .spending_transaction
924 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
925 })
926 .for_each(|output| {
927 output.set_spending_transaction(None);
928 });
929}
930
931pub fn set_transactions_failed(
935 wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
936 failed_txids: Vec<TxId>,
937) {
938 for failed_txid in failed_txids.iter() {
939 if let Some(transaction) = wallet_transactions.get_mut(failed_txid) {
940 let height = transaction.status().get_height();
941 transaction.update_status(
942 ConfirmationStatus::Failed(height),
943 SystemTime::now()
944 .duration_since(SystemTime::UNIX_EPOCH)
945 .expect("infalliable for such long time periods")
946 .as_secs() as u32,
947 );
948 }
949 }
950 reset_spends(wallet_transactions, failed_txids);
951}
952
953fn is_shutdown<P>(
955 scanner: &Scanner<P>,
956 mempool_unprocessed_transactions_count: Arc<AtomicU8>,
957) -> bool
958where
959 P: consensus::Parameters + Sync + Send + 'static,
960{
961 scanner.worker_poolsize() == 0
962 && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
963}
964
965#[allow(clippy::too_many_arguments)]
967async fn process_scan_results<W>(
968 consensus_parameters: &impl consensus::Parameters,
969 wallet: &mut W,
970 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
971 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
972 scan_range: ScanRange,
973 scan_results: Result<ScanResults, ScanError>,
974 initial_reorg_detection_start_height: BlockHeight,
975 performance_level: PerformanceLevel,
976 nullifier_map_limit_exceeded: &mut bool,
977) -> Result<(), SyncError<W::Error>>
978where
979 W: SyncWallet
980 + SyncBlocks
981 + SyncTransactions
982 + SyncNullifiers
983 + SyncOutPoints
984 + SyncShardTrees
985 + Send,
986{
987 match scan_results {
988 Ok(results) => {
989 let ScanResults {
990 mut nullifiers,
991 mut outpoints,
992 scanned_blocks,
993 wallet_transactions,
994 sapling_located_trees,
995 orchard_located_trees,
996 } = results;
997
998 if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
999 let full_refetching_nullifiers_range = wallet
1002 .get_sync_state()
1003 .map_err(SyncError::WalletError)?
1004 .scan_ranges
1005 .iter()
1006 .find(|&wallet_scan_range| {
1007 wallet_scan_range
1008 .block_range()
1009 .contains(&scan_range.block_range().start)
1010 && wallet_scan_range
1011 .block_range()
1012 .contains(&(scan_range.block_range().end - 1))
1013 })
1014 .expect("wallet scan range containing scan range should exist!");
1015 if scan_range.block_range().start
1016 != full_refetching_nullifiers_range.block_range().start
1017 || scan_range.block_range().end
1018 != full_refetching_nullifiers_range.block_range().end
1019 {
1020 let mut missing_block_bounds = BTreeMap::new();
1021 for block_bound in [
1022 scan_range.block_range().start - 1,
1023 scan_range.block_range().start,
1024 scan_range.block_range().end - 1,
1025 scan_range.block_range().end,
1026 ] {
1027 if block_bound < full_refetching_nullifiers_range.block_range().start
1028 || block_bound >= full_refetching_nullifiers_range.block_range().end
1029 {
1030 continue;
1031 }
1032 if wallet.get_wallet_block(block_bound).is_err() {
1033 missing_block_bounds.insert(
1034 block_bound,
1035 WalletBlock::from_compact_block(
1036 consensus_parameters,
1037 fetch_request_sender.clone(),
1038 &client::get_compact_block(
1039 fetch_request_sender.clone(),
1040 block_bound,
1041 )
1042 .await?,
1043 )
1044 .await?,
1045 );
1046 }
1047 }
1048 if !missing_block_bounds.is_empty() {
1049 wallet
1050 .append_wallet_blocks(missing_block_bounds)
1051 .map_err(SyncError::WalletError)?;
1052 }
1053 }
1054
1055 let first_unscanned_range = wallet
1056 .get_sync_state()
1057 .map_err(SyncError::WalletError)?
1058 .scan_ranges
1059 .iter()
1060 .find(|scan_range| scan_range.priority() != ScanPriority::Scanned)
1061 .expect("the scan range being processed is not yet set to scanned so at least one unscanned range must exist");
1062 if !first_unscanned_range
1063 .block_range()
1064 .contains(&scan_range.block_range().start)
1065 || !first_unscanned_range
1066 .block_range()
1067 .contains(&(scan_range.block_range().end - 1))
1068 {
1069 state::reset_refetching_nullifiers_scan_range(
1073 wallet
1074 .get_sync_state_mut()
1075 .map_err(SyncError::WalletError)?,
1076 scan_range.block_range().clone(),
1077 );
1078 tracing::debug!(
1079 "Nullifiers discarded and will be re-fetched to avoid missing spends."
1080 );
1081
1082 return Ok(());
1083 }
1084
1085 spend::update_shielded_spends(
1086 consensus_parameters,
1087 wallet,
1088 fetch_request_sender.clone(),
1089 ufvks,
1090 &scanned_blocks,
1091 Some(&mut nullifiers),
1092 )
1093 .await?;
1094
1095 state::set_scanned_scan_range(
1096 wallet
1097 .get_sync_state_mut()
1098 .map_err(SyncError::WalletError)?,
1099 scan_range.block_range().clone(),
1100 true, );
1102 } else {
1103 if !*nullifier_map_limit_exceeded {
1105 let nullifier_map = wallet.get_nullifiers().map_err(SyncError::WalletError)?;
1106 if max_nullifier_map_size(performance_level).is_some_and(|max| {
1107 nullifier_map.orchard.len()
1108 + nullifier_map.sapling.len()
1109 + nullifiers.orchard.len()
1110 + nullifiers.sapling.len()
1111 > max
1112 }) {
1113 *nullifier_map_limit_exceeded = true;
1114 }
1115 }
1116 let mut map_nullifiers = !*nullifier_map_limit_exceeded;
1117
1118 let map_outpoints = scan_range.priority() >= ScanPriority::FoundNote;
1121
1122 for query_scan_range in wallet
1129 .get_sync_state()
1130 .map_err(SyncError::WalletError)?
1131 .scan_ranges()
1132 {
1133 let scan_priority = query_scan_range.priority();
1134 if scan_priority != ScanPriority::Scanned
1135 && scan_priority != ScanPriority::Scanning
1136 && scan_priority != ScanPriority::RefetchingNullifiers
1137 {
1138 break;
1139 }
1140
1141 if scan_priority == ScanPriority::Scanning
1142 && query_scan_range
1143 .block_range()
1144 .contains(&scan_range.block_range().start)
1145 && query_scan_range
1146 .block_range()
1147 .contains(&(scan_range.block_range().end - 1))
1148 {
1149 map_nullifiers = true;
1150 break;
1151 }
1152 }
1153
1154 update_wallet_data(
1155 consensus_parameters,
1156 wallet,
1157 fetch_request_sender.clone(),
1158 ufvks,
1159 &scan_range,
1160 if map_nullifiers {
1161 Some(&mut nullifiers)
1162 } else {
1163 None
1164 },
1165 if map_outpoints {
1166 Some(&mut outpoints)
1167 } else {
1168 None
1169 },
1170 wallet_transactions,
1171 sapling_located_trees,
1172 orchard_located_trees,
1173 )
1174 .await?;
1175 spend::update_transparent_spends(
1176 wallet,
1177 if map_outpoints {
1178 None
1179 } else {
1180 Some(&mut outpoints)
1181 },
1182 )
1183 .map_err(SyncError::WalletError)?;
1184 spend::update_shielded_spends(
1185 consensus_parameters,
1186 wallet,
1187 fetch_request_sender,
1188 ufvks,
1189 &scanned_blocks,
1190 if map_nullifiers {
1191 None
1192 } else {
1193 Some(&mut nullifiers)
1194 },
1195 )
1196 .await?;
1197 add_scanned_blocks(wallet, scanned_blocks, &scan_range)
1198 .map_err(SyncError::WalletError)?;
1199
1200 state::set_scanned_scan_range(
1201 wallet
1202 .get_sync_state_mut()
1203 .map_err(SyncError::WalletError)?,
1204 scan_range.block_range().clone(),
1205 map_nullifiers,
1206 );
1207 state::merge_scan_ranges(
1208 wallet
1209 .get_sync_state_mut()
1210 .map_err(SyncError::WalletError)?,
1211 ScanPriority::ScannedWithoutMapping,
1212 );
1213 }
1214
1215 state::merge_scan_ranges(
1216 wallet
1217 .get_sync_state_mut()
1218 .map_err(SyncError::WalletError)?,
1219 ScanPriority::Scanned,
1220 );
1221 remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
1222 tracing::debug!("Scan results processed.");
1223 }
1224 Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
1225 tracing::warn!("Hash discontinuity detected before block {height}.");
1226 if height == scan_range.block_range().start
1227 && scan_range.priority() == ScanPriority::Verify
1228 {
1229 tracing::info!("Re-org detected.");
1230 let sync_state = wallet
1231 .get_sync_state_mut()
1232 .map_err(SyncError::WalletError)?;
1233 let last_known_chain_height = sync_state
1234 .last_known_chain_height()
1235 .expect("scan ranges should be non-empty in this scope");
1236
1237 state::set_scan_priority(
1239 sync_state,
1240 scan_range.block_range(),
1241 ScanPriority::Verify,
1242 );
1243
1244 let current_reorg_detection_start_height = state::set_verify_scan_range(
1246 sync_state,
1247 height - 1,
1248 state::VerifyEnd::VerifyHighest,
1249 )
1250 .block_range()
1251 .start;
1252 state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1253
1254 if initial_reorg_detection_start_height - current_reorg_detection_start_height
1255 > MAX_REORG_ALLOWANCE
1256 {
1257 clear_wallet_data(wallet)?;
1258
1259 return Err(ServerError::ChainVerificationError.into());
1260 }
1261
1262 truncate_wallet_data(wallet, current_reorg_detection_start_height - 1)?;
1263
1264 state::set_initial_state(
1265 consensus_parameters,
1266 fetch_request_sender.clone(),
1267 wallet,
1268 last_known_chain_height,
1269 )
1270 .await?;
1271 } else {
1272 scan_results?;
1273 }
1274 }
1275 Err(e) => return Err(e.into()),
1276 }
1277
1278 Ok(())
1279}
1280
1281async fn process_mempool_transaction<W>(
1285 consensus_parameters: &impl consensus::Parameters,
1286 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1287 wallet: &mut W,
1288 raw_transaction: RawTransaction,
1289) -> Result<(), SyncError<W::Error>>
1290where
1291 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1292{
1293 let mempool_height = wallet
1295 .get_sync_state()
1296 .map_err(SyncError::WalletError)?
1297 .last_known_chain_height()
1298 .expect("wallet height must exist after sync is initialised")
1299 + 1;
1300
1301 let transaction = zcash_primitives::transaction::Transaction::read(
1302 &raw_transaction.data[..],
1303 consensus::BranchId::for_height(consensus_parameters, mempool_height),
1304 )
1305 .map_err(ServerError::InvalidTransaction)?;
1306
1307 tracing::debug!(
1308 "mempool received txid {} at height {}",
1309 transaction.txid(),
1310 mempool_height
1311 );
1312
1313 if let Some(tx) = wallet
1314 .get_wallet_transactions_mut()
1315 .map_err(SyncError::WalletError)?
1316 .get_mut(&transaction.txid())
1317 {
1318 tx.update_status(
1319 ConfirmationStatus::Mempool(mempool_height),
1320 SystemTime::now()
1321 .duration_since(SystemTime::UNIX_EPOCH)
1322 .expect("infalliable for such long time periods")
1323 .as_secs() as u32,
1324 );
1325
1326 return Ok(());
1327 }
1328
1329 scan_pending_transaction(
1330 consensus_parameters,
1331 ufvks,
1332 wallet,
1333 transaction,
1334 ConfirmationStatus::Mempool(mempool_height),
1335 SystemTime::now()
1336 .duration_since(SystemTime::UNIX_EPOCH)
1337 .expect("infalliable for such long time periods")
1338 .as_secs() as u32,
1339 )?;
1340
1341 Ok(())
1342}
1343
1344fn truncate_wallet_data<W>(
1346 wallet: &mut W,
1347 truncate_height: BlockHeight,
1348) -> Result<(), SyncError<W::Error>>
1349where
1350 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1351{
1352 let sync_state = wallet
1353 .get_sync_state_mut()
1354 .map_err(SyncError::WalletError)?;
1355 let highest_scanned_height = sync_state
1356 .highest_scanned_height()
1357 .expect("should be non-empty in this scope");
1358 let wallet_birthday = sync_state
1359 .wallet_birthday()
1360 .expect("should be non-empty in this scope");
1361 let checked_truncate_height = match truncate_height.cmp(&wallet_birthday) {
1362 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1363 std::cmp::Ordering::Less => consensus::H0,
1364 };
1365 truncate_scan_ranges(checked_truncate_height, sync_state);
1366
1367 if checked_truncate_height > highest_scanned_height {
1368 return Ok(());
1369 }
1370
1371 wallet
1372 .truncate_wallet_blocks(checked_truncate_height)
1373 .map_err(SyncError::WalletError)?;
1374 wallet
1375 .truncate_wallet_transactions(checked_truncate_height)
1376 .map_err(SyncError::WalletError)?;
1377 wallet
1378 .truncate_nullifiers(checked_truncate_height)
1379 .map_err(SyncError::WalletError)?;
1380 wallet
1381 .truncate_outpoints(checked_truncate_height)
1382 .map_err(SyncError::WalletError)?;
1383 match wallet.truncate_shard_trees(checked_truncate_height) {
1384 Ok(_) => Ok(()),
1385 Err(SyncError::TruncationError(height, pooltype)) => {
1386 clear_wallet_data(wallet)?;
1387
1388 Err(SyncError::TruncationError(height, pooltype))
1389 }
1390 Err(e) => Err(e),
1391 }?;
1392
1393 Ok(())
1394}
1395
1396fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1397where
1398 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1399{
1400 let scan_targets = wallet
1401 .get_wallet_transactions()
1402 .map_err(SyncError::WalletError)?
1403 .values()
1404 .filter_map(|transaction| {
1405 transaction
1406 .status()
1407 .get_confirmed_height()
1408 .map(|height| ScanTarget {
1409 block_height: height,
1410 txid: transaction.txid(),
1411 narrow_scan_area: true,
1412 })
1413 })
1414 .collect::<Vec<_>>();
1415 truncate_wallet_data(wallet, consensus::H0)?;
1416 wallet
1417 .get_wallet_transactions_mut()
1418 .map_err(SyncError::WalletError)?
1419 .clear();
1420 let sync_state = wallet
1421 .get_sync_state_mut()
1422 .map_err(SyncError::WalletError)?;
1423 add_scan_targets(sync_state, &scan_targets);
1424 wallet.set_save_flag().map_err(SyncError::WalletError)?;
1425
1426 Ok(())
1427}
1428
1429#[allow(clippy::too_many_arguments)]
1431async fn update_wallet_data<W>(
1432 consensus_parameters: &impl consensus::Parameters,
1433 wallet: &mut W,
1434 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1435 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1436 scan_range: &ScanRange,
1437 nullifiers: Option<&mut NullifierMap>,
1438 outpoints: Option<&mut BTreeMap<OutputId, ScanTarget>>,
1439 mut transactions: HashMap<TxId, WalletTransaction>,
1440 sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1441 orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1442) -> Result<(), SyncError<W::Error>>
1443where
1444 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1445{
1446 let sync_state = wallet
1447 .get_sync_state_mut()
1448 .map_err(SyncError::WalletError)?;
1449 let highest_scanned_height = sync_state
1450 .highest_scanned_height()
1451 .expect("scan ranges should not be empty in this scope");
1452 for transaction in transactions.values() {
1453 state::update_found_note_shard_priority(
1454 consensus_parameters,
1455 sync_state,
1456 ShieldedProtocol::Sapling,
1457 transaction,
1458 );
1459 state::update_found_note_shard_priority(
1460 consensus_parameters,
1461 sync_state,
1462 ShieldedProtocol::Orchard,
1463 transaction,
1464 );
1465 }
1466 let refetch_nullifier_ranges = {
1474 let block_ranges: Vec<Range<BlockHeight>> = sync_state
1475 .scan_ranges()
1476 .iter()
1477 .filter(|&scan_range| {
1478 scan_range.priority() == ScanPriority::ScannedWithoutMapping
1479 || scan_range.priority() == ScanPriority::RefetchingNullifiers
1480 })
1481 .map(|scan_range| scan_range.block_range().clone())
1482 .collect();
1483
1484 block_ranges
1485 [block_ranges.partition_point(|range| range.start < scan_range.block_range().end)..]
1486 .to_vec()
1487 };
1488 for transaction in transactions.values_mut() {
1489 for note in transaction.sapling_notes.as_mut_slice() {
1490 note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1491 }
1492 for note in transaction.orchard_notes.as_mut_slice() {
1493 note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1494 }
1495 }
1496 for transaction in transactions.values() {
1497 discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1498 }
1499
1500 wallet
1501 .extend_wallet_transactions(transactions)
1502 .map_err(SyncError::WalletError)?;
1503 if let Some(nullifiers) = nullifiers {
1504 wallet
1505 .append_nullifiers(nullifiers)
1506 .map_err(SyncError::WalletError)?;
1507 }
1508 if let Some(outpoints) = outpoints {
1509 wallet
1510 .append_outpoints(outpoints)
1511 .map_err(SyncError::WalletError)?;
1512 }
1513 wallet
1514 .update_shard_trees(
1515 fetch_request_sender,
1516 scan_range,
1517 highest_scanned_height,
1518 sapling_located_trees,
1519 orchard_located_trees,
1520 )
1521 .await?;
1522
1523 Ok(())
1524}
1525
1526fn discover_unified_addresses<W>(
1527 wallet: &mut W,
1528 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1529 transaction: &WalletTransaction,
1530) -> Result<(), W::Error>
1531where
1532 W: SyncWallet,
1533{
1534 for note in transaction
1535 .orchard_notes()
1536 .iter()
1537 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1538 {
1539 let ivk = ufvks
1540 .get(¬e.key_id().account_id())
1541 .expect("ufvk must exist to decrypt this note")
1542 .orchard()
1543 .expect("fvk must exist to decrypt this note")
1544 .to_ivk(zip32::Scope::External);
1545
1546 wallet.add_orchard_address(
1547 note.key_id().account_id(),
1548 note.note().recipient(),
1549 ivk.diversifier_index(¬e.note().recipient())
1550 .expect("must be key used to create this address"),
1551 )?;
1552 }
1553 for note in transaction
1554 .sapling_notes()
1555 .iter()
1556 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1557 {
1558 let ivk = ufvks
1559 .get(¬e.key_id().account_id())
1560 .expect("ufvk must exist to decrypt this note")
1561 .sapling()
1562 .expect("fvk must exist to decrypt this note")
1563 .to_external_ivk();
1564
1565 wallet.add_sapling_address(
1566 note.key_id().account_id(),
1567 note.note().recipient(),
1568 ivk.decrypt_diversifier(¬e.note().recipient())
1569 .expect("must be key used to create this address"),
1570 )?;
1571 }
1572
1573 Ok(())
1574}
1575
1576fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1577where
1578 W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1579{
1580 let fully_scanned_height = wallet
1581 .get_sync_state()?
1582 .fully_scanned_height()
1583 .expect("scan ranges must be non-empty");
1584
1585 wallet
1586 .get_outpoints_mut()?
1587 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1588 wallet
1589 .get_nullifiers_mut()?
1590 .sapling
1591 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1592 wallet
1593 .get_nullifiers_mut()?
1594 .orchard
1595 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1596 wallet
1597 .get_sync_state_mut()?
1598 .scan_targets
1599 .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1600 remove_irrelevant_blocks(wallet)?;
1601
1602 Ok(())
1603}
1604
1605fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1606where
1607 W: SyncWallet + SyncBlocks + SyncTransactions,
1608{
1609 let sync_state = wallet.get_sync_state()?;
1610 let highest_scanned_height = sync_state
1611 .highest_scanned_height()
1612 .expect("should be non-empty");
1613 let scanned_range_bounds = sync_state
1614 .scan_ranges()
1615 .iter()
1616 .filter(|scan_range| {
1617 scan_range.priority() == ScanPriority::Scanned
1618 || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1619 || scan_range.priority() == ScanPriority::RefetchingNullifiers
1620 })
1621 .flat_map(|scanned_range| {
1622 vec![
1623 scanned_range.block_range().start,
1624 scanned_range.block_range().end - 1,
1625 ]
1626 })
1627 .collect::<Vec<_>>();
1628 let wallet_transaction_heights = wallet
1629 .get_wallet_transactions()?
1630 .values()
1631 .filter_map(|tx| tx.status().get_confirmed_height())
1632 .collect::<Vec<_>>();
1633
1634 wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1635 *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1636 || scanned_range_bounds.contains(height)
1637 || wallet_transaction_heights.contains(height)
1638 });
1639
1640 Ok(())
1641}
1642
1643fn add_scanned_blocks<W>(
1644 wallet: &mut W,
1645 mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1646 scan_range: &ScanRange,
1647) -> Result<(), W::Error>
1648where
1649 W: SyncWallet + SyncBlocks + SyncTransactions,
1650{
1651 let sync_state = wallet.get_sync_state()?;
1652 let highest_scanned_height = sync_state
1653 .highest_scanned_height()
1654 .expect("scan ranges must be non-empty");
1655
1656 let wallet_transaction_heights = wallet
1657 .get_wallet_transactions()?
1658 .values()
1659 .filter_map(|tx| tx.status().get_confirmed_height())
1660 .collect::<Vec<_>>();
1661
1662 scanned_blocks.retain(|height, _| {
1663 *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1664 || *height == scan_range.block_range().start
1665 || *height == scan_range.block_range().end - 1
1666 || wallet_transaction_heights.contains(height)
1667 });
1668
1669 wallet.append_wallet_blocks(scanned_blocks)?;
1670
1671 Ok(())
1672}
1673
1674#[cfg(not(feature = "darkside_test"))]
1675async fn update_subtree_roots<W>(
1676 consensus_parameters: &impl consensus::Parameters,
1677 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1678 wallet: &mut W,
1679) -> Result<(), SyncError<W::Error>>
1680where
1681 W: SyncWallet + SyncShardTrees,
1682{
1683 let sapling_start_index = wallet
1684 .get_shard_trees()
1685 .map_err(SyncError::WalletError)?
1686 .sapling
1687 .store()
1688 .get_shard_roots()
1689 .expect("infallible")
1690 .len() as u32;
1691 let orchard_start_index = wallet
1692 .get_shard_trees()
1693 .map_err(SyncError::WalletError)?
1694 .orchard
1695 .store()
1696 .get_shard_roots()
1697 .expect("infallible")
1698 .len() as u32;
1699 let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1700 client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1701 client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1702 );
1703
1704 let sapling_subtree_roots = sapling_subtree_roots?;
1705 let orchard_subtree_roots = orchard_subtree_roots?;
1706
1707 let sync_state = wallet
1708 .get_sync_state_mut()
1709 .map_err(SyncError::WalletError)?;
1710 state::add_shard_ranges(
1711 consensus_parameters,
1712 ShieldedProtocol::Sapling,
1713 sync_state,
1714 &sapling_subtree_roots,
1715 );
1716 state::add_shard_ranges(
1717 consensus_parameters,
1718 ShieldedProtocol::Orchard,
1719 sync_state,
1720 &orchard_subtree_roots,
1721 );
1722
1723 let shard_trees = wallet
1724 .get_shard_trees_mut()
1725 .map_err(SyncError::WalletError)?;
1726 witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1727 witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1728
1729 Ok(())
1730}
1731
1732async fn add_initial_frontier<W>(
1733 consensus_parameters: &impl consensus::Parameters,
1734 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1735 wallet: &mut W,
1736) -> Result<(), SyncError<W::Error>>
1737where
1738 W: SyncWallet + SyncShardTrees,
1739{
1740 let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
1741 if birthday
1742 == consensus_parameters
1743 .activation_height(consensus::NetworkUpgrade::Sapling)
1744 .expect("sapling activation height should always return Some")
1745 {
1746 return Ok(());
1747 }
1748
1749 let shard_trees = wallet
1752 .get_shard_trees_mut()
1753 .map_err(SyncError::WalletError)?;
1754 if shard_trees
1755 .sapling
1756 .store()
1757 .checkpoint_count()
1758 .expect("infallible")
1759 == 1
1760 {
1761 let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1762 shard_trees
1763 .sapling
1764 .insert_frontier(
1765 frontiers.final_sapling_tree().clone(),
1766 Retention::Checkpoint {
1767 id: birthday,
1768 marking: Marking::None,
1769 },
1770 )
1771 .expect("infallible");
1772 shard_trees
1773 .orchard
1774 .insert_frontier(
1775 frontiers.final_orchard_tree().clone(),
1776 Retention::Checkpoint {
1777 id: birthday,
1778 marking: Marking::None,
1779 },
1780 )
1781 .expect("infallible");
1782 }
1783
1784 Ok(())
1785}
1786
1787async fn mempool_monitor(
1792 mut client: CompactTxStreamerClient<Channel>,
1793 mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1794 unprocessed_transactions_count: Arc<AtomicU8>,
1795 shutdown_mempool: Arc<AtomicBool>,
1796) -> Result<(), MempoolError> {
1797 let mut interval = tokio::time::interval(Duration::from_secs(1));
1798 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1799 'main: loop {
1800 let response =
1801 client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1802
1803 match response {
1804 Ok(mut mempool_stream) => {
1805 interval.reset();
1806 loop {
1807 tokio::select! {
1808 mempool_stream_message = mempool_stream.message() => {
1809 match mempool_stream_message.unwrap_or(None) {
1810 Some(raw_transaction) => {
1811 let _ignore_error = mempool_transaction_sender
1812 .send(raw_transaction)
1813 .await;
1814 unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1815 }
1816 None => {
1817 continue 'main;
1818 }
1819 }
1820
1821 }
1822
1823 _ = interval.tick() => {
1824 if shutdown_mempool.load(atomic::Ordering::Acquire) {
1825 break 'main;
1826 }
1827 }
1828 }
1829 }
1830 }
1831 Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1832 Err(MempoolError::ServerError(e)) => {
1833 tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1834 tokio::time::sleep(Duration::from_secs(3)).await;
1835 }
1836 }
1837 }
1838
1839 Ok(())
1840}
1841
1842fn expire_transactions<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1847where
1848 W: SyncWallet + SyncTransactions,
1849{
1850 let last_known_chain_height = wallet
1851 .get_sync_state()
1852 .map_err(SyncError::WalletError)?
1853 .last_known_chain_height()
1854 .expect("wallet height must exist after scan ranges have been updated");
1855 let wallet_transactions = wallet
1856 .get_wallet_transactions_mut()
1857 .map_err(SyncError::WalletError)?;
1858
1859 let expired_txids = wallet_transactions
1860 .values()
1861 .filter(|transaction| {
1862 transaction.status().is_pending()
1863 && last_known_chain_height >= transaction.transaction().expiry_height()
1864 })
1865 .map(super::wallet::WalletTransaction::txid)
1866 .collect::<Vec<_>>();
1867 set_transactions_failed(wallet_transactions, expired_txids);
1868
1869 let stuck_funds_txids = wallet_transactions
1870 .values()
1871 .filter(|transaction| {
1872 transaction.status().is_pending()
1873 && last_known_chain_height
1874 >= transaction.status().get_height() + UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1875 })
1876 .map(super::wallet::WalletTransaction::txid)
1877 .collect::<Vec<_>>();
1878 reset_spends(wallet_transactions, stuck_funds_txids);
1879
1880 Ok(())
1881}
1882
1883fn max_nullifier_map_size(performance_level: PerformanceLevel) -> Option<usize> {
1884 match performance_level {
1885 PerformanceLevel::Low => Some(0),
1886 PerformanceLevel::Medium => Some(125_000),
1887 PerformanceLevel::High => Some(2_000_000),
1888 PerformanceLevel::Maximum => None,
1889 }
1890}
1891
1892#[cfg(test)]
1893mod test {
1894
1895 mod checked_height_validation {
1896 use zcash_protocol::consensus::BlockHeight;
1897 use zcash_protocol::local_consensus::LocalNetwork;
1898 const LOCAL_NETWORK: LocalNetwork = LocalNetwork {
1899 overwinter: Some(BlockHeight::from_u32(1)),
1900 sapling: Some(BlockHeight::from_u32(3)),
1901 blossom: Some(BlockHeight::from_u32(3)),
1902 heartwood: Some(BlockHeight::from_u32(3)),
1903 canopy: Some(BlockHeight::from_u32(3)),
1904 nu5: Some(BlockHeight::from_u32(3)),
1905 nu6: Some(BlockHeight::from_u32(3)),
1906 nu6_1: Some(BlockHeight::from_u32(3)),
1907 };
1908 use crate::{error::SyncError, mocks::MockWalletError, sync::checked_wallet_height};
1909 #[tokio::test]
1912 async fn get_sync_state_error() {
1913 let builder = crate::mocks::MockWalletBuilder::new();
1914 let test_error = "get_sync_state_error";
1915 let mut test_wallet = builder
1916 .get_sync_state_patch(Box::new(|_| {
1917 Err(MockWalletError::AnErrorVariant(test_error.to_string()))
1918 }))
1919 .create_mock_wallet();
1920 let res =
1921 checked_wallet_height(&mut test_wallet, BlockHeight::from_u32(1), &LOCAL_NETWORK);
1922 assert!(matches!(
1923 res,
1924 Err(SyncError::WalletError(
1925 crate::mocks::MockWalletError::AnErrorVariant(ref s)
1926 )) if s == test_error
1927 ));
1928 }
1929
1930 mod last_known_chain_height {
1931 use crate::{
1932 sync::{MAX_REORG_ALLOWANCE, ScanRange},
1933 wallet::SyncState,
1934 };
1935 const DEFAULT_START_HEIGHT: BlockHeight = BlockHeight::from_u32(1);
1936 const _DEFAULT_LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(102);
1937 const DEFAULT_CHAIN_HEIGHT: BlockHeight = BlockHeight::from_u32(110);
1938
1939 use super::*;
1940 #[tokio::test]
1941 async fn above_allowance() {
1942 const LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(211);
1943 let lkch = vec![ScanRange::from_parts(
1944 DEFAULT_START_HEIGHT..LAST_KNOWN_HEIGHT,
1945 crate::sync::ScanPriority::Scanned,
1946 )];
1947 let state = SyncState {
1948 scan_ranges: lkch,
1949 ..Default::default()
1950 };
1951 let builder = crate::mocks::MockWalletBuilder::new();
1952 let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1953 let res =
1954 checked_wallet_height(&mut test_wallet, DEFAULT_CHAIN_HEIGHT, &LOCAL_NETWORK);
1955 if let Err(e) = res {
1956 assert_eq!(
1957 e.to_string(),
1958 format!(
1959 "wallet height {} is more than {} blocks ahead of best chain height {}",
1960 LAST_KNOWN_HEIGHT - 1,
1961 MAX_REORG_ALLOWANCE,
1962 DEFAULT_CHAIN_HEIGHT
1963 )
1964 );
1965 } else {
1966 panic!()
1967 }
1968 }
1969 #[tokio::test]
1970 async fn above_chain_height_below_allowance() {
1971 let lkch = vec![ScanRange::from_parts(
1975 BlockHeight::from_u32(6)..BlockHeight::from_u32(10),
1976 crate::sync::ScanPriority::Scanned,
1977 )];
1978 let state = SyncState {
1979 scan_ranges: lkch,
1980 ..Default::default()
1981 };
1982 let builder = crate::mocks::MockWalletBuilder::new();
1983 let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1984 let chain_height = BlockHeight::from_u32(4);
1985 let res = checked_wallet_height(&mut test_wallet, chain_height, &LOCAL_NETWORK);
1990 assert_eq!(res.unwrap(), BlockHeight::from_u32(4));
1991 }
1992 #[ignore = "in progress"]
1993 #[tokio::test]
1994 async fn equal_or_below_chain_height_and_above_sapling() {
1995 let lkch = vec![ScanRange::from_parts(
1996 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
1997 crate::sync::ScanPriority::Scanned,
1998 )];
1999 let state = SyncState {
2000 scan_ranges: lkch,
2001 ..Default::default()
2002 };
2003 let builder = crate::mocks::MockWalletBuilder::new();
2004 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2005 }
2006 #[ignore = "in progress"]
2007 #[tokio::test]
2008 async fn equal_or_below_chain_height_and_below_sapling() {
2009 let lkch = vec![ScanRange::from_parts(
2012 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2013 crate::sync::ScanPriority::Scanned,
2014 )];
2015 let state = SyncState {
2016 scan_ranges: lkch,
2017 ..Default::default()
2018 };
2019 let builder = crate::mocks::MockWalletBuilder::new();
2020 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2021 }
2022 #[ignore = "in progress"]
2023 #[tokio::test]
2024 async fn below_sapling() {
2025 let lkch = vec![ScanRange::from_parts(
2026 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2027 crate::sync::ScanPriority::Scanned,
2028 )];
2029 let state = SyncState {
2030 scan_ranges: lkch,
2031 ..Default::default()
2032 };
2033 let builder = crate::mocks::MockWalletBuilder::new();
2034 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2035 }
2036 }
2037 mod no_last_known_chain_height {
2038 use super::*;
2039 #[tokio::test]
2041 async fn get_bday_error() {
2042 let test_error = "get_bday_error";
2043 let builder = crate::mocks::MockWalletBuilder::new();
2044 let mut test_wallet = builder
2045 .get_birthday_patch(Box::new(|_| {
2046 Err(crate::mocks::MockWalletError::AnErrorVariant(
2047 test_error.to_string(),
2048 ))
2049 }))
2050 .create_mock_wallet();
2051 let res = checked_wallet_height(
2052 &mut test_wallet,
2053 BlockHeight::from_u32(1),
2054 &LOCAL_NETWORK,
2055 );
2056 assert!(matches!(
2057 res,
2058 Err(SyncError::WalletError(
2059 crate::mocks::MockWalletError::AnErrorVariant(ref s)
2060 )) if s == test_error
2061 ));
2062 }
2063 #[ignore = "in progress"]
2064 #[tokio::test]
2065 async fn raw_bday_above_chain_height() {
2066 let builder = crate::mocks::MockWalletBuilder::new();
2067 let mut test_wallet = builder
2068 .birthday(BlockHeight::from_u32(15))
2069 .create_mock_wallet();
2070 let res = checked_wallet_height(
2071 &mut test_wallet,
2072 BlockHeight::from_u32(1),
2073 &LOCAL_NETWORK,
2074 );
2075 if let Err(e) = res {
2076 assert_eq!(
2077 e.to_string(),
2078 format!(
2079 "wallet height is more than {} blocks ahead of best chain height",
2080 15 - 1
2081 )
2082 );
2083 } else {
2084 panic!()
2085 }
2086 }
2087 mod sapling_height {
2088 use super::*;
2089 #[tokio::test]
2090 async fn raw_bday_above() {
2091 let builder = crate::mocks::MockWalletBuilder::new();
2092 let mut test_wallet = builder
2093 .birthday(BlockHeight::from_u32(4))
2094 .create_mock_wallet();
2095 let res = checked_wallet_height(
2096 &mut test_wallet,
2097 BlockHeight::from_u32(5),
2098 &LOCAL_NETWORK,
2099 );
2100 assert_eq!(res.unwrap(), BlockHeight::from_u32(4 - 1));
2101 }
2102 #[tokio::test]
2103 async fn raw_bday_equal() {
2104 let builder = crate::mocks::MockWalletBuilder::new();
2105 let mut test_wallet = builder
2106 .birthday(BlockHeight::from_u32(3))
2107 .create_mock_wallet();
2108 let res = checked_wallet_height(
2109 &mut test_wallet,
2110 BlockHeight::from_u32(5),
2111 &LOCAL_NETWORK,
2112 );
2113 assert_eq!(res.unwrap(), BlockHeight::from_u32(3 - 1));
2114 }
2115 #[tokio::test]
2116 async fn raw_bday_below() {
2117 let builder = crate::mocks::MockWalletBuilder::new();
2118 let mut test_wallet = builder
2119 .birthday(BlockHeight::from_u32(1))
2120 .create_mock_wallet();
2121 let res = checked_wallet_height(
2122 &mut test_wallet,
2123 BlockHeight::from_u32(5),
2124 &LOCAL_NETWORK,
2125 );
2126 assert!(matches!(res, Err(SyncError::BirthdayBelowSapling(1, 3))));
2127 }
2128 }
2129 }
2130 }
2131}