use crate::{
chain_index::{
finalised_state::{
capability::{
BlockCoreExt, BlockShieldedExt, BlockTransparentExt, CompactBlockExt, DbCore,
DbMetadata, DbRead, DbVersion, DbWrite, IndexedBlockExt, MigrationStatus,
TransparentHistExt,
},
entry::{StoredEntryFixed, StoredEntryVar},
},
types::{TransactionHash, GENESIS_HEIGHT},
},
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockHeaderData, CommitmentTreeData, CompactBlockStream, CompactOrchardAction,
CompactSaplingOutput, CompactSaplingSpend, CompactSize, CompactTxData, FixedEncodedLen as _,
Height, IndexedBlock, NamedAtomicStatus, OrchardCompactTx, OrchardTxList, Outpoint,
SaplingCompactTx, SaplingTxList, StatusType, TransparentCompactTx, TransparentTxList,
TxInCompact, TxLocation, TxOutCompact, TxidList, ZainoVersionedSerde as _,
};
#[cfg(feature = "transparent_address_history_experimental")]
use crate::{chain_index::types::AddrEventBytes, AddrHistRecord, AddrScript};
use zaino_proto::proto::{compact_formats::CompactBlock, utils::PoolTypeFilter};
use zebra_chain::parameters::NetworkKind;
use zebra_state::HashOrHeight;
use super::LmdbLifecycle;
use async_trait::async_trait;
use corez::io::{self, Read};
use dashmap::DashSet;
use lmdb::{
Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction as _, WriteFlags,
};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::{
collections::HashSet,
fs,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
pub(crate) mod validation;
pub(crate) mod read_core;
pub(crate) mod write_core;
pub(crate) mod block_core;
pub(crate) mod block_shielded;
pub(crate) mod block_transparent;
pub(crate) mod compact_block;
pub(crate) mod indexed_block;
pub(crate) mod transparent_address_history;
pub(crate) const DB_SCHEMA_V1_TEXT: &str = include_str!("db_schema_v1.txt");
pub(crate) const DB_SCHEMA_V1_HASH: [u8; 32] = [
0x11, 0xb2, 0x6a, 0x12, 0x08, 0x67, 0xf0, 0x42, 0xf6, 0x31, 0x45, 0xea, 0x87, 0xe7, 0x23, 0x75,
0x40, 0x3b, 0xf2, 0x14, 0xaa, 0x2b, 0x00, 0x12, 0xec, 0xa4, 0x4d, 0x00, 0xe9, 0x0b, 0x07, 0x9b,
];
pub(crate) const DB_VERSION_V1: DbVersion = DbVersion {
major: 1,
minor: 2,
patch: 0,
};
pub(crate) const TX_OUT_SET_INFO_ACCUMULATOR_DATABASE_NAME: &str =
"tx_out_set_info_accumulator_1_2_0";
pub(crate) const TX_OUT_SET_INFO_ACCUMULATOR_KEY: &[u8] = b"tx_out_set_info_accumulator";
pub(crate) const TX_OUT_SET_ACCUMULATOR_BUILT_HEIGHT_KEY: &[u8] =
b"_tx_out_set_accumulator_built_height";
pub(crate) const ACCUMULATOR_INCREMENTAL_MAX_GAP: u32 = 1_000;
pub(crate) const ACCUMULATOR_BUILD_SHARDS: u16 = 1;
pub(crate) const SYNC_CHECKPOINT_INTERVAL: u32 = 1000;
#[async_trait]
impl DbCore for DbV1 {
fn status(&self) -> StatusType {
LmdbLifecycle::status(self)
}
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
LmdbLifecycle::shutdown(self).await
}
}
impl LmdbLifecycle for DbV1 {
fn env(&self) -> &Arc<Environment> {
&self.env
}
fn db_handler_slot(&self) -> &std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> {
&self.db_handler
}
fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
fn status_atomic(&self) -> &NamedAtomicStatus {
&self.status
}
}
#[derive(Debug)]
pub(crate) struct DbV1 {
env: Arc<Environment>,
headers: Database,
txids: Database,
transparent: Database,
sapling: Database,
orchard: Database,
commitment_tree_data: Database,
heights: Database,
spent: Database,
txid_location: Database,
tx_out_set_info_accumulator: Database,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: Database,
metadata: Database,
validated_tip: Arc<AtomicU32>,
validated_set: DashSet<u32>,
db_handler: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
cancel_token: CancellationToken,
status: NamedAtomicStatus,
config: BlockCacheConfig,
}
impl DbV1 {
pub(crate) async fn spawn(config: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
info!("Launching ZainoDB");
let db_size_bytes = config.storage.database.size.to_byte_count();
let db_path_dir = match config.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "mainnet",
NetworkKind::Testnet => "testnet",
NetworkKind::Regtest => "regtest",
};
let db_path = config.storage.database.path.join(db_path_dir).join("v1");
if !db_path.exists() {
fs::create_dir_all(&db_path)?;
}
let cpu_cnt = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let max_readers = u32::try_from((cpu_cnt * 32).clamp(512, 4096))
.expect("max_readers was clamped to fit in u32");
let env = Environment::new()
.set_max_dbs(15)
.set_map_size(db_size_bytes)
.set_max_readers(max_readers)
.set_flags(
EnvironmentFlags::NO_TLS
| EnvironmentFlags::NO_READAHEAD
| EnvironmentFlags::NO_SYNC,
)
.open(&db_path)?;
let headers =
super::open_or_create_db(&env, "headers_1_0_0", DatabaseFlags::empty()).await?;
let txids = super::open_or_create_db(&env, "txids_1_0_0", DatabaseFlags::empty()).await?;
let transparent =
super::open_or_create_db(&env, "transparent_1_0_0", DatabaseFlags::empty()).await?;
let sapling =
super::open_or_create_db(&env, "sapling_1_0_0", DatabaseFlags::empty()).await?;
let orchard =
super::open_or_create_db(&env, "orchard_1_0_0", DatabaseFlags::empty()).await?;
let commitment_tree_data =
super::open_or_create_db(&env, "commitment_tree_data_1_0_0", DatabaseFlags::empty())
.await?;
let hashes = super::open_or_create_db(&env, "hashes_1_0_0", DatabaseFlags::empty()).await?;
let spent = super::open_or_create_db(&env, "spent_1_0_0", DatabaseFlags::empty()).await?;
let txid_location =
super::open_or_create_db(&env, "txid_location_1_0_0", DatabaseFlags::empty()).await?;
let tx_out_set_info_accumulator = super::open_or_create_db(
&env,
TX_OUT_SET_INFO_ACCUMULATOR_DATABASE_NAME,
DatabaseFlags::empty(),
)
.await?;
let metadata = super::open_or_create_db(&env, "metadata", DatabaseFlags::empty()).await?;
let mut zaino_db: Self;
#[cfg(feature = "transparent_address_history_experimental")]
{
let address_history = super::open_or_create_db(
&env,
"address_history_1_0_0",
DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED,
)
.await?;
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
spent,
txid_location,
tx_out_set_info_accumulator,
address_history,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
#[cfg(not(feature = "transparent_address_history_experimental"))]
{
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
spent,
txid_location,
tx_out_set_info_accumulator,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
zaino_db.check_schema_version().await?;
zaino_db.reconcile_alpha_txid_location_index().await?;
zaino_db.spawn_handler().await?;
Ok(zaino_db)
}
async fn spawn_handler(&mut self) -> Result<(), FinalisedStateError> {
let zaino_db = Self {
env: Arc::clone(&self.env),
headers: self.headers,
txids: self.txids,
transparent: self.transparent,
sapling: self.sapling,
orchard: self.orchard,
commitment_tree_data: self.commitment_tree_data,
heights: self.heights,
spent: self.spent,
txid_location: self.txid_location,
tx_out_set_info_accumulator: self.tx_out_set_info_accumulator,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: self.address_history,
metadata: self.metadata,
validated_tip: Arc::clone(&self.validated_tip),
validated_set: self.validated_set.clone(),
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
let handle = tokio::spawn({
let zaino_db = zaino_db;
async move {
zaino_db.status.store(StatusType::Syncing);
#[cfg(feature = "transparent_address_history_experimental")]
{
let (r1, r2, r3) = tokio::join!(
zaino_db.initial_spent_scan(),
zaino_db.initial_address_history_scan(),
zaino_db.initial_block_scan(),
);
for (desc, result) in [
("spent scan", r1),
("addrhist scan", r2),
("block scan", r3),
] {
if let Err(e) = result {
error!("initial {desc} failed: {e}");
zaino_db.status.store(StatusType::CriticalError);
return;
}
}
}
#[cfg(not(feature = "transparent_address_history_experimental"))]
{
let (r1, r2) =
tokio::join!(zaino_db.initial_spent_scan(), zaino_db.initial_block_scan(),);
for (desc, result) in [("spent scan", r1), ("block scan", r2)] {
if let Err(e) = result {
error!("initial {desc} failed: {e}");
zaino_db.status.store(StatusType::CriticalError);
return;
}
}
}
info!(
"initial validation complete – tip={}",
zaino_db.validated_tip.load(Ordering::Relaxed)
);
zaino_db.status.store(StatusType::Ready);
let mut maintenance = interval(Duration::from_secs(60));
loop {
if zaino_db.status.load() == StatusType::Closing {
break;
}
let next_h = zaino_db.validated_tip.load(Ordering::Acquire) + 1;
let next_height = match Height::try_from(next_h) {
Ok(h) => h,
Err(_) => {
warn!("height overflow – validated_tip too large");
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
continue;
}
};
let hkey = match next_height.to_bytes() {
Ok(bytes) => bytes,
Err(e) => {
warn!("Failed to serialize height {}: {}", next_height, e);
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
continue;
}
};
let hash_opt = (|| -> Option<BlockHash> {
let ro = zaino_db.env.begin_ro_txn().ok()?;
let bytes = ro.get(zaino_db.headers, &hkey).ok()?;
let entry = StoredEntryVar::<BlockHeaderData>::deserialize(bytes).ok()?;
Some(entry.inner().context.index.hash)
})();
if let Some(hash) = hash_opt {
if let Err(e) = zaino_db.validate_block_blocking(next_height, hash) {
warn!("{e}");
}
continue;
}
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
}
}
});
*self.db_handler.lock().expect("db_handler mutex poisoned") = Some(handle);
Ok(())
}
async fn initial_spent_scan(&self) -> Result<(), FinalisedStateError> {
let env = self.env.clone();
let spent = self.spent;
tokio::task::spawn_blocking(move || {
let ro = env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(spent)?;
for (key_bytes, val_bytes) in cursor.iter() {
let entry = StoredEntryFixed::<TxLocation>::from_bytes(val_bytes).map_err(|e| {
FinalisedStateError::Custom(format!("corrupt spent entry: {e}"))
})?;
if !entry.verify(key_bytes) {
return Err(FinalisedStateError::Custom(
"spent record checksum mismatch".into(),
));
}
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("Tokio task error: {e}")))?
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn initial_address_history_scan(&self) -> Result<(), FinalisedStateError> {
let env = self.env.clone();
let address_history = self.address_history;
tokio::task::spawn_blocking(move || {
let ro = env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(address_history)?;
for (addr_bytes, record_bytes) in cursor.iter() {
let entry =
StoredEntryFixed::<AddrEventBytes>::from_bytes(record_bytes).map_err(|e| {
FinalisedStateError::Custom(format!("corrupt addrhist entry: {e}"))
})?;
if !entry.verify(addr_bytes) {
return Err(FinalisedStateError::Custom(
"addrhist record checksum mismatch".into(),
));
}
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("spawn_blocking failed: {e}")))?
}
async fn initial_block_scan(&self) -> Result<(), FinalisedStateError> {
let zaino_db = Self {
env: Arc::clone(&self.env),
headers: self.headers,
txids: self.txids,
transparent: self.transparent,
sapling: self.sapling,
orchard: self.orchard,
commitment_tree_data: self.commitment_tree_data,
heights: self.heights,
spent: self.spent,
txid_location: self.txid_location,
tx_out_set_info_accumulator: self.tx_out_set_info_accumulator,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: self.address_history,
metadata: self.metadata,
validated_tip: Arc::clone(&self.validated_tip),
validated_set: self.validated_set.clone(),
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
tokio::task::spawn_blocking(move || {
let ro = zaino_db.env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(zaino_db.headers)?;
for (height_bytes, header_entry_bytes) in cursor.iter() {
let height = Height::from_bytes(height_bytes)?;
let header_entry = StoredEntryVar::<BlockHeaderData>::from_bytes(
header_entry_bytes,
)
.map_err(|e| FinalisedStateError::Custom(format!("corrupt header entry: {e}")))?;
let hash = *header_entry.inner().context.hash();
zaino_db.validate_block_blocking(height, hash)?
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("spawn_blocking failed: {e}")))?
}
pub(crate) fn metadata_db(&self) -> Database {
self.metadata
}
pub(crate) fn spent_db(&self) -> Database {
self.spent
}
pub(crate) fn txid_location_db(&self) -> Database {
self.txid_location
}
pub(crate) fn txids_db(&self) -> Database {
self.txids
}
pub(crate) fn transparent_db(&self) -> Database {
self.transparent
}
async fn reconcile_alpha_txid_location_index(&self) -> Result<(), FinalisedStateError> {
tokio::task::block_in_place(|| {
let mut txn = self.env.begin_rw_txn()?;
let raw = match txn.get(self.metadata, b"metadata") {
Ok(raw) => raw,
Err(lmdb::Error::NotFound) => return Ok(()),
Err(error) => return Err(FinalisedStateError::LmdbError(error)),
};
let stored = StoredEntryFixed::<DbMetadata>::from_bytes(raw).map_err(|error| {
FinalisedStateError::Custom(format!("corrupt metadata: {error}"))
})?;
if !stored.verify(b"metadata") {
return Err(FinalisedStateError::Custom(
"metadata checksum mismatch".to_string(),
));
}
let mut metadata = stored.item;
if metadata.version
< (DbVersion {
major: 1,
minor: 2,
patch: 0,
})
{
return Ok(());
}
let has_blocks = {
let mut cursor = txn.open_ro_cursor(self.headers)?;
cursor.iter().next().is_some()
};
let index_empty = {
let mut cursor = txn.open_ro_cursor(self.txid_location)?;
cursor.iter().next().is_none()
};
if !has_blocks || !index_empty {
return Ok(());
}
warn!(
"detected a 0.4.0-alpha.1 cache recorded at v{} with an unbuilt txid_location \
index; rolling the recorded version back to 1.1.0 so the corrected migration \
rebuilds it in place",
metadata.version
);
txn.clear_db(self.spent)?;
for key in [
b"_migration_txid_location_progress_1_2_0_next_height".as_slice(),
b"_migration_spent_progress_1_2_0_next_height".as_slice(),
] {
match txn.del(self.metadata, &key, None) {
Ok(()) | Err(lmdb::Error::NotFound) => {}
Err(error) => return Err(FinalisedStateError::LmdbError(error)),
}
}
metadata.version = DbVersion {
major: 1,
minor: 1,
patch: 0,
};
metadata.migration_status = MigrationStatus::Empty;
let entry = StoredEntryFixed::new(b"metadata", metadata);
txn.put(
self.metadata,
b"metadata",
&entry.to_bytes()?,
WriteFlags::empty(),
)?;
txn.commit()?;
Ok(())
})
}
pub(crate) fn tx_out_set_info_accumulator_db(&self) -> Database {
self.tx_out_set_info_accumulator
}
}
impl Drop for DbV1 {
fn drop(&mut self) {
if let Some(handle) = self
.db_handler
.get_mut()
.expect("db_handler mutex poisoned")
.take()
{
handle.abort();
}
}
}
#[cfg(test)]
impl DbV1 {
pub(crate) async fn spawn_v1_0_0(
config: &BlockCacheConfig,
) -> Result<Self, FinalisedStateError> {
info!("Launching ZainoDB");
let db_size_bytes = config.storage.database.size.to_byte_count();
let db_path_dir = match config.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "mainnet",
NetworkKind::Testnet => "testnet",
NetworkKind::Regtest => "regtest",
};
let db_path = config.storage.database.path.join(db_path_dir).join("v1");
if !db_path.exists() {
fs::create_dir_all(&db_path)?;
}
let cpu_cnt = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let max_readers = u32::try_from((cpu_cnt * 32).clamp(512, 4096))
.expect("max_readers was clamped to fit in u32");
let env = Environment::new()
.set_max_dbs(15)
.set_map_size(db_size_bytes)
.set_max_readers(max_readers)
.set_flags(
EnvironmentFlags::NO_TLS
| EnvironmentFlags::NO_READAHEAD
| EnvironmentFlags::NO_SYNC,
)
.open(&db_path)?;
let headers =
super::open_or_create_db(&env, "headers_1_0_0", DatabaseFlags::empty()).await?;
let txids = super::open_or_create_db(&env, "txids_1_0_0", DatabaseFlags::empty()).await?;
let transparent =
super::open_or_create_db(&env, "transparent_1_0_0", DatabaseFlags::empty()).await?;
let sapling =
super::open_or_create_db(&env, "sapling_1_0_0", DatabaseFlags::empty()).await?;
let orchard =
super::open_or_create_db(&env, "orchard_1_0_0", DatabaseFlags::empty()).await?;
let commitment_tree_data =
super::open_or_create_db(&env, "commitment_tree_data_1_0_0", DatabaseFlags::empty())
.await?;
let hashes = super::open_or_create_db(&env, "hashes_1_0_0", DatabaseFlags::empty()).await?;
let spent = super::open_or_create_db(&env, "spent_1_0_0", DatabaseFlags::empty()).await?;
let txid_location =
super::open_or_create_db(&env, "txid_location_1_0_0", DatabaseFlags::empty()).await?;
let tx_out_set_info_accumulator = super::open_or_create_db(
&env,
TX_OUT_SET_INFO_ACCUMULATOR_DATABASE_NAME,
DatabaseFlags::empty(),
)
.await?;
let metadata = super::open_or_create_db(&env, "metadata", DatabaseFlags::empty()).await?;
let mut zaino_db: Self;
#[cfg(feature = "transparent_address_history_experimental")]
{
let address_history = super::open_or_create_db(
&env,
"address_history_1_0_0",
DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED,
)
.await?;
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
spent,
txid_location,
tx_out_set_info_accumulator,
address_history,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
#[cfg(not(feature = "transparent_address_history_experimental"))]
{
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
spent,
txid_location,
tx_out_set_info_accumulator,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
tokio::task::block_in_place(|| {
let mut txn = zaino_db.env.begin_rw_txn()?;
let entry = StoredEntryFixed::new(
b"metadata",
DbMetadata {
version: DbVersion {
major: 1,
minor: 0,
patch: 0,
},
schema_hash: [0u8; 32],
migration_status: MigrationStatus::Empty,
},
);
txn.put(
zaino_db.metadata,
b"metadata",
&entry.to_bytes()?,
WriteFlags::NO_OVERWRITE,
)?;
txn.commit()?;
Ok::<(), FinalisedStateError>(())
})?;
zaino_db.spawn_handler().await?;
Ok(zaino_db)
}
}