use crate::storage::database::{Database, Tree};
use anyhow::Result;
use blvm_protocol::segwit::Witness;
use blvm_protocol::{Block, BlockHeader, Hash};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[cfg(feature = "block-compression")]
use zstd;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockMetadata {
pub n_tx: u32,
}
pub const BLOCK_HEIGHT_ROW_KEY_LEN: usize = 40;
#[inline]
pub fn block_height_row_key(height: u64, block_hash: &Hash) -> [u8; BLOCK_HEIGHT_ROW_KEY_LEN] {
let mut k = [0u8; BLOCK_HEIGHT_ROW_KEY_LEN];
k[..8].copy_from_slice(&height.to_be_bytes());
k[8..].copy_from_slice(block_hash.as_slice());
k
}
pub struct BlockStore {
#[allow(dead_code)]
db: Arc<dyn Database>,
blocks: Arc<dyn Tree>,
headers: Arc<dyn Tree>,
height_index: Arc<dyn Tree>, hash_to_height: Arc<dyn Tree>, witnesses: Arc<dyn Tree>,
recent_headers: Arc<dyn Tree>, block_metadata: Arc<dyn Tree>, #[cfg(feature = "block-compression")]
block_compression_enabled: bool,
#[cfg(feature = "block-compression")]
block_compression_level: u32,
#[cfg(feature = "witness-compression")]
witness_compression_enabled: bool,
#[cfg(feature = "witness-compression")]
witness_compression_level: u32,
#[cfg(feature = "rocksdb")]
bitcoin_core_reader: Option<Arc<crate::storage::bitcoin_core_blocks::BitcoinCoreBlockReader>>,
}
impl Clone for BlockStore {
fn clone(&self) -> Self {
Self {
db: Arc::clone(&self.db),
blocks: Arc::clone(&self.blocks),
headers: Arc::clone(&self.headers),
height_index: Arc::clone(&self.height_index),
hash_to_height: Arc::clone(&self.hash_to_height),
witnesses: Arc::clone(&self.witnesses),
recent_headers: Arc::clone(&self.recent_headers),
block_metadata: Arc::clone(&self.block_metadata),
#[cfg(feature = "block-compression")]
block_compression_enabled: self.block_compression_enabled,
#[cfg(feature = "block-compression")]
block_compression_level: self.block_compression_level,
#[cfg(feature = "witness-compression")]
witness_compression_enabled: self.witness_compression_enabled,
#[cfg(feature = "witness-compression")]
witness_compression_level: self.witness_compression_level,
#[cfg(feature = "rocksdb")]
bitcoin_core_reader: self.bitcoin_core_reader.clone(),
}
}
}
impl BlockStore {
pub fn new(db: Arc<dyn Database>) -> Result<Self> {
Self::new_with_compression(
db,
#[cfg(feature = "block-compression")]
false, #[cfg(feature = "block-compression")]
3, #[cfg(feature = "witness-compression")]
false,
#[cfg(feature = "witness-compression")]
2,
)
}
#[cfg(feature = "rocksdb")]
pub fn new_with_bitcoin_core_reader(
db: Arc<dyn Database>,
block_reader: Option<Arc<crate::storage::bitcoin_core_blocks::BitcoinCoreBlockReader>>,
) -> Result<Self> {
Self::new_with_compression_and_reader(
db,
#[cfg(feature = "block-compression")]
false,
#[cfg(feature = "block-compression")]
3,
#[cfg(feature = "witness-compression")]
false,
#[cfg(feature = "witness-compression")]
2,
block_reader,
)
}
pub fn new_with_compression(
db: Arc<dyn Database>,
#[cfg(feature = "block-compression")] block_compression_enabled: bool,
#[cfg(feature = "block-compression")] block_compression_level: u32,
#[cfg(feature = "witness-compression")] witness_compression_enabled: bool,
#[cfg(feature = "witness-compression")] witness_compression_level: u32,
) -> Result<Self> {
Self::new_with_compression_and_reader(
db,
#[cfg(feature = "block-compression")]
block_compression_enabled,
#[cfg(feature = "block-compression")]
block_compression_level,
#[cfg(feature = "witness-compression")]
witness_compression_enabled,
#[cfg(feature = "witness-compression")]
witness_compression_level,
#[cfg(feature = "rocksdb")]
None,
)
}
fn new_with_compression_and_reader(
db: Arc<dyn Database>,
#[cfg(feature = "block-compression")] block_compression_enabled: bool,
#[cfg(feature = "block-compression")] block_compression_level: u32,
#[cfg(feature = "witness-compression")] witness_compression_enabled: bool,
#[cfg(feature = "witness-compression")] witness_compression_level: u32,
#[cfg(feature = "rocksdb")] bitcoin_core_reader: Option<
Arc<crate::storage::bitcoin_core_blocks::BitcoinCoreBlockReader>,
>,
) -> Result<Self> {
let blocks = Arc::from(db.open_tree("blocks")?);
let headers = Arc::from(db.open_tree("headers")?);
let height_index = Arc::from(db.open_tree("height_index")?);
let hash_to_height = Arc::from(db.open_tree("hash_to_height")?);
let witnesses = Arc::from(db.open_tree("witnesses")?);
let recent_headers = Arc::from(db.open_tree("recent_headers")?);
let block_metadata = Arc::from(db.open_tree("block_metadata")?);
Ok(Self {
db,
blocks,
headers,
height_index,
hash_to_height,
witnesses,
recent_headers,
block_metadata,
#[cfg(feature = "block-compression")]
block_compression_enabled,
#[cfg(feature = "block-compression")]
block_compression_level,
#[cfg(feature = "witness-compression")]
witness_compression_enabled,
#[cfg(feature = "witness-compression")]
witness_compression_level,
#[cfg(feature = "rocksdb")]
bitcoin_core_reader,
})
}
pub fn store_block(&self, block: &Block) -> Result<()> {
let block_hash = self.block_hash(block);
let block_data = bincode::serialize(block)?;
#[cfg(feature = "block-compression")]
let data_to_store = if self.block_compression_enabled {
zstd::encode_all(&block_data[..], self.block_compression_level as i32)
.map_err(|e| anyhow::anyhow!("Block compression failed: {}", e))?
} else {
block_data
};
#[cfg(not(feature = "block-compression"))]
let data_to_store = block_data;
self.blocks.insert(block_hash.as_slice(), &data_to_store)?;
let header_data = bincode::serialize(&block.header)?;
self.headers.insert(block_hash.as_slice(), &header_data)?;
let metadata = BlockMetadata {
n_tx: block.transactions.len() as u32,
};
let metadata_data = bincode::serialize(&metadata)?;
self.block_metadata
.insert(block_hash.as_slice(), &metadata_data)?;
Ok(())
}
pub fn store_block_with_witness(
&self,
block: &Block,
witnesses: &[Vec<Witness>], height: u64,
) -> Result<()> {
let block_hash = self.block_hash(block);
let row_key = block_height_row_key(height, &block_hash);
let block_data = bincode::serialize(block)?;
#[cfg(feature = "block-compression")]
let data_to_store = if self.block_compression_enabled {
zstd::encode_all(&block_data[..], self.block_compression_level as i32)
.map_err(|e| anyhow::anyhow!("Block compression failed: {}", e))?
} else {
block_data
};
#[cfg(not(feature = "block-compression"))]
let data_to_store = block_data;
self.blocks.insert(row_key.as_slice(), &data_to_store)?;
let header_data = bincode::serialize(&block.header)?;
self.headers.insert(row_key.as_slice(), &header_data)?;
let metadata = BlockMetadata {
n_tx: block.transactions.len() as u32,
};
let metadata_data = bincode::serialize(&metadata)?;
self.block_metadata
.insert(row_key.as_slice(), &metadata_data)?;
if !witnesses.is_empty() {
let witness_data = bincode::serialize(witnesses)?;
#[cfg(feature = "witness-compression")]
let witness_blob = if self.witness_compression_enabled {
zstd::encode_all(&witness_data[..], self.witness_compression_level as i32)
.map_err(|e| anyhow::anyhow!("Witness compression failed: {}", e))?
} else {
witness_data
};
#[cfg(not(feature = "witness-compression"))]
let witness_blob = witness_data;
self.witnesses.insert(row_key.as_slice(), &witness_blob)?;
}
self.store_recent_header(height, &block.header)?;
Ok(())
}
pub fn store_witness(&self, block_hash: &Hash, witness: &[Vec<Witness>]) -> Result<()> {
let witness_data = bincode::serialize(witness)?;
#[cfg(feature = "witness-compression")]
let data_to_store = if self.witness_compression_enabled {
zstd::encode_all(&witness_data[..], self.witness_compression_level as i32)
.map_err(|e| anyhow::anyhow!("Witness compression failed: {}", e))?
} else {
witness_data
};
#[cfg(not(feature = "witness-compression"))]
let data_to_store = witness_data;
self.witnesses
.insert(block_hash.as_slice(), &data_to_store)?;
Ok(())
}
pub fn get_witness(&self, block_hash: &Hash) -> Result<Option<Vec<Vec<Witness>>>> {
if let Some(h) = self.get_height_by_hash(block_hash)? {
let k = block_height_row_key(h, block_hash);
if let Some(data) = self.witnesses.get(&k)? {
#[cfg(feature = "witness-compression")]
let witness_data = if Self::is_compressed(&data) {
zstd::decode_all(&data[..])
.map_err(|e| anyhow::anyhow!("Witness decompression failed: {}", e))?
} else {
data
};
#[cfg(not(feature = "witness-compression"))]
let witness_data = data;
let witnesses: Vec<Vec<Witness>> = bincode::deserialize(&witness_data)?;
return Ok(Some(witnesses));
}
}
if let Some(data) = self.witnesses.get(block_hash.as_slice())? {
#[cfg(feature = "witness-compression")]
let witness_data = if Self::is_compressed(&data) {
zstd::decode_all(&data[..])
.map_err(|e| anyhow::anyhow!("Witness decompression failed: {}", e))?
} else {
data
};
#[cfg(not(feature = "witness-compression"))]
let witness_data = data;
let witnesses: Vec<Vec<Witness>> = bincode::deserialize(&witness_data)?;
Ok(Some(witnesses))
} else {
Ok(None)
}
}
pub fn store_recent_header(&self, height: u64, header: &BlockHeader) -> Result<()> {
let height_bytes = height.to_be_bytes();
let header_data = bincode::serialize(header)?;
self.recent_headers.insert(&height_bytes, &header_data)?;
if height > 11 {
let remove_height = height - 12;
let remove_bytes = remove_height.to_be_bytes();
self.recent_headers.remove(&remove_bytes)?;
}
Ok(())
}
pub fn store_recent_headers_ibd_batch(&self, entries: &[(u64, &BlockHeader)]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
let mut batch = self.recent_headers.batch()?;
for &(height, header) in entries {
let height_bytes = height.to_be_bytes();
let header_data = bincode::serialize(header)?;
batch.put(&height_bytes, &header_data);
if height > 11 {
let remove_bytes = (height - 12).to_be_bytes();
batch.delete(&remove_bytes);
}
}
batch.commit_no_wal()?;
Ok(())
}
#[cfg(feature = "rocksdb")]
pub(crate) fn try_ibd_flush_rocksdb_unified(
&self,
flush_order: &[usize],
heights: &[u64],
block_hashes: &[Hash],
block_data: &[Vec<u8>],
header_data: &[Arc<Vec<u8>>],
witness_blobs: &[Option<Vec<u8>>],
metadata_blobs: &[Vec<u8>],
recent_entries: &[(u64, Vec<u8>)],
) -> Result<bool> {
use crate::storage::database::rocksdb_impl::RocksDBDatabase;
let Some(rocks) = self.db.as_ref().as_any().downcast_ref::<RocksDBDatabase>() else {
return Ok(false);
};
rocks.write_ibd_blockstore_flush_no_wal(
flush_order,
heights,
block_hashes,
block_data,
header_data,
witness_blobs,
metadata_blobs,
recent_entries,
)?;
Ok(true)
}
#[cfg(feature = "redb")]
pub(crate) fn try_ibd_flush_redb_unified(
&self,
flush_order: &[usize],
heights: &[u64],
block_hashes: &[Hash],
block_data: &[Vec<u8>],
header_data: &[Arc<Vec<u8>>],
witness_blobs: &[Option<Vec<u8>>],
metadata_blobs: &[Vec<u8>],
recent_entries: &[(u64, Vec<u8>)],
) -> Result<bool> {
use crate::storage::database::redb_impl::RedbDatabase;
let Some(redb) = self.db.as_ref().as_any().downcast_ref::<RedbDatabase>() else {
return Ok(false);
};
redb.write_ibd_blockstore_flush_no_wal(
flush_order,
heights,
block_hashes,
block_data,
header_data,
witness_blobs,
metadata_blobs,
recent_entries,
)?;
Ok(true)
}
#[cfg(feature = "tidesdb")]
pub(crate) fn try_ibd_flush_tidesdb_unified(
&self,
flush_order: &[usize],
heights: &[u64],
block_hashes: &[Hash],
block_data: &[Vec<u8>],
header_data: &[Arc<Vec<u8>>],
witness_blobs: &[Option<Vec<u8>>],
metadata_blobs: &[Vec<u8>],
recent_entries: &[(u64, Vec<u8>)],
) -> Result<bool> {
use crate::storage::database::tidesdb_impl::TidesDBDatabase;
let Some(tdb) = self.db.as_ref().as_any().downcast_ref::<TidesDBDatabase>() else {
return Ok(false);
};
tdb.write_ibd_blockstore_flush_no_wal(
flush_order,
heights,
block_hashes,
block_data,
header_data,
witness_blobs,
metadata_blobs,
recent_entries,
)?;
Ok(true)
}
pub fn get_recent_headers(&self, count: usize) -> Result<Vec<BlockHeader>> {
let mut headers = Vec::new();
let mut current_height: Option<u64> = None;
let mut items: Vec<_> = self.height_index.iter().collect();
items.reverse();
if let Some(item) = items.into_iter().flatten().next() {
let (height_bytes, _hash) = item;
let mut height_bytes_array = [0u8; 8];
height_bytes_array.copy_from_slice(&height_bytes);
current_height = Some(u64::from_be_bytes(height_bytes_array));
}
if let Some(mut height) = current_height {
for _ in 0..count {
let height_bytes = height.to_be_bytes();
if let Some(data) = self.recent_headers.get(&height_bytes)? {
if let Ok(header) = bincode::deserialize::<BlockHeader>(&data) {
headers.push(header);
}
}
if height == 0 {
break;
}
height -= 1;
}
}
headers.reverse();
Ok(headers)
}
pub fn get_block(&self, hash: &Hash) -> Result<Option<Block>> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
if let Some(data) = self.blocks.get(&k)? {
#[cfg(feature = "block-compression")]
let block_data = if Self::is_compressed(&data) {
zstd::decode_all(&data[..])
.map_err(|e| anyhow::anyhow!("Block decompression failed: {}", e))?
} else {
data
};
#[cfg(not(feature = "block-compression"))]
let block_data = data;
let block: Block = bincode::deserialize(&block_data)?;
return Ok(Some(block));
}
}
if let Some(data) = self.blocks.get(hash.as_slice())? {
#[cfg(feature = "block-compression")]
let block_data = if Self::is_compressed(&data) {
zstd::decode_all(&data[..])
.map_err(|e| anyhow::anyhow!("Block decompression failed: {}", e))?
} else {
data
};
#[cfg(not(feature = "block-compression"))]
let block_data = data;
let block: Block = bincode::deserialize(&block_data)?;
Ok(Some(block))
} else {
#[cfg(feature = "rocksdb")]
{
if let Some(reader) = &self.bitcoin_core_reader {
return reader.read_block(hash);
}
}
Ok(None)
}
}
#[cfg(feature = "block-compression")]
fn is_compressed(data: &[u8]) -> bool {
data.len() >= 4 && data[0] == 0x28 && data[1] == 0xB5 && data[2] == 0x2F && data[3] == 0xFD
}
pub fn store_header(&self, hash: &Hash, header: &BlockHeader) -> Result<()> {
let header_data = bincode::serialize(header)?;
self.headers.insert(hash.as_slice(), &header_data)?;
Ok(())
}
pub fn get_header(&self, hash: &Hash) -> Result<Option<BlockHeader>> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
if let Some(data) = self.headers.get(&k)? {
let header: BlockHeader = bincode::deserialize(&data)?;
return Ok(Some(header));
}
}
if let Some(data) = self.headers.get(hash.as_slice())? {
let header: BlockHeader = bincode::deserialize(&data)?;
Ok(Some(header))
} else {
Ok(None)
}
}
pub fn store_height(&self, height: u64, hash: &Hash) -> Result<()> {
let height_bytes = height.to_be_bytes();
self.height_index.insert(&height_bytes, hash.as_slice())?;
self.hash_to_height.insert(hash.as_slice(), &height_bytes)?;
Ok(())
}
pub fn store_headers_batch(&self, entries: &[(Hash, BlockHeader, u64)]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
let mut headers_batch = self.headers.batch()?;
let mut heights_batch = self.height_index.batch()?;
let mut hash_to_height_batch = self.hash_to_height.batch()?;
for (hash, header, height) in entries {
let header_data = bincode::serialize(header)?;
let height_bytes = height.to_be_bytes();
headers_batch.put(hash.as_slice(), &header_data);
heights_batch.put(&height_bytes, hash.as_slice());
hash_to_height_batch.put(hash.as_slice(), &height_bytes);
}
headers_batch.commit()?;
heights_batch.commit()?;
hash_to_height_batch.commit()?;
Ok(())
}
pub fn get_hash_by_height(&self, height: u64) -> Result<Option<Hash>> {
let height_bytes = height.to_be_bytes();
if let Some(data) = self.height_index.get(&height_bytes)? {
let mut hash = [0u8; 32];
hash.copy_from_slice(&data);
Ok(Some(hash))
} else {
Ok(None)
}
}
pub fn highest_stored_height(&self) -> Result<Option<u64>> {
if self.get_hash_by_height(0)?.is_none() {
return Ok(None);
}
let mut lo = 0u64;
let mut hi = 1u64;
while self.get_hash_by_height(hi)?.is_some() {
lo = hi;
hi = hi.saturating_mul(2);
if hi > 2_000_000_000 {
break;
}
}
if self.get_hash_by_height(hi)?.is_some() {
return Ok(Some(hi));
}
while lo + 1 < hi {
let mid = lo + (hi - lo) / 2;
if self.get_hash_by_height(mid)?.is_some() {
lo = mid;
} else {
hi = mid;
}
}
Ok(Some(lo))
}
pub fn get_height_by_hash(&self, hash: &Hash) -> Result<Option<u64>> {
if let Some(data) = self.hash_to_height.get(hash.as_slice())? {
let mut height_bytes_array = [0u8; 8];
height_bytes_array.copy_from_slice(&data);
return Ok(Some(u64::from_be_bytes(height_bytes_array)));
}
Ok(None)
}
pub fn build_headers_response(
&self,
locator: &[Hash],
hash_stop: &Hash,
max_headers: usize,
) -> Result<Vec<BlockHeader>> {
let Some(tip_h) = self.highest_stored_height()? else {
return Ok(Vec::new());
};
let fork_h: Option<u64> = if locator.is_empty() {
Some(0)
} else {
let mut found = None;
for hash in locator {
if let Some(h) = self.get_height_by_hash(hash)? {
if self.get_hash_by_height(h)? == Some(*hash) {
found = Some(h);
break;
}
}
}
found
};
let Some(fork) = fork_h else {
return Ok(Vec::new());
};
let start = fork.saturating_add(1);
if start > tip_h {
return Ok(Vec::new());
}
let mut out = Vec::new();
let stop_all_zero = hash_stop.iter().all(|&b| b == 0);
let cap = max_headers.max(1);
for height in start..=tip_h {
if out.len() >= cap {
break;
}
let Some(hash) = self.get_hash_by_height(height)? else {
break;
};
let Some(hdr) = self.get_header(&hash)? else {
break;
};
out.push(hdr);
if !stop_all_zero && hash == *hash_stop {
break;
}
}
Ok(out)
}
pub fn get_block_metadata(&self, hash: &Hash) -> Result<Option<BlockMetadata>> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
if let Some(data) = self.block_metadata.get(&k)? {
let metadata: BlockMetadata = bincode::deserialize(&data)?;
return Ok(Some(metadata));
}
}
if let Some(data) = self.block_metadata.get(hash.as_slice())? {
let metadata: BlockMetadata = bincode::deserialize(&data)?;
Ok(Some(metadata))
} else {
Ok(None)
}
}
pub fn get_blocks_by_height_range(&self, start: u64, end: u64) -> Result<Vec<Block>> {
let mut blocks = Vec::new();
for height in start..=end {
if let Some(hash) = self.get_hash_by_height(height)? {
if let Some(block) = self.get_block(&hash)? {
blocks.push(block);
}
}
}
Ok(blocks)
}
pub fn has_block(&self, hash: &Hash) -> Result<bool> {
self.has_block_body(hash)
}
pub fn block_count(&self) -> Result<usize> {
self.blocks.len()
}
pub fn get_block_hash(&self, block: &Block) -> Hash {
self.block_hash(block)
}
#[inline]
fn block_hash(&self, block: &Block) -> Hash {
use crate::storage::hashing::double_sha256;
let mut header_data = [0u8; 80];
header_data[0..4].copy_from_slice(&(block.header.version as i32).to_le_bytes()); header_data[4..36].copy_from_slice(&block.header.prev_block_hash); header_data[36..68].copy_from_slice(&block.header.merkle_root); header_data[68..72].copy_from_slice(&(block.header.timestamp as u32).to_le_bytes()); header_data[72..76].copy_from_slice(&(block.header.bits as u32).to_le_bytes()); header_data[76..80].copy_from_slice(&(block.header.nonce as u32).to_le_bytes());
double_sha256(&header_data)
}
pub fn remove_block_body(&self, hash: &Hash) -> Result<()> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
self.blocks.remove(&k)?;
}
self.blocks.remove(hash.as_slice())?;
Ok(())
}
pub fn remove_witness(&self, hash: &Hash) -> Result<()> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
self.witnesses.remove(&k)?;
}
self.witnesses.remove(hash.as_slice())?;
Ok(())
}
pub fn remove_block_by_height(&self, height: u64) -> Result<()> {
if let Some(hash) = self.get_hash_by_height(height)? {
self.remove_block_body(&hash)?;
}
Ok(())
}
pub fn remove_blocks_by_height_range(&self, start: u64, end: u64) -> Result<u64> {
let mut removed = 0;
for height in start..=end {
if self.remove_block_by_height(height).is_ok() {
removed += 1;
}
}
Ok(removed)
}
pub fn has_block_body(&self, hash: &Hash) -> Result<bool> {
if let Some(h) = self.get_height_by_hash(hash)? {
let k = block_height_row_key(h, hash);
if self.blocks.contains_key(&k)? {
return Ok(true);
}
}
self.blocks.contains_key(hash.as_slice())
}
pub fn blocks_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.blocks))
}
pub fn witnesses_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.witnesses))
}
pub fn height_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.height_index))
}
pub fn hash_to_height_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.hash_to_height))
}
pub fn headers_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.headers))
}
pub fn metadata_tree(&self) -> Result<Arc<dyn Tree>> {
Ok(Arc::clone(&self.block_metadata))
}
}