1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 ops::RangeInclusive,
18};
19
20use rocksdb::ColumnFamily;
21use zebra_chain::{
22 amount::{self, Amount, Constraint, NonNegative},
23 block::Height,
24 parameters::Network,
25 transaction::{self, Transaction},
26 transparent::{self, Input},
27};
28
29use crate::{
30 request::FinalizedBlock,
31 service::finalized_state::{
32 disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
33 disk_format::{
34 transparent::{
35 AddressBalanceLocation, AddressBalanceLocationChange, AddressBalanceLocationInner,
36 AddressBalanceLocationUpdates, AddressLocation, AddressTransaction,
37 AddressUnspentOutput, OutputLocation,
38 },
39 TransactionLocation,
40 },
41 zebra_db::ZebraDb,
42 },
43 BoxError, FromDisk, IntoDisk,
44};
45
46use super::super::TypedColumnFamily;
47
48pub const TX_LOC_BY_SPENT_OUT_LOC: &str = "tx_loc_by_spent_out_loc";
50
51pub const BALANCE_BY_TRANSPARENT_ADDR: &str = "balance_by_transparent_addr";
53
54pub const BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP: &str = "fetch_add_balance_and_received";
56
57pub fn fetch_add_balance_and_received(
59 _: &[u8],
60 existing_val: Option<&[u8]>,
61 operands: &rocksdb::MergeOperands,
62) -> Option<Vec<u8>> {
63 existing_val
68 .into_iter()
69 .chain(operands)
70 .map(AddressBalanceLocationChange::from_bytes)
71 .reduce(|a, b| (a + b).expect("address balance/received should not overflow"))
72 .map(|address_balance_location| address_balance_location.as_bytes().to_vec())
73}
74
75pub type TransactionLocationBySpentOutputLocationCf<'cf> =
80 TypedColumnFamily<'cf, OutputLocation, TransactionLocation>;
81
82impl ZebraDb {
83 pub(crate) fn tx_loc_by_spent_output_loc_cf(
87 &self,
88 ) -> TransactionLocationBySpentOutputLocationCf<'_> {
89 TransactionLocationBySpentOutputLocationCf::new(&self.db, TX_LOC_BY_SPENT_OUT_LOC)
90 .expect("column family was created when database was created")
91 }
92
93 pub fn tx_location_by_spent_output_location(
98 &self,
99 output_location: &OutputLocation,
100 ) -> Option<TransactionLocation> {
101 self.tx_loc_by_spent_output_loc_cf().zs_get(output_location)
102 }
103
104 pub fn address_balance_cf(&self) -> &ColumnFamily {
106 self.db.cf_handle(BALANCE_BY_TRANSPARENT_ADDR).unwrap()
107 }
108
109 #[allow(clippy::unwrap_in_result)]
112 pub fn address_balance_location(
113 &self,
114 address: &transparent::Address,
115 ) -> Option<AddressBalanceLocation> {
116 let balance_by_transparent_addr = self.address_balance_cf();
117
118 self.db.zs_get(&balance_by_transparent_addr, address)
119 }
120
121 pub fn address_balance(
124 &self,
125 address: &transparent::Address,
126 ) -> Option<(Amount<NonNegative>, u64)> {
127 self.address_balance_location(address)
128 .map(|abl| (abl.balance(), abl.received()))
129 }
130
131 pub fn address_location(&self, address: &transparent::Address) -> Option<AddressLocation> {
136 self.address_balance_location(address)
137 .map(|abl| abl.address_location())
138 }
139
140 pub fn output_location(&self, outpoint: &transparent::OutPoint) -> Option<OutputLocation> {
145 self.transaction_location(outpoint.hash)
146 .map(|transaction_location| {
147 OutputLocation::from_outpoint(transaction_location, outpoint)
148 })
149 }
150
151 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::OrderedUtxo> {
154 let output_location = self.output_location(outpoint)?;
155
156 self.utxo_by_location(output_location)
157 }
158
159 pub fn spending_tx_loc(&self, outpoint: &transparent::OutPoint) -> Option<TransactionLocation> {
163 let output_location = self.output_location(outpoint)?;
164 self.tx_location_by_spent_output_location(&output_location)
165 }
166
167 #[allow(clippy::unwrap_in_result)]
170 pub fn utxo_by_location(
171 &self,
172 output_location: OutputLocation,
173 ) -> Option<transparent::OrderedUtxo> {
174 let utxo_by_out_loc = self.db.cf_handle("utxo_by_out_loc").unwrap();
175
176 let output = self.db.zs_get(&utxo_by_out_loc, &output_location)?;
177
178 let utxo = transparent::OrderedUtxo::new(
179 output,
180 output_location.height(),
181 output_location.transaction_index().as_usize(),
182 );
183
184 Some(utxo)
185 }
186
187 pub fn address_utxos(
190 &self,
191 address: &transparent::Address,
192 ) -> BTreeMap<OutputLocation, transparent::Output> {
193 let address_location = match self.address_location(address) {
194 Some(address_location) => address_location,
195 None => return BTreeMap::new(),
196 };
197
198 let output_locations = self.address_utxo_locations(address_location);
199
200 output_locations
202 .iter()
203 .filter_map(|&addr_out_loc| {
204 Some((
205 addr_out_loc.unspent_output_location(),
206 self.utxo_by_location(addr_out_loc.unspent_output_location())?
207 .utxo
208 .output,
209 ))
210 })
211 .collect()
212 }
213
214 pub fn address_utxo_locations(
217 &self,
218 address_location: AddressLocation,
219 ) -> BTreeSet<AddressUnspentOutput> {
220 let utxo_loc_by_transparent_addr_loc = self
221 .db
222 .cf_handle("utxo_loc_by_transparent_addr_loc")
223 .unwrap();
224
225 let mut addr_unspent_outputs = BTreeSet::new();
227
228 let mut unspent_output = AddressUnspentOutput::address_iterator_start(address_location);
230
231 loop {
232 unspent_output = match self
234 .db
235 .zs_next_key_value_from(&utxo_loc_by_transparent_addr_loc, &unspent_output)
236 {
237 Some((unspent_output, ())) => unspent_output,
238 None => break,
240 };
241
242 if unspent_output.address_location() != address_location {
244 break;
245 }
246
247 addr_unspent_outputs.insert(unspent_output);
248
249 unspent_output.address_iterator_next();
251 }
252
253 addr_unspent_outputs
254 }
255
256 #[allow(clippy::unwrap_in_result)]
258 pub fn tx_id_by_location(&self, tx_location: TransactionLocation) -> Option<transaction::Hash> {
259 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
260
261 self.db.zs_get(&hash_by_tx_loc, &tx_location)
262 }
263
264 pub fn address_tx_ids(
271 &self,
272 address: &transparent::Address,
273 query_height_range: RangeInclusive<Height>,
274 ) -> BTreeMap<TransactionLocation, transaction::Hash> {
275 let address_location = match self.address_location(address) {
276 Some(address_location) => address_location,
277 None => return BTreeMap::new(),
278 };
279
280 if address_location.height() > *query_height_range.end() {
285 return BTreeMap::new();
286 }
287
288 let transaction_locations =
289 self.address_transaction_locations(address_location, query_height_range);
290
291 transaction_locations
292 .iter()
293 .map(|&tx_loc| {
294 (
295 tx_loc.transaction_location(),
296 self.tx_id_by_location(tx_loc.transaction_location())
297 .expect("transactions whose locations are stored must exist"),
298 )
299 })
300 .collect()
301 }
302
303 pub fn address_transaction_locations(
306 &self,
307 address_location: AddressLocation,
308 query_height_range: RangeInclusive<Height>,
309 ) -> BTreeSet<AddressTransaction> {
310 let tx_loc_by_transparent_addr_loc =
311 self.db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
312
313 let transaction_location_range =
316 AddressTransaction::address_iterator_range(address_location, query_height_range);
317
318 self.db
319 .zs_forward_range_iter(&tx_loc_by_transparent_addr_loc, transaction_location_range)
320 .map(|(tx_loc, ())| tx_loc)
321 .collect()
322 }
323
324 pub fn partial_finalized_transparent_balance(
337 &self,
338 addresses: &HashSet<transparent::Address>,
339 ) -> (Amount<NonNegative>, u64) {
340 let balance: amount::Result<(Amount<NonNegative>, u64)> = addresses
341 .iter()
342 .filter_map(|address| self.address_balance(address))
343 .try_fold(
344 (Amount::zero(), 0),
345 |(a_balance, a_received): (Amount<NonNegative>, u64), (b_balance, b_received)| {
346 let received = a_received.saturating_add(b_received);
347 Ok(((a_balance + b_balance)?, received))
348 },
349 );
350
351 balance.expect(
352 "unexpected amount overflow: value balances are valid, so partial sum should be valid",
353 )
354 }
355
356 pub fn partial_finalized_address_utxos(
370 &self,
371 addresses: &HashSet<transparent::Address>,
372 ) -> BTreeMap<OutputLocation, transparent::Output> {
373 addresses
374 .iter()
375 .flat_map(|address| self.address_utxos(address))
376 .collect()
377 }
378
379 pub fn partial_finalized_transparent_tx_ids(
401 &self,
402 addresses: &HashSet<transparent::Address>,
403 query_height_range: RangeInclusive<Height>,
404 ) -> BTreeMap<TransactionLocation, transaction::Hash> {
405 addresses
406 .iter()
407 .flat_map(|address| self.address_tx_ids(address, query_height_range.clone()))
408 .collect()
409 }
410}
411
412impl DiskWriteBatch {
413 #[allow(clippy::too_many_arguments)]
423 pub fn prepare_transparent_transaction_batch(
424 &mut self,
425 zebra_db: &ZebraDb,
426 network: &Network,
427 finalized: &FinalizedBlock,
428 new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
429 spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
430 spent_utxos_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
431 #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap<
432 transparent::OutPoint,
433 OutputLocation,
434 >,
435 mut address_balances: AddressBalanceLocationUpdates,
436 ) -> Result<(), BoxError> {
437 let db = &zebra_db.db;
438 let FinalizedBlock { block, height, .. } = finalized;
439
440 self.prepare_new_transparent_outputs_batch(
442 db,
443 network,
444 new_outputs_by_out_loc,
445 &mut address_balances,
446 )?;
447 self.prepare_spent_transparent_outputs_batch(
448 db,
449 network,
450 spent_utxos_by_out_loc,
451 &mut address_balances,
452 )?;
453
454 for (tx_index, transaction) in block.transactions.iter().enumerate() {
456 let spending_tx_location = TransactionLocation::from_usize(*height, tx_index);
457
458 self.prepare_spending_transparent_tx_ids_batch(
459 zebra_db,
460 network,
461 spending_tx_location,
462 transaction,
463 spent_utxos_by_outpoint,
464 #[cfg(feature = "indexer")]
465 out_loc_by_outpoint,
466 &address_balances,
467 )?;
468 }
469
470 self.prepare_transparent_balances_batch(db, address_balances)
471 }
472
473 #[allow(clippy::unwrap_in_result)]
488 pub fn prepare_new_transparent_outputs_batch(
489 &mut self,
490 db: &DiskDb,
491 network: &Network,
492 new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
493 address_balances: &mut AddressBalanceLocationUpdates,
494 ) -> Result<(), BoxError> {
495 let utxo_by_out_loc = db.cf_handle("utxo_by_out_loc").unwrap();
496 let utxo_loc_by_transparent_addr_loc =
497 db.cf_handle("utxo_loc_by_transparent_addr_loc").unwrap();
498 let tx_loc_by_transparent_addr_loc =
499 db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
500
501 for (new_output_location, utxo) in new_outputs_by_out_loc {
503 let unspent_output = &utxo.output;
504 let receiving_address = unspent_output.address(network);
505
506 if let Some(receiving_address) = receiving_address {
508 fn update_addr_loc<
517 C: Constraint + Copy + std::fmt::Debug,
518 T: std::ops::DerefMut<Target = AddressBalanceLocationInner<C>>
519 + From<AddressBalanceLocationInner<C>>,
520 >(
521 addr_locs: &mut HashMap<transparent::Address, T>,
522 receiving_address: transparent::Address,
523 new_output_location: &OutputLocation,
524 unspent_output: &transparent::Output,
525 ) -> AddressLocation {
526 let addr_loc = addr_locs.entry(receiving_address).or_insert_with(|| {
527 AddressBalanceLocationInner::new(*new_output_location).into()
528 });
529
530 addr_loc
532 .receive_output(unspent_output)
533 .expect("balance overflow already checked");
534
535 addr_loc.address_location()
536 }
537
538 let receiving_address_location = match address_balances {
540 AddressBalanceLocationUpdates::Merge(balance_changes) => update_addr_loc(
541 balance_changes,
542 receiving_address,
543 new_output_location,
544 unspent_output,
545 ),
546 AddressBalanceLocationUpdates::Insert(balances) => update_addr_loc(
547 balances,
548 receiving_address,
549 new_output_location,
550 unspent_output,
551 ),
552 };
553
554 let address_unspent_output =
556 AddressUnspentOutput::new(receiving_address_location, *new_output_location);
557 self.zs_insert(
558 &utxo_loc_by_transparent_addr_loc,
559 address_unspent_output,
560 (),
561 );
562
563 let address_transaction = AddressTransaction::new(
566 receiving_address_location,
567 new_output_location.transaction_location(),
568 );
569 self.zs_insert(&tx_loc_by_transparent_addr_loc, address_transaction, ());
570 }
571
572 self.zs_insert(&utxo_by_out_loc, new_output_location, unspent_output);
576 }
577
578 Ok(())
579 }
580
581 #[allow(clippy::unwrap_in_result)]
595 pub fn prepare_spent_transparent_outputs_batch(
596 &mut self,
597 db: &DiskDb,
598 network: &Network,
599 spent_utxos_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
600 address_balances: &mut AddressBalanceLocationUpdates,
601 ) -> Result<(), BoxError> {
602 let utxo_by_out_loc = db.cf_handle("utxo_by_out_loc").unwrap();
603 let utxo_loc_by_transparent_addr_loc =
604 db.cf_handle("utxo_loc_by_transparent_addr_loc").unwrap();
605
606 for (spent_output_location, utxo) in spent_utxos_by_out_loc {
610 let spent_output = &utxo.output;
611 let sending_address = spent_output.address(network);
612
613 if let Some(sending_address) = sending_address {
615 fn update_addr_loc<
616 C: Constraint + Copy + std::fmt::Debug,
617 T: std::ops::DerefMut<Target = AddressBalanceLocationInner<C>>
618 + From<AddressBalanceLocationInner<C>>,
619 >(
620 addr_locs: &mut HashMap<transparent::Address, T>,
621 sending_address: transparent::Address,
622 spent_output: &transparent::Output,
623 ) -> AddressLocation {
624 let addr_loc = addr_locs
625 .get_mut(&sending_address)
626 .expect("spent outputs must already have an address balance");
627
628 addr_loc
630 .spend_output(spent_output)
631 .expect("balance underflow already checked");
632
633 addr_loc.address_location()
634 }
635
636 let address_location = match address_balances {
637 AddressBalanceLocationUpdates::Merge(balance_changes) => {
638 update_addr_loc(balance_changes, sending_address, spent_output)
639 }
640 AddressBalanceLocationUpdates::Insert(balances) => {
641 update_addr_loc(balances, sending_address, spent_output)
642 }
643 };
644
645 let address_spent_output =
647 AddressUnspentOutput::new(address_location, *spent_output_location);
648
649 self.zs_delete(&utxo_loc_by_transparent_addr_loc, address_spent_output);
650 }
651
652 self.zs_delete(&utxo_by_out_loc, spent_output_location);
654 }
655
656 Ok(())
657 }
658
659 #[allow(clippy::unwrap_in_result, clippy::too_many_arguments)]
671 pub fn prepare_spending_transparent_tx_ids_batch(
672 &mut self,
673 zebra_db: &ZebraDb,
674 network: &Network,
675 spending_tx_location: TransactionLocation,
676 transaction: &Transaction,
677 spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
678 #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap<
679 transparent::OutPoint,
680 OutputLocation,
681 >,
682 address_balances: &AddressBalanceLocationUpdates,
683 ) -> Result<(), BoxError> {
684 let db = &zebra_db.db;
685 let tx_loc_by_transparent_addr_loc =
686 db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
687
688 for spent_outpoint in transaction.inputs().iter().filter_map(Input::outpoint) {
692 let spent_utxo = spent_utxos_by_outpoint
693 .get(&spent_outpoint)
694 .expect("unexpected missing spent output");
695 let sending_address = spent_utxo.output.address(network);
696
697 if let Some(sending_address) = sending_address {
699 let sending_address_location = match address_balances {
700 AddressBalanceLocationUpdates::Merge(balance_changes) => balance_changes
701 .get(&sending_address)
702 .expect("spent outputs must already have an address balance")
703 .address_location(),
704 AddressBalanceLocationUpdates::Insert(balances) => balances
705 .get(&sending_address)
706 .expect("spent outputs must already have an address balance")
707 .address_location(),
708 };
709
710 let address_transaction =
716 AddressTransaction::new(sending_address_location, spending_tx_location);
717 self.zs_insert(&tx_loc_by_transparent_addr_loc, address_transaction, ());
718 }
719
720 #[cfg(feature = "indexer")]
721 {
722 let spent_output_location = out_loc_by_outpoint
723 .get(&spent_outpoint)
724 .expect("spent outpoints must already have output locations");
725
726 let _ = zebra_db
727 .tx_loc_by_spent_output_loc_cf()
728 .with_batch_for_writing(self)
729 .zs_insert(spent_output_location, &spending_tx_location);
730 }
731 }
732
733 Ok(())
734 }
735
736 #[allow(clippy::unwrap_in_result)]
745 pub fn prepare_transparent_balances_batch(
746 &mut self,
747 db: &DiskDb,
748 address_balances: AddressBalanceLocationUpdates,
749 ) -> Result<(), BoxError> {
750 let balance_by_transparent_addr = db.cf_handle(BALANCE_BY_TRANSPARENT_ADDR).unwrap();
751
752 match address_balances {
755 AddressBalanceLocationUpdates::Merge(balance_changes) => {
756 for (address, address_balance_location_change) in balance_changes.into_iter() {
757 self.zs_merge(
758 &balance_by_transparent_addr,
759 address,
760 address_balance_location_change,
761 );
762 }
763 }
764
765 AddressBalanceLocationUpdates::Insert(balances) => {
766 for (address, address_balance_location_change) in balances.into_iter() {
767 self.zs_insert(
768 &balance_by_transparent_addr,
769 address,
770 address_balance_location_change,
771 );
772 }
773 }
774 };
775
776 Ok(())
777 }
778}