use std::path::Path;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, PAGE_SIZE, Reader, Stamp};
use crate::parallel_import;
mod addresses;
mod blocks;
mod inputs;
mod macros;
mod outputs;
mod scripts;
mod transactions;
pub use addresses::*;
pub use blocks::*;
pub use inputs::*;
pub use outputs::*;
pub use scripts::*;
pub use transactions::*;
use crate::Indexes;
#[derive(Clone, Traversable)]
pub struct Vecs {
db: Database,
pub blocks: BlocksVecs,
pub transactions: TransactionsVecs,
pub inputs: InputsVecs,
pub outputs: OutputsVecs,
pub addresses: AddressesVecs,
pub scripts: ScriptsVecs,
}
impl Vecs {
pub fn forced_import(parent: &Path, version: Version) -> Result<Self> {
tracing::debug!("Opening vecs database...");
let db = Database::open(&parent.join("vecs"))?;
tracing::debug!("Setting min len...");
db.set_min_len(PAGE_SIZE * 50_000_000)?;
tracing::debug!("Importing sub-vecs in parallel...");
let (blocks, transactions, inputs, outputs, addresses, scripts) = parallel_import! {
blocks = {
tracing::debug!("Importing BlocksVecs...");
let r = BlocksVecs::forced_import(&db, version);
tracing::debug!("BlocksVecs imported.");
r
},
transactions = {
tracing::debug!("Importing TransactionsVecs...");
let r = TransactionsVecs::forced_import(&db, version);
tracing::debug!("TransactionsVecs imported.");
r
},
inputs = {
tracing::debug!("Importing InputsVecs...");
let r = InputsVecs::forced_import(&db, version);
tracing::debug!("InputsVecs imported.");
r
},
outputs = {
tracing::debug!("Importing OutputsVecs...");
let r = OutputsVecs::forced_import(&db, version);
tracing::debug!("OutputsVecs imported.");
r
},
addresses = {
tracing::debug!("Importing AddressesVecs...");
let r = AddressesVecs::forced_import(&db, version);
tracing::debug!("AddressesVecs imported.");
r
},
scripts = {
tracing::debug!("Importing ScriptsVecs...");
let r = ScriptsVecs::forced_import(&db, version);
tracing::debug!("ScriptsVecs imported.");
r
},
};
tracing::debug!("Sub-vecs imported.");
let this = Self {
db,
blocks,
transactions,
inputs,
outputs,
addresses,
scripts,
};
this.db.retain_regions(
this.iter_any_exportable()
.flat_map(|v| v.region_names())
.collect(),
)?;
this.db.compact()?;
Ok(this)
}
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> Result<()> {
let saved_height = starting_indexes.height.decremented().unwrap_or_default();
let stamp = Stamp::from(u64::from(saved_height));
self.blocks.truncate(starting_indexes.height, stamp)?;
self.transactions
.truncate(starting_indexes.height, starting_indexes.txindex, stamp)?;
self.inputs
.truncate(starting_indexes.height, starting_indexes.txinindex, stamp)?;
self.outputs
.truncate(starting_indexes.height, starting_indexes.txoutindex, stamp)?;
self.addresses.truncate(
starting_indexes.height,
starting_indexes.p2pk65addressindex,
starting_indexes.p2pk33addressindex,
starting_indexes.p2pkhaddressindex,
starting_indexes.p2shaddressindex,
starting_indexes.p2wpkhaddressindex,
starting_indexes.p2wshaddressindex,
starting_indexes.p2traddressindex,
starting_indexes.p2aaddressindex,
stamp,
)?;
self.scripts.truncate(
starting_indexes.height,
starting_indexes.emptyoutputindex,
starting_indexes.opreturnindex,
starting_indexes.p2msoutputindex,
starting_indexes.unknownoutputindex,
stamp,
)?;
Ok(())
}
pub fn get_addressbytes_by_type(
&self,
addresstype: OutputType,
typeindex: TypeIndex,
reader: &Reader,
) -> Result<Option<AddressBytes>> {
self.addresses
.get_bytes_by_type(addresstype, typeindex, reader)
}
pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> {
self.addresses.push_bytes_if_needed(index, bytes)
}
pub fn flush(&mut self, height: Height) -> Result<()> {
self.par_iter_mut_any_stored_vec()
.try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?;
self.db.flush()?;
Ok(())
}
pub fn starting_height(&mut self) -> Height {
self.par_iter_mut_any_stored_vec()
.map(|vec| {
let h = Height::from(vec.stamp());
if h > Height::ZERO { h.incremented() } else { h }
})
.min()
.unwrap()
}
pub fn compact(&self) -> Result<()> {
self.db.compact()?;
Ok(())
}
pub fn reset(&mut self) -> Result<()> {
self.par_iter_mut_any_stored_vec()
.try_for_each(|vec| vec.any_reset())?;
Ok(())
}
pub fn iter_address_hashes_from(
&self,
address_type: OutputType,
height: Height,
) -> Result<Box<dyn Iterator<Item = AddressHash> + '_>> {
self.addresses.iter_hashes_from(address_type, height)
}
fn par_iter_mut_any_stored_vec(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.blocks
.par_iter_mut_any()
.chain(self.transactions.par_iter_mut_any())
.chain(self.inputs.par_iter_mut_any())
.chain(self.outputs.par_iter_mut_any())
.chain(self.addresses.par_iter_mut_any())
.chain(self.scripts.par_iter_mut_any())
}
pub fn db(&self) -> &Database {
&self.db
}
}