1use std::{
13 collections::{BTreeMap, HashMap, HashSet},
14 ops::RangeBounds,
15 sync::Arc,
16};
17
18use chrono::{DateTime, Utc};
19use itertools::Itertools;
20
21use zebra_chain::{
22 amount::NonNegative,
23 block::{self, Block, Height},
24 orchard,
25 parallel::tree::NoteCommitmentTrees,
26 parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
27 sapling,
28 serialization::{CompactSizeMessage, TrustedPreallocate, ZcashSerialize as _},
29 transaction::{self, Transaction},
30 transparent,
31 value_balance::ValueBalance,
32};
33
34use crate::{
35 request::FinalizedBlock,
36 service::finalized_state::{
37 disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
38 disk_format::{
39 block::TransactionLocation,
40 transparent::{AddressBalanceLocationUpdates, OutputLocation},
41 },
42 zebra_db::{metrics::block_precommit_metrics, ZebraDb},
43 FromDisk, RawBytes,
44 },
45 BoxError, HashOrHeight,
46};
47
48#[cfg(feature = "indexer")]
49use crate::request::Spend;
50
51#[cfg(test)]
52mod tests;
53
54impl ZebraDb {
55 pub fn is_empty(&self) -> bool {
61 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
62 self.db.zs_is_empty(&hash_by_height)
63 }
64
65 #[allow(clippy::unwrap_in_result)]
70 pub fn tip(&self) -> Option<(block::Height, block::Hash)> {
71 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
72 self.db.zs_last_key_value(&hash_by_height)
73 }
74
75 #[allow(clippy::unwrap_in_result)]
77 pub fn contains_height(&self, height: block::Height) -> bool {
78 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
79
80 self.db.zs_contains(&hash_by_height, &height)
81 }
82
83 #[allow(clippy::unwrap_in_result)]
85 pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
86 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
87 self.db.zs_get(&hash_by_height, &height)
88 }
89
90 #[allow(clippy::unwrap_in_result)]
92 pub fn contains_hash(&self, hash: block::Hash) -> bool {
93 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
94
95 self.db.zs_contains(&height_by_hash, &hash)
96 }
97
98 #[allow(clippy::unwrap_in_result)]
100 pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
101 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
102 self.db.zs_get(&height_by_hash, &hash)
103 }
104
105 #[allow(dead_code)]
107 pub fn prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
108 let height = self.height(hash)?;
109 let prev_height = height.previous().ok()?;
110
111 self.hash(prev_height)
112 }
113
114 #[allow(dead_code)]
116 pub fn prev_block_height_for_hash(&self, hash: block::Hash) -> Option<block::Height> {
117 let height = self.height(hash)?;
118
119 height.previous().ok()
120 }
121
122 #[allow(clippy::unwrap_in_result)]
127 pub fn block_header(&self, hash_or_height: HashOrHeight) -> Option<Arc<block::Header>> {
128 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
130
131 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
132 let header = self.db.zs_get(&block_header_by_height, &height)?;
133
134 Some(header)
135 }
136
137 #[allow(clippy::unwrap_in_result)]
140 fn raw_block_header(&self, hash_or_height: HashOrHeight) -> Option<RawBytes> {
141 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
143
144 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
145 let header: RawBytes = self.db.zs_get(&block_header_by_height, &height)?;
146
147 Some(header)
148 }
149
150 #[allow(clippy::unwrap_in_result)]
155 pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
156 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
158 let header = self.block_header(height.into())?;
159
160 let transactions = self
168 .transactions_by_height(height)
169 .map(|(_, tx)| tx)
170 .map(Arc::new)
171 .collect();
172
173 Some(Arc::new(Block {
174 header,
175 transactions,
176 }))
177 }
178
179 #[allow(clippy::unwrap_in_result)]
182 pub fn block_and_size(&self, hash_or_height: HashOrHeight) -> Option<(Arc<Block>, usize)> {
183 let (raw_header, raw_txs) = self.raw_block(hash_or_height)?;
184
185 let header = Arc::<block::Header>::from_bytes(raw_header.raw_bytes());
186 let txs: Vec<_> = raw_txs
187 .iter()
188 .map(|raw_tx| Arc::<Transaction>::from_bytes(raw_tx.raw_bytes()))
189 .collect();
190
191 let tx_count = CompactSizeMessage::try_from(txs.len())
196 .expect("must work for a previously serialized block");
197 let tx_raw = tx_count
198 .zcash_serialize_to_vec()
199 .expect("must work for a previously serialized block");
200 let size = raw_header.raw_bytes().len()
201 + raw_txs
202 .iter()
203 .map(|raw_tx| raw_tx.raw_bytes().len())
204 .sum::<usize>()
205 + tx_raw.len();
206
207 let block = Block {
208 header,
209 transactions: txs,
210 };
211 Some((Arc::new(block), size))
212 }
213
214 #[allow(clippy::unwrap_in_result)]
217 fn raw_block(&self, hash_or_height: HashOrHeight) -> Option<(RawBytes, Vec<RawBytes>)> {
218 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
220 let header = self.raw_block_header(height.into())?;
221
222 let transactions = self
225 .raw_transactions_by_height(height)
226 .map(|(_, tx)| tx)
227 .collect();
228
229 Some((header, transactions))
230 }
231
232 #[allow(clippy::unwrap_in_result)]
235 pub fn sapling_tree_by_hash_or_height(
236 &self,
237 hash_or_height: HashOrHeight,
238 ) -> Option<Arc<sapling::tree::NoteCommitmentTree>> {
239 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
240
241 self.sapling_tree_by_height(&height)
242 }
243
244 #[allow(clippy::unwrap_in_result)]
247 pub fn orchard_tree_by_hash_or_height(
248 &self,
249 hash_or_height: HashOrHeight,
250 ) -> Option<Arc<orchard::tree::NoteCommitmentTree>> {
251 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
252
253 self.orchard_tree_by_height(&height)
254 }
255
256 pub fn finalized_tip_hash(&self) -> block::Hash {
260 self.tip()
261 .map(|(_, hash)| hash)
262 .unwrap_or(GENESIS_PREVIOUS_BLOCK_HASH)
264 }
265
266 pub fn finalized_tip_height(&self) -> Option<block::Height> {
268 self.tip().map(|(height, _)| height)
269 }
270
271 pub fn tip_block(&self) -> Option<Arc<Block>> {
273 let (height, _hash) = self.tip()?;
274 self.block(height.into())
275 }
276
277 #[allow(clippy::unwrap_in_result)]
282 pub fn transaction(
283 &self,
284 hash: transaction::Hash,
285 ) -> Option<(Arc<Transaction>, Height, DateTime<Utc>)> {
286 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
287
288 let transaction_location = self.transaction_location(hash)?;
289
290 let block_time = self
291 .block_header(transaction_location.height.into())
292 .map(|header| header.time);
293
294 self.db
295 .zs_get(&tx_by_loc, &transaction_location)
296 .and_then(|tx| block_time.map(|time| (tx, transaction_location.height, time)))
297 }
298
299 #[allow(clippy::unwrap_in_result)]
301 pub fn transactions_by_height(
302 &self,
303 height: Height,
304 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_ {
305 self.transactions_by_location_range(
306 TransactionLocation::min_for_height(height)
307 ..=TransactionLocation::max_for_height(height),
308 )
309 }
310
311 #[allow(clippy::unwrap_in_result)]
314 fn raw_transactions_by_height(
315 &self,
316 height: Height,
317 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_ {
318 self.raw_transactions_by_location_range(
319 TransactionLocation::min_for_height(height)
320 ..=TransactionLocation::max_for_height(height),
321 )
322 }
323
324 #[allow(clippy::unwrap_in_result)]
327 pub fn transactions_by_location_range<R>(
328 &self,
329 range: R,
330 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_
331 where
332 R: RangeBounds<TransactionLocation>,
333 {
334 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
335 self.db.zs_forward_range_iter(tx_by_loc, range)
336 }
337
338 #[allow(clippy::unwrap_in_result)]
341 pub fn raw_transactions_by_location_range<R>(
342 &self,
343 range: R,
344 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_
345 where
346 R: RangeBounds<TransactionLocation>,
347 {
348 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
349 self.db.zs_forward_range_iter(tx_by_loc, range)
350 }
351
352 #[allow(clippy::unwrap_in_result)]
355 pub fn transaction_location(&self, hash: transaction::Hash) -> Option<TransactionLocation> {
356 let tx_loc_by_hash = self.db.cf_handle("tx_loc_by_hash").unwrap();
357 self.db.zs_get(&tx_loc_by_hash, &hash)
358 }
359
360 #[allow(clippy::unwrap_in_result)]
363 #[allow(dead_code)]
364 pub fn transaction_hash(&self, location: TransactionLocation) -> Option<transaction::Hash> {
365 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
366 self.db.zs_get(&hash_by_tx_loc, &location)
367 }
368
369 #[cfg(feature = "indexer")]
372 pub fn spending_transaction_hash(&self, spend: &Spend) -> Option<transaction::Hash> {
373 let tx_loc = match spend {
374 Spend::OutPoint(outpoint) => self.spending_tx_loc(outpoint)?,
375 Spend::Sprout(nullifier) => self.sprout_revealing_tx_loc(nullifier)?,
376 Spend::Sapling(nullifier) => self.sapling_revealing_tx_loc(nullifier)?,
377 Spend::Orchard(nullifier) => self.orchard_revealing_tx_loc(nullifier)?,
378 };
379
380 self.transaction_hash(tx_loc)
381 }
382
383 #[allow(clippy::unwrap_in_result)]
390 pub fn transaction_hashes_for_block(
391 &self,
392 hash_or_height: HashOrHeight,
393 ) -> Option<Arc<[transaction::Hash]>> {
394 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
396
397 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
399
400 let mut transaction_hashes = Vec::new();
402
403 for tx_index in 0..=Transaction::max_allocation() {
404 let tx_loc = TransactionLocation::from_u64(height, tx_index);
405
406 if let Some(tx_hash) = self.db.zs_get(&hash_by_tx_loc, &tx_loc) {
407 transaction_hashes.push(tx_hash);
408 } else {
409 break;
410 }
411 }
412
413 Some(transaction_hashes.into())
414 }
415
416 #[allow(clippy::unwrap_in_result)]
430 pub(in super::super) fn write_block(
431 &mut self,
432 finalized: FinalizedBlock,
433 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
434 network: &Network,
435 source: &str,
436 ) -> Result<block::Hash, BoxError> {
437 let tx_hash_indexes: HashMap<transaction::Hash, usize> = finalized
438 .transaction_hashes
439 .iter()
440 .enumerate()
441 .map(|(index, hash)| (*hash, index))
442 .collect();
443
444 let new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = finalized
450 .new_outputs
451 .iter()
452 .map(|(outpoint, ordered_utxo)| {
453 (
454 lookup_out_loc(finalized.height, outpoint, &tx_hash_indexes),
455 ordered_utxo.utxo.clone(),
456 )
457 })
458 .collect();
459
460 let spent_utxos: Vec<(transparent::OutPoint, OutputLocation, transparent::Utxo)> =
462 finalized
463 .block
464 .transactions
465 .iter()
466 .flat_map(|tx| tx.inputs().iter())
467 .flat_map(|input| input.outpoint())
468 .map(|outpoint| {
469 (
470 outpoint,
471 self.output_location(&outpoint).unwrap_or_else(|| {
474 lookup_out_loc(finalized.height, &outpoint, &tx_hash_indexes)
475 }),
476 self.utxo(&outpoint)
477 .map(|ordered_utxo| ordered_utxo.utxo)
478 .or_else(|| {
479 finalized
480 .new_outputs
481 .get(&outpoint)
482 .map(|ordered_utxo| ordered_utxo.utxo.clone())
483 })
484 .expect("already checked UTXO was in state or block"),
485 )
486 })
487 .collect();
488
489 let spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo> =
490 spent_utxos
491 .iter()
492 .map(|(outpoint, _output_loc, utxo)| (*outpoint, utxo.clone()))
493 .collect();
494
495 #[cfg(feature = "indexer")]
497 let out_loc_by_outpoint: HashMap<transparent::OutPoint, OutputLocation> = spent_utxos
498 .iter()
499 .map(|(outpoint, out_loc, _utxo)| (*outpoint, *out_loc))
500 .collect();
501 let spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = spent_utxos
502 .into_iter()
503 .map(|(_outpoint, out_loc, utxo)| (out_loc, utxo))
504 .collect();
505
506 let changed_addresses: HashSet<transparent::Address> = spent_utxos_by_out_loc
508 .values()
509 .chain(
510 finalized
511 .new_outputs
512 .values()
513 .map(|ordered_utxo| &ordered_utxo.utxo),
514 )
515 .filter_map(|utxo| utxo.output.address(network))
516 .unique()
517 .collect();
518
519 fn read_addr_locs<T, F: Fn(&transparent::Address) -> Option<T>>(
522 changed_addresses: HashSet<transparent::Address>,
523 f: F,
524 ) -> HashMap<transparent::Address, T> {
525 changed_addresses
526 .into_iter()
527 .filter_map(|address| Some((address.clone(), f(&address)?)))
528 .collect()
529 }
530
531 let address_balances: AddressBalanceLocationUpdates = if self.finished_format_upgrades() {
542 AddressBalanceLocationUpdates::Insert(read_addr_locs(changed_addresses, |addr| {
543 self.address_balance_location(addr)
544 }))
545 } else {
546 AddressBalanceLocationUpdates::Merge(read_addr_locs(changed_addresses, |addr| {
547 Some(self.address_balance_location(addr)?.into_new_change())
548 }))
549 };
550
551 let mut batch = DiskWriteBatch::new();
552
553 batch.prepare_block_batch(
555 self,
556 network,
557 &finalized,
558 new_outputs_by_out_loc,
559 spent_utxos_by_outpoint,
560 spent_utxos_by_out_loc,
561 #[cfg(feature = "indexer")]
562 out_loc_by_outpoint,
563 address_balances,
564 self.finalized_value_pool(),
565 prev_note_commitment_trees,
566 )?;
567
568 self.db.write(batch)?;
569
570 tracing::trace!(?source, "committed block from");
571
572 Ok(finalized.hash)
573 }
574
575 pub fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
577 self.db.write(batch)
578 }
579}
580
581fn lookup_out_loc(
585 height: Height,
586 outpoint: &transparent::OutPoint,
587 tx_hash_indexes: &HashMap<transaction::Hash, usize>,
588) -> OutputLocation {
589 let tx_index = tx_hash_indexes
590 .get(&outpoint.hash)
591 .expect("already checked UTXO was in state or block");
592
593 let tx_loc = TransactionLocation::from_usize(height, *tx_index);
594
595 OutputLocation::from_outpoint(tx_loc, outpoint)
596}
597
598impl DiskWriteBatch {
599 #[allow(clippy::too_many_arguments)]
611 pub fn prepare_block_batch(
612 &mut self,
613 zebra_db: &ZebraDb,
614 network: &Network,
615 finalized: &FinalizedBlock,
616 new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
617 spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo>,
618 spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
619 #[cfg(feature = "indexer")] out_loc_by_outpoint: HashMap<
620 transparent::OutPoint,
621 OutputLocation,
622 >,
623 address_balances: AddressBalanceLocationUpdates,
624 value_pool: ValueBalance<NonNegative>,
625 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
626 ) -> Result<(), BoxError> {
627 let db = &zebra_db.db;
628
629 self.prepare_block_header_and_transaction_data_batch(db, finalized)?;
631
632 self.prepare_shielded_transaction_batch(zebra_db, finalized)?;
639 self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees)?;
640
641 if !finalized.height.is_min() {
652 self.prepare_transparent_transaction_batch(
654 zebra_db,
655 network,
656 finalized,
657 &new_outputs_by_out_loc,
658 &spent_utxos_by_outpoint,
659 &spent_utxos_by_out_loc,
660 #[cfg(feature = "indexer")]
661 &out_loc_by_outpoint,
662 address_balances,
663 )?;
664 }
665 self.prepare_chain_value_pools_batch(
667 zebra_db,
668 finalized,
669 spent_utxos_by_outpoint,
670 value_pool,
671 )?;
672
673 block_precommit_metrics(&finalized.block, finalized.hash, finalized.height);
675
676 Ok(())
677 }
678
679 #[allow(clippy::unwrap_in_result)]
686 pub fn prepare_block_header_and_transaction_data_batch(
687 &mut self,
688 db: &DiskDb,
689 finalized: &FinalizedBlock,
690 ) -> Result<(), BoxError> {
691 let block_header_by_height = db.cf_handle("block_header_by_height").unwrap();
693 let hash_by_height = db.cf_handle("hash_by_height").unwrap();
694 let height_by_hash = db.cf_handle("height_by_hash").unwrap();
695
696 let tx_by_loc = db.cf_handle("tx_by_loc").unwrap();
698 let hash_by_tx_loc = db.cf_handle("hash_by_tx_loc").unwrap();
699 let tx_loc_by_hash = db.cf_handle("tx_loc_by_hash").unwrap();
700
701 let FinalizedBlock {
702 block,
703 hash,
704 height,
705 transaction_hashes,
706 ..
707 } = finalized;
708
709 self.zs_insert(&block_header_by_height, height, &block.header);
711
712 self.zs_insert(&hash_by_height, height, hash);
714 self.zs_insert(&height_by_hash, hash, height);
715
716 for (transaction_index, (transaction, transaction_hash)) in block
717 .transactions
718 .iter()
719 .zip(transaction_hashes.iter())
720 .enumerate()
721 {
722 let transaction_location = TransactionLocation::from_usize(*height, transaction_index);
723
724 self.zs_insert(&tx_by_loc, transaction_location, transaction);
726
727 self.zs_insert(&hash_by_tx_loc, transaction_location, transaction_hash);
729 self.zs_insert(&tx_loc_by_hash, transaction_hash, transaction_location);
730 }
731
732 Ok(())
733 }
734}