#![allow(dead_code)]
pub(crate) mod capability;
pub(crate) mod db;
pub(crate) mod entry;
pub(crate) mod migrations;
pub(crate) mod reader;
pub(crate) mod router;
use capability::*;
use db::{DbBackend, VERSION_DIRS};
use migrations::MigrationManager;
use reader::*;
use router::Router;
use tracing::{info, instrument};
use zebra_chain::parameters::NetworkKind;
use crate::{
chain_index::{finalised_state::db::v1::DB_VERSION_V1, source::BlockchainSourceError},
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockMetadata, BlockWithMetadata, ChainWork, Height, IndexedBlock, StatusType,
};
use std::{sync::Arc, time::Duration};
use tokio::time::{interval, MissedTickBehavior};
pub(crate) async fn build_indexed_block_from_source<S: BlockchainSource>(
source: &S,
network: zaino_common::Network,
sapling_activation_height: zebra_chain::block::Height,
nu5_activation_height: Option<zebra_chain::block::Height>,
height_int: u32,
parent_chainwork: ChainWork,
) -> Result<IndexedBlock, FinalisedStateError> {
let block = match source
.get_block(zebra_state::HashOrHeight::Height(
zebra_chain::block::Height(height_int),
))
.await?
{
Some(block) => block,
None => {
return Err(FinalisedStateError::BlockchainSourceError(
BlockchainSourceError::Unrecoverable(format!(
"error fetching block at height {height_int} from validator"
)),
));
}
};
let block_hash = BlockHash::from(block.hash().0);
let (sapling_opt, orchard_opt) = source.get_commitment_tree_roots(block_hash).await?;
let is_sapling_active = height_int >= sapling_activation_height.0;
let is_orchard_active = nu5_activation_height
.is_some_and(|nu5_activation_height| height_int >= nu5_activation_height.0);
let (sapling_root, sapling_size) = if is_sapling_active {
sapling_opt.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!("missing Sapling commitment tree root for block {block_hash}"),
))
})?
} else {
(zebra_chain::sapling::tree::Root::default(), 0)
};
let (orchard_root, orchard_size) = if is_orchard_active {
orchard_opt.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!("missing Orchard commitment tree root for block {block_hash}"),
))
})?
} else {
(zebra_chain::orchard::tree::Root::default(), 0)
};
let metadata = BlockMetadata::new(
sapling_root,
sapling_size as u32,
orchard_root,
orchard_size as u32,
parent_chainwork,
network.to_zebra_network(),
);
let block_with_metadata = BlockWithMetadata::new(block.as_ref(), metadata);
IndexedBlock::try_from(block_with_metadata).map_err(|_| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(format!(
"error building block data at height {height_int}"
)))
})
}
use super::source::BlockchainSource;
#[derive(Debug)]
pub(crate) struct ZainoDB {
db: Arc<Router>,
cfg: BlockCacheConfig,
}
impl ZainoDB {
#[instrument(name = "ZainoDB::spawn", skip(cfg, source), fields(db_version = cfg.db_version))]
pub(crate) async fn spawn<T>(
cfg: BlockCacheConfig,
source: T,
) -> Result<Self, FinalisedStateError>
where
T: BlockchainSource,
{
let version_opt = Self::try_find_current_db_version(&cfg).await;
let target_version = match cfg.db_version {
0 => DbVersion {
major: 0,
minor: 0,
patch: 0,
},
1 => DB_VERSION_V1,
x => {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: DbV{x}"
)));
}
};
let backend = match version_opt {
Some(version) => {
info!(version, "Opening ZainoDB from file");
match version {
0 => DbBackend::spawn_v0(&cfg).await?,
1 => DbBackend::spawn_v1(&cfg).await?,
_ => {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: DbV{version}"
)));
}
}
}
None => {
info!(version = %target_version, "Creating new ZainoDB");
match target_version.major() {
0 => DbBackend::spawn_v0(&cfg).await?,
1 => DbBackend::spawn_v1(&cfg).await?,
_ => {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: DbV{target_version}"
)));
}
}
}
};
let current_version = backend.get_metadata().await?.version();
let router = Arc::new(Router::new(Arc::new(backend)));
if version_opt.is_some() && current_version < target_version {
info!(
from_version = %current_version,
to_version = %target_version,
"Starting ZainoDB migration"
);
let mut migration_manager = MigrationManager {
router: Arc::clone(&router),
cfg: cfg.clone(),
current_version,
target_version,
source,
};
migration_manager.migrate().await?;
}
Ok(Self { db: router, cfg })
}
pub(crate) async fn shutdown(&self) -> Result<(), FinalisedStateError> {
self.db.shutdown().await
}
pub(crate) fn status(&self) -> StatusType {
self.db.status()
}
pub(crate) async fn wait_until_ready(&self) {
let mut ticker = interval(Duration::from_millis(100));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if self.db.status() == StatusType::Ready {
break;
}
}
}
pub(crate) fn to_reader(self: &Arc<Self>) -> DbReader {
DbReader {
inner: Arc::clone(self),
}
}
async fn try_find_current_db_version(cfg: &BlockCacheConfig) -> Option<u32> {
let legacy_dir = match cfg.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "live",
NetworkKind::Testnet => "test",
NetworkKind::Regtest => "local",
};
let legacy_path = cfg.storage.database.path.join(legacy_dir);
if legacy_path.join("data.mdb").exists() && legacy_path.join("lock.mdb").exists() {
return Some(0);
}
let net_dir = match cfg.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "mainnet",
NetworkKind::Testnet => "testnet",
NetworkKind::Regtest => "regtest",
};
let net_path = cfg.storage.database.path.join(net_dir);
if net_path.exists() && net_path.is_dir() {
for (i, version_dir) in VERSION_DIRS.iter().enumerate() {
let db_path = net_path.join(version_dir);
let data_file = db_path.join("data.mdb");
let lock_file = db_path.join("lock.mdb");
if data_file.exists() && lock_file.exists() {
let version = (i + 1) as u32;
return Some(version);
}
}
}
None
}
#[inline]
pub(crate) fn backend_for_cap(
&self,
cap: CapabilityRequest,
) -> Result<Arc<DbBackend>, FinalisedStateError> {
self.db.backend(cap)
}
pub(crate) async fn sync_to_height<T>(
&self,
height: Height,
source: &T,
) -> Result<(), FinalisedStateError>
where
T: BlockchainSource,
{
let result = self.db.write_blocks_to_height(height, source).await;
if result.is_ok() {
let env = self.db.backend(CapabilityRequest::WriteCore)?.env();
tokio::task::block_in_place(|| env.sync(true))
.map_err(FinalisedStateError::LmdbError)?;
}
result
}
pub(crate) async fn write_block(&self, b: IndexedBlock) -> Result<(), FinalisedStateError> {
self.db.write_block(b).await
}
pub(crate) async fn delete_block_at_height(
&self,
h: Height,
) -> Result<(), FinalisedStateError> {
self.db.delete_block_at_height(h).await
}
pub(crate) async fn delete_block(&self, b: &IndexedBlock) -> Result<(), FinalisedStateError> {
self.db.delete_block(b).await
}
pub(crate) async fn db_height(&self) -> Result<Option<Height>, FinalisedStateError> {
self.db.db_height().await
}
pub(crate) async fn get_block_height(
&self,
hash: BlockHash,
) -> Result<Option<Height>, FinalisedStateError> {
self.db.get_block_height(hash).await
}
pub(crate) async fn get_block_hash(
&self,
height: Height,
) -> Result<Option<BlockHash>, FinalisedStateError> {
self.db.get_block_hash(height).await
}
pub(crate) async fn get_metadata(&self) -> Result<DbMetadata, FinalisedStateError> {
self.db.get_metadata().await
}
}
#[cfg(test)]
impl ZainoDB {
pub(crate) fn router(&self) -> &Router {
&self.db
}
pub(crate) async fn spawn_with_target_version<T>(
cfg: BlockCacheConfig,
source: T,
target_version: DbVersion,
) -> Result<Self, FinalisedStateError>
where
T: BlockchainSource,
{
if target_version.major() > DB_VERSION_V1.major() {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: {target_version}"
)));
}
if target_version.major() == DB_VERSION_V1.major() && target_version > DB_VERSION_V1 {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: {target_version}"
)));
}
let version_opt = Self::try_find_current_db_version(&cfg).await;
let backend = match version_opt {
Some(version) => {
info!(version, "Opening ZainoDB from file");
match version {
0 => DbBackend::spawn_v0(&cfg).await?,
1 => DbBackend::spawn_v1(&cfg).await?,
_ => {
return Err(FinalisedStateError::Custom(format!(
"unsupported database version: DbV{version}"
)));
}
}
}
None => {
return Err(FinalisedStateError::Custom(
"expected existing v1.0.0 migration-test database, found no database"
.to_string(),
));
}
};
let current_version = backend.get_metadata().await?.version();
let router = Arc::new(Router::new(Arc::new(backend)));
if current_version < target_version {
info!(
from_version = %current_version,
to_version = %target_version,
"Starting ZainoDB migration"
);
let mut migration_manager = MigrationManager {
router: Arc::clone(&router),
cfg: cfg.clone(),
current_version,
target_version,
source,
};
migration_manager.migrate().await?;
}
let metadata = router.get_metadata().await?;
if metadata.version() != target_version {
return Err(FinalisedStateError::Custom(format!(
"database version mismatch after test spawn: expected {}, found {}",
target_version,
metadata.version()
)));
}
Ok(Self { db: router, cfg })
}
pub(crate) async fn build_clean_v1_0_0<T>(
cfg: &BlockCacheConfig,
source: T,
) -> Result<DbBackend, FinalisedStateError>
where
T: BlockchainSource,
{
let db = DbBackend::spawn_v1_0_0(cfg).await?;
db.wait_until_ready().await;
let tip = source.get_best_block_height().await?.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
"source has no best block height".to_string(),
))
})?;
let tip = Height::from(tip);
let mut parent_chainwork = ChainWork::from_u256(0.into());
for height in crate::chain_index::types::GENESIS_HEIGHT.0..=tip.0 {
let block = source
.get_block(zebra_state::HashOrHeight::Height(
zebra_chain::block::Height(height),
))
.await?
.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(
BlockchainSourceError::Unrecoverable(format!(
"source missing block at height {height}"
)),
)
})?;
let block_hash = BlockHash::from(block.hash().0);
let (sapling_opt, orchard_opt) = source.get_commitment_tree_roots(block_hash).await?;
let (sapling_root, sapling_size) = sapling_opt.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!("missing Sapling commitment tree root for block {block_hash}"),
))
})?;
let (orchard_root, orchard_size) = orchard_opt.ok_or_else(|| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!("missing Orchard commitment tree root for block {block_hash}"),
))
})?;
let metadata = BlockMetadata::new(
sapling_root,
sapling_size as u32,
orchard_root,
orchard_size as u32,
parent_chainwork,
cfg.network.to_zebra_network(),
);
let block_with_metadata = BlockWithMetadata::new(block.as_ref(), metadata);
let chain_block = IndexedBlock::try_from(block_with_metadata).map_err(|_| {
FinalisedStateError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!("error building block data at height {height}"),
))
})?;
parent_chainwork = chain_block.context.chainwork;
db.write_block_v1_0_0(chain_block).await?;
}
Ok(db)
}
pub(crate) async fn build_db_to_version<T>(
cfg: BlockCacheConfig,
source: T,
target_version: DbVersion,
) -> Result<Self, FinalisedStateError>
where
T: BlockchainSource,
{
let v1_0_0 = DbVersion::new(1, 0, 0);
if target_version < v1_0_0 {
return Err(FinalisedStateError::Custom(format!(
"target version {} is older than v1.0.0",
target_version
)));
}
let db = Self::build_clean_v1_0_0(&cfg, source.clone()).await?;
db.shutdown().await?;
drop(db);
Self::spawn_with_target_version(cfg, source, target_version).await
}
}