bitcoin_explorer/iter/
iter_connected.rs1use crate::api::BitcoinDB;
2use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
3#[cfg(not(feature = "on-disk-utxo"))]
4use crate::iter::util::VecMap;
5use crate::parser::proto::connected_proto::ConnectedBlock;
6#[cfg(not(feature = "on-disk-utxo"))]
7use crate::parser::proto::connected_proto::ConnectedTx;
8#[cfg(not(feature = "on-disk-utxo"))]
9use bitcoin::Txid;
10#[cfg(not(feature = "on-disk-utxo"))]
11use hash_hasher::HashedMap;
12#[cfg(feature = "on-disk-utxo")]
13use log::error;
14#[cfg(feature = "on-disk-utxo")]
15use num_cpus;
16use par_iter_sync::{IntoParallelIteratorSync, ParIterSync};
17#[cfg(feature = "on-disk-utxo")]
18use rocksdb::{Options, PlainTableFactoryOptions, SliceTransform, DB};
19use std::sync::Arc;
20#[cfg(not(feature = "on-disk-utxo"))]
21use std::sync::Mutex;
22#[cfg(feature = "on-disk-utxo")]
23use tempdir::TempDir;
24
25#[cfg(feature = "on-disk-utxo")]
27pub(crate) const KEY_LENGTH: u32 = 32 + 4;
28
29pub struct ConnectedBlockIter<TBlock> {
31 inner: ParIterSync<TBlock>,
32 #[cfg(feature = "on-disk-utxo")]
33 #[allow(dead_code)]
34 cache: Option<TempDir>,
35}
36
37impl<TBlock> ConnectedBlockIter<TBlock>
38where
39 TBlock: 'static + ConnectedBlock + Send,
40{
41 pub fn new(db: &BitcoinDB, end: usize) -> Self {
43 #[cfg(not(feature = "on-disk-utxo"))]
45 let unspent: Arc<
46 Mutex<HashedMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as ConnectedTx>::TOut>>>>>,
47 > = Arc::new(Mutex::new(HashedMap::default()));
48 #[cfg(feature = "on-disk-utxo")]
49 let cache_dir = {
50 match TempDir::new("rocks_db") {
51 Ok(tempdir) => tempdir,
52 Err(e) => {
53 error!("failed to create rocksDB tempdir for UTXO: {}", e);
54 return ConnectedBlockIter::null();
55 }
56 }
57 };
58 #[cfg(feature = "on-disk-utxo")]
59 let unspent = {
60 let mut options = Options::default();
61 options.create_if_missing(true);
63 options.set_max_background_jobs(num_cpus::get() as i32);
65 options.set_write_buffer_size(0x10000000);
67 options.set_level_zero_file_num_compaction_trigger(4);
69 options.set_max_bytes_for_level_base(0x40000000);
70 options.set_target_file_size_base(0x10000000);
72 options.set_max_bytes_for_level_multiplier(4.0);
74 options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
76 options.set_plain_table_factory(&PlainTableFactoryOptions {
78 user_key_length: KEY_LENGTH,
80 bloom_bits_per_key: 10,
81 hash_table_ratio: 0.75,
82 index_sparseness: 16,
83 });
84 Arc::new(match DB::open(&options, &cache_dir) {
85 Ok(db) => db,
86 Err(e) => {
87 error!("failed to create temp rocksDB for UTXO: {}", e);
88 return ConnectedBlockIter::null();
89 }
90 })
91 };
92 let heights = 0..end;
94 let db_copy = db.clone();
95 let unspent_copy = unspent.clone();
96
97 let output_iterator = heights
98 .into_par_iter_sync(move |height| {
99 update_unspent_cache::<TBlock>(&unspent_copy, &db_copy, height)
100 })
101 .into_par_iter_sync(move |blk| connect_outpoints(&unspent, blk));
102
103 ConnectedBlockIter {
104 inner: output_iterator,
105 #[cfg(feature = "on-disk-utxo")]
107 cache: Some(cache_dir),
108 }
109 }
110
111 #[cfg(feature = "on-disk-utxo")]
112 fn null() -> Self {
113 ConnectedBlockIter {
114 inner: Vec::new().into_par_iter_sync(|_: usize| Err(())),
115 #[cfg(feature = "on-disk-utxo")]
116 cache: None,
117 }
118 }
119}
120
121impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
122 type Item = TBlock;
123
124 fn next(&mut self) -> Option<Self::Item> {
125 self.inner.next()
126 }
127}
128
129#[cfg(test)]
130#[cfg(feature = "on-disk-utxo")]
131mod test_empty {
132 use crate::{ConnectedBlockIter, SConnectedBlock};
133
134 #[test]
135 fn test_empty() {
136 let mut empty = ConnectedBlockIter::null();
137 for _ in 0..100 {
138 let b: Option<SConnectedBlock> = empty.next();
139 assert!(b.is_none());
140 }
141 }
142}