1use std::cmp;
4use std::collections::{BTreeMap, HashMap};
5use std::io::{Read, Write};
6use std::sync::Arc;
7use std::sync::atomic::{self, AtomicBool, AtomicU8};
8use std::time::{Duration, SystemTime};
9
10use byteorder::{ReadBytesExt, WriteBytesExt};
11use tokio::sync::{Mutex, mpsc};
12
13use incrementalmerkletree::{Marking, Retention};
14use orchard::tree::MerkleHashOrchard;
15use shardtree::store::ShardStore;
16use zcash_client_backend::data_api::scanning::{ScanPriority, ScanRange};
17use zcash_client_backend::proto::service::RawTransaction;
18use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
19use zcash_keys::keys::UnifiedFullViewingKey;
20use zcash_primitives::transaction::{Transaction, TxId};
21use zcash_primitives::zip32::AccountId;
22use zcash_protocol::ShieldedProtocol;
23use zcash_protocol::consensus::{self, BlockHeight};
24
25use zingo_status::confirmation_status::ConfirmationStatus;
26
27use crate::client::{self, FetchRequest};
28use crate::error::{
29 ContinuityError, MempoolError, ScanError, ServerError, SyncError, SyncModeError,
30 SyncStatusError,
31};
32use crate::keys::transparent::TransparentAddressId;
33use crate::scan::ScanResults;
34use crate::scan::task::{Scanner, ScannerState};
35use crate::scan::transactions::scan_transaction;
36use crate::wallet::traits::{
37 SyncBlocks, SyncNullifiers, SyncOutPoints, SyncShardTrees, SyncTransactions, SyncWallet,
38};
39use crate::wallet::{
40 Locator, NullifierMap, OutputId, SyncMode, SyncState, WalletBlock, WalletTransaction,
41};
42use crate::witness::LocatedTreeData;
43
44#[cfg(not(feature = "darkside_test"))]
45use crate::witness;
46
47pub(crate) mod spend;
48pub(crate) mod state;
49pub(crate) mod transparent;
50
51const VERIFY_BLOCK_RANGE_SIZE: u32 = 10;
52pub(crate) const MAX_VERIFICATION_WINDOW: u32 = 100;
53
54#[derive(Debug, Clone)]
59#[allow(missing_docs)]
60pub struct SyncStatus {
61 pub scan_ranges: Vec<ScanRange>,
62 pub sync_start_height: BlockHeight,
63 pub session_blocks_scanned: u32,
64 pub total_blocks_scanned: u32,
65 pub percentage_session_blocks_scanned: f32,
66 pub percentage_total_blocks_scanned: f32,
67 pub session_sapling_outputs_scanned: u32,
68 pub total_sapling_outputs_scanned: u32,
69 pub session_orchard_outputs_scanned: u32,
70 pub total_orchard_outputs_scanned: u32,
71 pub percentage_session_outputs_scanned: f32,
72 pub percentage_total_outputs_scanned: f32,
73}
74
75impl std::fmt::Display for SyncStatus {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "percentage complete: {}",
81 self.percentage_total_outputs_scanned
82 )
83 }
84}
85
86impl From<SyncStatus> for json::JsonValue {
87 fn from(value: SyncStatus) -> Self {
88 let scan_ranges: Vec<json::JsonValue> = value
89 .scan_ranges
90 .iter()
91 .map(|range| {
92 json::object! {
93 "priority" => format!("{:?}", range.priority()),
94 "start_block" => range.block_range().start.to_string(),
95 "end_block" => (range.block_range().end - 1).to_string(),
96 }
97 })
98 .collect();
99
100 json::object! {
101 "scan_ranges" => scan_ranges,
102 "sync_start_height" => u32::from(value.sync_start_height),
103 "session_blocks_scanned" => value.session_blocks_scanned,
104 "total_blocks_scanned" => value.total_blocks_scanned,
105 "percentage_session_blocks_scanned" => value.percentage_session_blocks_scanned,
106 "percentage_total_blocks_scanned" => value.percentage_total_blocks_scanned,
107 "session_sapling_outputs_scanned" => value.session_sapling_outputs_scanned,
108 "total_sapling_outputs_scanned" => value.total_sapling_outputs_scanned,
109 "session_orchard_outputs_scanned" => value.session_orchard_outputs_scanned,
110 "total_orchard_outputs_scanned" => value.total_orchard_outputs_scanned,
111 "percentage_session_outputs_scanned" => value.percentage_session_outputs_scanned,
112 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119#[allow(missing_docs)]
120pub struct SyncResult {
121 pub sync_start_height: BlockHeight,
122 pub sync_end_height: BlockHeight,
123 pub blocks_scanned: u32,
124 pub sapling_outputs_scanned: u32,
125 pub orchard_outputs_scanned: u32,
126 pub percentage_total_outputs_scanned: f32,
127}
128
129impl std::fmt::Display for SyncResult {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 write!(
132 f,
133 "Sync completed succesfully:
134{{
135 sync start height: {}
136 sync end height: {}
137 blocks scanned: {}
138 sapling outputs scanned: {}
139 orchard outputs scanned: {}
140 percentage total outputs scanned: {}
141}}",
142 self.sync_start_height,
143 self.sync_end_height,
144 self.blocks_scanned,
145 self.sapling_outputs_scanned,
146 self.orchard_outputs_scanned,
147 self.percentage_total_outputs_scanned,
148 )
149 }
150}
151
152impl From<SyncResult> for json::JsonValue {
153 fn from(value: SyncResult) -> Self {
154 json::object! {
155 "sync_start_height" => u32::from(value.sync_start_height),
156 "sync_end_height" => u32::from(value.sync_end_height),
157 "blocks_scanned" => value.blocks_scanned,
158 "sapling_outputs_scanned" => value.sapling_outputs_scanned,
159 "orchard_outputs_scanned" => value.orchard_outputs_scanned,
160 "percentage_total_outputs_scanned" => value.percentage_total_outputs_scanned,
161 }
162 }
163}
164
165#[derive(Default, Debug, Clone)]
167pub struct SyncConfig {
168 pub transparent_address_discovery: TransparentAddressDiscovery,
170}
171
172impl SyncConfig {
173 fn serialized_version() -> u8 {
174 0
175 }
176
177 pub fn read<R: Read>(mut reader: R) -> std::io::Result<Self> {
179 let _version = reader.read_u8()?;
180
181 let gap_limit = reader.read_u8()?;
182 let scopes = reader.read_u8()?;
183 Ok(Self {
184 transparent_address_discovery: TransparentAddressDiscovery {
185 gap_limit,
186 scopes: TransparentAddressDiscoveryScopes {
187 external: scopes & 0b1 != 0,
188 internal: scopes & 0b10 != 0,
189 refund: scopes & 0b100 != 0,
190 },
191 },
192 })
193 }
194
195 pub fn write<W: Write>(&mut self, mut writer: W) -> std::io::Result<()> {
197 writer.write_u8(Self::serialized_version())?;
198 writer.write_u8(self.transparent_address_discovery.gap_limit)?;
199 let mut scopes = 0;
200 if self.transparent_address_discovery.scopes.external {
201 scopes |= 0b1;
202 };
203 if self.transparent_address_discovery.scopes.internal {
204 scopes |= 0b10;
205 };
206 if self.transparent_address_discovery.scopes.refund {
207 scopes |= 0b100;
208 };
209 writer.write_u8(scopes)?;
210
211 Ok(())
212 }
213}
214
215#[derive(Debug, Clone)]
219pub struct TransparentAddressDiscovery {
220 pub gap_limit: u8,
222 pub scopes: TransparentAddressDiscoveryScopes,
224}
225
226impl Default for TransparentAddressDiscovery {
227 fn default() -> Self {
228 Self {
229 gap_limit: 10,
230 scopes: TransparentAddressDiscoveryScopes::default(),
231 }
232 }
233}
234
235impl TransparentAddressDiscovery {
236 pub fn minimal() -> Self {
238 Self {
239 gap_limit: 1,
240 scopes: TransparentAddressDiscoveryScopes::default(),
241 }
242 }
243
244 pub fn recovery() -> Self {
246 Self {
247 gap_limit: 20,
248 scopes: TransparentAddressDiscoveryScopes::recovery(),
249 }
250 }
251
252 pub fn disabled() -> Self {
255 Self {
256 gap_limit: 0,
257 scopes: TransparentAddressDiscoveryScopes {
258 external: false,
259 internal: false,
260 refund: false,
261 },
262 }
263 }
264}
265
266#[derive(Debug, Clone)]
268pub struct TransparentAddressDiscoveryScopes {
269 pub external: bool,
271 pub internal: bool,
273 pub refund: bool,
275}
276
277impl Default for TransparentAddressDiscoveryScopes {
278 fn default() -> Self {
279 Self {
280 external: true,
281 internal: false,
282 refund: true,
283 }
284 }
285}
286
287impl TransparentAddressDiscoveryScopes {
288 pub fn recovery() -> Self {
290 Self {
291 external: true,
292 internal: true,
293 refund: true,
294 }
295 }
296}
297
298pub async fn sync<P, W>(
309 client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
310 consensus_parameters: &P,
311 wallet: Arc<Mutex<W>>,
312 sync_mode: Arc<AtomicU8>,
313 config: SyncConfig,
314) -> Result<SyncResult, SyncError<W::Error>>
315where
316 P: consensus::Parameters + Sync + Send + 'static,
317 W: SyncWallet
318 + SyncBlocks
319 + SyncTransactions
320 + SyncNullifiers
321 + SyncOutPoints
322 + SyncShardTrees
323 + Send,
324{
325 let mut sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
326 if sync_mode_enum == SyncMode::NotRunning {
327 sync_mode_enum = SyncMode::Running;
328 sync_mode.store(sync_mode_enum as u8, atomic::Ordering::Release);
329 } else {
330 return Err(SyncModeError::SyncAlreadyRunning.into());
331 }
332
333 tracing::info!("Starting sync...");
334
335 let (fetch_request_sender, fetch_request_receiver) = mpsc::unbounded_channel();
337 let client_clone = client.clone();
338 let fetcher_handle =
339 tokio::spawn(
340 async move { client::fetch::fetch(fetch_request_receiver, client_clone).await },
341 );
342
343 let (mempool_transaction_sender, mut mempool_transaction_receiver) = mpsc::channel(100);
345 let shutdown_mempool = Arc::new(AtomicBool::new(false));
346 let shutdown_mempool_clone = shutdown_mempool.clone();
347 let unprocessed_mempool_transactions_count = Arc::new(AtomicU8::new(0));
348 let unprocessed_mempool_transactions_count_clone =
349 unprocessed_mempool_transactions_count.clone();
350 let mempool_handle = tokio::spawn(async move {
351 mempool_monitor(
352 client,
353 mempool_transaction_sender,
354 unprocessed_mempool_transactions_count_clone,
355 shutdown_mempool_clone,
356 )
357 .await
358 });
359
360 let mut wallet_guard = wallet.lock().await;
362
363 let mut wallet_height = state::get_wallet_height(consensus_parameters, &*wallet_guard)
364 .map_err(SyncError::WalletError)?;
365 let chain_height = client::get_chain_height(fetch_request_sender.clone()).await?;
366 if wallet_height > chain_height {
367 if wallet_height - chain_height > MAX_VERIFICATION_WINDOW {
368 return Err(SyncError::ChainError(MAX_VERIFICATION_WINDOW));
369 }
370 truncate_wallet_data(&mut *wallet_guard, chain_height)?;
371 wallet_height = chain_height;
372 }
373
374 let ufvks = wallet_guard
375 .get_unified_full_viewing_keys()
376 .map_err(SyncError::WalletError)?;
377
378 transparent::update_addresses_and_locators(
379 consensus_parameters,
380 &mut *wallet_guard,
381 fetch_request_sender.clone(),
382 &ufvks,
383 wallet_height,
384 chain_height,
385 config.transparent_address_discovery,
386 )
387 .await?;
388
389 #[cfg(not(feature = "darkside_test"))]
390 update_subtree_roots(
391 consensus_parameters,
392 fetch_request_sender.clone(),
393 &mut *wallet_guard,
394 )
395 .await?;
396
397 add_initial_frontier(
398 consensus_parameters,
399 fetch_request_sender.clone(),
400 &mut *wallet_guard,
401 )
402 .await?;
403
404 state::update_scan_ranges(
405 consensus_parameters,
406 wallet_height,
407 chain_height,
408 wallet_guard
409 .get_sync_state_mut()
410 .map_err(SyncError::WalletError)?,
411 )
412 .await;
413
414 state::set_initial_state(
415 consensus_parameters,
416 fetch_request_sender.clone(),
417 &mut *wallet_guard,
418 chain_height,
419 )
420 .await?;
421
422 let initial_verification_height = wallet_guard
423 .get_sync_state()
424 .map_err(SyncError::WalletError)?
425 .highest_scanned_height()
426 .expect("scan ranges must be non-empty")
427 + 1;
428
429 drop(wallet_guard);
430
431 let (scan_results_sender, mut scan_results_receiver) = mpsc::unbounded_channel();
433 let mut scanner = Scanner::new(
434 consensus_parameters.clone(),
435 scan_results_sender,
436 fetch_request_sender.clone(),
437 ufvks.clone(),
438 );
439 scanner.launch();
440
441 let mut interval = tokio::time::interval(Duration::from_millis(50));
444 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
445 loop {
446 tokio::select! {
447 Some((scan_range, scan_results)) = scan_results_receiver.recv() => {
448 let mut wallet_guard = wallet.lock().await;
449 process_scan_results(
450 consensus_parameters,
451 &mut *wallet_guard,
452 fetch_request_sender.clone(),
453 &ufvks,
454 scan_range,
455 scan_results,
456 initial_verification_height,
457 )
458 .await?;
459 wallet_guard.set_save_flag().map_err(SyncError::WalletError)?;
460 drop(wallet_guard);
461 }
462
463 Some(raw_transaction) = mempool_transaction_receiver.recv() => {
464 let mut wallet_guard = wallet.lock().await;
465 process_mempool_transaction(
466 consensus_parameters,
467 &ufvks,
468 &mut *wallet_guard,
469 raw_transaction,
470 )
471 .await?;
472 unprocessed_mempool_transactions_count.fetch_sub(1, atomic::Ordering::Release);
473 drop(wallet_guard);
474 }
475
476 _update_scanner = interval.tick() => {
477 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
478 match sync_mode_enum {
479 SyncMode::Paused => {
480 let mut pause_interval = tokio::time::interval(Duration::from_secs(1));
481 pause_interval.tick().await;
482 while sync_mode_enum == SyncMode::Paused {
483 pause_interval.tick().await;
484 sync_mode_enum = SyncMode::from_atomic_u8(sync_mode.clone())?;
485 }
486 },
487 SyncMode::Shutdown => {
488 let mut wallet_guard = wallet.lock().await;
489 let sync_status = match sync_status(&*wallet_guard).await {
490 Ok(status) => status,
491 Err(SyncStatusError::WalletError(e)) => {
492 return Err(SyncError::WalletError(e));
493 }
494 Err(SyncStatusError::NoSyncData) => {
495 panic!("sync data must exist!");
496 }
497 };
498 wallet_guard
499 .set_save_flag()
500 .map_err(SyncError::WalletError)?;
501 drop(wallet_guard);
502 tracing::info!("Sync successfully shutdown.");
503
504 return Ok(SyncResult {
505 sync_start_height: sync_status.sync_start_height,
506 sync_end_height: (sync_status
507 .scan_ranges
508 .last()
509 .expect("should be non-empty after syncing")
510 .block_range()
511 .end
512 - 1),
513 blocks_scanned: sync_status.session_blocks_scanned,
514 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
515 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
516 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
517 });
518 }
519 SyncMode::Running => (),
520 SyncMode::NotRunning => {
521 panic!("sync mode should not be manually set to NotRunning!");
522 },
523 }
524
525 scanner.update(&mut *wallet.lock().await, shutdown_mempool.clone()).await?;
526
527 if matches!(scanner.state, ScannerState::Shutdown) {
528 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
530 if is_shutdown(&scanner, unprocessed_mempool_transactions_count.clone())
531 {
532 tracing::info!("Sync successfully shutdown.");
533 break;
534 }
535 }
536 }
537 }
538 }
539
540 let mut wallet_guard = wallet.lock().await;
541 let sync_status = match sync_status(&*wallet_guard).await {
542 Ok(status) => status,
543 Err(SyncStatusError::WalletError(e)) => {
544 return Err(SyncError::WalletError(e));
545 }
546 Err(SyncStatusError::NoSyncData) => {
547 panic!("sync data must exist!");
548 }
549 };
550 wallet_guard
551 .set_save_flag()
552 .map_err(SyncError::WalletError)?;
553
554 drop(wallet_guard);
555 drop(scanner);
556 drop(fetch_request_sender);
557
558 match mempool_handle.await.expect("task panicked") {
559 Ok(_) => (),
560 Err(e @ MempoolError::ShutdownWithoutStream) => tracing::warn!("{e}"),
561 Err(e) => return Err(e.into()),
562 }
563 fetcher_handle.await.expect("task panicked");
564
565 Ok(SyncResult {
566 sync_start_height: sync_status.sync_start_height,
567 sync_end_height: (sync_status
568 .scan_ranges
569 .last()
570 .expect("should be non-empty after syncing")
571 .block_range()
572 .end
573 - 1),
574 blocks_scanned: sync_status.session_blocks_scanned,
575 sapling_outputs_scanned: sync_status.session_sapling_outputs_scanned,
576 orchard_outputs_scanned: sync_status.session_orchard_outputs_scanned,
577 percentage_total_outputs_scanned: sync_status.percentage_total_outputs_scanned,
578 })
579}
580
581pub async fn sync_status<W>(wallet: &W) -> Result<SyncStatus, SyncStatusError<W::Error>>
585where
586 W: SyncWallet + SyncBlocks,
587{
588 let (total_sapling_outputs_scanned, total_orchard_outputs_scanned) =
589 state::calculate_scanned_outputs(wallet).map_err(SyncStatusError::WalletError)?;
590 let total_outputs_scanned = total_sapling_outputs_scanned + total_orchard_outputs_scanned;
591
592 let sync_state = wallet
593 .get_sync_state()
594 .map_err(SyncStatusError::WalletError)?;
595 if sync_state.initial_sync_state.sync_start_height == 0.into() {
596 return Ok(SyncStatus {
597 scan_ranges: sync_state.scan_ranges.clone(),
598 sync_start_height: 0.into(),
599 session_blocks_scanned: 0,
600 total_blocks_scanned: 0,
601 percentage_session_blocks_scanned: 0.0,
602 percentage_total_blocks_scanned: 0.0,
603 session_sapling_outputs_scanned: 0,
604 session_orchard_outputs_scanned: 0,
605 total_sapling_outputs_scanned: 0,
606 total_orchard_outputs_scanned: 0,
607 percentage_session_outputs_scanned: 0.0,
608 percentage_total_outputs_scanned: 0.0,
609 });
610 }
611 let total_blocks_scanned = state::calculate_scanned_blocks(sync_state);
612
613 let birthday = sync_state
614 .wallet_birthday()
615 .ok_or(SyncStatusError::NoSyncData)?;
616 let wallet_height = sync_state
617 .wallet_height()
618 .ok_or(SyncStatusError::NoSyncData)?;
619 let total_blocks = wallet_height - birthday + 1;
620 let total_sapling_outputs = sync_state
621 .initial_sync_state
622 .wallet_tree_bounds
623 .sapling_final_tree_size
624 - sync_state
625 .initial_sync_state
626 .wallet_tree_bounds
627 .sapling_initial_tree_size;
628 let total_orchard_outputs = sync_state
629 .initial_sync_state
630 .wallet_tree_bounds
631 .orchard_final_tree_size
632 - sync_state
633 .initial_sync_state
634 .wallet_tree_bounds
635 .orchard_initial_tree_size;
636 let total_outputs = total_sapling_outputs + total_orchard_outputs;
637
638 let session_blocks_scanned =
639 total_blocks_scanned - sync_state.initial_sync_state.previously_scanned_blocks;
640 let percentage_session_blocks_scanned = (session_blocks_scanned as f32
641 / (total_blocks - sync_state.initial_sync_state.previously_scanned_blocks) as f32)
642 * 100.0;
643 let percentage_total_blocks_scanned =
644 (total_blocks_scanned as f32 / total_blocks as f32) * 100.0;
645
646 let session_sapling_outputs_scanned = total_sapling_outputs_scanned
647 - sync_state
648 .initial_sync_state
649 .previously_scanned_sapling_outputs;
650 let session_orchard_outputs_scanned = total_orchard_outputs_scanned
651 - sync_state
652 .initial_sync_state
653 .previously_scanned_orchard_outputs;
654 let session_outputs_scanned = session_sapling_outputs_scanned + session_orchard_outputs_scanned;
655 let previously_scanned_outputs = sync_state
656 .initial_sync_state
657 .previously_scanned_sapling_outputs
658 + sync_state
659 .initial_sync_state
660 .previously_scanned_orchard_outputs;
661 let percentage_session_outputs_scanned = (session_outputs_scanned as f32
662 / (total_outputs - previously_scanned_outputs) as f32)
663 * 100.0;
664 let percentage_total_outputs_scanned =
665 (total_outputs_scanned as f32 / total_outputs as f32) * 100.0;
666
667 Ok(SyncStatus {
668 scan_ranges: sync_state.scan_ranges.clone(),
669 sync_start_height: sync_state.initial_sync_state.sync_start_height,
670 session_blocks_scanned,
671 total_blocks_scanned,
672 percentage_session_blocks_scanned,
673 percentage_total_blocks_scanned,
674 session_sapling_outputs_scanned,
675 total_sapling_outputs_scanned,
676 session_orchard_outputs_scanned,
677 total_orchard_outputs_scanned,
678 percentage_session_outputs_scanned,
679 percentage_total_outputs_scanned,
680 })
681}
682
683pub fn scan_pending_transaction<W>(
690 consensus_parameters: &impl consensus::Parameters,
691 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
692 wallet: &mut W,
693 transaction: Transaction,
694 status: ConfirmationStatus,
695 datetime: u32,
696) -> Result<(), SyncError<W::Error>>
697where
698 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints,
699{
700 if matches!(status, ConfirmationStatus::Confirmed(_)) {
701 panic!("this fn is for unconfirmed transactions only");
702 }
703
704 let mut pending_transaction_nullifiers = NullifierMap::new();
705 let mut pending_transaction_outpoints = BTreeMap::new();
706 let transparent_addresses: HashMap<String, TransparentAddressId> = wallet
707 .get_transparent_addresses()
708 .map_err(SyncError::WalletError)?
709 .iter()
710 .map(|(id, address)| (address.clone(), *id))
711 .collect();
712 let pending_transaction = scan_transaction(
713 consensus_parameters,
714 ufvks,
715 transaction.txid(),
716 transaction,
717 status,
718 None,
719 &mut pending_transaction_nullifiers,
720 &mut pending_transaction_outpoints,
721 &transparent_addresses,
722 datetime,
723 )?;
724
725 let wallet_transactions = wallet
726 .get_wallet_transactions()
727 .map_err(SyncError::WalletError)?;
728 let transparent_output_ids = spend::collect_transparent_output_ids(wallet_transactions);
729 let transparent_spend_locators = spend::detect_transparent_spends(
730 &mut pending_transaction_outpoints,
731 transparent_output_ids,
732 );
733 let (sapling_derived_nullifiers, orchard_derived_nullifiers) =
734 spend::collect_derived_nullifiers(wallet_transactions);
735 let (sapling_spend_locators, orchard_spend_locators) = spend::detect_shielded_spends(
736 &mut pending_transaction_nullifiers,
737 sapling_derived_nullifiers,
738 orchard_derived_nullifiers,
739 );
740
741 if pending_transaction.transparent_coins().is_empty()
743 && pending_transaction.sapling_notes().is_empty()
744 && pending_transaction.orchard_notes().is_empty()
745 && pending_transaction.outgoing_orchard_notes().is_empty()
746 && pending_transaction.outgoing_sapling_notes().is_empty()
747 && transparent_spend_locators.is_empty()
748 && sapling_spend_locators.is_empty()
749 && orchard_spend_locators.is_empty()
750 {
751 return Ok(());
752 }
753
754 wallet
755 .insert_wallet_transaction(pending_transaction)
756 .map_err(SyncError::WalletError)?;
757 spend::update_spent_coins(
758 wallet
759 .get_wallet_transactions_mut()
760 .map_err(SyncError::WalletError)?,
761 transparent_spend_locators,
762 );
763 spend::update_spent_notes(
764 wallet
765 .get_wallet_transactions_mut()
766 .map_err(SyncError::WalletError)?,
767 sapling_spend_locators,
768 orchard_spend_locators,
769 );
770
771 Ok(())
772}
773
774pub fn add_scan_targets(sync_state: &mut SyncState, scan_targets: &[Locator]) {
785 for scan_target in scan_targets {
786 sync_state.locators.insert(*scan_target);
787 }
788}
789
790fn is_shutdown<P>(
792 scanner: &Scanner<P>,
793 mempool_unprocessed_transactions_count: Arc<AtomicU8>,
794) -> bool
795where
796 P: consensus::Parameters + Sync + Send + 'static,
797{
798 scanner.worker_poolsize() == 0
799 && mempool_unprocessed_transactions_count.load(atomic::Ordering::Acquire) == 0
800}
801
802async fn process_scan_results<W>(
804 consensus_parameters: &impl consensus::Parameters,
805 wallet: &mut W,
806 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
807 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
808 scan_range: ScanRange,
809 scan_results: Result<ScanResults, ScanError>,
810 initial_verification_height: BlockHeight,
811) -> Result<(), SyncError<W::Error>>
812where
813 W: SyncWallet
814 + SyncBlocks
815 + SyncTransactions
816 + SyncNullifiers
817 + SyncOutPoints
818 + SyncShardTrees
819 + Send,
820{
821 match scan_results {
822 Ok(results) => {
823 let ScanResults {
824 nullifiers,
825 outpoints,
826 scanned_blocks,
827 wallet_transactions,
828 sapling_located_trees,
829 orchard_located_trees,
830 } = results;
831 update_wallet_data(
832 consensus_parameters,
833 wallet,
834 fetch_request_sender.clone(),
835 &scan_range,
836 nullifiers,
837 outpoints,
838 wallet_transactions,
839 sapling_located_trees,
840 orchard_located_trees,
841 )
842 .await?;
843 spend::update_transparent_spends(wallet).map_err(SyncError::WalletError)?;
844 spend::update_shielded_spends(
845 consensus_parameters,
846 wallet,
847 fetch_request_sender,
848 ufvks,
849 &scanned_blocks,
850 )
851 .await?;
852 add_scanned_blocks(wallet, scanned_blocks, &scan_range)
853 .map_err(SyncError::WalletError)?;
854 state::set_scanned_scan_range(
855 wallet
856 .get_sync_state_mut()
857 .map_err(SyncError::WalletError)?,
858 scan_range.block_range().clone(),
859 );
860 state::merge_scan_ranges(
861 wallet
862 .get_sync_state_mut()
863 .map_err(SyncError::WalletError)?,
864 ScanPriority::Scanned,
865 );
866 remove_irrelevant_data(wallet).map_err(SyncError::WalletError)?;
867 tracing::debug!("Scan results processed.");
868 }
869 Err(ScanError::ContinuityError(ContinuityError::HashDiscontinuity { height, .. })) => {
870 if height == scan_range.block_range().start
871 && scan_range.priority() == ScanPriority::Verify
872 {
873 tracing::info!("Re-org detected.");
874 let sync_state = wallet
875 .get_sync_state_mut()
876 .map_err(SyncError::WalletError)?;
877 let wallet_height = sync_state
878 .wallet_height()
879 .expect("scan ranges should be non-empty in this scope");
880
881 state::set_scan_priority(
883 sync_state,
884 scan_range.block_range(),
885 ScanPriority::Verify,
886 );
887
888 let scan_range_to_verify = state::set_verify_scan_range(
890 sync_state,
891 height - 1,
892 state::VerifyEnd::VerifyHighest,
893 );
894 state::merge_scan_ranges(sync_state, ScanPriority::Verify);
895
896 truncate_wallet_data(wallet, scan_range_to_verify.block_range().start - 1)?;
897
898 if initial_verification_height - scan_range_to_verify.block_range().start
899 > MAX_VERIFICATION_WINDOW
900 {
901 return Err(ServerError::ChainVerificationError.into());
902 }
903
904 state::set_initial_state(
905 consensus_parameters,
906 fetch_request_sender.clone(),
907 wallet,
908 wallet_height,
909 )
910 .await?;
911 } else {
912 scan_results?;
913 }
914 }
915 Err(e) => return Err(e.into()),
916 }
917
918 Ok(())
919}
920
921async fn process_mempool_transaction<W>(
925 consensus_parameters: &impl consensus::Parameters,
926 ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
927 wallet: &mut W,
928 raw_transaction: RawTransaction,
929) -> Result<(), SyncError<W::Error>>
930where
931 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints,
932{
933 let block_height = BlockHeight::from_u32(
934 u32::try_from(raw_transaction.height + 1).expect("should be valid u32"),
935 );
936 let transaction = zcash_primitives::transaction::Transaction::read(
937 &raw_transaction.data[..],
938 consensus::BranchId::for_height(consensus_parameters, block_height),
939 )
940 .map_err(ServerError::InvalidTransaction)?;
941
942 tracing::debug!(
943 "mempool received txid {} at height {}",
944 transaction.txid(),
945 block_height
946 );
947
948 if let Some(tx) = wallet
949 .get_wallet_transactions()
950 .map_err(SyncError::WalletError)?
951 .get(&transaction.txid())
952 {
953 if tx.status().is_confirmed() {
954 return Ok(());
955 }
956 }
957
958 scan_pending_transaction(
959 consensus_parameters,
960 ufvks,
961 wallet,
962 transaction,
963 ConfirmationStatus::Mempool(block_height),
964 SystemTime::now()
965 .duration_since(SystemTime::UNIX_EPOCH)
966 .expect("infalliable for such long time periods")
967 .as_secs() as u32,
968 )?;
969
970 Ok(())
971}
972
973fn truncate_wallet_data<W>(
975 wallet: &mut W,
976 truncate_height: BlockHeight,
977) -> Result<(), SyncError<W::Error>>
978where
979 W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncShardTrees,
980{
981 let birthday = wallet
982 .get_sync_state()
983 .map_err(SyncError::WalletError)?
984 .wallet_birthday()
985 .expect("should be non-empty in this scope");
986 let checked_truncate_height = match truncate_height.cmp(&birthday) {
987 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => truncate_height,
988 std::cmp::Ordering::Less => birthday,
989 };
990
991 wallet
992 .truncate_wallet_blocks(checked_truncate_height)
993 .map_err(SyncError::WalletError)?;
994 wallet
995 .truncate_wallet_transactions(checked_truncate_height)
996 .map_err(SyncError::WalletError)?;
997 wallet
998 .truncate_nullifiers(checked_truncate_height)
999 .map_err(SyncError::WalletError)?;
1000 wallet.truncate_shard_trees(checked_truncate_height)?;
1001
1002 Ok(())
1003}
1004
1005#[allow(clippy::too_many_arguments)]
1007async fn update_wallet_data<W>(
1008 consensus_parameters: &impl consensus::Parameters,
1009 wallet: &mut W,
1010 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1011 scan_range: &ScanRange,
1012 nullifiers: NullifierMap,
1013 mut outpoints: BTreeMap<OutputId, Locator>,
1014 wallet_transactions: HashMap<TxId, WalletTransaction>,
1015 sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
1016 orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
1017) -> Result<(), SyncError<W::Error>>
1018where
1019 W: SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints + SyncShardTrees + Send,
1020{
1021 let sync_state = wallet
1022 .get_sync_state_mut()
1023 .map_err(SyncError::WalletError)?;
1024 let wallet_height = sync_state
1025 .wallet_height()
1026 .expect("scan ranges should not be empty in this scope");
1027 for transaction in wallet_transactions.values() {
1028 state::update_found_note_shard_priority(
1029 consensus_parameters,
1030 sync_state,
1031 ShieldedProtocol::Sapling,
1032 transaction,
1033 );
1034 state::update_found_note_shard_priority(
1035 consensus_parameters,
1036 sync_state,
1037 ShieldedProtocol::Orchard,
1038 transaction,
1039 );
1040 }
1041
1042 wallet
1043 .extend_wallet_transactions(wallet_transactions)
1044 .map_err(SyncError::WalletError)?;
1045 wallet
1046 .append_nullifiers(nullifiers)
1047 .map_err(SyncError::WalletError)?;
1048 wallet
1049 .append_outpoints(&mut outpoints)
1050 .map_err(SyncError::WalletError)?;
1051 wallet
1052 .update_shard_trees(
1053 fetch_request_sender,
1054 scan_range,
1055 wallet_height,
1056 sapling_located_trees,
1057 orchard_located_trees,
1058 )
1059 .await?;
1060
1061 Ok(())
1062}
1063
1064fn remove_irrelevant_data<W>(wallet: &mut W) -> Result<(), W::Error>
1065where
1066 W: SyncWallet + SyncBlocks + SyncOutPoints + SyncNullifiers + SyncTransactions,
1067{
1068 let fully_scanned_height = wallet
1069 .get_sync_state()?
1070 .fully_scanned_height()
1071 .expect("scan ranges must be non-empty");
1072
1073 wallet
1074 .get_outpoints_mut()?
1075 .retain(|_, (height, _)| *height > fully_scanned_height);
1076 wallet
1077 .get_nullifiers_mut()?
1078 .sapling
1079 .retain(|_, (height, _)| *height > fully_scanned_height);
1080 wallet
1081 .get_nullifiers_mut()?
1082 .orchard
1083 .retain(|_, (height, _)| *height > fully_scanned_height);
1084 wallet
1085 .get_sync_state_mut()?
1086 .locators
1087 .retain(|(height, _)| *height > fully_scanned_height);
1088 remove_irrelevant_blocks(wallet)?;
1089
1090 Ok(())
1091}
1092
1093fn remove_irrelevant_blocks<W>(wallet: &mut W) -> Result<(), W::Error>
1094where
1095 W: SyncWallet + SyncBlocks + SyncTransactions,
1096{
1097 let sync_state = wallet.get_sync_state()?;
1098 let highest_scanned_height = sync_state
1099 .highest_scanned_height()
1100 .expect("should be non-empty");
1101 let scanned_range_bounds = sync_state
1102 .scan_ranges()
1103 .iter()
1104 .filter(|scan_range| scan_range.priority() == ScanPriority::Scanned)
1105 .flat_map(|scanned_range| {
1106 vec![
1107 scanned_range.block_range().start,
1108 scanned_range.block_range().end - 1,
1109 ]
1110 })
1111 .collect::<Vec<_>>();
1112 let wallet_transaction_heights = wallet
1113 .get_wallet_transactions()?
1114 .values()
1115 .filter_map(|tx| tx.status().get_confirmed_height())
1116 .collect::<Vec<_>>();
1117
1118 wallet.get_wallet_blocks_mut()?.retain(|height, _| {
1119 *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1120 || scanned_range_bounds.contains(height)
1121 || wallet_transaction_heights.contains(height)
1122 });
1123
1124 Ok(())
1125}
1126
1127fn add_scanned_blocks<W>(
1128 wallet: &mut W,
1129 mut scanned_blocks: BTreeMap<BlockHeight, WalletBlock>,
1130 scan_range: &ScanRange,
1131) -> Result<(), W::Error>
1132where
1133 W: SyncWallet + SyncBlocks + SyncTransactions,
1134{
1135 let sync_state = wallet.get_sync_state()?;
1136 let highest_scanned_height = sync_state
1137 .highest_scanned_height()
1138 .expect("scan ranges must be non-empty");
1139
1140 let wallet_transaction_heights = wallet
1141 .get_wallet_transactions()?
1142 .values()
1143 .filter_map(|tx| tx.status().get_confirmed_height())
1144 .collect::<Vec<_>>();
1145
1146 scanned_blocks.retain(|height, _| {
1147 *height >= highest_scanned_height.saturating_sub(MAX_VERIFICATION_WINDOW)
1148 || *height == scan_range.block_range().start
1149 || *height == scan_range.block_range().end - 1
1150 || wallet_transaction_heights.contains(height)
1151 });
1152
1153 wallet.append_wallet_blocks(scanned_blocks)?;
1154
1155 Ok(())
1156}
1157
1158#[cfg(not(feature = "darkside_test"))]
1159async fn update_subtree_roots<W>(
1160 consensus_parameters: &impl consensus::Parameters,
1161 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1162 wallet: &mut W,
1163) -> Result<(), SyncError<W::Error>>
1164where
1165 W: SyncWallet + SyncShardTrees,
1166{
1167 let sapling_start_index = wallet
1168 .get_shard_trees()
1169 .map_err(SyncError::WalletError)?
1170 .sapling
1171 .store()
1172 .get_shard_roots()
1173 .expect("infallible")
1174 .len() as u32;
1175 let orchard_start_index = wallet
1176 .get_shard_trees()
1177 .map_err(SyncError::WalletError)?
1178 .orchard
1179 .store()
1180 .get_shard_roots()
1181 .expect("infallible")
1182 .len() as u32;
1183 let (sapling_subtree_roots, orchard_subtree_roots) = futures::join!(
1184 client::get_subtree_roots(fetch_request_sender.clone(), sapling_start_index, 0, 0),
1185 client::get_subtree_roots(fetch_request_sender, orchard_start_index, 1, 0)
1186 );
1187
1188 let sapling_subtree_roots = sapling_subtree_roots?;
1189 let orchard_subtree_roots = orchard_subtree_roots?;
1190
1191 let sync_state = wallet
1192 .get_sync_state_mut()
1193 .map_err(SyncError::WalletError)?;
1194 state::add_shard_ranges(
1195 consensus_parameters,
1196 ShieldedProtocol::Sapling,
1197 sync_state,
1198 &sapling_subtree_roots,
1199 );
1200 state::add_shard_ranges(
1201 consensus_parameters,
1202 ShieldedProtocol::Orchard,
1203 sync_state,
1204 &orchard_subtree_roots,
1205 );
1206
1207 let shard_trees = wallet
1208 .get_shard_trees_mut()
1209 .map_err(SyncError::WalletError)?;
1210 witness::add_subtree_roots(sapling_subtree_roots, &mut shard_trees.sapling)?;
1211 witness::add_subtree_roots(orchard_subtree_roots, &mut shard_trees.orchard)?;
1212
1213 Ok(())
1214}
1215
1216async fn add_initial_frontier<W>(
1217 consensus_parameters: &impl consensus::Parameters,
1218 fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
1219 wallet: &mut W,
1220) -> Result<(), SyncError<W::Error>>
1221where
1222 W: SyncWallet + SyncShardTrees,
1223{
1224 let birthday =
1225 checked_birthday(consensus_parameters, wallet).map_err(SyncError::WalletError)?;
1226 if birthday
1227 == consensus_parameters
1228 .activation_height(consensus::NetworkUpgrade::Sapling)
1229 .expect("sapling activation height should always return Some")
1230 {
1231 return Ok(());
1232 }
1233
1234 let shard_trees = wallet
1237 .get_shard_trees_mut()
1238 .map_err(SyncError::WalletError)?;
1239 if shard_trees
1240 .sapling
1241 .store()
1242 .checkpoint_count()
1243 .expect("infallible")
1244 == 1
1245 {
1246 let frontiers = client::get_frontiers(fetch_request_sender, birthday).await?;
1247 shard_trees
1248 .sapling
1249 .insert_frontier(
1250 frontiers.final_sapling_tree().clone(),
1251 Retention::Checkpoint {
1252 id: birthday,
1253 marking: Marking::None,
1254 },
1255 )
1256 .expect("infallible");
1257 shard_trees
1258 .orchard
1259 .insert_frontier(
1260 frontiers.final_orchard_tree().clone(),
1261 Retention::Checkpoint {
1262 id: birthday,
1263 marking: Marking::None,
1264 },
1265 )
1266 .expect("infallible");
1267 }
1268
1269 Ok(())
1270}
1271
1272fn checked_birthday<W: SyncWallet>(
1274 consensus_parameters: &impl consensus::Parameters,
1275 wallet: &W,
1276) -> Result<BlockHeight, W::Error> {
1277 let wallet_birthday = wallet.get_birthday()?;
1278 let sapling_activation_height = consensus_parameters
1279 .activation_height(consensus::NetworkUpgrade::Sapling)
1280 .expect("sapling activation height should always return Some");
1281
1282 match wallet_birthday.cmp(&sapling_activation_height) {
1283 cmp::Ordering::Greater | cmp::Ordering::Equal => Ok(wallet_birthday),
1284 cmp::Ordering::Less => Ok(sapling_activation_height),
1285 }
1286}
1287
1288async fn mempool_monitor(
1293 mut client: CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
1294 mempool_transaction_sender: mpsc::Sender<RawTransaction>,
1295 unprocessed_transactions_count: Arc<AtomicU8>,
1296 shutdown_mempool: Arc<AtomicBool>,
1297) -> Result<(), MempoolError> {
1298 let mut interval = tokio::time::interval(Duration::from_secs(1));
1299 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1300 'main: loop {
1301 let response =
1302 client::get_mempool_transaction_stream(&mut client, shutdown_mempool.clone()).await;
1303
1304 match response {
1305 Ok(mut mempool_stream) => {
1306 interval.reset();
1307 loop {
1308 tokio::select! {
1309 mempool_stream_message = mempool_stream.message() => {
1310 match mempool_stream_message.unwrap_or(None) {
1311 Some(raw_transaction) => {
1312 let _ignore_error = mempool_transaction_sender
1313 .send(raw_transaction)
1314 .await;
1315 unprocessed_transactions_count.fetch_add(1, atomic::Ordering::Release);
1316 }
1317 None => {
1318 continue 'main;
1319 }
1320 }
1321
1322 }
1323
1324 _ = interval.tick() => {
1325 if shutdown_mempool.load(atomic::Ordering::Acquire) {
1326 break 'main;
1327 }
1328 }
1329 }
1330 }
1331 }
1332 Err(e @ MempoolError::ShutdownWithoutStream) => return Err(e),
1333 Err(MempoolError::ServerError(e)) => {
1334 tracing::warn!("Mempool stream request failed! Status: {e}.\nRetrying...");
1335 tokio::time::sleep(Duration::from_secs(3)).await;
1336 }
1337 }
1338 }
1339
1340 Ok(())
1341}