1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 ops::RangeInclusive,
18};
19
20use rocksdb::ColumnFamily;
21use zebra_chain::{
22 amount::{self, Amount, Constraint, NonNegative},
23 block::Height,
24 parameters::Network,
25 transaction::{self, Transaction},
26 transparent::{self, Input},
27};
28
29use crate::{
30 request::FinalizedBlock,
31 service::finalized_state::{
32 disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
33 disk_format::{
34 transparent::{
35 AddressBalanceLocation, AddressBalanceLocationChange, AddressBalanceLocationInner,
36 AddressBalanceLocationUpdates, AddressLocation, AddressTransaction,
37 AddressUnspentOutput, OutputLocation,
38 },
39 TransactionLocation,
40 },
41 zebra_db::ZebraDb,
42 },
43 FromDisk, IntoDisk,
44};
45
46use super::super::TypedColumnFamily;
47
48pub const TX_LOC_BY_SPENT_OUT_LOC: &str = "tx_loc_by_spent_out_loc";
50
51pub const BALANCE_BY_TRANSPARENT_ADDR: &str = "balance_by_transparent_addr";
53
54pub const BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP: &str = "fetch_add_balance_and_received";
56
57pub fn fetch_add_balance_and_received(
59 _: &[u8],
60 existing_val: Option<&[u8]>,
61 operands: &rocksdb::MergeOperands,
62) -> Option<Vec<u8>> {
63 existing_val
68 .into_iter()
69 .chain(operands)
70 .map(AddressBalanceLocationChange::from_bytes)
71 .reduce(|a, b| (a + b).expect("address balance/received should not overflow"))
72 .map(|address_balance_location| address_balance_location.as_bytes().to_vec())
73}
74
75pub type TransactionLocationBySpentOutputLocationCf<'cf> =
80 TypedColumnFamily<'cf, OutputLocation, TransactionLocation>;
81
82impl ZebraDb {
83 pub(crate) fn tx_loc_by_spent_output_loc_cf(
87 &self,
88 ) -> TransactionLocationBySpentOutputLocationCf<'_> {
89 TransactionLocationBySpentOutputLocationCf::new(&self.db, TX_LOC_BY_SPENT_OUT_LOC)
90 .expect("column family was created when database was created")
91 }
92
93 pub fn tx_location_by_spent_output_location(
98 &self,
99 output_location: &OutputLocation,
100 ) -> Option<TransactionLocation> {
101 self.tx_loc_by_spent_output_loc_cf().zs_get(output_location)
102 }
103
104 pub fn address_balance_cf(&self) -> &ColumnFamily {
106 self.db.cf_handle(BALANCE_BY_TRANSPARENT_ADDR).unwrap()
107 }
108
109 #[allow(clippy::unwrap_in_result)]
112 pub fn address_balance_location(
113 &self,
114 address: &transparent::Address,
115 ) -> Option<AddressBalanceLocation> {
116 let balance_by_transparent_addr = self.address_balance_cf();
117
118 self.db.zs_get(&balance_by_transparent_addr, address)
119 }
120
121 pub fn address_balance(
124 &self,
125 address: &transparent::Address,
126 ) -> Option<(Amount<NonNegative>, u64)> {
127 self.address_balance_location(address)
128 .map(|abl| (abl.balance(), abl.received()))
129 }
130
131 pub fn address_location(&self, address: &transparent::Address) -> Option<AddressLocation> {
136 self.address_balance_location(address)
137 .map(|abl| abl.address_location())
138 }
139
140 pub fn output_location(&self, outpoint: &transparent::OutPoint) -> Option<OutputLocation> {
145 self.transaction_location(outpoint.hash)
146 .map(|transaction_location| {
147 OutputLocation::from_outpoint(transaction_location, outpoint)
148 })
149 }
150
151 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::OrderedUtxo> {
154 let output_location = self.output_location(outpoint)?;
155
156 self.utxo_by_location(output_location)
157 }
158
159 pub fn spending_tx_loc(&self, outpoint: &transparent::OutPoint) -> Option<TransactionLocation> {
163 let output_location = self.output_location(outpoint)?;
164 self.tx_location_by_spent_output_location(&output_location)
165 }
166
167 #[allow(clippy::unwrap_in_result)]
170 pub fn utxo_by_location(
171 &self,
172 output_location: OutputLocation,
173 ) -> Option<transparent::OrderedUtxo> {
174 let utxo_by_out_loc = self.db.cf_handle("utxo_by_out_loc").unwrap();
175
176 let output = self.db.zs_get(&utxo_by_out_loc, &output_location)?;
177
178 let utxo = transparent::OrderedUtxo::new(
179 output,
180 output_location.height(),
181 output_location.transaction_index().as_usize(),
182 );
183
184 Some(utxo)
185 }
186
187 pub fn address_utxos(
190 &self,
191 address: &transparent::Address,
192 ) -> BTreeMap<OutputLocation, transparent::Output> {
193 let address_location = match self.address_location(address) {
194 Some(address_location) => address_location,
195 None => return BTreeMap::new(),
196 };
197
198 let output_locations = self.address_utxo_locations(address_location);
199
200 output_locations
202 .iter()
203 .filter_map(|&addr_out_loc| {
204 Some((
205 addr_out_loc.unspent_output_location(),
206 self.utxo_by_location(addr_out_loc.unspent_output_location())?
207 .utxo
208 .output,
209 ))
210 })
211 .collect()
212 }
213
214 pub fn address_utxo_locations(
217 &self,
218 address_location: AddressLocation,
219 ) -> BTreeSet<AddressUnspentOutput> {
220 let utxo_loc_by_transparent_addr_loc = self
221 .db
222 .cf_handle("utxo_loc_by_transparent_addr_loc")
223 .unwrap();
224
225 let mut addr_unspent_outputs = BTreeSet::new();
227
228 let mut unspent_output = AddressUnspentOutput::address_iterator_start(address_location);
230
231 loop {
232 unspent_output = match self
234 .db
235 .zs_next_key_value_from(&utxo_loc_by_transparent_addr_loc, &unspent_output)
236 {
237 Some((unspent_output, ())) => unspent_output,
238 None => break,
240 };
241
242 if unspent_output.address_location() != address_location {
244 break;
245 }
246
247 addr_unspent_outputs.insert(unspent_output);
248
249 unspent_output.address_iterator_next();
251 }
252
253 addr_unspent_outputs
254 }
255
256 #[allow(clippy::unwrap_in_result)]
258 pub fn tx_id_by_location(&self, tx_location: TransactionLocation) -> Option<transaction::Hash> {
259 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
260
261 self.db.zs_get(&hash_by_tx_loc, &tx_location)
262 }
263
264 pub fn address_tx_ids(
271 &self,
272 address: &transparent::Address,
273 query_height_range: RangeInclusive<Height>,
274 ) -> BTreeMap<TransactionLocation, transaction::Hash> {
275 let address_location = match self.address_location(address) {
276 Some(address_location) => address_location,
277 None => return BTreeMap::new(),
278 };
279
280 if address_location.height() > *query_height_range.end() {
285 return BTreeMap::new();
286 }
287
288 let transaction_locations =
289 self.address_transaction_locations(address_location, query_height_range);
290
291 transaction_locations
292 .iter()
293 .map(|&tx_loc| {
294 (
295 tx_loc.transaction_location(),
296 self.tx_id_by_location(tx_loc.transaction_location())
297 .expect("transactions whose locations are stored must exist"),
298 )
299 })
300 .collect()
301 }
302
303 pub fn address_transaction_locations(
306 &self,
307 address_location: AddressLocation,
308 query_height_range: RangeInclusive<Height>,
309 ) -> BTreeSet<AddressTransaction> {
310 let tx_loc_by_transparent_addr_loc =
311 self.db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
312
313 let transaction_location_range =
316 AddressTransaction::address_iterator_range(address_location, query_height_range);
317
318 self.db
319 .zs_forward_range_iter(&tx_loc_by_transparent_addr_loc, transaction_location_range)
320 .map(|(tx_loc, ())| tx_loc)
321 .collect()
322 }
323
324 pub fn partial_finalized_transparent_balance(
337 &self,
338 addresses: &HashSet<transparent::Address>,
339 ) -> (Amount<NonNegative>, u64) {
340 let balance: amount::Result<(Amount<NonNegative>, u64)> = addresses
341 .iter()
342 .filter_map(|address| self.address_balance(address))
343 .try_fold(
344 (Amount::zero(), 0),
345 |(a_balance, a_received): (Amount<NonNegative>, u64), (b_balance, b_received)| {
346 let received = a_received.saturating_add(b_received);
347 Ok(((a_balance + b_balance)?, received))
348 },
349 );
350
351 balance.expect(
352 "unexpected amount overflow: value balances are valid, so partial sum should be valid",
353 )
354 }
355
356 pub fn partial_finalized_address_utxos(
370 &self,
371 addresses: &HashSet<transparent::Address>,
372 ) -> BTreeMap<OutputLocation, transparent::Output> {
373 addresses
374 .iter()
375 .flat_map(|address| self.address_utxos(address))
376 .collect()
377 }
378
379 pub fn partial_finalized_transparent_tx_ids(
401 &self,
402 addresses: &HashSet<transparent::Address>,
403 query_height_range: RangeInclusive<Height>,
404 ) -> BTreeMap<TransactionLocation, transaction::Hash> {
405 addresses
406 .iter()
407 .flat_map(|address| self.address_tx_ids(address, query_height_range.clone()))
408 .collect()
409 }
410}
411
412impl DiskWriteBatch {
413 #[allow(clippy::too_many_arguments)]
419 pub fn prepare_transparent_transaction_batch(
420 &mut self,
421 zebra_db: &ZebraDb,
422 network: &Network,
423 finalized: &FinalizedBlock,
424 new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
425 spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
426 spent_utxos_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
427 #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap<
428 transparent::OutPoint,
429 OutputLocation,
430 >,
431 mut address_balances: AddressBalanceLocationUpdates,
432 ) {
433 let db = &zebra_db.db;
434 let FinalizedBlock { block, height, .. } = finalized;
435
436 self.prepare_new_transparent_outputs_batch(
438 db,
439 network,
440 new_outputs_by_out_loc,
441 &mut address_balances,
442 );
443 self.prepare_spent_transparent_outputs_batch(
444 db,
445 network,
446 spent_utxos_by_out_loc,
447 &mut address_balances,
448 );
449
450 for (tx_index, transaction) in block.transactions.iter().enumerate() {
452 let spending_tx_location = TransactionLocation::from_usize(*height, tx_index);
453
454 self.prepare_spending_transparent_tx_ids_batch(
455 zebra_db,
456 network,
457 spending_tx_location,
458 transaction,
459 spent_utxos_by_outpoint,
460 #[cfg(feature = "indexer")]
461 out_loc_by_outpoint,
462 &address_balances,
463 );
464 }
465
466 self.prepare_transparent_balances_batch(db, address_balances);
467 }
468
469 #[allow(clippy::unwrap_in_result)]
484 pub fn prepare_new_transparent_outputs_batch(
485 &mut self,
486 db: &DiskDb,
487 network: &Network,
488 new_outputs_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
489 address_balances: &mut AddressBalanceLocationUpdates,
490 ) {
491 let utxo_by_out_loc = db.cf_handle("utxo_by_out_loc").unwrap();
492 let utxo_loc_by_transparent_addr_loc =
493 db.cf_handle("utxo_loc_by_transparent_addr_loc").unwrap();
494 let tx_loc_by_transparent_addr_loc =
495 db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
496
497 for (new_output_location, utxo) in new_outputs_by_out_loc {
499 let unspent_output = &utxo.output;
500 let receiving_address = unspent_output.address(network);
501
502 if let Some(receiving_address) = receiving_address {
504 fn update_addr_loc<
513 C: Constraint + Copy + std::fmt::Debug,
514 T: std::ops::DerefMut<Target = AddressBalanceLocationInner<C>>
515 + From<AddressBalanceLocationInner<C>>,
516 >(
517 addr_locs: &mut HashMap<transparent::Address, T>,
518 receiving_address: transparent::Address,
519 new_output_location: &OutputLocation,
520 unspent_output: &transparent::Output,
521 ) -> AddressLocation {
522 let addr_loc = addr_locs.entry(receiving_address).or_insert_with(|| {
523 AddressBalanceLocationInner::new(*new_output_location).into()
524 });
525
526 addr_loc
528 .receive_output(unspent_output)
529 .expect("balance overflow already checked");
530
531 addr_loc.address_location()
532 }
533
534 let receiving_address_location = match address_balances {
536 AddressBalanceLocationUpdates::Merge(balance_changes) => update_addr_loc(
537 balance_changes,
538 receiving_address,
539 new_output_location,
540 unspent_output,
541 ),
542 AddressBalanceLocationUpdates::Insert(balances) => update_addr_loc(
543 balances,
544 receiving_address,
545 new_output_location,
546 unspent_output,
547 ),
548 };
549
550 let address_unspent_output =
552 AddressUnspentOutput::new(receiving_address_location, *new_output_location);
553 self.zs_insert(
554 &utxo_loc_by_transparent_addr_loc,
555 address_unspent_output,
556 (),
557 );
558
559 let address_transaction = AddressTransaction::new(
562 receiving_address_location,
563 new_output_location.transaction_location(),
564 );
565 self.zs_insert(&tx_loc_by_transparent_addr_loc, address_transaction, ());
566 }
567
568 self.zs_insert(&utxo_by_out_loc, new_output_location, unspent_output);
572 }
573 }
574
575 #[allow(clippy::unwrap_in_result)]
589 pub fn prepare_spent_transparent_outputs_batch(
590 &mut self,
591 db: &DiskDb,
592 network: &Network,
593 spent_utxos_by_out_loc: &BTreeMap<OutputLocation, transparent::Utxo>,
594 address_balances: &mut AddressBalanceLocationUpdates,
595 ) {
596 let utxo_by_out_loc = db.cf_handle("utxo_by_out_loc").unwrap();
597 let utxo_loc_by_transparent_addr_loc =
598 db.cf_handle("utxo_loc_by_transparent_addr_loc").unwrap();
599
600 for (spent_output_location, utxo) in spent_utxos_by_out_loc {
604 let spent_output = &utxo.output;
605 let sending_address = spent_output.address(network);
606
607 if let Some(sending_address) = sending_address {
609 fn update_addr_loc<
610 C: Constraint + Copy + std::fmt::Debug,
611 T: std::ops::DerefMut<Target = AddressBalanceLocationInner<C>>
612 + From<AddressBalanceLocationInner<C>>,
613 >(
614 addr_locs: &mut HashMap<transparent::Address, T>,
615 sending_address: transparent::Address,
616 spent_output: &transparent::Output,
617 ) -> AddressLocation {
618 let addr_loc = addr_locs
619 .get_mut(&sending_address)
620 .expect("spent outputs must already have an address balance");
621
622 addr_loc
624 .spend_output(spent_output)
625 .expect("balance underflow already checked");
626
627 addr_loc.address_location()
628 }
629
630 let address_location = match address_balances {
631 AddressBalanceLocationUpdates::Merge(balance_changes) => {
632 update_addr_loc(balance_changes, sending_address, spent_output)
633 }
634 AddressBalanceLocationUpdates::Insert(balances) => {
635 update_addr_loc(balances, sending_address, spent_output)
636 }
637 };
638
639 let address_spent_output =
641 AddressUnspentOutput::new(address_location, *spent_output_location);
642
643 self.zs_delete(&utxo_loc_by_transparent_addr_loc, address_spent_output);
644 }
645
646 self.zs_delete(&utxo_by_out_loc, spent_output_location);
648 }
649 }
650
651 #[allow(clippy::unwrap_in_result, clippy::too_many_arguments)]
663 pub fn prepare_spending_transparent_tx_ids_batch(
664 &mut self,
665 zebra_db: &ZebraDb,
666 network: &Network,
667 spending_tx_location: TransactionLocation,
668 transaction: &Transaction,
669 spent_utxos_by_outpoint: &HashMap<transparent::OutPoint, transparent::Utxo>,
670 #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap<
671 transparent::OutPoint,
672 OutputLocation,
673 >,
674 address_balances: &AddressBalanceLocationUpdates,
675 ) {
676 let db = &zebra_db.db;
677 let tx_loc_by_transparent_addr_loc =
678 db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap();
679
680 for spent_outpoint in transaction.inputs().iter().filter_map(Input::outpoint) {
684 let spent_utxo = spent_utxos_by_outpoint
685 .get(&spent_outpoint)
686 .expect("unexpected missing spent output");
687 let sending_address = spent_utxo.output.address(network);
688
689 if let Some(sending_address) = sending_address {
691 let sending_address_location = match address_balances {
692 AddressBalanceLocationUpdates::Merge(balance_changes) => balance_changes
693 .get(&sending_address)
694 .expect("spent outputs must already have an address balance")
695 .address_location(),
696 AddressBalanceLocationUpdates::Insert(balances) => balances
697 .get(&sending_address)
698 .expect("spent outputs must already have an address balance")
699 .address_location(),
700 };
701
702 let address_transaction =
708 AddressTransaction::new(sending_address_location, spending_tx_location);
709 self.zs_insert(&tx_loc_by_transparent_addr_loc, address_transaction, ());
710 }
711
712 #[cfg(feature = "indexer")]
713 {
714 let spent_output_location = out_loc_by_outpoint
715 .get(&spent_outpoint)
716 .expect("spent outpoints must already have output locations");
717
718 let _ = zebra_db
719 .tx_loc_by_spent_output_loc_cf()
720 .with_batch_for_writing(self)
721 .zs_insert(spent_output_location, &spending_tx_location);
722 }
723 }
724 }
725
726 #[allow(clippy::unwrap_in_result)]
735 pub fn prepare_transparent_balances_batch(
736 &mut self,
737 db: &DiskDb,
738 address_balances: AddressBalanceLocationUpdates,
739 ) {
740 let balance_by_transparent_addr = db.cf_handle(BALANCE_BY_TRANSPARENT_ADDR).unwrap();
741
742 match address_balances {
745 AddressBalanceLocationUpdates::Merge(balance_changes) => {
746 for (address, address_balance_location_change) in balance_changes.into_iter() {
747 self.zs_merge(
748 &balance_by_transparent_addr,
749 address,
750 address_balance_location_change,
751 );
752 }
753 }
754
755 AddressBalanceLocationUpdates::Insert(balances) => {
756 for (address, address_balance_location_change) in balances.into_iter() {
757 self.zs_insert(
758 &balance_by_transparent_addr,
759 address,
760 address_balance_location_change,
761 );
762 }
763 }
764 };
765 }
766}