bitcoin_explorer/iter/
iter_connected.rs

1use crate::api::BitcoinDB;
2use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
3#[cfg(not(feature = "on-disk-utxo"))]
4use crate::iter::util::VecMap;
5use crate::parser::proto::connected_proto::ConnectedBlock;
6#[cfg(not(feature = "on-disk-utxo"))]
7use crate::parser::proto::connected_proto::ConnectedTx;
8#[cfg(not(feature = "on-disk-utxo"))]
9use bitcoin::Txid;
10#[cfg(not(feature = "on-disk-utxo"))]
11use hash_hasher::HashedMap;
12#[cfg(feature = "on-disk-utxo")]
13use log::error;
14#[cfg(feature = "on-disk-utxo")]
15use num_cpus;
16use par_iter_sync::{IntoParallelIteratorSync, ParIterSync};
17#[cfg(feature = "on-disk-utxo")]
18use rocksdb::{Options, PlainTableFactoryOptions, SliceTransform, DB};
19use std::sync::Arc;
20#[cfg(not(feature = "on-disk-utxo"))]
21use std::sync::Mutex;
22#[cfg(feature = "on-disk-utxo")]
23use tempdir::TempDir;
24
25/// 32 (txid) + 4 (i32 out n)
26#[cfg(feature = "on-disk-utxo")]
27pub(crate) const KEY_LENGTH: u32 = 32 + 4;
28
29/// iterate through blocks, and connecting outpoints.
30pub struct ConnectedBlockIter<TBlock> {
31    inner: ParIterSync<TBlock>,
32    #[cfg(feature = "on-disk-utxo")]
33    #[allow(dead_code)]
34    cache: Option<TempDir>,
35}
36
37impl<TBlock> ConnectedBlockIter<TBlock>
38where
39    TBlock: 'static + ConnectedBlock + Send,
40{
41    /// the worker threads are dispatched in this `new` constructor!
42    pub fn new(db: &BitcoinDB, end: usize) -> Self {
43        // UTXO cache
44        #[cfg(not(feature = "on-disk-utxo"))]
45        let unspent: Arc<
46            Mutex<HashedMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as ConnectedTx>::TOut>>>>>,
47        > = Arc::new(Mutex::new(HashedMap::default()));
48        #[cfg(feature = "on-disk-utxo")]
49        let cache_dir = {
50            match TempDir::new("rocks_db") {
51                Ok(tempdir) => tempdir,
52                Err(e) => {
53                    error!("failed to create rocksDB tempdir for UTXO: {}", e);
54                    return ConnectedBlockIter::null();
55                }
56            }
57        };
58        #[cfg(feature = "on-disk-utxo")]
59        let unspent = {
60            let mut options = Options::default();
61            // create table
62            options.create_if_missing(true);
63            // config to more jobs
64            options.set_max_background_jobs(num_cpus::get() as i32);
65            // configure mem-table to a large value (256 MB)
66            options.set_write_buffer_size(0x10000000);
67            // configure l0 and l1 size, let them have the same size (1 GB)
68            options.set_level_zero_file_num_compaction_trigger(4);
69            options.set_max_bytes_for_level_base(0x40000000);
70            // 256MB file size
71            options.set_target_file_size_base(0x10000000);
72            // use a smaller compaction multiplier
73            options.set_max_bytes_for_level_multiplier(4.0);
74            // use 8-byte prefix (2 ^ 64 is far enough for transaction counts)
75            options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
76            // set to plain-table for better performance
77            options.set_plain_table_factory(&PlainTableFactoryOptions {
78                // 32 (txid) + 4 (i32 out n)
79                user_key_length: KEY_LENGTH,
80                bloom_bits_per_key: 10,
81                hash_table_ratio: 0.75,
82                index_sparseness: 16,
83            });
84            Arc::new(match DB::open(&options, &cache_dir) {
85                Ok(db) => db,
86                Err(e) => {
87                    error!("failed to create temp rocksDB for UTXO: {}", e);
88                    return ConnectedBlockIter::null();
89                }
90            })
91        };
92        // all tasks
93        let heights = 0..end;
94        let db_copy = db.clone();
95        let unspent_copy = unspent.clone();
96
97        let output_iterator = heights
98            .into_par_iter_sync(move |height| {
99                update_unspent_cache::<TBlock>(&unspent_copy, &db_copy, height)
100            })
101            .into_par_iter_sync(move |blk| connect_outpoints(&unspent, blk));
102
103        ConnectedBlockIter {
104            inner: output_iterator,
105            // cache dir will be deleted when ConnectedBlockIter is dropped
106            #[cfg(feature = "on-disk-utxo")]
107            cache: Some(cache_dir),
108        }
109    }
110
111    #[cfg(feature = "on-disk-utxo")]
112    fn null() -> Self {
113        ConnectedBlockIter {
114            inner: Vec::new().into_par_iter_sync(|_: usize| Err(())),
115            #[cfg(feature = "on-disk-utxo")]
116            cache: None,
117        }
118    }
119}
120
121impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
122    type Item = TBlock;
123
124    fn next(&mut self) -> Option<Self::Item> {
125        self.inner.next()
126    }
127}
128
129#[cfg(test)]
130#[cfg(feature = "on-disk-utxo")]
131mod test_empty {
132    use crate::{ConnectedBlockIter, SConnectedBlock};
133
134    #[test]
135    fn test_empty() {
136        let mut empty = ConnectedBlockIter::null();
137        for _ in 0..100 {
138            let b: Option<SConnectedBlock> = empty.next();
139            assert!(b.is_none());
140        }
141    }
142}