1use std::cmp;
4use std::collections::{BTreeMap, HashMap};
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::atomic::{self, AtomicBool, AtomicU8};
8use std::time::{Duration, SystemTime};
9
10use tokio::sync::{RwLock, mpsc};
11
12use incrementalmerkletree::{Marking, Retention};
13use orchard::tree::MerkleHashOrchard;
14use shardtree::store::ShardStore;
15use zcash_client_backend::proto::service::RawTransaction;
16use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
17use zcash_keys::keys::UnifiedFullViewingKey;
18use zcash_primitives::transaction::{Transaction, TxId};
19use zcash_primitives::zip32::AccountId;
20use zcash_protocol::ShieldedProtocol;
21use zcash_protocol::consensus::{self, BlockHeight};
22
23use zingo_status::confirmation_status::ConfirmationStatus;
24
25use crate::client::{self, FetchRequest};
26use crate::config::SyncConfig;
27use crate::error::{
28 ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
29 SyncStatusError,
30};
31use crate::keys::transparent::TransparentAddressId;
32use crate::scan::ScanResults;
33use crate::scan::task::{Scanner, ScannerState};
34use crate::scan::transactions::scan_transaction;
35use crate::wallet::traits::{
36 SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
37};
38use crate::wallet::{
39 KeyIdInterface, NoteInterface, NullifierMap, OutputId, OutputInterface, ScanTarget, SyncMode,
40 SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const MEMPOOL_SPEND_INVALIDATION_THRESHOLD: u32 = 3;
52pub(crate) const MAX_VERIFICATION_WINDOW: u32 = 100;
53const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
54
55#[derive(Debug, Clone)]
60#[allow(missing_docs)]
61pub struct SyncStatus {
62 pub scan_ranges: Vec<ScanRange>,
63 pub sync_start_height: BlockHeight,
64 pub session_blocks_scanned: u32,
65 pub total_blocks_scanned: u32,
66 pub percentage_session_blocks_scanned: f32,
67 pub percentage_total_blocks_scanned: f32,
68 pub session_sapling_outputs_scanned: u32,
69 pub total_sapling_outputs_scanned: u32,
70 pub session_orchard_outputs_scanned: u32,
71 pub total_orchard_outputs_scanned: u32,
72 pub percentage_session_outputs_scanned: f32,
73 pub percentage_total_outputs_scanned: f32,
74}
75
76impl std::fmt::Display for SyncStatus {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "percentage complete: {}",
82 self.percentage_total_outputs_scanned
83 )
84 }
85}
86
87impl From<SyncStatus> for json::JsonValue {
88 fn from(value: SyncStatus) -> Self {
89 let scan_ranges: Vec<json::JsonValue> = value
90 .scan_ranges
91 .iter()
92 .map(|range| {
93 json::object! {
94 "priority" => format!("{:?}", range.priority()),
95 "start_block" => range.block_range().start.to_string(),
96 "end_block" => (range.block_range().end - 1).to_string(),
97 }
98 })
99 .collect();
100
101 json::object! {
102 "scan_ranges" => scan_ranges,
103 "sync_start_height" => u32::from(value.sync_start_height),
104 "session_blocks_scanned" => value.session_blocks_scanned,
105 "total_blocks_scanned" => value.total_blocks_scanned,
106 "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
107 "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
108 "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
109 "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
110 "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
111 "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
112 "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
113 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
120#[allow(missing_docs)]
121pub struct SyncResult {
122 pub sync_start_height: BlockHeight,
123 pub sync_end_height: BlockHeight,
124 pub blocks_scanned: u32,
125 pub sapling_outputs_scanned: u32,
126 pub orchard_outputs_scanned: u32,
127 pub percentage_total_outputs_scanned: f32,
128}
129
130impl std::fmt::Display for SyncResult {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 write!(
133 f,
134 "Sync completed succesfully:
135{{
136 sync start height: {}
137 sync end height: {}
138 blocks scanned: {}
139 sapling outputs scanned: {}
140 orchard outputs scanned: {}
141 percentage total outputs scanned: {}
142}}",
143 self.sync_start_height,
144 self.sync_end_height,
145 self.blocks_scanned,
146 self.sapling_outputs_scanned,
147 self.orchard_outputs_scanned,
148 self.percentage_total_outputs_scanned,
149 )
150 }
151}
152
153impl From<SyncResult> for json::JsonValue {
154 fn from(value: SyncResult) -> Self {
155 json::object! {
156 "sync_start_height" => u32::from(value.sync_start_height),
157 "sync_end_height" => u32::from(value.sync_end_height),
158 "blocks_scanned" => value.blocks_scanned,
159 "sapling_outputs_scanned" => value.sapling_outputs_scanned,
160 "orchard_outputs_scanned" => value.orchard_outputs_scanned,
161 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
168pub enum ScanPriority {
169 Scanning,
171 Scanned,
173 ScannedWithoutMapping,
178 Historic,
180 OpenAdjacent,
182 FoundNote,
184 ChainTip,
186 Verify,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct ScanRange {
194 block_range: Range<BlockHeight>,
195 priority: ScanPriority,
196}
197
198impl std::fmt::Display for ScanRange {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 write!(
201 f,
202 "{:?}({}..{})",
203 self.priority, self.block_range.start, self.block_range.end,
204 )
205 }
206}
207
208impl ScanRange {
209 #[must_use]
211 pub fn from_parts(block_range: Range<BlockHeight>, priority: ScanPriority) -> Self {
212 assert!(
213 block_range.end >= block_range.start,
214 "{block_range:?} is invalid for ScanRange({priority:?})",
215 );
216 ScanRange {
217 block_range,
218 priority,
219 }
220 }
221
222 #[must_use]
224 pub fn block_range(&self) -> &Range<BlockHeight> {
225 &self.block_range
226 }
227
228 #[must_use]
230 pub fn priority(&self) -> ScanPriority {
231 self.priority
232 }
233
234 #[must_use]
236 pub fn is_empty(&self) -> bool {
237 self.block_range.is_empty()
238 }
239
240 #[must_use]
242 pub fn len(&self) -> usize {
243 usize::try_from(u32::from(self.block_range.end) - u32::from(self.block_range.start))
244 .expect("due to number of max blocks should always be valid usize")
245 }
246
247 #[must_use]
251 pub fn truncate_start(&self, block_height: BlockHeight) -> Option<Self> {
252 if block_height >= self.block_range.end || self.is_empty() {
253 None
254 } else {
255 Some(ScanRange {
256 block_range: self.block_range.start.max(block_height)..self.block_range.end,
257 priority: self.priority,
258 })
259 }
260 }
261
262 #[must_use]
266 pub fn truncate_end(&self, block_height: BlockHeight) -> Option<Self> {
267 if block_height <= self.block_range.start || self.is_empty() {
268 None
269 } else {
270 Some(ScanRange {
271 block_range: self.block_range.start..self.block_range.end.min(block_height),
272 priority: self.priority,
273 })
274 }
275 }
276
277 #[must_use]
281 pub fn split_at(&self, p: BlockHeight) -> Option<(Self, Self)> {
282 (p > self.block_range.start && p < self.block_range.end).then_some((
283 ScanRange {
284 block_range: self.block_range.start..p,
285 priority: self.priority,
286 },
287 ScanRange {
288 block_range: p..self.block_range.end,
289 priority: self.priority,
290 },
291 ))
292 }
293}
294
295pub async fn sync<P, W>(
306 client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
307 consensus_parameters: &P,
308 wallet: Arc<RwLock<W>>,
309 sync_mode: Arc<AtomicU8>,
310 config: SyncConfig,
311) -> Result<SyncResult, SyncError<W::Error>>
312where
313 P: consensus::Parameters + Sync + Send + 'static,
314 W: SyncWallet
315 + SyncBlocks
316 + SyncTransactions
317 + SyncNullifiers
318 + SyncOutPoints
319 + SyncShardTrees
320 + Send,
321{
322 let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
323 if sync_mode_enum == SyncMode::NotRunning {
324 sync_mode_enum = SyncMode::Running;
325 sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
326 } else {
327 return Err(SyncModeError::SyncAlreadyRunning.into());
328 }
329
330 tracing::info!("Starting sync...");
331
332 let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
334 let client_clone = client.clone();
335 let fetcher_handle =
336 tokio::spawn(
337 async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
338 );
339
340 let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
342 let shutdown_mempool = Arc::new(AtomicBool::new(false));
343 let shutdown_mempool_clone = shutdown_mempool.clone();
344 let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
345 let unprocessed_mempool_transactions_count_clone =
346 unprocessed_mempool_transactions_count.clone();
347 let mempool_handle = tokio::spawn(async move {
348 mempool_monitor(
349 client,
350 mempool_transaction_sender,
351 unprocessed_mempool_transactions_count_clone,
352 shutdown_mempool_clone,
353 )
354 .await
355 });
356
357 let mut wallet_guard = wallet.write().await;
359
360 let mut wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_guard)
361 .map_err(SyncError::WalletError)?;
362 let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
363 if chain_height == 0.into() {
364 return Err(SyncError::ServerError(ServerError::GenesisBlockOnly));
365 }
366 if wallet_height > chain_height {
367 if wallet_height - chain_height >= MAX_VERIFICATION_WINDOW {
368 return Err(SyncError::ChainError(MAX_VERIFICATION_WINDOW));
369 }
370 truncate_wallet_data(&mut *wallet_guard, chain_height)?;
371 wallet_height = chain_height;
372 }
373
374 let ufvks = wallet_guard
375 .get_unified_full_viewing_keys()
376 .map_err(SyncError::WalletError)?;
377
378 transparent::update_addresses_and_scan_targets(
379 consensus_parameters,
380 &mut *wallet_guard,
381 fetch_request_sender.clone(),
382 &ufvks,
383 wallet_height,
384 chain_height,
385 config.transparent_address_discovery,
386 )
387 .await?;
388
389 #[cfg(not(feature = "darkside_test"))]
390 update_subtree_roots(
391 consensus_parameters,
392 fetch_request_sender.clone(),
393 &mut *wallet_guard,
394 )
395 .await?;
396
397 add_initial_frontier(
398 consensus_parameters,
399 fetch_request_sender.clone(),
400 &mut *wallet_guard,
401 )
402 .await?;
403
404 state::update_scan_ranges(
405 consensus_parameters,
406 wallet_height,
407 chain_height,
408 wallet_guard
409 .get_sync_state_mut()
410 .map_err(SyncError::WalletError)?,
411 )
412 .await;
413
414 state::set_initial_state(
415 consensus_parameters,
416 fetch_request_sender.clone(),
417 &mut *wallet_guard,
418 chain_height,
419 )
420 .await?;
421
422 let initial_verification_height = wallet_guard
423 .get_sync_state()
424 .map_err(SyncError::WalletError)?
425 .highest_scanned_height()
426 .expect("scan ranges must be non-empty")
427 + 1;
428
429 reset_invalid_spends(&mut *wallet_guard)?;
430
431 drop(wallet_guard);
432
433 let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
435 let mut scanner = Scanner::new(
436 consensus_parameters.clone(),
437 scan_results_sender,
438 fetch_request_sender.clone(),
439 ufvks.clone(),
440 );
441 scanner.launch(config.performance_level);
442
443 let mut interval = tokio::time::interval(Duration::from_millis(50));
446 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
447 loop {
448 tokio::select! {
449 Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
450 let mut wallet_guard = wallet.write().await;
451 process_scan_results(
452 consensus_parameters,
453 &mut *wallet_guard,
454 fetch_request_sender.clone(),
455 &ufvks,
456 scan_range,
457 scan_results,
458 initial_verification_height,
459 )
460 .await?;
461 wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
462 drop(wallet_guard);
463 }
464
465 Some(raw_transaction) = mempool_transaction_receiver.recv() => {
466 let mut wallet_guard = wallet.write().await;
467 process_mempool_transaction(
468 consensus_parameters,
469 &ufvks,
470 &mut *wallet_guard,
471 raw_transaction,
472 )
473 .await?;
474 unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
475 drop(wallet_guard);
476 }
477
478 _update_scanner = interval.tick() => {
479 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
480 match sync_mode_enum {
481 SyncMode::Paused => {
482 let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
483 pause_interval.tick().await;
484 while sync_mode_enum == SyncMode::Paused {
485 pause_interval.tick().await;
486 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
487 }
488 },
489 SyncMode::Shutdown => {
490 let mut wallet_guard = wallet.write().await;
491 let sync_status = match sync_status(&*wallet_guard).await {
492 Ok(status) => status,
493 Err(SyncStatusError::WalletError(e)) => {
494 return Err(SyncError::WalletError(e));
495 }
496 Err(SyncStatusError::NoSyncData) => {
497 panic!("sync data must exist!");
498 }
499 };
500 wallet_guard
501 .set_save_flag()
502 .map_err(SyncError::WalletError)?;
503 drop(wallet_guard);
504 tracing::info!("Sync successfully shutdown.");
505
506 return Ok(SyncResult {
507 sync_start_height: sync_status.sync_start_height,
508 sync_end_height: (sync_status
509 .scan_ranges
510 .last()
511 .expect("should be non-empty after syncing")
512 .block_range()
513 .end
514 - 1),
515 blocks_scanned: sync_status.session_blocks_scanned,
516 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
517 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
518 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
519 });
520 }
521 SyncMode::Running => (),
522 SyncMode::NotRunning => {
523 panic!("sync mode should not be manually set to NotRunning!");
524 },
525 }
526
527 scanner.update(&mut *wallet.write().await, shutdown_mempool.clone(), config.performance_level).await?;
528
529 if matches!(scanner.state, ScannerState::Shutdown) {
530 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
532 if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
533 {
534 tracing::info!("Sync successfully shutdown.");
535 break;
536 }
537 }
538 }
539 }
540 }
541
542 let mut wallet_guard = wallet.write().await;
543 let sync_status = match sync_status(&*wallet_guard).await {
544 Ok(status) => status,
545 Err(SyncStatusError::WalletError(e)) => {
546 return Err(SyncError::WalletError(e));
547 }
548 Err(SyncStatusError::NoSyncData) => {
549 panic!("sync data must exist!");
550 }
551 };
552 wallet_guard
553 .set_save_flag()
554 .map_err(SyncError::WalletError)?;
555
556 drop(wallet_guard);
557 drop(scanner);
558 drop(fetch_request_sender);
559
560 match mempool_handle.await.expect("task panicked") {
561 Ok(()) => (),
562 Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
563 Err(e) => return Err(e.into()),
564 }
565 fetcher_handle.await.expect("task panicked");
566
567 Ok(SyncResult {
568 sync_start_height: sync_status.sync_start_height,
569 sync_end_height: (sync_status
570 .scan_ranges
571 .last()
572 .expect("should be non-empty after syncing")
573 .block_range()
574 .end
575 - 1),
576 blocks_scanned: sync_status.session_blocks_scanned,
577 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
578 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
579 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
580 })
581}
582
583pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
587where
588 W: SyncWallet + SyncBlocks,
589{
590 let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
591 state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
592 let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
593
594 let sync_state = wallet
595 .get_sync_state()
596 .map_err(SyncStatusError::WalletError)?;
597 if sync_state.initial_sync_state.sync_start_height == 0.into() {
598 return Ok(SyncStatus {
599 scan_ranges: sync_state.scan_ranges.clone(),
600 sync_start_height: 0.into(),
601 session_blocks_scanned: 0,
602 total_blocks_scanned: 0,
603 percentage_session_blocks_scanned: 0.0,
604 percentage_total_blocks_scanned: 0.0,
605 session_sapling_outputs_scanned: 0,
606 session_orchard_outputs_scanned: 0,
607 total_sapling_outputs_scanned: 0,
608 total_orchard_outputs_scanned: 0,
609 percentage_session_outputs_scanned: 0.0,
610 percentage_total_outputs_scanned: 0.0,
611 });
612 }
613 let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
614
615 let birthday = sync_state
616 .wallet_birthday()
617 .ok_or(SyncStatusError::NoSyncData)?;
618 let wallet_height = sync_state
619 .wallet_height()
620 .ok_or(SyncStatusError::NoSyncData)?;
621 let total_blocks = wallet_height - birthday + 1;
622 let total_sapling_outputs = sync_state
623 .initial_sync_state
624 .wallet_tree_bounds
625 .sapling_final_tree_size
626 - sync_state
627 .initial_sync_state
628 .wallet_tree_bounds
629 .sapling_initial_tree_size;
630 let total_orchard_outputs = sync_state
631 .initial_sync_state
632 .wallet_tree_bounds
633 .orchard_final_tree_size
634 - sync_state
635 .initial_sync_state
636 .wallet_tree_bounds
637 .orchard_initial_tree_size;
638 let total_outputs = total_sapling_outputs + total_orchard_outputs;
639
640 let session_blocks_scanned =
641 total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
642 let percentage_session_blocks_scanned = ((session_blocks_scanned as f32
643 / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
644 * 100.0)
645 .clamp(0.0, 100.0);
646 let percentage_total_blocks_scanned =
647 ((total_blocks_scanned as f32 / total_blocks as f32) * 100.0).clamp(0.0, 100.0);
648
649 let session_sapling_outputs_scanned = total_sapling_outputs_scanned
650 - sync_state
651 .initial_sync_state
652 .previously_scanned_sapling_outputs;
653 let session_orchard_outputs_scanned = total_orchard_outputs_scanned
654 - sync_state
655 .initial_sync_state
656 .previously_scanned_orchard_outputs;
657 let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
658 let previously_scanned_outputs = sync_state
659 .initial_sync_state
660 .previously_scanned_sapling_outputs
661 + sync_state
662 .initial_sync_state
663 .previously_scanned_orchard_outputs;
664 let percentage_session_outputs_scanned = ((session_outputs_scanned as f32
665 / (total_outputs - previously_scanned_outputs) as f32)
666 * 100.0)
667 .clamp(0.0, 100.0);
668 let percentage_total_outputs_scanned =
669 ((total_outputs_scanned as f32 / total_outputs as f32) * 100.0).clamp(0.0, 100.0);
670
671 Ok(SyncStatus {
672 scan_ranges: sync_state.scan_ranges.clone(),
673 sync_start_height: sync_state.initial_sync_state.sync_start_height,
674 session_blocks_scanned,
675 total_blocks_scanned,
676 percentage_session_blocks_scanned,
677 percentage_total_blocks_scanned,
678 session_sapling_outputs_scanned,
679 total_sapling_outputs_scanned,
680 session_orchard_outputs_scanned,
681 total_orchard_outputs_scanned,
682 percentage_session_outputs_scanned,
683 percentage_total_outputs_scanned,
684 })
685}
686
687pub fn scan_pending_transaction<W>(
694 consensus_parameters: &impl consensus::Parameters,
695 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
696 wallet: &mut W,
697 transaction: Transaction,
698 status: ConfirmationStatus,
699 datetime: u32,
700) -> Result<(), SyncError<W::Error>>
701where
702 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
703{
704 if matches!(status, ConfirmationStatus::Confirmed(_)) {
705 panic!("this fn is for unconfirmed transactions only");
706 }
707
708 let mut pending_transaction_nullifiers = NullifierMap::new();
709 let mut pending_transaction_outpoints = BTreeMap::new();
710 let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
711 .get_transparent_addresses()
712 .map_err(SyncError::WalletError)?
713 .iter()
714 .map(|(id, address)| (address.clone(), *id))
715 .collect();
716 let pending_transaction = scan_transaction(
717 consensus_parameters,
718 ufvks,
719 transaction.txid(),
720 transaction,
721 status,
722 None,
723 &mut pending_transaction_nullifiers,
724 &mut pending_transaction_outpoints,
725 &transparent_addresses,
726 datetime,
727 )?;
728
729 let wallet_transactions = wallet
730 .get_wallet_transactions()
731 .map_err(SyncError::WalletError)?;
732 let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
733 let transparent_spend_scan_targets = spend::detect_transparent_spends(
734 &mut pending_transaction_outpoints,
735 transparent_output_ids,
736 );
737 let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
738 spend::collect_derived_nullifiers(wallet_transactions);
739 let (sapling_spend_scan_targets, orchard_spend_scan_targets) = spend::detect_shielded_spends(
740 &mut pending_transaction_nullifiers,
741 sapling_derived_nullifiers,
742 orchard_derived_nullifiers,
743 );
744
745 if pending_transaction.transparent_coins().is_empty()
747 && pending_transaction.sapling_notes().is_empty()
748 && pending_transaction.orchard_notes().is_empty()
749 && pending_transaction.outgoing_orchard_notes().is_empty()
750 && pending_transaction.outgoing_sapling_notes().is_empty()
751 && transparent_spend_scan_targets.is_empty()
752 && sapling_spend_scan_targets.is_empty()
753 && orchard_spend_scan_targets.is_empty()
754 {
755 return Ok(());
756 }
757
758 wallet
759 .insert_wallet_transaction(pending_transaction)
760 .map_err(SyncError::WalletError)?;
761 spend::update_spent_coins(
762 wallet
763 .get_wallet_transactions_mut()
764 .map_err(SyncError::WalletError)?,
765 transparent_spend_scan_targets,
766 );
767 spend::update_spent_notes(
768 wallet,
769 sapling_spend_scan_targets,
770 orchard_spend_scan_targets,
771 false,
772 )
773 .map_err(SyncError::WalletError)?;
774
775 Ok(())
776}
777
778pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[ScanTarget]) {
789 for scan_target in scan_targets {
790 sync_state.scan_targets.insert(*scan_target);
791 }
792}
793
794pub fn reset_spends(
800 wallet_transactions: &mut HashMap<TxId, WalletTransaction>,
801 invalid_txids: Vec<TxId>,
802) {
803 wallet_transactions
804 .values_mut()
805 .flat_map(|transaction| transaction.orchard_notes_mut())
806 .filter(|output| {
807 output
808 .spending_transaction
809 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
810 })
811 .for_each(|output| {
812 output.set_spending_transaction(None);
813 });
814 wallet_transactions
815 .values_mut()
816 .flat_map(|transaction| transaction.sapling_notes_mut())
817 .filter(|output| {
818 output
819 .spending_transaction
820 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
821 })
822 .for_each(|output| {
823 output.set_spending_transaction(None);
824 });
825 wallet_transactions
826 .values_mut()
827 .flat_map(|transaction| transaction.transparent_coins_mut())
828 .filter(|output| {
829 output
830 .spending_transaction
831 .is_some_and(|spending_txid| invalid_txids.contains(&spending_txid))
832 })
833 .for_each(|output| {
834 output.set_spending_transaction(None);
835 });
836}
837
838fn is_shutdown<P>(
840 scanner: &Scanner<P>,
841 mempool_unprocessed_transactions_count: Arc<AtomicU8>,
842) -> bool
843where
844 P: consensus::Parameters + Sync + Send + 'static,
845{
846 scanner.worker_poolsize() == 0
847 && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
848}
849
850async fn process_scan_results<W>(
852 consensus_parameters: &impl consensus::Parameters,
853 wallet: &mut W,
854 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
855 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
856 scan_range: ScanRange,
857 scan_results: Result<ScanResults, ScanError>,
858 initial_verification_height: BlockHeight,
859) -> Result<(), SyncError<W::Error>>
860where
861 W: SyncWallet
862 + SyncBlocks
863 + SyncTransactions
864 + SyncNullifiers
865 + SyncOutPoints
866 + SyncShardTrees
867 + Send,
868{
869 match scan_results {
870 Ok(results) => {
871 let ScanResults {
872 mut nullifiers,
873 outpoints,
874 scanned_blocks,
875 wallet_transactions,
876 sapling_located_trees,
877 orchard_located_trees,
878 map_nullifiers,
879 } = results;
880
881 if scan_range.priority() == ScanPriority::ScannedWithoutMapping {
882 spend::update_shielded_spends(
883 consensus_parameters,
884 wallet,
885 fetch_request_sender.clone(),
886 ufvks,
887 &scanned_blocks,
888 Some(&mut nullifiers),
889 )
890 .await?;
891
892 let mut missing_block_bounds = BTreeMap::new();
894 for block_bound in [
895 scan_range.block_range().start,
896 scan_range.block_range().end - 1,
897 ] {
898 if wallet.get_wallet_block(block_bound).is_err() {
899 missing_block_bounds.insert(
900 block_bound,
901 WalletBlock::from_compact_block(
902 consensus_parameters,
903 fetch_request_sender.clone(),
904 &client::get_compact_block(
905 fetch_request_sender.clone(),
906 block_bound,
907 )
908 .await?,
909 )
910 .await?,
911 );
912 }
913 }
914 if !missing_block_bounds.is_empty() {
915 wallet
916 .append_wallet_blocks(missing_block_bounds)
917 .map_err(SyncError::WalletError)?;
918 }
919
920 state::set_scanned_scan_range(
921 wallet
922 .get_sync_state_mut()
923 .map_err(SyncError::WalletError)?,
924 scan_range.block_range().clone(),
925 true,
926 );
927 } else {
928 update_wallet_data(
929 consensus_parameters,
930 wallet,
931 fetch_request_sender.clone(),
932 ufvks,
933 &scan_range,
934 if map_nullifiers {
935 Some(&mut nullifiers)
936 } else {
937 None
938 },
939 outpoints,
940 wallet_transactions,
941 sapling_located_trees,
942 orchard_located_trees,
943 )
944 .await?;
945 spend::update_transparent_spends(wallet).map_err(SyncError::WalletError)?;
946 spend::update_shielded_spends(
947 consensus_parameters,
948 wallet,
949 fetch_request_sender,
950 ufvks,
951 &scanned_blocks,
952 if map_nullifiers {
953 None
954 } else {
955 Some(&mut nullifiers)
956 },
957 )
958 .await?;
959 add_scanned_blocks(wallet, scanned_blocks, &scan_range)
960 .map_err(SyncError::WalletError)?;
961
962 state::set_scanned_scan_range(
963 wallet
964 .get_sync_state_mut()
965 .map_err(SyncError::WalletError)?,
966 scan_range.block_range().clone(),
967 map_nullifiers,
968 );
969 state::merge_scan_ranges(
970 wallet
971 .get_sync_state_mut()
972 .map_err(SyncError::WalletError)?,
973 ScanPriority::ScannedWithoutMapping,
974 );
975 }
976
977 state::merge_scan_ranges(
978 wallet
979 .get_sync_state_mut()
980 .map_err(SyncError::WalletError)?,
981 ScanPriority::Scanned,
982 );
983 remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
984 tracing::debug!("Scan results processed.");
985 }
986 Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
987 if height == scan_range.block_range().start
988 && scan_range.priority() == ScanPriority::Verify
989 {
990 tracing::info!("Re-org detected.");
991 let sync_state = wallet
992 .get_sync_state_mut()
993 .map_err(SyncError::WalletError)?;
994 let wallet_height = sync_state
995 .wallet_height()
996 .expect("scan ranges should be non-empty in this scope");
997
998 state::set_scan_priority(
1000 sync_state,
1001 scan_range.block_range(),
1002 ScanPriority::Verify,
1003 );
1004
1005 let verification_start = state::set_verify_scan_range(
1007 sync_state,
1008 height - 1,
1009 state::VerifyEnd::VerifyHighest,
1010 )
1011 .block_range()
1012 .start;
1013 state::merge_scan_ranges(sync_state, ScanPriority::Verify);
1014
1015 if initial_verification_height - verification_start > MAX_VERIFICATION_WINDOW {
1016 clear_wallet_data(wallet)?;
1017
1018 return Err(ServerError::ChainVerificationError.into());
1019 }
1020
1021 truncate_wallet_data(wallet, verification_start - 1)?;
1022
1023 state::set_initial_state(
1024 consensus_parameters,
1025 fetch_request_sender.clone(),
1026 wallet,
1027 wallet_height,
1028 )
1029 .await?;
1030 } else {
1031 scan_results?;
1032 }
1033 }
1034 Err(e) => return Err(e.into()),
1035 }
1036
1037 Ok(())
1038}
1039
1040async fn process_mempool_transaction<W>(
1044 consensus_parameters: &impl consensus::Parameters,
1045 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1046 wallet: &mut W,
1047 raw_transaction: RawTransaction,
1048) -> Result<(), SyncError<W::Error>>
1049where
1050 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1051{
1052 let mempool_height = wallet
1054 .get_sync_state()
1055 .map_err(SyncError::WalletError)?
1056 .wallet_height()
1057 .expect("wallet height must exist after sync is initialised")
1058 + 1;
1059
1060 let transaction = zcash_primitives::transaction::Transaction::read(
1061 &raw_transaction.data[..],
1062 consensus::BranchId::for_height(consensus_parameters, mempool_height),
1063 )
1064 .map_err(ServerError::InvalidTransaction)?;
1065
1066 tracing::debug!(
1067 "mempool received txid {} at height {}",
1068 transaction.txid(),
1069 mempool_height
1070 );
1071
1072 if let Some(tx) = wallet
1073 .get_wallet_transactions()
1074 .map_err(SyncError::WalletError)?
1075 .get(&transaction.txid())
1076 && (tx.status().is_confirmed() || matches!(tx.status(), ConfirmationStatus::Mempool(_)))
1077 {
1078 return Ok(());
1079 }
1080
1081 scan_pending_transaction(
1082 consensus_parameters,
1083 ufvks,
1084 wallet,
1085 transaction,
1086 ConfirmationStatus::Mempool(mempool_height),
1087 SystemTime::now()
1088 .duration_since(SystemTime::UNIX_EPOCH)
1089 .expect("infalliable for such long time periods")
1090 .as_secs() as u32,
1091 )?;
1092
1093 Ok(())
1094}
1095
1096fn truncate_wallet_data<W>(
1098 wallet: &mut W,
1099 truncate_height: BlockHeight,
1100) -> Result<(), SyncError<W::Error>>
1101where
1102 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1103{
1104 let birthday = wallet
1105 .get_sync_state()
1106 .map_err(SyncError::WalletError)?
1107 .wallet_birthday()
1108 .expect("should be non-empty in this scope");
1109 let checked_truncate_height = match truncate_height.cmp(&birthday) {
1110 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
1111 std::cmp::Ordering::Less => consensus::H0,
1112 };
1113
1114 wallet
1115 .truncate_wallet_blocks(checked_truncate_height)
1116 .map_err(SyncError::WalletError)?;
1117 wallet
1118 .truncate_wallet_transactions(checked_truncate_height)
1119 .map_err(SyncError::WalletError)?;
1120 wallet
1121 .truncate_nullifiers(checked_truncate_height)
1122 .map_err(SyncError::WalletError)?;
1123 wallet
1124 .truncate_outpoints(checked_truncate_height)
1125 .map_err(SyncError::WalletError)?;
1126 match wallet.truncate_shard_trees(checked_truncate_height) {
1127 Ok(_) => Ok(()),
1128 Err(SyncError::TruncationError(height, pooltype)) => {
1129 clear_wallet_data(wallet)?;
1130
1131 Err(SyncError::TruncationError(height, pooltype))
1132 }
1133 Err(e) => Err(e),
1134 }?;
1135
1136 Ok(())
1137}
1138
1139fn clear_wallet_data<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1140where
1141 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees,
1142{
1143 let scan_targets = wallet
1144 .get_wallet_transactions()
1145 .map_err(SyncError::WalletError)?
1146 .values()
1147 .filter_map(|transaction| {
1148 transaction
1149 .status()
1150 .get_confirmed_height()
1151 .map(|height| ScanTarget {
1152 block_height: height,
1153 txid: transaction.txid(),
1154 narrow_scan_area: true,
1155 })
1156 })
1157 .collect::<Vec<_>>();
1158 let sync_state = wallet
1159 .get_sync_state_mut()
1160 .map_err(SyncError::WalletError)?;
1161 *sync_state = SyncState::new();
1162 add_scan_targets(sync_state, &scan_targets);
1163
1164 truncate_wallet_data(wallet, consensus::H0)
1165}
1166
1167#[allow(clippy::too_many_arguments)]
1169async fn update_wallet_data<W>(
1170 consensus_parameters: &impl consensus::Parameters,
1171 wallet: &mut W,
1172 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1173 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1174 scan_range: &ScanRange,
1175 nullifiers: Option<&mut NullifierMap>,
1176 mut outpoints: BTreeMap<OutputId, ScanTarget>,
1177 transactions: HashMap<TxId, WalletTransaction>,
1178 sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1179 orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1180) -> Result<(), SyncError<W::Error>>
1181where
1182 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1183{
1184 let sync_state = wallet
1185 .get_sync_state_mut()
1186 .map_err(SyncError::WalletError)?;
1187 let highest_scanned_height = sync_state
1188 .highest_scanned_height()
1189 .expect("scan ranges should not be empty in this scope");
1190 for transaction in transactions.values() {
1191 state::update_found_note_shard_priority(
1192 consensus_parameters,
1193 sync_state,
1194 ShieldedProtocol::Sapling,
1195 transaction,
1196 );
1197 state::update_found_note_shard_priority(
1198 consensus_parameters,
1199 sync_state,
1200 ShieldedProtocol::Orchard,
1201 transaction,
1202 );
1203 }
1204 for transaction in transactions.values() {
1205 discover_unified_addresses(wallet, ufvks, transaction).map_err(SyncError::WalletError)?;
1206 }
1207
1208 wallet
1209 .extend_wallet_transactions(transactions)
1210 .map_err(SyncError::WalletError)?;
1211 if let Some(nullifiers) = nullifiers {
1212 wallet
1213 .append_nullifiers(nullifiers)
1214 .map_err(SyncError::WalletError)?;
1215 }
1216 wallet
1217 .append_outpoints(&mut outpoints)
1218 .map_err(SyncError::WalletError)?;
1219 wallet
1220 .update_shard_trees(
1221 fetch_request_sender,
1222 scan_range,
1223 highest_scanned_height,
1224 sapling_located_trees,
1225 orchard_located_trees,
1226 )
1227 .await?;
1228
1229 Ok(())
1230}
1231
1232fn discover_unified_addresses<W>(
1233 wallet: &mut W,
1234 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
1235 transaction: &WalletTransaction,
1236) -> Result<(), W::Error>
1237where
1238 W: SyncWallet,
1239{
1240 for note in transaction
1241 .orchard_notes()
1242 .iter()
1243 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1244 {
1245 let ivk = ufvks
1246 .get(¬e.key_id().account_id())
1247 .expect("ufvk must exist to decrypt this note")
1248 .orchard()
1249 .expect("fvk must exist to decrypt this note")
1250 .to_ivk(zip32::Scope::External);
1251
1252 wallet.add_orchard_address(
1253 note.key_id().account_id(),
1254 note.note().recipient(),
1255 ivk.diversifier_index(¬e.note().recipient())
1256 .expect("must be key used to create this address"),
1257 )?;
1258 }
1259 for note in transaction
1260 .sapling_notes()
1261 .iter()
1262 .filter(|¬e| note.key_id().scope == zip32::Scope::External)
1263 {
1264 let ivk = ufvks
1265 .get(¬e.key_id().account_id())
1266 .expect("ufvk must exist to decrypt this note")
1267 .sapling()
1268 .expect("fvk must exist to decrypt this note")
1269 .to_external_ivk();
1270
1271 wallet.add_sapling_address(
1272 note.key_id().account_id(),
1273 note.note().recipient(),
1274 ivk.decrypt_diversifier(¬e.note().recipient())
1275 .expect("must be key used to create this address"),
1276 )?;
1277 }
1278
1279 Ok(())
1280}
1281
1282fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1283where
1284 W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1285{
1286 let fully_scanned_height = wallet
1287 .get_sync_state()?
1288 .fully_scanned_height()
1289 .expect("scan ranges must be non-empty");
1290
1291 wallet
1292 .get_outpoints_mut()?
1293 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1294 wallet
1295 .get_nullifiers_mut()?
1296 .sapling
1297 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1298 wallet
1299 .get_nullifiers_mut()?
1300 .orchard
1301 .retain(|_, scan_target| scan_target.block_height > fully_scanned_height);
1302 wallet
1303 .get_sync_state_mut()?
1304 .scan_targets
1305 .retain(|scan_target| scan_target.block_height > fully_scanned_height);
1306 remove_irrelevant_blocks(wallet)?;
1307
1308 Ok(())
1309}
1310
1311fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1312where
1313 W: SyncWallet + SyncBlocks + SyncTransactions,
1314{
1315 let sync_state = wallet.get_sync_state()?;
1316 let highest_scanned_height = sync_state
1317 .highest_scanned_height()
1318 .expect("should be non-empty");
1319 let scanned_range_bounds = sync_state
1320 .scan_ranges()
1321 .iter()
1322 .filter(|scan_range| {
1323 scan_range.priority() == ScanPriority::Scanned
1324 || scan_range.priority() == ScanPriority::ScannedWithoutMapping
1325 || scan_range.priority() == ScanPriority::Scanning
1326 })
1327 .flat_map(|scanned_range| {
1328 vec![
1329 scanned_range.block_range().start,
1330 scanned_range.block_range().end - 1,
1331 ]
1332 })
1333 .collect::<Vec<_>>();
1334 let wallet_transaction_heights = wallet
1335 .get_wallet_transactions()?
1336 .values()
1337 .filter_map(|tx| tx.status().get_confirmed_height())
1338 .collect::<Vec<_>>();
1339
1340 wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1341 *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1342 || scanned_range_bounds.contains(height)
1343 || wallet_transaction_heights.contains(height)
1344 });
1345
1346 Ok(())
1347}
1348
1349fn add_scanned_blocks<W>(
1350 wallet: &mut W,
1351 mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1352 scan_range: &ScanRange,
1353) -> Result<(), W::Error>
1354where
1355 W: SyncWallet + SyncBlocks + SyncTransactions,
1356{
1357 let sync_state = wallet.get_sync_state()?;
1358 let highest_scanned_height = sync_state
1359 .highest_scanned_height()
1360 .expect("scan ranges must be non-empty");
1361
1362 let wallet_transaction_heights = wallet
1363 .get_wallet_transactions()?
1364 .values()
1365 .filter_map(|tx| tx.status().get_confirmed_height())
1366 .collect::<Vec<_>>();
1367
1368 scanned_blocks.retain(|height, _| {
1369 *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1370 || *height == scan_range.block_range().start
1371 || *height == scan_range.block_range().end - 1
1372 || wallet_transaction_heights.contains(height)
1373 });
1374
1375 wallet.append_wallet_blocks(scanned_blocks)?;
1376
1377 Ok(())
1378}
1379
1380#[cfg(not(feature = "darkside_test"))]
1381async fn update_subtree_roots<W>(
1382 consensus_parameters: &impl consensus::Parameters,
1383 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1384 wallet: &mut W,
1385) -> Result<(), SyncError<W::Error>>
1386where
1387 W: SyncWallet + SyncShardTrees,
1388{
1389 let sapling_start_index = wallet
1390 .get_shard_trees()
1391 .map_err(SyncError::WalletError)?
1392 .sapling
1393 .store()
1394 .get_shard_roots()
1395 .expect("infallible")
1396 .len() as u32;
1397 let orchard_start_index = wallet
1398 .get_shard_trees()
1399 .map_err(SyncError::WalletError)?
1400 .orchard
1401 .store()
1402 .get_shard_roots()
1403 .expect("infallible")
1404 .len() as u32;
1405 let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1406 client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1407 client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1408 );
1409
1410 let sapling_subtree_roots = sapling_subtree_roots?;
1411 let orchard_subtree_roots = orchard_subtree_roots?;
1412
1413 let sync_state = wallet
1414 .get_sync_state_mut()
1415 .map_err(SyncError::WalletError)?;
1416 state::add_shard_ranges(
1417 consensus_parameters,
1418 ShieldedProtocol::Sapling,
1419 sync_state,
1420 &sapling_subtree_roots,
1421 );
1422 state::add_shard_ranges(
1423 consensus_parameters,
1424 ShieldedProtocol::Orchard,
1425 sync_state,
1426 &orchard_subtree_roots,
1427 );
1428
1429 let shard_trees = wallet
1430 .get_shard_trees_mut()
1431 .map_err(SyncError::WalletError)?;
1432 witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1433 witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1434
1435 Ok(())
1436}
1437
1438async fn add_initial_frontier<W>(
1439 consensus_parameters: &impl consensus::Parameters,
1440 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1441 wallet: &mut W,
1442) -> Result<(), SyncError<W::Error>>
1443where
1444 W: SyncWallet + SyncShardTrees,
1445{
1446 let birthday =
1447 checked_birthday(consensus_parameters, wallet).map_err(SyncError::WalletError)?;
1448 if birthday
1449 == consensus_parameters
1450 .activation_height(consensus::NetworkUpgrade::Sapling)
1451 .expect("sapling activation height should always return Some")
1452 {
1453 return Ok(());
1454 }
1455
1456 let shard_trees = wallet
1459 .get_shard_trees_mut()
1460 .map_err(SyncError::WalletError)?;
1461 if shard_trees
1462 .sapling
1463 .store()
1464 .checkpoint_count()
1465 .expect("infallible")
1466 == 1
1467 {
1468 let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1469 shard_trees
1470 .sapling
1471 .insert_frontier(
1472 frontiers.final_sapling_tree().clone(),
1473 Retention::Checkpoint {
1474 id: birthday,
1475 marking: Marking::None,
1476 },
1477 )
1478 .expect("infallible");
1479 shard_trees
1480 .orchard
1481 .insert_frontier(
1482 frontiers.final_orchard_tree().clone(),
1483 Retention::Checkpoint {
1484 id: birthday,
1485 marking: Marking::None,
1486 },
1487 )
1488 .expect("infallible");
1489 }
1490
1491 Ok(())
1492}
1493
1494fn checked_birthday<W: SyncWallet>(
1496 consensus_parameters: &impl consensus::Parameters,
1497 wallet: &W,
1498) -> Result<BlockHeight, W::Error> {
1499 let wallet_birthday = wallet.get_birthday()?;
1500 let sapling_activation_height = consensus_parameters
1501 .activation_height(consensus::NetworkUpgrade::Sapling)
1502 .expect("sapling activation height should always return Some");
1503
1504 match wallet_birthday.cmp(&sapling_activation_height) {
1505 cmp::Ordering::Greater | cmp::Ordering::Equal => Ok(wallet_birthday),
1506 cmp::Ordering::Less => Ok(sapling_activation_height),
1507 }
1508}
1509
1510async fn mempool_monitor(
1515 mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1516 mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1517 unprocessed_transactions_count: Arc<AtomicU8>,
1518 shutdown_mempool: Arc<AtomicBool>,
1519) -> Result<(), MempoolError> {
1520 let mut interval = tokio::time::interval(Duration::from_secs(1));
1521 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1522 'main: loop {
1523 let response =
1524 client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1525
1526 match response {
1527 Ok(mut mempool_stream) => {
1528 interval.reset();
1529 loop {
1530 tokio::select! {
1531 mempool_stream_message = mempool_stream.message() => {
1532 match mempool_stream_message.unwrap_or(None) {
1533 Some(raw_transaction) => {
1534 let _ignore_error = mempool_transaction_sender
1535 .send(raw_transaction)
1536 .await;
1537 unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1538 }
1539 None => {
1540 continue 'main;
1541 }
1542 }
1543
1544 }
1545
1546 _ = interval.tick() => {
1547 if shutdown_mempool.load(atomic::Ordering::Acquire) {
1548 break 'main;
1549 }
1550 }
1551 }
1552 }
1553 }
1554 Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1555 Err(MempoolError::ServerError(e)) => {
1556 tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1557 tokio::time::sleep(Duration::from_secs(3)).await;
1558 }
1559 }
1560 }
1561
1562 Ok(())
1563}
1564
1565fn reset_invalid_spends<W>(wallet: &mut W) -> Result<(), SyncError<W::Error>>
1566where
1567 W: SyncWallet + SyncTransactions,
1568{
1569 let wallet_height = wallet
1570 .get_sync_state()
1571 .map_err(SyncError::WalletError)?
1572 .wallet_height()
1573 .expect("wallet height must exist after scan ranges have been updated");
1574 let wallet_transactions = wallet
1575 .get_wallet_transactions_mut()
1576 .map_err(SyncError::WalletError)?;
1577
1578 let invalid_txids = wallet_transactions
1579 .values()
1580 .filter(|transaction| {
1581 matches!(transaction.status(), ConfirmationStatus::Mempool(_))
1582 && transaction.status().get_height()
1583 <= wallet_height - MEMPOOL_SPEND_INVALIDATION_THRESHOLD
1584 })
1585 .map(super::wallet::WalletTransaction::txid)
1586 .chain(
1587 wallet_transactions
1588 .values()
1589 .filter(|transaction| {
1590 (matches!(transaction.status(), ConfirmationStatus::Calculated(_))
1591 || matches!(transaction.status(), ConfirmationStatus::Transmitted(_)))
1592 && wallet_height >= transaction.transaction().expiry_height()
1593 })
1594 .map(super::wallet::WalletTransaction::txid),
1595 )
1596 .collect::<Vec<_>>();
1597 reset_spends(wallet_transactions, invalid_txids);
1598
1599 Ok(())
1600}