use crate::{
chain_index::{
finalised_state::{
capability::{
BlockCoreExt, BlockShieldedExt, BlockTransparentExt, CompactBlockExt, DbCore,
DbMetadata, DbRead, DbVersion, DbWrite, IndexedBlockExt, MigrationStatus,
},
entry::{StoredEntryFixed, StoredEntryVar},
},
types::{TransactionHash, GENESIS_HEIGHT},
},
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockHeaderData, CommitmentTreeData, CompactBlockStream, CompactOrchardAction,
CompactSaplingOutput, CompactSaplingSpend, CompactSize, CompactTxData, FixedEncodedLen as _,
Height, IndexedBlock, NamedAtomicStatus, OrchardCompactTx, OrchardTxList, SaplingCompactTx,
SaplingTxList, StatusType, TransparentCompactTx, TransparentTxList, TxInCompact, TxLocation,
TxOutCompact, TxidList, ZainoVersionedSerde as _,
};
#[cfg(feature = "transparent_address_history_experimental")]
use crate::{
chain_index::{finalised_state::capability::TransparentHistExt, types::AddrEventBytes},
AddrHistRecord, AddrScript, Outpoint,
};
use zaino_proto::proto::{compact_formats::CompactBlock, utils::PoolTypeFilter};
use zebra_chain::parameters::NetworkKind;
use zebra_state::HashOrHeight;
use super::LmdbLifecycle;
use async_trait::async_trait;
use corez::io::{self, Read};
use dashmap::DashSet;
use lmdb::{
Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction as _, WriteFlags,
};
use sha2::{Digest, Sha256};
use std::{
collections::HashSet,
fs,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
#[cfg(feature = "transparent_address_history_experimental")]
use std::collections::HashMap;
pub(crate) mod validation;
pub(crate) mod read_core;
pub(crate) mod write_core;
pub(crate) mod block_core;
pub(crate) mod block_shielded;
pub(crate) mod block_transparent;
pub(crate) mod compact_block;
pub(crate) mod indexed_block;
#[cfg(feature = "transparent_address_history_experimental")]
pub(crate) mod transparent_address_history;
pub(crate) const DB_SCHEMA_V1_TEXT: &str = include_str!("db_schema_v1.txt");
pub(crate) const DB_SCHEMA_V1_HASH: [u8; 32] = [
0xa8, 0x19, 0x61, 0x50, 0xff, 0xb6, 0x9e, 0xf8, 0xb2, 0xb5, 0x31, 0x80, 0xdd, 0x90, 0xd0, 0x67,
0x41, 0x57, 0xfc, 0x51, 0x39, 0xa1, 0x3a, 0xbe, 0xce, 0x70, 0x4e, 0x51, 0x55, 0xc3, 0x3a, 0x0a,
];
pub(crate) const DB_VERSION_V1: DbVersion = DbVersion {
major: 1,
minor: 1,
patch: 0,
};
#[async_trait]
impl DbCore for DbV1 {
fn status(&self) -> StatusType {
LmdbLifecycle::status(self)
}
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
LmdbLifecycle::shutdown(self).await
}
}
impl LmdbLifecycle for DbV1 {
fn env(&self) -> &Arc<Environment> {
&self.env
}
fn db_handler_slot(&self) -> &std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> {
&self.db_handler
}
fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
fn status_atomic(&self) -> &NamedAtomicStatus {
&self.status
}
}
#[derive(Debug)]
pub(crate) struct DbV1 {
env: Arc<Environment>,
headers: Database,
txids: Database,
transparent: Database,
sapling: Database,
orchard: Database,
commitment_tree_data: Database,
heights: Database,
#[cfg(feature = "transparent_address_history_experimental")]
spent: Database,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: Database,
metadata: Database,
validated_tip: Arc<AtomicU32>,
validated_set: DashSet<u32>,
db_handler: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
cancel_token: CancellationToken,
status: NamedAtomicStatus,
config: BlockCacheConfig,
}
impl DbV1 {
pub(crate) async fn spawn(config: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
info!("Launching ZainoDB");
let db_size_bytes = config.storage.database.size.to_byte_count();
let db_path_dir = match config.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "mainnet",
NetworkKind::Testnet => "testnet",
NetworkKind::Regtest => "regtest",
};
let db_path = config.storage.database.path.join(db_path_dir).join("v1");
if !db_path.exists() {
fs::create_dir_all(&db_path)?;
}
let cpu_cnt = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let max_readers = u32::try_from((cpu_cnt * 32).clamp(512, 4096))
.expect("max_readers was clamped to fit in u32");
let env = Environment::new()
.set_max_dbs(12)
.set_map_size(db_size_bytes)
.set_max_readers(max_readers)
.set_flags(EnvironmentFlags::NO_TLS | EnvironmentFlags::NO_READAHEAD)
.open(&db_path)?;
let headers =
super::open_or_create_db(&env, "headers_1_0_0", DatabaseFlags::empty()).await?;
let txids = super::open_or_create_db(&env, "txids_1_0_0", DatabaseFlags::empty()).await?;
let transparent =
super::open_or_create_db(&env, "transparent_1_0_0", DatabaseFlags::empty()).await?;
let sapling =
super::open_or_create_db(&env, "sapling_1_0_0", DatabaseFlags::empty()).await?;
let orchard =
super::open_or_create_db(&env, "orchard_1_0_0", DatabaseFlags::empty()).await?;
let commitment_tree_data =
super::open_or_create_db(&env, "commitment_tree_data_1_0_0", DatabaseFlags::empty())
.await?;
let hashes = super::open_or_create_db(&env, "hashes_1_0_0", DatabaseFlags::empty()).await?;
let metadata = super::open_or_create_db(&env, "metadata", DatabaseFlags::empty()).await?;
let mut zaino_db: Self;
#[cfg(feature = "transparent_address_history_experimental")]
{
let spent =
super::open_or_create_db(&env, "spent_1_0_0", DatabaseFlags::empty()).await?;
let address_history = super::open_or_create_db(
&env,
"address_history_1_0_0",
DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED,
)
.await?;
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
spent,
address_history,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
#[cfg(not(feature = "transparent_address_history_experimental"))]
{
zaino_db = Self {
env: Arc::new(env),
headers,
txids,
transparent,
sapling,
orchard,
commitment_tree_data,
heights: hashes,
metadata,
validated_tip: Arc::new(AtomicU32::new(0)),
validated_set: DashSet::new(),
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
}
zaino_db.check_schema_version().await?;
zaino_db.spawn_handler().await?;
Ok(zaino_db)
}
async fn spawn_handler(&mut self) -> Result<(), FinalisedStateError> {
let zaino_db = Self {
env: Arc::clone(&self.env),
headers: self.headers,
txids: self.txids,
transparent: self.transparent,
sapling: self.sapling,
orchard: self.orchard,
commitment_tree_data: self.commitment_tree_data,
heights: self.heights,
#[cfg(feature = "transparent_address_history_experimental")]
spent: self.spent,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: self.address_history,
metadata: self.metadata,
validated_tip: Arc::clone(&self.validated_tip),
validated_set: self.validated_set.clone(),
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
let handle = tokio::spawn({
let zaino_db = zaino_db;
async move {
zaino_db.status.store(StatusType::Syncing);
#[cfg(feature = "transparent_address_history_experimental")]
{
let (r1, r2, r3) = tokio::join!(
zaino_db.initial_spent_scan(),
zaino_db.initial_address_history_scan(),
zaino_db.initial_block_scan(),
);
for (desc, result) in [
("spent scan", r1),
("addrhist scan", r2),
("block scan", r3),
] {
if let Err(e) = result {
error!("initial {desc} failed: {e}");
zaino_db.status.store(StatusType::CriticalError);
return;
}
}
}
#[cfg(not(feature = "transparent_address_history_experimental"))]
{
if let Err(e) = zaino_db.initial_block_scan().await {
error!("initial block scan failed: {e}");
zaino_db.status.store(StatusType::CriticalError);
return;
}
}
info!(
"initial validation complete – tip={}",
zaino_db.validated_tip.load(Ordering::Relaxed)
);
zaino_db.status.store(StatusType::Ready);
let mut maintenance = interval(Duration::from_secs(60));
loop {
if zaino_db.status.load() == StatusType::Closing {
break;
}
let next_h = zaino_db.validated_tip.load(Ordering::Acquire) + 1;
let next_height = match Height::try_from(next_h) {
Ok(h) => h,
Err(_) => {
warn!("height overflow – validated_tip too large");
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
continue;
}
};
let hkey = match next_height.to_bytes() {
Ok(bytes) => bytes,
Err(e) => {
warn!("Failed to serialize height {}: {}", next_height, e);
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
continue;
}
};
let hash_opt = (|| -> Option<BlockHash> {
let ro = zaino_db.env.begin_ro_txn().ok()?;
let bytes = ro.get(zaino_db.headers, &hkey).ok()?;
let entry = StoredEntryVar::<BlockHeaderData>::deserialize(bytes).ok()?;
Some(entry.inner().context.index.hash)
})();
if let Some(hash) = hash_opt {
if let Err(e) = zaino_db.validate_block_blocking(next_height, hash) {
warn!("{e}");
}
continue;
}
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
}
}
});
*self.db_handler.lock().expect("db_handler mutex poisoned") = Some(handle);
Ok(())
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn initial_spent_scan(&self) -> Result<(), FinalisedStateError> {
let env = self.env.clone();
let spent = self.spent;
tokio::task::spawn_blocking(move || {
let ro = env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(spent)?;
for (key_bytes, val_bytes) in cursor.iter() {
let entry = StoredEntryFixed::<TxLocation>::from_bytes(val_bytes).map_err(|e| {
FinalisedStateError::Custom(format!("corrupt spent entry: {e}"))
})?;
if !entry.verify(key_bytes) {
return Err(FinalisedStateError::Custom(
"spent record checksum mismatch".into(),
));
}
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("Tokio task error: {e}")))?
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn initial_address_history_scan(&self) -> Result<(), FinalisedStateError> {
let env = self.env.clone();
let address_history = self.address_history;
tokio::task::spawn_blocking(move || {
let ro = env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(address_history)?;
for (addr_bytes, record_bytes) in cursor.iter() {
let entry =
StoredEntryFixed::<AddrEventBytes>::from_bytes(record_bytes).map_err(|e| {
FinalisedStateError::Custom(format!("corrupt addrhist entry: {e}"))
})?;
if !entry.verify(addr_bytes) {
return Err(FinalisedStateError::Custom(
"addrhist record checksum mismatch".into(),
));
}
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("spawn_blocking failed: {e}")))?
}
async fn initial_block_scan(&self) -> Result<(), FinalisedStateError> {
let zaino_db = Self {
env: Arc::clone(&self.env),
headers: self.headers,
txids: self.txids,
transparent: self.transparent,
sapling: self.sapling,
orchard: self.orchard,
commitment_tree_data: self.commitment_tree_data,
heights: self.heights,
#[cfg(feature = "transparent_address_history_experimental")]
spent: self.spent,
#[cfg(feature = "transparent_address_history_experimental")]
address_history: self.address_history,
metadata: self.metadata,
validated_tip: Arc::clone(&self.validated_tip),
validated_set: self.validated_set.clone(),
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
tokio::task::spawn_blocking(move || {
let ro = zaino_db.env.begin_ro_txn()?;
let mut cursor = ro.open_ro_cursor(zaino_db.heights)?;
for (hash_bytes, height_entry_bytes) in cursor.iter() {
let hash = BlockHash::from_bytes(hash_bytes)?;
let height = *StoredEntryFixed::<Height>::from_bytes(height_entry_bytes)
.map_err(|e| FinalisedStateError::Custom(format!("corrupt height entry: {e}")))?
.inner();
zaino_db.validate_block_blocking(height, hash)?
}
Ok(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("spawn_blocking failed: {e}")))?
}
}
impl Drop for DbV1 {
fn drop(&mut self) {
if let Some(handle) = self
.db_handler
.get_mut()
.expect("db_handler mutex poisoned")
.take()
{
handle.abort();
}
}
}