1use std::collections::{BTreeMap, HashMap};
4use std::ops::Range;
5use std::sync::Arc;
6use std::sync::atomic::{self, AtomicBool, AtomicU8};
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::{RwLock, mpsc};
10
11use incrementalmerkletree::{Marking, Retention};
12use orchard::tree::MerkleHashOrchard;
13use shardtree::store::ShardStore;
14use zcash_client_backend::proto::service::RawTransaction;
15use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
16use zcash_keys::keys::UnifiedFullViewingKey;
17use zcash_primitives::transaction::{Transaction, TxId};
18use zcash_protocol::ShieldedProtocol;
19use zcash_protocol::consensus::{self, BlockHeight};
20use zip32::AccountId;
21
22use zingo_status::confirmation_status::ConfirmationStatus;
23
24use crate::client::{self, FetchRequest};
25use crate::config::{PerformanceLevel, SyncConfig};
26use crate::error::{
27 ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
28 SyncStatusError,
29};
30use crate::keys::transparent::TransparentAddressId;
31use crate::scan::ScanResults;
32use crate::scan::task::{Scanner, ScannerState};
33use crate::scan::transactions::scan_transaction;
34use crate::sync::state::truncate_scan_ranges;
35use crate::wallet::traits::{
36 SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
37};
38use crate::wallet::{
39 KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
40 SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
52pub(crate) const MAX_REORG_ALLOWANCE: u32 = 100;
53const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
54
55#[derive(Debug, Clone)]
60#[allow(missing_docs)]
61pub struct SyncStatus {
62 pub scan_ranges: Vec<ScanRange>,
63 pub sync_start_height: BlockHeight,
64 pub session_blocks_scanned: u32,
65 pub total_blocks_scanned: u32,
66 pub percentage_session_blocks_scanned: f32,
67 pub percentage_total_blocks_scanned: f32,
68 pub session_sapling_outputs_scanned: u32,
69 pub total_sapling_outputs_scanned: u32,
70 pub session_orchard_outputs_scanned: u32,
71 pub total_orchard_outputs_scanned: u32,
72 pub percentage_session_outputs_scanned: f32,
73 pub percentage_total_outputs_scanned: f32,
74}
75
76impl std::fmt::Display for SyncStatus {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "percentage complete: {}",
82 self.percentage_total_outputs_scanned
83 )
84 }
85}
86
87impl From<SyncStatus> for json::JsonValue {
88 fn from(value: SyncStatus) -> Self {
89 let scan_ranges: Vec<json::JsonValue> = value
90 .scan_ranges
91 .iter()
92 .map(|range| {
93 json::object! {
94 "priority" => format!("{:?}", range.priority()),
95 "start_block" => range.block_range().start.to_string(),
96 "end_block" => (range.block_range().end - 1).to_string(),
97 }
98 })
99 .collect();
100
101 json::object! {
102 "scan_ranges" => scan_ranges,
103 "sync_start_height" => u32::from(value.sync_start_height),
104 "session_blocks_scanned" => value.session_blocks_scanned,
105 "total_blocks_scanned" => value.total_blocks_scanned,
106 "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
107 "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
108 "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
109 "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
110 "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
111 "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
112 "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
113 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120#[allow(missing_docs)]
121pub struct SyncResult {
122 pub sync_start_height: BlockHeight,
123 pub sync_end_height: BlockHeight,
124 pub blocks_scanned: u32,
125 pub sapling_outputs_scanned: u32,
126 pub orchard_outputs_scanned: u32,
127 pub percentage_total_outputs_scanned: f32,
128}
129
130impl std::fmt::Display for SyncResult {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 write!(
133 f,
134 "Sync completed succesfully:
135{{
136 sync start height: {}
137 sync end height: {}
138 blocks scanned: {}
139 sapling outputs scanned: {}
140 orchard outputs scanned: {}
141 percentage total outputs scanned: {}
142}}",
143 self.sync_start_height,
144 self.sync_end_height,
145 self.blocks_scanned,
146 self.sapling_outputs_scanned,
147 self.orchard_outputs_scanned,
148 self.percentage_total_outputs_scanned,
149 )
150 }
151}
152
153impl From<SyncResult> for json::JsonValue {
154 fn from(value: SyncResult) -> Self {
155 json::object! {
156 "sync_start_height" => u32::from(value.sync_start_height),
157 "sync_end_height" => u32::from(value.sync_end_height),
158 "blocks_scanned" => value.blocks_scanned,
159 "sapling_outputs_scanned" => value.sapling_outputs_scanned,
160 "orchard_outputs_scanned" => value.orchard_outputs_scanned,
161 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
168pub enum ScanPriority {
169 RefetchingNullifiers,
171 Scanning,
173 Scanned,
175 ScannedWithoutMapping,
180 Historic,
182 OpenAdjacent,
184 FoundNote,
186 ChainTip,
188 Verify,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct ScanRange {
196 block_range: Range<BlockHeight>,
197 priority: ScanPriority,
198}
199
200impl std::fmt::Display for ScanRange {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 write!(
203 f,
204 "{:?}({}..{})",
205 self.priority, self.block_range.start, self.block_range.end,
206 )
207 }
208}
209
210impl ScanRange {
211 #[must_use]
213 pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
214 assert!(
215 block_range.end >= block_range.start,
216 "{block_range:?} is invalid for ScanRange({priority:?})",
217 );
218 ScanRange {
219 block_range,
220 priority,
221 }
222 }
223
224 #[must_use]
226 pub fn block_range(&self) -> &Range<BlockHeight> {
227 &self.block_range
228 }
229
230 #[must_use]
232 pub fn priority(&self) -> ScanPriority {
233 self.priority
234 }
235
236 #[must_use]
238 pub fn is_empty(&self) -> bool {
239 self.block_range.is_empty()
240 }
241
242 #[must_use]
244 pub fn len(&self) -> usize {
245 usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
246 .expect("due to number of max blocks should always be valid usize")
247 }
248
249 #[must_use]
253 pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
254 if block_height >= self.block_range.end || self.is_empty() {
255 None
256 } else {
257 Some(ScanRange {
258 block_range: self.block_range.start.max(block_height)..self.block_range.end,
259 priority: self.priority,
260 })
261 }
262 }
263
264 #[must_use]
268 pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
269 if block_height <= self.block_range.start || self.is_empty() {
270 None
271 } else {
272 Some(ScanRange {
273 block_range: self.block_range.start..self.block_range.end.min(block_height),
274 priority: self.priority,
275 })
276 }
277 }
278
279 #[must_use]
283 pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
284 (p > self.block_range.start && p < self.block_range.end).then_some((
285 ScanRange {
286 block_range: self.block_range.start..p,
287 priority: self.priority,
288 },
289 ScanRange {
290 block_range: p..self.block_range.end,
291 priority: self.priority,
292 },
293 ))
294 }
295}
296
297pub async fn sync<P, W>(
308 client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
309 consensus_parameters: &P,
310 wallet: Arc<RwLock<W>>,
311 sync_mode: Arc<AtomicU8>,
312 config: SyncConfig,
313) -> Result<SyncResult, SyncError<W::Error>>
314where
315 P: consensus::Parameters + Sync + Send + 'static,
316 W: SyncWallet
317 + SyncBlocks
318 + SyncTransactions
319 + SyncNullifiers
320 + SyncOutPoints
321 + SyncShardTrees
322 + Send,
323{
324 let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
325 if sync_mode_enum == SyncMode::NotRunning {
326 sync_mode_enum = SyncMode::Running;
327 sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
328 } else {
329 return Err(SyncModeError::SyncAlreadyRunning.into());
330 }
331
332 tracing::info!("Starting sync...");
333
334 let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
336 let client_clone = client.clone();
337 let fetcher_handle =
338 tokio::spawn(
339 async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
340 );
341
342 let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
344 let shutdown_mempool = Arc::new(AtomicBool::new(false));
345 let shutdown_mempool_clone = shutdown_mempool.clone();
346 let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
347 let unprocessed_mempool_transactions_count_clone =
348 unprocessed_mempool_transactions_count.clone();
349 let mempool_handle = tokio::spawn(async move {
350 mempool_monitor(
351 client,
352 mempool_transaction_sender,
353 unprocessed_mempool_transactions_count_clone,
354 shutdown_mempool_clone,
355 )
356 .await
357 });
358
359 let mut wallet_guard = wallet.write().await;
361
362 let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
363 if chain_height == 0.into() {
364 return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
365 }
366 let last_known_chain_height =
367 checked_wallet_height(&mut *wallet_guard, chain_height, consensus_parameters)?;
368
369 let ufvks = wallet_guard
370 .get_unified_full_viewing_keys()
371 .map_err(SyncError::WalletError)?;
372
373 transparent::update_addresses_and_scan_targets(
374 consensus_parameters,
375 &mut *wallet_guard,
376 fetch_request_sender.clone(),
377 &ufvks,
378 last_known_chain_height,
379 chain_height,
380 config.transparent_address_discovery,
381 )
382 .await?;
383
384 #[cfg(not(feature = "darkside_test"))]
385 update_subtree_roots(
386 consensus_parameters,
387 fetch_request_sender.clone(),
388 &mut *wallet_guard,
389 )
390 .await?;
391
392 add_initial_frontier(
393 consensus_parameters,
394 fetch_request_sender.clone(),
395 &mut *wallet_guard,
396 )
397 .await?;
398
399 let initial_reorg_detection_start_height = state::update_scan_ranges(
400 consensus_parameters,
401 last_known_chain_height,
402 chain_height,
403 wallet_guard
404 .get_sync_state_mut()
405 .map_err(SyncError::WalletError)?,
406 );
407
408 state::set_initial_state(
409 consensus_parameters,
410 fetch_request_sender.clone(),
411 &mut *wallet_guard,
412 chain_height,
413 )
414 .await?;
415
416 expire_transactions(&mut *wallet_guard)?;
417
418 drop(wallet_guard);
419
420 let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
422 let mut scanner = Scanner::new(
423 consensus_parameters.clone(),
424 scan_results_sender,
425 fetch_request_sender.clone(),
426 ufvks.clone(),
427 );
428 scanner.launch(config.performance_level);
429
430 let mut nullifier_map_limit_exceeded = false;
433 let mut interval = tokio::time::interval(Duration::from_millis(50));
434 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
435 loop {
436 tokio::select! {
437 Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
438 let mut wallet_guard = wallet.write().await;
439 process_scan_results(
440 consensus_parameters,
441 &mut *wallet_guard,
442 fetch_request_sender.clone(),
443 &ufvks,
444 scan_range,
445 scan_results,
446 initial_reorg_detection_start_height,
447 config.performance_level,
448 &mut nullifier_map_limit_exceeded,
449 )
450 .await?;
451 wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
452 drop(wallet_guard);
453 }
454
455 Some(raw_transaction) = mempool_transaction_receiver.recv() => {
456 let mut wallet_guard = wallet.write().await;
457 process_mempool_transaction(
458 consensus_parameters,
459 &ufvks,
460 &mut *wallet_guard,
461 raw_transaction,
462 )
463 .await?;
464 unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
465 drop(wallet_guard);
466 }
467
468 _update_scanner = interval.tick() => {
469 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
470 match sync_mode_enum {
471 SyncMode::Paused => {
472 let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
473 pause_interval.tick().await;
474 while sync_mode_enum == SyncMode::Paused {
475 pause_interval.tick().await;
476 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
477 }
478 },
479 SyncMode::Shutdown => {
480 let mut wallet_guard = wallet.write().await;
481 let sync_status = match sync_status(&*wallet_guard).await {
482 Ok(status) => status,
483 Err(SyncStatusError::WalletError(e)) => {
484 return Err(SyncError::WalletError(e));
485 }
486 Err(SyncStatusError::NoSyncData) => {
487 panic!("sync data must exist!");
488 }
489 };
490 wallet_guard
491 .set_save_flag()
492 .map_err(SyncError::WalletError)?;
493 drop(wallet_guard);
494 tracing::info!("Sync successfully shutdown.");
495
496 return Ok(SyncResult {
497 sync_start_height: sync_status.sync_start_height,
498 sync_end_height: (sync_status
499 .scan_ranges
500 .last()
501 .expect("should be non-empty after syncing")
502 .block_range()
503 .end
504 - 1),
505 blocks_scanned: sync_status.session_blocks_scanned,
506 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
507 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
508 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
509 });
510 }
511 SyncMode::Running => (),
512 SyncMode::NotRunning => {
513 panic!("sync mode should not be manually set to NotRunning!");
514 },
515 }
516
517 scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), nullifier_map_limit_exceeded).await?;
518
519 if matches!(scanner.state, ScannerState::Shutdown) {
520 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
522 if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
523 {
524 tracing::info!("Sync successfully shutdown.");
525 break;
526 }
527 }
528 }
529 }
530 }
531
532 let mut wallet_guard = wallet.write().await;
533 let sync_status = match sync_status(&*wallet_guard).await {
534 Ok(status) => status,
535 Err(SyncStatusError::WalletError(e)) => {
536 return Err(SyncError::WalletError(e));
537 }
538 Err(SyncStatusError::NoSyncData) => {
539 panic!("sync data must exist!");
540 }
541 };
542 for transaction in wallet_guard
544 .get_wallet_transactions_mut()
545 .map_err(SyncError::WalletError)?
546 .values_mut()
547 {
548 for note in transaction.sapling_notes.as_mut_slice() {
549 note.refetch_nullifier_ranges = Vec::new();
550 }
551 for note in transaction.orchard_notes.as_mut_slice() {
552 note.refetch_nullifier_ranges = Vec::new();
553 }
554 }
555 wallet_guard
556 .set_save_flag()
557 .map_err(SyncError::WalletError)?;
558
559 drop(wallet_guard);
560 drop(scanner);
561 drop(fetch_request_sender);
562
563 match mempool_handle.await.expect("task panicked") {
564 Ok(()) => (),
565 Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
566 Err(e) => return Err(e.into()),
567 }
568 fetcher_handle.await.expect("task panicked");
569
570 Ok(SyncResult {
571 sync_start_height: sync_status.sync_start_height,
572 sync_end_height: (sync_status
573 .scan_ranges
574 .last()
575 .expect("should be non-empty after syncing")
576 .block_range()
577 .end
578 - 1),
579 blocks_scanned: sync_status.session_blocks_scanned,
580 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
581 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
582 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
583 })
584}
585
586fn checked_wallet_height<W, P>(
602 wallet: &mut W,
603 chain_height: BlockHeight,
604 consensus_parameters: &P,
605) -> Result<BlockHeight, SyncError<W::Error>>
606where
607 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
608 P: zcash_protocol::consensus::Parameters,
609{
610 let sync_state = wallet.get_sync_state().map_err(SyncError::WalletError)?;
611 if let Some(last_known_chain_height) = sync_state.last_known_chain_height() {
612 if last_known_chain_height > chain_height {
613 if last_known_chain_height - chain_height >= MAX_REORG_ALLOWANCE {
614 return Err(SyncError::ChainError(
618 u32::from(last_known_chain_height),
619 MAX_REORG_ALLOWANCE,
620 u32::from(chain_height),
621 ));
622 }
623 truncate_wallet_data(wallet, chain_height)?;
626 return Ok(chain_height);
627 }
628 Ok(last_known_chain_height)
630 } else {
631 let sapling_activation_height = consensus_parameters
633 .activation_height(consensus::NetworkUpgrade::Sapling)
634 .expect("sapling activation height should always return Some");
635 let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
636 if birthday > chain_height {
637 return Err(SyncError::ChainError(
640 u32::from(birthday),
641 MAX_REORG_ALLOWANCE,
642 u32::from(chain_height),
643 ));
644 } else if birthday < sapling_activation_height {
645 return Err(SyncError::BirthdayBelowSapling(
646 u32::from(birthday),
647 u32::from(sapling_activation_height),
648 ));
649 }
650
651 Ok(birthday - 1)
652 }
653}
654
655pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
661where
662 W: SyncWallet + SyncBlocks,
663{
664 let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
665 state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
666 let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
667
668 let sync_state = wallet
669 .get_sync_state()
670 .map_err(SyncStatusError::WalletError)?;
671 if sync_state.initial_sync_state.sync_start_height == 0.into() {
672 return Ok(SyncStatus {
673 scan_ranges: sync_state.scan_ranges.clone(),
674 sync_start_height: 0.into(),
675 session_blocks_scanned: 0,
676 total_blocks_scanned: 0,
677 percentage_session_blocks_scanned: 0.0,
678 percentage_total_blocks_scanned: 0.0,
679 session_sapling_outputs_scanned: 0,
680 session_orchard_outputs_scanned: 0,
681 total_sapling_outputs_scanned: 0,
682 total_orchard_outputs_scanned: 0,
683 percentage_session_outputs_scanned: 0.0,
684 percentage_total_outputs_scanned: 0.0,
685 });
686 }
687 let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
688
689 let birthday = sync_state
690 .wallet_birthday()
691 .ok_or(SyncStatusError::NoSyncData)?;
692 let last_known_chain_height = sync_state
693 .last_known_chain_height()
694 .ok_or(SyncStatusError::NoSyncData)?;
695 let total_blocks = last_known_chain_height - birthday + 1;
696 let total_sapling_outputs = sync_state
697 .initial_sync_state
698 .wallet_tree_bounds
699 .sapling_final_tree_size
700 - sync_state
701 .initial_sync_state
702 .wallet_tree_bounds
703 .sapling_initial_tree_size;
704 let total_orchard_outputs = sync_state
705 .initial_sync_state
706 .wallet_tree_bounds
707 .orchard_final_tree_size
708 - sync_state
709 .initial_sync_state
710 .wallet_tree_bounds
711 .orchard_initial_tree_size;
712 let total_outputs = total_sapling_outputs + total_orchard_outputs;
713
714 let session_blocks_scanned =
715 total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
716 let mut percentage_session_blocks_scanned = ((session_blocks_scanned as f32
717 / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
718 * 100.0)
719 .clamp(0.0, 100.0);
720 let mut percentage_total_blocks_scanned =
721 ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
722
723 let session_sapling_outputs_scanned = total_sapling_outputs_scanned
724 - sync_state
725 .initial_sync_state
726 .previously_scanned_sapling_outputs;
727 let session_orchard_outputs_scanned = total_orchard_outputs_scanned
728 - sync_state
729 .initial_sync_state
730 .previously_scanned_orchard_outputs;
731 let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
732 let previously_scanned_outputs = sync_state
733 .initial_sync_state
734 .previously_scanned_sapling_outputs
735 + sync_state
736 .initial_sync_state
737 .previously_scanned_orchard_outputs;
738 let mut percentage_session_outputs_scanned = ((session_outputs_scanned as f32
739 / (total_outputs - previously_scanned_outputs) as f32)
740 * 100.0)
741 .clamp(0.0, 100.0);
742 let mut percentage_total_outputs_scanned =
743 ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
744
745 if sync_state.scan_ranges().iter().any(|scan_range| {
746 scan_range.priority() == ScanPriority::ScannedWithoutMapping
747 || scan_range.priority() == ScanPriority::RefetchingNullifiers
748 }) {
749 if percentage_session_blocks_scanned == 100.0 {
750 percentage_session_blocks_scanned = 99.0;
751 }
752 if percentage_total_blocks_scanned == 100.0 {
753 percentage_total_blocks_scanned = 99.0;
754 }
755 if percentage_session_outputs_scanned == 100.0 {
756 percentage_session_outputs_scanned = 99.0;
757 }
758 if percentage_total_outputs_scanned == 100.0 {
759 percentage_total_outputs_scanned = 99.0;
760 }
761 }
762
763 Ok(SyncStatus {
764 scan_ranges: sync_state.scan_ranges.clone(),
765 sync_start_height: sync_state.initial_sync_state.sync_start_height,
766 session_blocks_scanned,
767 total_blocks_scanned,
768 percentage_session_blocks_scanned,
769 percentage_total_blocks_scanned,
770 session_sapling_outputs_scanned,
771 total_sapling_outputs_scanned,
772 session_orchard_outputs_scanned,
773 total_orchard_outputs_scanned,
774 percentage_session_outputs_scanned,
775 percentage_total_outputs_scanned,
776 })
777}
778
779pub fn scan_pending_transaction<W>(
786 consensus_parameters: &impl consensus::Parameters,
787 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
788 wallet: &mut W,
789 transaction: Transaction,
790 status: ConfirmationStatus,
791 datetime: u32,
792) -> Result<(), SyncError<W::Error>>
793where
794 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
795{
796 if matches!(status, ConfirmationStatus::Confirmed(_)) {
797 panic!("this fn is for unconfirmed transactions only");
798 }
799
800 let mut pending_transaction_nullifiers = NullifierMap::new();
801 let mut pending_transaction_outpoints = BTreeMap::new();
802 let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
803 .get_transparent_addresses()
804 .map_err(SyncError::WalletError)?
805 .iter()
806 .map(|(id, address)| (address.clone(), *id))
807 .collect();
808 let pending_transaction = scan_transaction(
809 consensus_parameters,
810 ufvks,
811 transaction.txid(),
812 transaction,
813 status,
814 None,
815 &mut pending_transaction_nullifiers,
816 &mut pending_transaction_outpoints,
817 &transparent_addresses,
818 datetime,
819 )?;
820
821 let wallet_transactions = wallet
822 .get_wallet_transactions()
823 .map_err(SyncError::WalletError)?;
824 let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
825 let transparent_spend_scan_targets = spend::detect_transparent_spends(
826 &mut pending_transaction_outpoints,
827 transparent_output_ids,
828 );
829 let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
830 spend::collect_derived_nullifiers(wallet_transactions);
831 let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
832 &mut pending_transaction_nullifiers,
833 sapling_derived_nullifiers,
834 orchard_derived_nullifiers,
835 );
836
837 if pending_transaction.transparent_coins().is_empty()
839 && pending_transaction.sapling_notes().is_empty()
840 && pending_transaction.orchard_notes().is_empty()
841 && pending_transaction.outgoing_orchard_notes().is_empty()
842 && pending_transaction.outgoing_sapling_notes().is_empty()
843 && transparent_spend_scan_targets.is_empty()
844 && sapling_spend_scan_targets.is_empty()
845 && orchard_spend_scan_targets.is_empty()
846 {
847 return Ok(());
848 }
849
850 wallet
851 .insert_wallet_transaction(pending_transaction)
852 .map_err(SyncError::WalletError)?;
853 spend::update_spent_coins(
854 wallet
855 .get_wallet_transactions_mut()
856 .map_err(SyncError::WalletError)?,
857 transparent_spend_scan_targets,
858 );
859 spend::update_spent_notes(
860 wallet,
861 sapling_spend_scan_targets,
862 orchard_spend_scan_targets,
863 false,
864 )
865 .map_err(SyncError::WalletError)?;
866
867 Ok(())
868}
869
870pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
881 for scan_target in scan_targets {
882 sync_state.scan_targets.insert(*scan_target);
883 }
884}
885
886pub fn reset_spends(
892 wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
893 invalid_txids: Vec<TxId>,
894) {
895 wallet_transactions
896 .values_mut()
897 .flat_map(|transaction| transaction.orchard_notes_mut())
898 .filter(|output| {
899 output
900 .spending_transaction
901 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
902 })
903 .for_each(|output| {
904 output.set_spending_transaction(None);
905 });
906 wallet_transactions
907 .values_mut()
908 .flat_map(|transaction| transaction.sapling_notes_mut())
909 .filter(|output| {
910 output
911 .spending_transaction
912 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
913 })
914 .for_each(|output| {
915 output.set_spending_transaction(None);
916 });
917 wallet_transactions
918 .values_mut()
919 .flat_map(|transaction| transaction.transparent_coins_mut())
920 .filter(|output| {
921 output
922 .spending_transaction
923 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
924 })
925 .for_each(|output| {
926 output.set_spending_transaction(None);
927 });
928}
929
930pub fn set_transactions_failed(
934 wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
935 failed_txids: Vec<TxId>,
936) {
937 for failed_txid in failed_txids.iter() {
938 if let Some(transaction) = wallet_transactions.get_mut(failed_txid) {
939 let height = transaction.status().get_height();
940 transaction.update_status(
941 ConfirmationStatus::Failed(height),
942 SystemTime::now()
943 .duration_since(SystemTime::UNIX_EPOCH)
944 .expect("infalliable for such long time periods")
945 .as_secs() as u32,
946 );
947 }
948 }
949 reset_spends(wallet_transactions, failed_txids);
950}
951
952fn is_shutdown<P>(
954 scanner: &Scanner<P>,
955 mempool_unprocessed_transactions_count: Arc<AtomicU8>,
956) -> bool
957where
958 P: consensus::Parameters + Sync + Send + 'static,
959{
960 scanner.worker_poolsize() == 0
961 && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
962}
963
964#[allow(clippy::too_many_arguments)]
966async fn process_scan_results<W>(
967 consensus_parameters: &impl consensus::Parameters,
968 wallet: &mut W,
969 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
970 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
971 scan_range: ScanRange,
972 scan_results: Result<ScanResults, ScanError>,
973 initial_reorg_detection_start_height: BlockHeight,
974 performance_level: PerformanceLevel,
975 nullifier_map_limit_exceeded: &mut bool,
976) -> Result<(), SyncError<W::Error>>
977where
978 W: SyncWallet
979 + SyncBlocks
980 + SyncTransactions
981 + SyncNullifiers
982 + SyncOutPoints
983 + SyncShardTrees
984 + Send,
985{
986 match scan_results {
987 Ok(results) => {
988 let ScanResults {
989 mut nullifiers,
990 mut outpoints,
991 scanned_blocks,
992 wallet_transactions,
993 sapling_located_trees,
994 orchard_located_trees,
995 } = results;
996
997 if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
998 let full_refetching_nullifiers_range = wallet
1001 .get_sync_state()
1002 .map_err(SyncError::WalletError)?
1003 .scan_ranges
1004 .iter()
1005 .find(|&wallet_scan_range| {
1006 wallet_scan_range
1007 .block_range()
1008 .contains(&scan_range.block_range().start)
1009 && wallet_scan_range
1010 .block_range()
1011 .contains(&(scan_range.block_range().end - 1))
1012 })
1013 .expect("wallet scan range containing scan range should exist!");
1014 if scan_range.block_range().start
1015 != full_refetching_nullifiers_range.block_range().start
1016 || scan_range.block_range().end
1017 != full_refetching_nullifiers_range.block_range().end
1018 {
1019 let mut missing_block_bounds = BTreeMap::new();
1020 for block_bound in [
1021 scan_range.block_range().start - 1,
1022 scan_range.block_range().start,
1023 scan_range.block_range().end - 1,
1024 scan_range.block_range().end,
1025 ] {
1026 if block_bound < full_refetching_nullifiers_range.block_range().start
1027 || block_bound >= full_refetching_nullifiers_range.block_range().end
1028 {
1029 continue;
1030 }
1031 if wallet.get_wallet_block(block_bound).is_err() {
1032 missing_block_bounds.insert(
1033 block_bound,
1034 WalletBlock::from_compact_block(
1035 consensus_parameters,
1036 fetch_request_sender.clone(),
1037 &client::get_compact_block(
1038 fetch_request_sender.clone(),
1039 block_bound,
1040 )
1041 .await?,
1042 )
1043 .await?,
1044 );
1045 }
1046 }
1047 if !missing_block_bounds.is_empty() {
1048 wallet
1049 .append_wallet_blocks(missing_block_bounds)
1050 .map_err(SyncError::WalletError)?;
1051 }
1052 }
1053
1054 let first_unscanned_range = wallet
1055 .get_sync_state()
1056 .map_err(SyncError::WalletError)?
1057 .scan_ranges
1058 .iter()
1059 .find(|scan_range| scan_range.priority() != ScanPriority::Scanned)
1060 .expect("the scan range being processed is not yet set to scanned so at least one unscanned range must exist");
1061 if !first_unscanned_range
1062 .block_range()
1063 .contains(&scan_range.block_range().start)
1064 || !first_unscanned_range
1065 .block_range()
1066 .contains(&(scan_range.block_range().end - 1))
1067 {
1068 state::reset_refetching_nullifiers_scan_range(
1072 wallet
1073 .get_sync_state_mut()
1074 .map_err(SyncError::WalletError)?,
1075 scan_range.block_range().clone(),
1076 );
1077 tracing::debug!(
1078 "Nullifiers discarded and will be re-fetched to avoid missing spends."
1079 );
1080
1081 return Ok(());
1082 }
1083
1084 spend::update_shielded_spends(
1085 consensus_parameters,
1086 wallet,
1087 fetch_request_sender.clone(),
1088 ufvks,
1089 &scanned_blocks,
1090 Some(&mut nullifiers),
1091 )
1092 .await?;
1093
1094 state::set_scanned_scan_range(
1095 wallet
1096 .get_sync_state_mut()
1097 .map_err(SyncError::WalletError)?,
1098 scan_range.block_range().clone(),
1099 true, );
1101 } else {
1102 if !*nullifier_map_limit_exceeded {
1104 let nullifier_map = wallet.get_nullifiers().map_err(SyncError::WalletError)?;
1105 if max_nullifier_map_size(performance_level).is_some_and(|max| {
1106 nullifier_map.orchard.len()
1107 + nullifier_map.sapling.len()
1108 + nullifiers.orchard.len()
1109 + nullifiers.sapling.len()
1110 > max
1111 }) {
1112 *nullifier_map_limit_exceeded = true;
1113 }
1114 }
1115 let mut map_nullifiers = !*nullifier_map_limit_exceeded;
1116
1117 let map_outpoints = scan_range.priority() >= ScanPriority::FoundNote;
1120
1121 for query_scan_range in wallet
1128 .get_sync_state()
1129 .map_err(SyncError::WalletError)?
1130 .scan_ranges()
1131 {
1132 let scan_priority = query_scan_range.priority();
1133 if scan_priority != ScanPriority::Scanned
1134 && scan_priority != ScanPriority::Scanning
1135 && scan_priority != ScanPriority::RefetchingNullifiers
1136 {
1137 break;
1138 }
1139
1140 if scan_priority == ScanPriority::Scanning
1141 && query_scan_range
1142 .block_range()
1143 .contains(&scan_range.block_range().start)
1144 && query_scan_range
1145 .block_range()
1146 .contains(&(scan_range.block_range().end - 1))
1147 {
1148 map_nullifiers = true;
1149 break;
1150 }
1151 }
1152
1153 update_wallet_data(
1154 consensus_parameters,
1155 wallet,
1156 fetch_request_sender.clone(),
1157 ufvks,
1158 &scan_range,
1159 if map_nullifiers {
1160 Some(&mut nullifiers)
1161 } else {
1162 None
1163 },
1164 if map_outpoints {
1165 Some(&mut outpoints)
1166 } else {
1167 None
1168 },
1169 wallet_transactions,
1170 sapling_located_trees,
1171 orchard_located_trees,
1172 )
1173 .await?;
1174 spend::update_transparent_spends(
1175 wallet,
1176 if map_outpoints {
1177 None
1178 } else {
1179 Some(&mut outpoints)
1180 },
1181 )
1182 .map_err(SyncError::WalletError)?;
1183 spend::update_shielded_spends(
1184 consensus_parameters,
1185 wallet,
1186 fetch_request_sender,
1187 ufvks,
1188 &scanned_blocks,
1189 if map_nullifiers {
1190 None
1191 } else {
1192 Some(&mut nullifiers)
1193 },
1194 )
1195 .await?;
1196 add_scanned_blocks(wallet, scanned_blocks, &scan_range)
1197 .map_err(SyncError::WalletError)?;
1198
1199 state::set_scanned_scan_range(
1200 wallet
1201 .get_sync_state_mut()
1202 .map_err(SyncError::WalletError)?,
1203 scan_range.block_range().clone(),
1204 map_nullifiers,
1205 );
1206 state::merge_scan_ranges(
1207 wallet
1208 .get_sync_state_mut()
1209 .map_err(SyncError::WalletError)?,
1210 ScanPriority::ScannedWithoutMapping,
1211 );
1212 }
1213
1214 state::merge_scan_ranges(
1215 wallet
1216 .get_sync_state_mut()
1217 .map_err(SyncError::WalletError)?,
1218 ScanPriority::Scanned,
1219 );
1220 remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
1221 tracing::debug!("Scan results processed.");
1222 }
1223 Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
1224 tracing::warn!("Hash discontinuity detected before block {height}.");
1225 if height == scan_range.block_range().start
1226 && scan_range.priority() == ScanPriority::Verify
1227 {
1228 tracing::info!("Re-org detected.");
1229 let sync_state = wallet
1230 .get_sync_state_mut()
1231 .map_err(SyncError::WalletError)?;
1232 let last_known_chain_height = sync_state
1233 .last_known_chain_height()
1234 .expect("scan ranges should be non-empty in this scope");
1235
1236 state::set_scan_priority(
1238 sync_state,
1239 scan_range.block_range(),
1240 ScanPriority::Verify,
1241 );
1242
1243 let current_reorg_detection_start_height = state::set_verify_scan_range(
1245 sync_state,
1246 height - 1,
1247 state::VerifyEnd::VerifyHighest,
1248 )
1249 .block_range()
1250 .start;
1251 state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1252
1253 if initial_reorg_detection_start_height - current_reorg_detection_start_height
1254 > MAX_REORG_ALLOWANCE
1255 {
1256 clear_wallet_data(wallet)?;
1257
1258 return Err(ServerError::ChainVerificationError.into());
1259 }
1260
1261 truncate_wallet_data(wallet, current_reorg_detection_start_height - 1)?;
1262
1263 state::set_initial_state(
1264 consensus_parameters,
1265 fetch_request_sender.clone(),
1266 wallet,
1267 last_known_chain_height,
1268 )
1269 .await?;
1270 } else {
1271 scan_results?;
1272 }
1273 }
1274 Err(e) => return Err(e.into()),
1275 }
1276
1277 Ok(())
1278}
1279
1280async fn process_mempool_transaction<W>(
1284 consensus_parameters: &impl consensus::Parameters,
1285 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1286 wallet: &mut W,
1287 raw_transaction: RawTransaction,
1288) -> Result<(), SyncError<W::Error>>
1289where
1290 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1291{
1292 let mempool_height = wallet
1294 .get_sync_state()
1295 .map_err(SyncError::WalletError)?
1296 .last_known_chain_height()
1297 .expect("wallet height must exist after sync is initialised")
1298 + 1;
1299
1300 let transaction = zcash_primitives::transaction::Transaction::read(
1301 &raw_transaction.data[..],
1302 consensus::BranchId::for_height(consensus_parameters, mempool_height),
1303 )
1304 .map_err(ServerError::InvalidTransaction)?;
1305
1306 tracing::debug!(
1307 "mempool received txid {} at height {}",
1308 transaction.txid(),
1309 mempool_height
1310 );
1311
1312 if let Some(tx) = wallet
1313 .get_wallet_transactions_mut()
1314 .map_err(SyncError::WalletError)?
1315 .get_mut(&transaction.txid())
1316 {
1317 tx.update_status(
1318 ConfirmationStatus::Mempool(mempool_height),
1319 SystemTime::now()
1320 .duration_since(SystemTime::UNIX_EPOCH)
1321 .expect("infalliable for such long time periods")
1322 .as_secs() as u32,
1323 );
1324
1325 return Ok(());
1326 }
1327
1328 scan_pending_transaction(
1329 consensus_parameters,
1330 ufvks,
1331 wallet,
1332 transaction,
1333 ConfirmationStatus::Mempool(mempool_height),
1334 SystemTime::now()
1335 .duration_since(SystemTime::UNIX_EPOCH)
1336 .expect("infalliable for such long time periods")
1337 .as_secs() as u32,
1338 )?;
1339
1340 Ok(())
1341}
1342
1343fn truncate_wallet_data<W>(
1345 wallet: &mut W,
1346 truncate_height: BlockHeight,
1347) -> Result<(), SyncError<W::Error>>
1348where
1349 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1350{
1351 let sync_state = wallet
1352 .get_sync_state_mut()
1353 .map_err(SyncError::WalletError)?;
1354 let highest_scanned_height = sync_state
1355 .highest_scanned_height()
1356 .expect("should be non-empty in this scope");
1357 let wallet_birthday = sync_state
1358 .wallet_birthday()
1359 .expect("should be non-empty in this scope");
1360 let checked_truncate_height = match truncate_height.cmp(&wallet_birthday) {
1361 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1362 std::cmp::Ordering::Less => consensus::H0,
1363 };
1364 truncate_scan_ranges(checked_truncate_height, sync_state);
1365
1366 if checked_truncate_height > highest_scanned_height {
1367 return Ok(());
1368 }
1369
1370 wallet
1371 .truncate_wallet_blocks(checked_truncate_height)
1372 .map_err(SyncError::WalletError)?;
1373 wallet
1374 .truncate_wallet_transactions(checked_truncate_height)
1375 .map_err(SyncError::WalletError)?;
1376 wallet
1377 .truncate_nullifiers(checked_truncate_height)
1378 .map_err(SyncError::WalletError)?;
1379 wallet
1380 .truncate_outpoints(checked_truncate_height)
1381 .map_err(SyncError::WalletError)?;
1382 match wallet.truncate_shard_trees(checked_truncate_height) {
1383 Ok(_) => Ok(()),
1384 Err(SyncError::TruncationError(height, pooltype)) => {
1385 clear_wallet_data(wallet)?;
1386
1387 Err(SyncError::TruncationError(height, pooltype))
1388 }
1389 Err(e) => Err(e),
1390 }?;
1391
1392 Ok(())
1393}
1394
1395fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1396where
1397 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1398{
1399 let scan_targets = wallet
1400 .get_wallet_transactions()
1401 .map_err(SyncError::WalletError)?
1402 .values()
1403 .filter_map(|transaction| {
1404 transaction
1405 .status()
1406 .get_confirmed_height()
1407 .map(|height| ScanTarget {
1408 block_height: height,
1409 txid: transaction.txid(),
1410 narrow_scan_area: true,
1411 })
1412 })
1413 .collect::<Vec<_>>();
1414 truncate_wallet_data(wallet, consensus::H0)?;
1415 wallet
1416 .get_wallet_transactions_mut()
1417 .map_err(SyncError::WalletError)?
1418 .clear();
1419 let sync_state = wallet
1420 .get_sync_state_mut()
1421 .map_err(SyncError::WalletError)?;
1422 add_scan_targets(sync_state, &scan_targets);
1423 wallet.set_save_flag().map_err(SyncError::WalletError)?;
1424
1425 Ok(())
1426}
1427
1428#[allow(clippy::too_many_arguments)]
1430async fn update_wallet_data<W>(
1431 consensus_parameters: &impl consensus::Parameters,
1432 wallet: &mut W,
1433 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1434 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1435 scan_range: &ScanRange,
1436 nullifiers: Option<&mut NullifierMap>,
1437 outpoints: Option<&mut BTreeMap<OutputId, ScanTarget>>,
1438 mut transactions: HashMap<TxId, WalletTransaction>,
1439 sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1440 orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1441) -> Result<(), SyncError<W::Error>>
1442where
1443 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1444{
1445 let sync_state = wallet
1446 .get_sync_state_mut()
1447 .map_err(SyncError::WalletError)?;
1448 let highest_scanned_height = sync_state
1449 .highest_scanned_height()
1450 .expect("scan ranges should not be empty in this scope");
1451 for transaction in transactions.values() {
1452 state::update_found_note_shard_priority(
1453 consensus_parameters,
1454 sync_state,
1455 ShieldedProtocol::Sapling,
1456 transaction,
1457 );
1458 state::update_found_note_shard_priority(
1459 consensus_parameters,
1460 sync_state,
1461 ShieldedProtocol::Orchard,
1462 transaction,
1463 );
1464 }
1465 let refetch_nullifier_ranges = {
1473 let block_ranges: Vec<Range<BlockHeight>> = sync_state
1474 .scan_ranges()
1475 .iter()
1476 .filter(|&scan_range| {
1477 scan_range.priority() == ScanPriority::ScannedWithoutMapping
1478 || scan_range.priority() == ScanPriority::RefetchingNullifiers
1479 })
1480 .map(|scan_range| scan_range.block_range().clone())
1481 .collect();
1482
1483 block_ranges
1484 [block_ranges.partition_point(|range| range.start < scan_range.block_range().end)..]
1485 .to_vec()
1486 };
1487 for transaction in transactions.values_mut() {
1488 for note in transaction.sapling_notes.as_mut_slice() {
1489 note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1490 }
1491 for note in transaction.orchard_notes.as_mut_slice() {
1492 note.refetch_nullifier_ranges = refetch_nullifier_ranges.clone();
1493 }
1494 }
1495 for transaction in transactions.values() {
1496 discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1497 }
1498
1499 wallet
1500 .extend_wallet_transactions(transactions)
1501 .map_err(SyncError::WalletError)?;
1502 if let Some(nullifiers) = nullifiers {
1503 wallet
1504 .append_nullifiers(nullifiers)
1505 .map_err(SyncError::WalletError)?;
1506 }
1507 if let Some(outpoints) = outpoints {
1508 wallet
1509 .append_outpoints(outpoints)
1510 .map_err(SyncError::WalletError)?;
1511 }
1512 wallet
1513 .update_shard_trees(
1514 fetch_request_sender,
1515 scan_range,
1516 highest_scanned_height,
1517 sapling_located_trees,
1518 orchard_located_trees,
1519 )
1520 .await?;
1521
1522 Ok(())
1523}
1524
1525fn discover_unified_addresses<W>(
1526 wallet: &mut W,
1527 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1528 transaction: &WalletTransaction,
1529) -> Result<(), W::Error>
1530where
1531 W: SyncWallet,
1532{
1533 for note in transaction
1534 .orchard_notes()
1535 .iter()
1536 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1537 {
1538 let ivk = ufvks
1539 .get(¬e.key_id().account_id())
1540 .expect("ufvk must exist to decrypt this note")
1541 .orchard()
1542 .expect("fvk must exist to decrypt this note")
1543 .to_ivk(zip32::Scope::External);
1544
1545 wallet.add_orchard_address(
1546 note.key_id().account_id(),
1547 note.note().recipient(),
1548 ivk.diversifier_index(¬e.note().recipient())
1549 .expect("must be key used to create this address"),
1550 )?;
1551 }
1552 for note in transaction
1553 .sapling_notes()
1554 .iter()
1555 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1556 {
1557 let ivk = ufvks
1558 .get(¬e.key_id().account_id())
1559 .expect("ufvk must exist to decrypt this note")
1560 .sapling()
1561 .expect("fvk must exist to decrypt this note")
1562 .to_external_ivk();
1563
1564 wallet.add_sapling_address(
1565 note.key_id().account_id(),
1566 note.note().recipient(),
1567 ivk.decrypt_diversifier(¬e.note().recipient())
1568 .expect("must be key used to create this address"),
1569 )?;
1570 }
1571
1572 Ok(())
1573}
1574
1575fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1576where
1577 W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1578{
1579 let fully_scanned_height = wallet
1580 .get_sync_state()?
1581 .fully_scanned_height()
1582 .expect("scan ranges must be non-empty");
1583
1584 wallet
1585 .get_outpoints_mut()?
1586 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1587 wallet
1588 .get_nullifiers_mut()?
1589 .sapling
1590 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1591 wallet
1592 .get_nullifiers_mut()?
1593 .orchard
1594 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1595 wallet
1596 .get_sync_state_mut()?
1597 .scan_targets
1598 .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1599 remove_irrelevant_blocks(wallet)?;
1600
1601 Ok(())
1602}
1603
1604fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1605where
1606 W: SyncWallet + SyncBlocks + SyncTransactions,
1607{
1608 let sync_state = wallet.get_sync_state()?;
1609 let highest_scanned_height = sync_state
1610 .highest_scanned_height()
1611 .expect("should be non-empty");
1612 let scanned_range_bounds = sync_state
1613 .scan_ranges()
1614 .iter()
1615 .filter(|scan_range| {
1616 scan_range.priority() == ScanPriority::Scanned
1617 || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1618 || scan_range.priority() == ScanPriority::RefetchingNullifiers
1619 })
1620 .flat_map(|scanned_range| {
1621 vec![
1622 scanned_range.block_range().start,
1623 scanned_range.block_range().end - 1,
1624 ]
1625 })
1626 .collect::<Vec<_>>();
1627 let wallet_transaction_heights = wallet
1628 .get_wallet_transactions()?
1629 .values()
1630 .filter_map(|tx| tx.status().get_confirmed_height())
1631 .collect::<Vec<_>>();
1632
1633 wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1634 *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1635 || scanned_range_bounds.contains(height)
1636 || wallet_transaction_heights.contains(height)
1637 });
1638
1639 Ok(())
1640}
1641
1642fn add_scanned_blocks<W>(
1643 wallet: &mut W,
1644 mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1645 scan_range: &ScanRange,
1646) -> Result<(), W::Error>
1647where
1648 W: SyncWallet + SyncBlocks + SyncTransactions,
1649{
1650 let sync_state = wallet.get_sync_state()?;
1651 let highest_scanned_height = sync_state
1652 .highest_scanned_height()
1653 .expect("scan ranges must be non-empty");
1654
1655 let wallet_transaction_heights = wallet
1656 .get_wallet_transactions()?
1657 .values()
1658 .filter_map(|tx| tx.status().get_confirmed_height())
1659 .collect::<Vec<_>>();
1660
1661 scanned_blocks.retain(|height, _| {
1662 *height >= highest_scanned_height.saturating_sub(MAX_REORG_ALLOWANCE)
1663 || *height == scan_range.block_range().start
1664 || *height == scan_range.block_range().end - 1
1665 || wallet_transaction_heights.contains(height)
1666 });
1667
1668 wallet.append_wallet_blocks(scanned_blocks)?;
1669
1670 Ok(())
1671}
1672
1673#[cfg(not(feature = "darkside_test"))]
1674async fn update_subtree_roots<W>(
1675 consensus_parameters: &impl consensus::Parameters,
1676 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1677 wallet: &mut W,
1678) -> Result<(), SyncError<W::Error>>
1679where
1680 W: SyncWallet + SyncShardTrees,
1681{
1682 let sapling_start_index = wallet
1683 .get_shard_trees()
1684 .map_err(SyncError::WalletError)?
1685 .sapling
1686 .store()
1687 .get_shard_roots()
1688 .expect("infallible")
1689 .len() as u32;
1690 let orchard_start_index = wallet
1691 .get_shard_trees()
1692 .map_err(SyncError::WalletError)?
1693 .orchard
1694 .store()
1695 .get_shard_roots()
1696 .expect("infallible")
1697 .len() as u32;
1698 let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1699 client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1700 client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1701 );
1702
1703 let sapling_subtree_roots = sapling_subtree_roots?;
1704 let orchard_subtree_roots = orchard_subtree_roots?;
1705
1706 let sync_state = wallet
1707 .get_sync_state_mut()
1708 .map_err(SyncError::WalletError)?;
1709 state::add_shard_ranges(
1710 consensus_parameters,
1711 ShieldedProtocol::Sapling,
1712 sync_state,
1713 &sapling_subtree_roots,
1714 );
1715 state::add_shard_ranges(
1716 consensus_parameters,
1717 ShieldedProtocol::Orchard,
1718 sync_state,
1719 &orchard_subtree_roots,
1720 );
1721
1722 let shard_trees = wallet
1723 .get_shard_trees_mut()
1724 .map_err(SyncError::WalletError)?;
1725 witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1726 witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1727
1728 Ok(())
1729}
1730
1731async fn add_initial_frontier<W>(
1732 consensus_parameters: &impl consensus::Parameters,
1733 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1734 wallet: &mut W,
1735) -> Result<(), SyncError<W::Error>>
1736where
1737 W: SyncWallet + SyncShardTrees,
1738{
1739 let birthday = wallet.get_birthday().map_err(SyncError::WalletError)?;
1740 if birthday
1741 == consensus_parameters
1742 .activation_height(consensus::NetworkUpgrade::Sapling)
1743 .expect("sapling activation height should always return Some")
1744 {
1745 return Ok(());
1746 }
1747
1748 let shard_trees = wallet
1751 .get_shard_trees_mut()
1752 .map_err(SyncError::WalletError)?;
1753 if shard_trees
1754 .sapling
1755 .store()
1756 .checkpoint_count()
1757 .expect("infallible")
1758 == 1
1759 {
1760 let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1761 shard_trees
1762 .sapling
1763 .insert_frontier(
1764 frontiers.final_sapling_tree().clone(),
1765 Retention::Checkpoint {
1766 id: birthday,
1767 marking: Marking::None,
1768 },
1769 )
1770 .expect("infallible");
1771 shard_trees
1772 .orchard
1773 .insert_frontier(
1774 frontiers.final_orchard_tree().clone(),
1775 Retention::Checkpoint {
1776 id: birthday,
1777 marking: Marking::None,
1778 },
1779 )
1780 .expect("infallible");
1781 }
1782
1783 Ok(())
1784}
1785
1786async fn mempool_monitor(
1791 mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1792 mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1793 unprocessed_transactions_count: Arc<AtomicU8>,
1794 shutdown_mempool: Arc<AtomicBool>,
1795) -> Result<(), MempoolError> {
1796 let mut interval = tokio::time::interval(Duration::from_secs(1));
1797 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1798 'main: loop {
1799 let response =
1800 client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1801
1802 match response {
1803 Ok(mut mempool_stream) => {
1804 interval.reset();
1805 loop {
1806 tokio::select! {
1807 mempool_stream_message = mempool_stream.message() => {
1808 match mempool_stream_message.unwrap_or(None) {
1809 Some(raw_transaction) => {
1810 let _ignore_error = mempool_transaction_sender
1811 .send(raw_transaction)
1812 .await;
1813 unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1814 }
1815 None => {
1816 continue 'main;
1817 }
1818 }
1819
1820 }
1821
1822 _ = interval.tick() => {
1823 if shutdown_mempool.load(atomic::Ordering::Acquire) {
1824 break 'main;
1825 }
1826 }
1827 }
1828 }
1829 }
1830 Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1831 Err(MempoolError::ServerError(e)) => {
1832 tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1833 tokio::time::sleep(Duration::from_secs(3)).await;
1834 }
1835 }
1836 }
1837
1838 Ok(())
1839}
1840
1841fn expire_transactions<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1846where
1847 W: SyncWallet + SyncTransactions,
1848{
1849 let last_known_chain_height = wallet
1850 .get_sync_state()
1851 .map_err(SyncError::WalletError)?
1852 .last_known_chain_height()
1853 .expect("wallet height must exist after scan ranges have been updated");
1854 let wallet_transactions = wallet
1855 .get_wallet_transactions_mut()
1856 .map_err(SyncError::WalletError)?;
1857
1858 let expired_txids = wallet_transactions
1859 .values()
1860 .filter(|transaction| {
1861 transaction.status().is_pending()
1862 && last_known_chain_height >= transaction.transaction().expiry_height()
1863 })
1864 .map(super::wallet::WalletTransaction::txid)
1865 .collect::<Vec<_>>();
1866 set_transactions_failed(wallet_transactions, expired_txids);
1867
1868 let stuck_funds_txids = wallet_transactions
1869 .values()
1870 .filter(|transaction| {
1871 transaction.status().is_pending()
1872 && last_known_chain_height
1873 >= transaction.status().get_height() + UNCONFIRMED_SPEND_INVALIDATION_THRESHOLD
1874 })
1875 .map(super::wallet::WalletTransaction::txid)
1876 .collect::<Vec<_>>();
1877 reset_spends(wallet_transactions, stuck_funds_txids);
1878
1879 Ok(())
1880}
1881
1882fn max_nullifier_map_size(performance_level: PerformanceLevel) -> Option<usize> {
1883 match performance_level {
1884 PerformanceLevel::Low => Some(0),
1885 PerformanceLevel::Medium => Some(125_000),
1886 PerformanceLevel::High => Some(2_000_000),
1887 PerformanceLevel::Maximum => None,
1888 }
1889}
1890
1891#[cfg(test)]
1892mod test {
1893
1894 mod checked_height_validation {
1895 use zcash_protocol::consensus::BlockHeight;
1896 use zcash_protocol::local_consensus::LocalNetwork;
1897 const LOCAL_NETWORK: LocalNetwork = LocalNetwork {
1898 overwinter: Some(BlockHeight::from_u32(1)),
1899 sapling: Some(BlockHeight::from_u32(3)),
1900 blossom: Some(BlockHeight::from_u32(3)),
1901 heartwood: Some(BlockHeight::from_u32(3)),
1902 canopy: Some(BlockHeight::from_u32(3)),
1903 nu5: Some(BlockHeight::from_u32(3)),
1904 nu6: Some(BlockHeight::from_u32(3)),
1905 nu6_1: Some(BlockHeight::from_u32(3)),
1906 };
1907 use crate::{error::SyncError, mocks::MockWalletError, sync::checked_wallet_height};
1908 #[tokio::test]
1911 async fn get_sync_state_error() {
1912 let builder = crate::mocks::MockWalletBuilder::new();
1913 let test_error = "get_sync_state_error";
1914 let mut test_wallet = builder
1915 .get_sync_state_patch(Box::new(|_| {
1916 Err(MockWalletError::AnErrorVariant(test_error.to_string()))
1917 }))
1918 .create_mock_wallet();
1919 let res =
1920 checked_wallet_height(&mut test_wallet, BlockHeight::from_u32(1), &LOCAL_NETWORK);
1921 assert!(matches!(
1922 res,
1923 Err(SyncError::WalletError(
1924 crate::mocks::MockWalletError::AnErrorVariant(ref s)
1925 )) if s == test_error
1926 ));
1927 }
1928
1929 mod last_known_chain_height {
1930 use crate::{
1931 sync::{MAX_REORG_ALLOWANCE, ScanRange},
1932 wallet::SyncState,
1933 };
1934 const DEFAULT_START_HEIGHT: BlockHeight = BlockHeight::from_u32(1);
1935 const _DEFAULT_LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(102);
1936 const DEFAULT_CHAIN_HEIGHT: BlockHeight = BlockHeight::from_u32(110);
1937
1938 use super::*;
1939 #[tokio::test]
1940 async fn above_allowance() {
1941 const LAST_KNOWN_HEIGHT: BlockHeight = BlockHeight::from_u32(211);
1942 let lkch = vec![ScanRange::from_parts(
1943 DEFAULT_START_HEIGHT..LAST_KNOWN_HEIGHT,
1944 crate::sync::ScanPriority::Scanned,
1945 )];
1946 let state = SyncState {
1947 scan_ranges: lkch,
1948 ..Default::default()
1949 };
1950 let builder = crate::mocks::MockWalletBuilder::new();
1951 let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1952 let res =
1953 checked_wallet_height(&mut test_wallet, DEFAULT_CHAIN_HEIGHT, &LOCAL_NETWORK);
1954 if let Err(e) = res {
1955 assert_eq!(
1956 e.to_string(),
1957 format!(
1958 "wallet height {} is more than {} blocks ahead of best chain height {}",
1959 LAST_KNOWN_HEIGHT - 1,
1960 MAX_REORG_ALLOWANCE,
1961 DEFAULT_CHAIN_HEIGHT
1962 )
1963 );
1964 } else {
1965 panic!()
1966 }
1967 }
1968 #[tokio::test]
1969 async fn above_chain_height_below_allowance() {
1970 let lkch = vec![ScanRange::from_parts(
1974 BlockHeight::from_u32(6)..BlockHeight::from_u32(10),
1975 crate::sync::ScanPriority::Scanned,
1976 )];
1977 let state = SyncState {
1978 scan_ranges: lkch,
1979 ..Default::default()
1980 };
1981 let builder = crate::mocks::MockWalletBuilder::new();
1982 let mut test_wallet = builder.sync_state(state).create_mock_wallet();
1983 let chain_height = BlockHeight::from_u32(4);
1984 let res = checked_wallet_height(&mut test_wallet, chain_height, &LOCAL_NETWORK);
1989 assert_eq!(res.unwrap(), BlockHeight::from_u32(4));
1990 }
1991 #[ignore = "in progress"]
1992 #[tokio::test]
1993 async fn equal_or_below_chain_height_and_above_sapling() {
1994 let lkch = vec![ScanRange::from_parts(
1995 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
1996 crate::sync::ScanPriority::Scanned,
1997 )];
1998 let state = SyncState {
1999 scan_ranges: lkch,
2000 ..Default::default()
2001 };
2002 let builder = crate::mocks::MockWalletBuilder::new();
2003 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2004 }
2005 #[ignore = "in progress"]
2006 #[tokio::test]
2007 async fn equal_or_below_chain_height_and_below_sapling() {
2008 let lkch = vec![ScanRange::from_parts(
2011 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2012 crate::sync::ScanPriority::Scanned,
2013 )];
2014 let state = SyncState {
2015 scan_ranges: lkch,
2016 ..Default::default()
2017 };
2018 let builder = crate::mocks::MockWalletBuilder::new();
2019 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2020 }
2021 #[ignore = "in progress"]
2022 #[tokio::test]
2023 async fn below_sapling() {
2024 let lkch = vec![ScanRange::from_parts(
2025 BlockHeight::from_u32(1)..BlockHeight::from_u32(10),
2026 crate::sync::ScanPriority::Scanned,
2027 )];
2028 let state = SyncState {
2029 scan_ranges: lkch,
2030 ..Default::default()
2031 };
2032 let builder = crate::mocks::MockWalletBuilder::new();
2033 let mut _test_wallet = builder.sync_state(state).create_mock_wallet();
2034 }
2035 }
2036 mod no_last_known_chain_height {
2037 use super::*;
2038 #[tokio::test]
2040 async fn get_bday_error() {
2041 let test_error = "get_bday_error";
2042 let builder = crate::mocks::MockWalletBuilder::new();
2043 let mut test_wallet = builder
2044 .get_birthday_patch(Box::new(|_| {
2045 Err(crate::mocks::MockWalletError::AnErrorVariant(
2046 test_error.to_string(),
2047 ))
2048 }))
2049 .create_mock_wallet();
2050 let res = checked_wallet_height(
2051 &mut test_wallet,
2052 BlockHeight::from_u32(1),
2053 &LOCAL_NETWORK,
2054 );
2055 assert!(matches!(
2056 res,
2057 Err(SyncError::WalletError(
2058 crate::mocks::MockWalletError::AnErrorVariant(ref s)
2059 )) if s == test_error
2060 ));
2061 }
2062 #[ignore = "in progress"]
2063 #[tokio::test]
2064 async fn raw_bday_above_chain_height() {
2065 let builder = crate::mocks::MockWalletBuilder::new();
2066 let mut test_wallet = builder
2067 .birthday(BlockHeight::from_u32(15))
2068 .create_mock_wallet();
2069 let res = checked_wallet_height(
2070 &mut test_wallet,
2071 BlockHeight::from_u32(1),
2072 &LOCAL_NETWORK,
2073 );
2074 if let Err(e) = res {
2075 assert_eq!(
2076 e.to_string(),
2077 format!(
2078 "wallet height is more than {} blocks ahead of best chain height",
2079 15 - 1
2080 )
2081 );
2082 } else {
2083 panic!()
2084 }
2085 }
2086 mod sapling_height {
2087 use super::*;
2088 #[tokio::test]
2089 async fn raw_bday_above() {
2090 let builder = crate::mocks::MockWalletBuilder::new();
2091 let mut test_wallet = builder
2092 .birthday(BlockHeight::from_u32(4))
2093 .create_mock_wallet();
2094 let res = checked_wallet_height(
2095 &mut test_wallet,
2096 BlockHeight::from_u32(5),
2097 &LOCAL_NETWORK,
2098 );
2099 assert_eq!(res.unwrap(), BlockHeight::from_u32(4 - 1));
2100 }
2101 #[tokio::test]
2102 async fn raw_bday_equal() {
2103 let builder = crate::mocks::MockWalletBuilder::new();
2104 let mut test_wallet = builder
2105 .birthday(BlockHeight::from_u32(3))
2106 .create_mock_wallet();
2107 let res = checked_wallet_height(
2108 &mut test_wallet,
2109 BlockHeight::from_u32(5),
2110 &LOCAL_NETWORK,
2111 );
2112 assert_eq!(res.unwrap(), BlockHeight::from_u32(3 - 1));
2113 }
2114 #[tokio::test]
2115 async fn raw_bday_below() {
2116 let builder = crate::mocks::MockWalletBuilder::new();
2117 let mut test_wallet = builder
2118 .birthday(BlockHeight::from_u32(1))
2119 .create_mock_wallet();
2120 let res = checked_wallet_height(
2121 &mut test_wallet,
2122 BlockHeight::from_u32(5),
2123 &LOCAL_NETWORK,
2124 );
2125 assert!(matches!(res, Err(SyncError::BirthdayBelowSapling(1, 3))));
2126 }
2127 }
2128 }
2129 }
2130}