1use std::{
13 collections::{BTreeMap, HashMap, HashSet},
14 ops::RangeBounds,
15 sync::Arc,
16};
17
18use chrono::{DateTime, Utc};
19use itertools::Itertools;
20
21use zebra_chain::{
22 amount::NonNegative,
23 block::{self, Block, Height},
24 orchard,
25 parallel::tree::NoteCommitmentTrees,
26 parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
27 sapling,
28 serialization::{CompactSizeMessage, TrustedPreallocate, ZcashSerialize as _},
29 transaction::{self, Transaction},
30 transparent,
31 value_balance::ValueBalance,
32};
33
34use crate::{
35 error::CommitCheckpointVerifiedError,
36 request::FinalizedBlock,
37 service::finalized_state::{
38 disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
39 disk_format::{
40 block::TransactionLocation,
41 transparent::{AddressBalanceLocationUpdates, OutputLocation},
42 },
43 zebra_db::{metrics::block_precommit_metrics, ZebraDb},
44 FromDisk, RawBytes,
45 },
46 HashOrHeight,
47};
48
49#[cfg(feature = "indexer")]
50use crate::request::Spend;
51
52#[cfg(test)]
53mod tests;
54
55impl ZebraDb {
56 pub fn is_empty(&self) -> bool {
62 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
63 self.db.zs_is_empty(&hash_by_height)
64 }
65
66 #[allow(clippy::unwrap_in_result)]
71 pub fn tip(&self) -> Option<(block::Height, block::Hash)> {
72 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
73 self.db.zs_last_key_value(&hash_by_height)
74 }
75
76 #[allow(clippy::unwrap_in_result)]
78 pub fn contains_height(&self, height: block::Height) -> bool {
79 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
80
81 self.db.zs_contains(&hash_by_height, &height)
82 }
83
84 #[allow(clippy::unwrap_in_result)]
86 pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
87 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
88 self.db.zs_get(&hash_by_height, &height)
89 }
90
91 #[allow(clippy::unwrap_in_result)]
93 pub fn contains_hash(&self, hash: block::Hash) -> bool {
94 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
95
96 self.db.zs_contains(&height_by_hash, &hash)
97 }
98
99 #[allow(clippy::unwrap_in_result)]
101 pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
102 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
103 self.db.zs_get(&height_by_hash, &hash)
104 }
105
106 #[allow(dead_code)]
108 pub fn prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
109 let height = self.height(hash)?;
110 let prev_height = height.previous().ok()?;
111
112 self.hash(prev_height)
113 }
114
115 #[allow(dead_code)]
117 pub fn prev_block_height_for_hash(&self, hash: block::Hash) -> Option<block::Height> {
118 let height = self.height(hash)?;
119
120 height.previous().ok()
121 }
122
123 #[allow(clippy::unwrap_in_result)]
128 pub fn block_header(&self, hash_or_height: HashOrHeight) -> Option<Arc<block::Header>> {
129 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
131
132 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
133 let header = self.db.zs_get(&block_header_by_height, &height)?;
134
135 Some(header)
136 }
137
138 #[allow(clippy::unwrap_in_result)]
141 fn raw_block_header(&self, hash_or_height: HashOrHeight) -> Option<RawBytes> {
142 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
144
145 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
146 let header: RawBytes = self.db.zs_get(&block_header_by_height, &height)?;
147
148 Some(header)
149 }
150
151 #[allow(clippy::unwrap_in_result)]
156 pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
157 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
159 let header = self.block_header(height.into())?;
160
161 let transactions = self
169 .transactions_by_height(height)
170 .map(|(_, tx)| tx)
171 .map(Arc::new)
172 .collect();
173
174 Some(Arc::new(Block {
175 header,
176 transactions,
177 }))
178 }
179
180 #[allow(clippy::unwrap_in_result)]
183 pub fn block_and_size(&self, hash_or_height: HashOrHeight) -> Option<(Arc<Block>, usize)> {
184 let (raw_header, raw_txs) = self.raw_block(hash_or_height)?;
185
186 let header = Arc::<block::Header>::from_bytes(raw_header.raw_bytes());
187 let txs: Vec<_> = raw_txs
188 .iter()
189 .map(|raw_tx| Arc::<Transaction>::from_bytes(raw_tx.raw_bytes()))
190 .collect();
191
192 let tx_count = CompactSizeMessage::try_from(txs.len())
197 .expect("must work for a previously serialized block");
198 let tx_raw = tx_count
199 .zcash_serialize_to_vec()
200 .expect("must work for a previously serialized block");
201 let size = raw_header.raw_bytes().len()
202 + raw_txs
203 .iter()
204 .map(|raw_tx| raw_tx.raw_bytes().len())
205 .sum::<usize>()
206 + tx_raw.len();
207
208 let block = Block {
209 header,
210 transactions: txs,
211 };
212 Some((Arc::new(block), size))
213 }
214
215 #[allow(clippy::unwrap_in_result)]
218 fn raw_block(&self, hash_or_height: HashOrHeight) -> Option<(RawBytes, Vec<RawBytes>)> {
219 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
221 let header = self.raw_block_header(height.into())?;
222
223 let transactions = self
226 .raw_transactions_by_height(height)
227 .map(|(_, tx)| tx)
228 .collect();
229
230 Some((header, transactions))
231 }
232
233 #[allow(clippy::unwrap_in_result)]
236 pub fn sapling_tree_by_hash_or_height(
237 &self,
238 hash_or_height: HashOrHeight,
239 ) -> Option<Arc<sapling::tree::NoteCommitmentTree>> {
240 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
241
242 self.sapling_tree_by_height(&height)
243 }
244
245 #[allow(clippy::unwrap_in_result)]
248 pub fn orchard_tree_by_hash_or_height(
249 &self,
250 hash_or_height: HashOrHeight,
251 ) -> Option<Arc<orchard::tree::NoteCommitmentTree>> {
252 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
253
254 self.orchard_tree_by_height(&height)
255 }
256
257 pub fn finalized_tip_hash(&self) -> block::Hash {
261 self.tip()
262 .map(|(_, hash)| hash)
263 .unwrap_or(GENESIS_PREVIOUS_BLOCK_HASH)
265 }
266
267 pub fn finalized_tip_height(&self) -> Option<block::Height> {
269 self.tip().map(|(height, _)| height)
270 }
271
272 pub fn tip_block(&self) -> Option<Arc<Block>> {
274 let (height, _hash) = self.tip()?;
275 self.block(height.into())
276 }
277
278 #[allow(clippy::unwrap_in_result)]
283 pub fn transaction(
284 &self,
285 hash: transaction::Hash,
286 ) -> Option<(Arc<Transaction>, Height, DateTime<Utc>)> {
287 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
288
289 let transaction_location = self.transaction_location(hash)?;
290
291 let block_time = self
292 .block_header(transaction_location.height.into())
293 .map(|header| header.time);
294
295 self.db
296 .zs_get(&tx_by_loc, &transaction_location)
297 .and_then(|tx| block_time.map(|time| (tx, transaction_location.height, time)))
298 }
299
300 #[allow(clippy::unwrap_in_result)]
302 pub fn transactions_by_height(
303 &self,
304 height: Height,
305 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_ {
306 self.transactions_by_location_range(
307 TransactionLocation::min_for_height(height)
308 ..=TransactionLocation::max_for_height(height),
309 )
310 }
311
312 #[allow(clippy::unwrap_in_result)]
315 fn raw_transactions_by_height(
316 &self,
317 height: Height,
318 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_ {
319 self.raw_transactions_by_location_range(
320 TransactionLocation::min_for_height(height)
321 ..=TransactionLocation::max_for_height(height),
322 )
323 }
324
325 #[allow(clippy::unwrap_in_result)]
328 pub fn transactions_by_location_range<R>(
329 &self,
330 range: R,
331 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_
332 where
333 R: RangeBounds<TransactionLocation>,
334 {
335 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
336 self.db.zs_forward_range_iter(tx_by_loc, range)
337 }
338
339 #[allow(clippy::unwrap_in_result)]
342 pub fn raw_transactions_by_location_range<R>(
343 &self,
344 range: R,
345 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_
346 where
347 R: RangeBounds<TransactionLocation>,
348 {
349 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
350 self.db.zs_forward_range_iter(tx_by_loc, range)
351 }
352
353 #[allow(clippy::unwrap_in_result)]
356 pub fn transaction_location(&self, hash: transaction::Hash) -> Option<TransactionLocation> {
357 let tx_loc_by_hash = self.db.cf_handle("tx_loc_by_hash").unwrap();
358 self.db.zs_get(&tx_loc_by_hash, &hash)
359 }
360
361 #[allow(clippy::unwrap_in_result)]
364 #[allow(dead_code)]
365 pub fn transaction_hash(&self, location: TransactionLocation) -> Option<transaction::Hash> {
366 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
367 self.db.zs_get(&hash_by_tx_loc, &location)
368 }
369
370 #[cfg(feature = "indexer")]
373 pub fn spending_transaction_hash(&self, spend: &Spend) -> Option<transaction::Hash> {
374 let tx_loc = match spend {
375 Spend::OutPoint(outpoint) => self.spending_tx_loc(outpoint)?,
376 Spend::Sprout(nullifier) => self.sprout_revealing_tx_loc(nullifier)?,
377 Spend::Sapling(nullifier) => self.sapling_revealing_tx_loc(nullifier)?,
378 Spend::Orchard(nullifier) => self.orchard_revealing_tx_loc(nullifier)?,
379 };
380
381 self.transaction_hash(tx_loc)
382 }
383
384 #[allow(clippy::unwrap_in_result)]
391 pub fn transaction_hashes_for_block(
392 &self,
393 hash_or_height: HashOrHeight,
394 ) -> Option<Arc<[transaction::Hash]>> {
395 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
397
398 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
400
401 let mut transaction_hashes = Vec::new();
403
404 for tx_index in 0..=Transaction::max_allocation() {
405 let tx_loc = TransactionLocation::from_u64(height, tx_index);
406
407 if let Some(tx_hash) = self.db.zs_get(&hash_by_tx_loc, &tx_loc) {
408 transaction_hashes.push(tx_hash);
409 } else {
410 break;
411 }
412 }
413
414 Some(transaction_hashes.into())
415 }
416
417 #[allow(clippy::unwrap_in_result)]
432 pub(in super::super) fn write_block(
433 &mut self,
434 finalized: FinalizedBlock,
435 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
436 network: &Network,
437 source: &str,
438 ) -> Result<block::Hash, CommitCheckpointVerifiedError> {
439 let tx_hash_indexes: HashMap<transaction::Hash, usize> = finalized
440 .transaction_hashes
441 .iter()
442 .enumerate()
443 .map(|(index, hash)| (*hash, index))
444 .collect();
445
446 let new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = finalized
452 .new_outputs
453 .iter()
454 .map(|(outpoint, ordered_utxo)| {
455 (
456 lookup_out_loc(finalized.height, outpoint, &tx_hash_indexes),
457 ordered_utxo.utxo.clone(),
458 )
459 })
460 .collect();
461
462 let spent_utxos: Vec<(transparent::OutPoint, OutputLocation, transparent::Utxo)> =
464 finalized
465 .block
466 .transactions
467 .iter()
468 .flat_map(|tx| tx.inputs().iter())
469 .flat_map(|input| input.outpoint())
470 .map(|outpoint| {
471 (
472 outpoint,
473 self.output_location(&outpoint).unwrap_or_else(|| {
476 lookup_out_loc(finalized.height, &outpoint, &tx_hash_indexes)
477 }),
478 self.utxo(&outpoint)
479 .map(|ordered_utxo| ordered_utxo.utxo)
480 .or_else(|| {
481 finalized
482 .new_outputs
483 .get(&outpoint)
484 .map(|ordered_utxo| ordered_utxo.utxo.clone())
485 })
486 .expect("already checked UTXO was in state or block"),
487 )
488 })
489 .collect();
490
491 let spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo> =
492 spent_utxos
493 .iter()
494 .map(|(outpoint, _output_loc, utxo)| (*outpoint, utxo.clone()))
495 .collect();
496
497 #[cfg(feature = "indexer")]
499 let out_loc_by_outpoint: HashMap<transparent::OutPoint, OutputLocation> = spent_utxos
500 .iter()
501 .map(|(outpoint, out_loc, _utxo)| (*outpoint, *out_loc))
502 .collect();
503 let spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = spent_utxos
504 .into_iter()
505 .map(|(_outpoint, out_loc, utxo)| (out_loc, utxo))
506 .collect();
507
508 let changed_addresses: HashSet<transparent::Address> = spent_utxos_by_out_loc
510 .values()
511 .chain(
512 finalized
513 .new_outputs
514 .values()
515 .map(|ordered_utxo| &ordered_utxo.utxo),
516 )
517 .filter_map(|utxo| utxo.output.address(network))
518 .unique()
519 .collect();
520
521 fn read_addr_locs<T, F: Fn(&transparent::Address) -> Option<T>>(
524 changed_addresses: HashSet<transparent::Address>,
525 f: F,
526 ) -> HashMap<transparent::Address, T> {
527 changed_addresses
528 .into_iter()
529 .filter_map(|address| Some((address.clone(), f(&address)?)))
530 .collect()
531 }
532
533 let address_balances: AddressBalanceLocationUpdates = if self.finished_format_upgrades() {
544 AddressBalanceLocationUpdates::Insert(read_addr_locs(changed_addresses, |addr| {
545 self.address_balance_location(addr)
546 }))
547 } else {
548 AddressBalanceLocationUpdates::Merge(read_addr_locs(changed_addresses, |addr| {
549 Some(self.address_balance_location(addr)?.into_new_change())
550 }))
551 };
552
553 let mut batch = DiskWriteBatch::new();
554
555 batch.prepare_block_batch(
557 self,
558 network,
559 &finalized,
560 new_outputs_by_out_loc,
561 spent_utxos_by_outpoint,
562 spent_utxos_by_out_loc,
563 #[cfg(feature = "indexer")]
564 out_loc_by_outpoint,
565 address_balances,
566 self.finalized_value_pool(),
567 prev_note_commitment_trees,
568 )?;
569
570 let batch_start = std::time::Instant::now();
572 self.db
573 .write(batch)
574 .expect("unexpected rocksdb error while writing block");
575 metrics::histogram!("zebra.state.rocksdb.batch_commit.duration_seconds")
576 .record(batch_start.elapsed().as_secs_f64());
577
578 tracing::trace!(?source, "committed block from");
579
580 Ok(finalized.hash)
581 }
582
583 pub fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
585 self.db.write(batch)
586 }
587}
588
589fn lookup_out_loc(
593 height: Height,
594 outpoint: &transparent::OutPoint,
595 tx_hash_indexes: &HashMap<transaction::Hash, usize>,
596) -> OutputLocation {
597 let tx_index = tx_hash_indexes
598 .get(&outpoint.hash)
599 .expect("already checked UTXO was in state or block");
600
601 let tx_loc = TransactionLocation::from_usize(height, *tx_index);
602
603 OutputLocation::from_outpoint(tx_loc, outpoint)
604}
605
606impl DiskWriteBatch {
607 #[allow(clippy::too_many_arguments)]
620 pub fn prepare_block_batch(
621 &mut self,
622 zebra_db: &ZebraDb,
623 network: &Network,
624 finalized: &FinalizedBlock,
625 new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
626 spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo>,
627 spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
628 #[cfg(feature = "indexer")] out_loc_by_outpoint: HashMap<
629 transparent::OutPoint,
630 OutputLocation,
631 >,
632 address_balances: AddressBalanceLocationUpdates,
633 value_pool: ValueBalance<NonNegative>,
634 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
635 ) -> Result<(), CommitCheckpointVerifiedError> {
636 let db = &zebra_db.db;
637
638 self.prepare_block_header_and_transaction_data_batch(db, finalized);
640
641 self.prepare_shielded_transaction_batch(zebra_db, finalized);
648 self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees);
649
650 if !finalized.height.is_min() {
661 self.prepare_transparent_transaction_batch(
663 zebra_db,
664 network,
665 finalized,
666 &new_outputs_by_out_loc,
667 &spent_utxos_by_outpoint,
668 &spent_utxos_by_out_loc,
669 #[cfg(feature = "indexer")]
670 &out_loc_by_outpoint,
671 address_balances,
672 );
673 }
674
675 self.prepare_chain_value_pools_batch(
677 zebra_db,
678 finalized,
679 spent_utxos_by_outpoint,
680 value_pool,
681 )?;
682
683 block_precommit_metrics(&finalized.block, finalized.hash, finalized.height);
685
686 Ok(())
687 }
688
689 #[allow(clippy::unwrap_in_result)]
692 pub fn prepare_block_header_and_transaction_data_batch(
693 &mut self,
694 db: &DiskDb,
695 finalized: &FinalizedBlock,
696 ) {
697 let block_header_by_height = db.cf_handle("block_header_by_height").unwrap();
699 let hash_by_height = db.cf_handle("hash_by_height").unwrap();
700 let height_by_hash = db.cf_handle("height_by_hash").unwrap();
701
702 let tx_by_loc = db.cf_handle("tx_by_loc").unwrap();
704 let hash_by_tx_loc = db.cf_handle("hash_by_tx_loc").unwrap();
705 let tx_loc_by_hash = db.cf_handle("tx_loc_by_hash").unwrap();
706
707 let FinalizedBlock {
708 block,
709 hash,
710 height,
711 transaction_hashes,
712 ..
713 } = finalized;
714
715 self.zs_insert(&block_header_by_height, height, &block.header);
717
718 self.zs_insert(&hash_by_height, height, hash);
720 self.zs_insert(&height_by_hash, hash, height);
721
722 for (transaction_index, (transaction, transaction_hash)) in block
723 .transactions
724 .iter()
725 .zip(transaction_hashes.iter())
726 .enumerate()
727 {
728 let transaction_location = TransactionLocation::from_usize(*height, transaction_index);
729
730 self.zs_insert(&tx_by_loc, transaction_location, transaction);
732
733 self.zs_insert(&hash_by_tx_loc, transaction_location, transaction_hash);
735 self.zs_insert(&tx_loc_by_hash, transaction_hash, transaction_location);
736 }
737 }
738}