bitcoin_explorer/iter/
iter_connected.rs1use crate::api::BitcoinDB;
2use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
3#[cfg(not(feature = "on-disk-utxo"))]
4use crate::iter::util::VecMap;
5use crate::parser::proto::connected_proto::ConnectedBlock;
6#[cfg(not(feature = "on-disk-utxo"))]
7use crate::parser::proto::connected_proto::ConnectedTx;
8#[cfg(not(feature = "on-disk-utxo"))]
9use bitcoin::Txid;
10#[cfg(not(feature = "on-disk-utxo"))]
11use hash_hasher::HashedMap;
12#[cfg(feature = "on-disk-utxo")]
13use log::error;
14#[cfg(feature = "on-disk-utxo")]
15use num_cpus;
16use par_iter_sync::{IntoParallelIteratorSync, ParIterSync};
17#[cfg(feature = "on-disk-utxo")]
18use rocksdb::{Options, SliceTransform, DB};
19use std::sync::Arc;
20#[cfg(not(feature = "on-disk-utxo"))]
21use std::sync::Mutex;
22#[cfg(feature = "on-disk-utxo")]
23use tempdir::TempDir;
24
25#[cfg(feature = "on-disk-utxo")]
27pub(crate) const KEY_LENGTH: u32 = 32 + 4;
28
29pub struct ConnectedBlockIter<TBlock> {
31 inner: ParIterSync<TBlock>,
32 #[cfg(feature = "on-disk-utxo")]
33 #[allow(dead_code)]
34 cache: Option<TempDir>,
35}
36
37impl<TBlock> ConnectedBlockIter<TBlock>
38where
39 TBlock: 'static + ConnectedBlock + Send,
40{
41 pub fn new(db: &BitcoinDB, end: usize) -> Self {
43 #[cfg(not(feature = "on-disk-utxo"))]
45 let unspent: Arc<
46 Mutex<HashedMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as ConnectedTx>::TOut>>>>>,
47 > = Arc::new(Mutex::new(HashedMap::default()));
48 #[cfg(feature = "on-disk-utxo")]
49 let cache_dir = {
50 match TempDir::new("rocks_db") {
51 Ok(tempdir) => tempdir,
52 Err(e) => {
53 error!("failed to create rocksDB tempdir for UTXO: {}", e);
54 return ConnectedBlockIter::null();
55 }
56 }
57 };
58 #[cfg(feature = "on-disk-utxo")]
59 let unspent = {
60 let mut options = Options::default();
61 options.create_if_missing(true);
63 options.set_max_background_jobs(num_cpus::get() as i32);
65 options.set_write_buffer_size(0x10000000);
67 options.set_level_zero_file_num_compaction_trigger(4);
69 options.set_max_bytes_for_level_base(0x40000000);
70 options.set_target_file_size_base(0x10000000);
72 options.set_max_bytes_for_level_multiplier(4.0);
74 options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
76 Arc::new(match DB::open(&options, &cache_dir) {
77 Ok(db) => db,
78 Err(e) => {
79 error!("failed to create temp rocksDB for UTXO: {}", e);
80 return ConnectedBlockIter::null();
81 }
82 })
83 };
84 let heights = 0..end;
86 let db_copy = db.clone();
87 let unspent_copy = unspent.clone();
88
89 let output_iterator = heights
90 .into_par_iter_sync(move |height| {
91 update_unspent_cache::<TBlock>(&unspent_copy, &db_copy, height)
92 })
93 .into_par_iter_sync(move |blk| connect_outpoints(&unspent, blk));
94
95 ConnectedBlockIter {
96 inner: output_iterator,
97 #[cfg(feature = "on-disk-utxo")]
99 cache: Some(cache_dir),
100 }
101 }
102
103 #[cfg(feature = "on-disk-utxo")]
104 fn null() -> Self {
105 ConnectedBlockIter {
106 inner: Vec::new().into_par_iter_sync(|_: usize| Err(())),
107 #[cfg(feature = "on-disk-utxo")]
108 cache: None,
109 }
110 }
111}
112
113impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
114 type Item = TBlock;
115
116 fn next(&mut self) -> Option<Self::Item> {
117 self.inner.next()
118 }
119}
120
121#[cfg(test)]
122#[cfg(feature = "on-disk-utxo")]
123mod test_empty {
124 use crate::{ConnectedBlockIter, SConnectedBlock};
125
126 #[test]
127 fn test_empty() {
128 let mut empty = ConnectedBlockIter::null();
129 for _ in 0..100 {
130 let b: Option<SConnectedBlock> = empty.next();
131 assert!(b.is_none());
132 }
133 }
134}