bitcoin_explorer/iter/
iter_connected.rs

1use crate::api::BitcoinDB;
2use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
3#[cfg(not(feature = "on-disk-utxo"))]
4use crate::iter::util::VecMap;
5use crate::parser::proto::connected_proto::ConnectedBlock;
6#[cfg(not(feature = "on-disk-utxo"))]
7use crate::parser::proto::connected_proto::ConnectedTx;
8#[cfg(not(feature = "on-disk-utxo"))]
9use bitcoin::Txid;
10#[cfg(not(feature = "on-disk-utxo"))]
11use hash_hasher::HashedMap;
12#[cfg(feature = "on-disk-utxo")]
13use log::error;
14#[cfg(feature = "on-disk-utxo")]
15use num_cpus;
16use par_iter_sync::{IntoParallelIteratorSync, ParIterSync};
17#[cfg(feature = "on-disk-utxo")]
18use rocksdb::{Options, SliceTransform, DB};
19use std::sync::Arc;
20#[cfg(not(feature = "on-disk-utxo"))]
21use std::sync::Mutex;
22#[cfg(feature = "on-disk-utxo")]
23use tempdir::TempDir;
24
25/// 32 (txid) + 4 (i32 out n)
26#[cfg(feature = "on-disk-utxo")]
27pub(crate) const KEY_LENGTH: u32 = 32 + 4;
28
29/// iterate through blocks, and connecting outpoints.
30pub struct ConnectedBlockIter<TBlock> {
31    inner: ParIterSync<TBlock>,
32    #[cfg(feature = "on-disk-utxo")]
33    #[allow(dead_code)]
34    cache: Option<TempDir>,
35}
36
37impl<TBlock> ConnectedBlockIter<TBlock>
38where
39    TBlock: 'static + ConnectedBlock + Send,
40{
41    /// the worker threads are dispatched in this `new` constructor!
42    pub fn new(db: &BitcoinDB, end: usize) -> Self {
43        // UTXO cache
44        #[cfg(not(feature = "on-disk-utxo"))]
45        let unspent: Arc<
46            Mutex<HashedMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as ConnectedTx>::TOut>>>>>,
47        > = Arc::new(Mutex::new(HashedMap::default()));
48        #[cfg(feature = "on-disk-utxo")]
49        let cache_dir = {
50            match TempDir::new("rocks_db") {
51                Ok(tempdir) => tempdir,
52                Err(e) => {
53                    error!("failed to create rocksDB tempdir for UTXO: {}", e);
54                    return ConnectedBlockIter::null();
55                }
56            }
57        };
58        #[cfg(feature = "on-disk-utxo")]
59        let unspent = {
60            let mut options = Options::default();
61            // create table
62            options.create_if_missing(true);
63            // config to more jobs
64            options.set_max_background_jobs(num_cpus::get() as i32);
65            // configure mem-table to a large value (256 MB)
66            options.set_write_buffer_size(0x10000000);
67            // configure l0 and l1 size, let them have the same size (1 GB)
68            options.set_level_zero_file_num_compaction_trigger(4);
69            options.set_max_bytes_for_level_base(0x40000000);
70            // 256MB file size
71            options.set_target_file_size_base(0x10000000);
72            // use a smaller compaction multiplier
73            options.set_max_bytes_for_level_multiplier(4.0);
74            // use 8-byte prefix (2 ^ 64 is far enough for transaction counts)
75            options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
76            Arc::new(match DB::open(&options, &cache_dir) {
77                Ok(db) => db,
78                Err(e) => {
79                    error!("failed to create temp rocksDB for UTXO: {}", e);
80                    return ConnectedBlockIter::null();
81                }
82            })
83        };
84        // all tasks
85        let heights = 0..end;
86        let db_copy = db.clone();
87        let unspent_copy = unspent.clone();
88
89        let output_iterator = heights
90            .into_par_iter_sync(move |height| {
91                update_unspent_cache::<TBlock>(&unspent_copy, &db_copy, height)
92            })
93            .into_par_iter_sync(move |blk| connect_outpoints(&unspent, blk));
94
95        ConnectedBlockIter {
96            inner: output_iterator,
97            // cache dir will be deleted when ConnectedBlockIter is dropped
98            #[cfg(feature = "on-disk-utxo")]
99            cache: Some(cache_dir),
100        }
101    }
102
103    #[cfg(feature = "on-disk-utxo")]
104    fn null() -> Self {
105        ConnectedBlockIter {
106            inner: Vec::new().into_par_iter_sync(|_: usize| Err(())),
107            #[cfg(feature = "on-disk-utxo")]
108            cache: None,
109        }
110    }
111}
112
113impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
114    type Item = TBlock;
115
116    fn next(&mut self) -> Option<Self::Item> {
117        self.inner.next()
118    }
119}
120
121#[cfg(test)]
122#[cfg(feature = "on-disk-utxo")]
123mod test_empty {
124    use crate::{ConnectedBlockIter, SConnectedBlock};
125
126    #[test]
127    fn test_empty() {
128        let mut empty = ConnectedBlockIter::null();
129        for _ in 0..100 {
130            let b: Option<SConnectedBlock> = empty.next();
131            assert!(b.is_none());
132        }
133    }
134}