use crate::api::BitcoinDB;
use crate::iter::fetch_connected_async::{connect_outpoints, update_unspent_cache};
#[cfg(not(feature = "on-disk-utxo"))]
use crate::iter::util::VecMap;
use crate::parser::proto::connected_proto::ConnectedBlock;
#[cfg(not(feature = "on-disk-utxo"))]
use crate::parser::proto::connected_proto::ConnectedTx;
#[cfg(not(feature = "on-disk-utxo"))]
use bitcoin::Txid;
#[cfg(not(feature = "on-disk-utxo"))]
use hash_hasher::HashedMap;
#[cfg(feature = "on-disk-utxo")]
use log::error;
#[cfg(feature = "on-disk-utxo")]
use num_cpus;
use par_iter_sync::{IntoParallelIteratorSync, ParIterSync};
#[cfg(feature = "on-disk-utxo")]
use rocksdb::{Options, SliceTransform, DB};
use std::sync::Arc;
#[cfg(not(feature = "on-disk-utxo"))]
use std::sync::Mutex;
#[cfg(feature = "on-disk-utxo")]
use tempdir::TempDir;
#[cfg(feature = "on-disk-utxo")]
pub(crate) const KEY_LENGTH: u32 = 32 + 4;
pub struct ConnectedBlockIter<TBlock> {
inner: ParIterSync<TBlock>,
#[cfg(feature = "on-disk-utxo")]
#[allow(dead_code)]
cache: Option<TempDir>,
}
impl<TBlock> ConnectedBlockIter<TBlock>
where
TBlock: 'static + ConnectedBlock + Send,
{
pub fn new(db: &BitcoinDB, end: usize) -> Self {
#[cfg(not(feature = "on-disk-utxo"))]
let unspent: Arc<
Mutex<HashedMap<Txid, Arc<Mutex<VecMap<<TBlock::Tx as ConnectedTx>::TOut>>>>>,
> = Arc::new(Mutex::new(HashedMap::default()));
#[cfg(feature = "on-disk-utxo")]
let cache_dir = {
match TempDir::new("rocks_db") {
Ok(tempdir) => tempdir,
Err(e) => {
error!("failed to create rocksDB tempdir for UTXO: {}", e);
return ConnectedBlockIter::null();
}
}
};
#[cfg(feature = "on-disk-utxo")]
let unspent = {
let mut options = Options::default();
options.create_if_missing(true);
options.set_max_background_jobs(num_cpus::get() as i32);
options.set_write_buffer_size(0x10000000);
options.set_level_zero_file_num_compaction_trigger(4);
options.set_max_bytes_for_level_base(0x40000000);
options.set_target_file_size_base(0x10000000);
options.set_max_bytes_for_level_multiplier(4.0);
options.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
Arc::new(match DB::open(&options, &cache_dir) {
Ok(db) => db,
Err(e) => {
error!("failed to create temp rocksDB for UTXO: {}", e);
return ConnectedBlockIter::null();
}
})
};
let heights = 0..end;
let db_copy = db.clone();
let unspent_copy = unspent.clone();
let output_iterator = heights
.into_par_iter_sync(move |height| {
update_unspent_cache::<TBlock>(&unspent_copy, &db_copy, height)
})
.into_par_iter_sync(move |blk| connect_outpoints(&unspent, blk));
ConnectedBlockIter {
inner: output_iterator,
#[cfg(feature = "on-disk-utxo")]
cache: Some(cache_dir),
}
}
#[cfg(feature = "on-disk-utxo")]
fn null() -> Self {
ConnectedBlockIter {
inner: Vec::new().into_par_iter_sync(|_: usize| Err(())),
#[cfg(feature = "on-disk-utxo")]
cache: None,
}
}
}
impl<TBlock> Iterator for ConnectedBlockIter<TBlock> {
type Item = TBlock;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
#[cfg(test)]
#[cfg(feature = "on-disk-utxo")]
mod test_empty {
use crate::{ConnectedBlockIter, SConnectedBlock};
#[test]
fn test_empty() {
let mut empty = ConnectedBlockIter::null();
for _ in 0..100 {
let b: Option<SConnectedBlock> = empty.next();
assert!(b.is_none());
}
}
}