use super::{
capability::{
BlockCoreExt, Capability, DbCore as _, DbRead, DbVersion, DbWrite, MigrationStatus,
},
db::DbBackend,
router::Router,
};
use crate::{
chain_index::{
finalised_state::{
capability::{CapabilityRequest, DbMetadata},
db::v1::{DB_VERSION_V1, SYNC_CHECKPOINT_INTERVAL},
entry::{StoredEntryFixed, StoredEntryVar},
},
source::BlockchainSource,
types::GENESIS_HEIGHT,
},
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockMetadata, BlockWithMetadata, ChainWork, Height, IndexedBlock, Outpoint,
TransparentTxList, TxLocation, TxidList, ZainoVersionedSerde as _,
};
use lmdb::{Transaction, WriteFlags};
use zebra_chain::parameters::NetworkKind;
use async_trait::async_trait;
use std::sync::Arc;
use tracing::info;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MigrationType {
Patch,
Minor,
Major,
}
#[async_trait]
pub trait Migration<T: BlockchainSource> {
const CURRENT_VERSION: DbVersion;
const TO_VERSION: DbVersion;
fn current_version(&self) -> DbVersion {
Self::CURRENT_VERSION
}
fn to_version(&self) -> DbVersion {
Self::TO_VERSION
}
async fn migrate(
&self,
router: Arc<Router>,
_cfg: BlockCacheConfig,
_source: T,
) -> Result<(), FinalisedStateError> {
info!(
"Starting metadata-only migration from {} to {}.",
Self::CURRENT_VERSION,
Self::TO_VERSION,
);
let mut metadata: DbMetadata = router.get_metadata().await?;
metadata.version = Self::TO_VERSION;
metadata.schema_hash = crate::chain_index::finalised_state::db::v1::DB_SCHEMA_V1_HASH;
metadata.migration_status = MigrationStatus::Empty;
router.update_metadata(metadata).await?;
info!(
"Metadata-only migration from {} to {} complete.",
Self::CURRENT_VERSION,
Self::TO_VERSION,
);
Ok(())
}
}
pub(super) struct MigrationManager<T: BlockchainSource> {
pub(super) router: Arc<Router>,
pub(super) cfg: BlockCacheConfig,
pub(super) current_version: DbVersion,
pub(super) target_version: DbVersion,
pub(super) source: T,
}
impl<T: BlockchainSource> MigrationManager<T> {
pub(super) async fn migrate(&mut self) -> Result<(), FinalisedStateError> {
while self.current_version < self.target_version {
let migration = self.get_migration()?;
migration
.migrate(
Arc::clone(&self.router),
self.cfg.clone(),
self.source.clone(),
)
.await?;
self.current_version = migration.to_version::<T>();
}
Ok(())
}
fn get_migration(&self) -> Result<MigrationStep, FinalisedStateError> {
match (
self.current_version.major,
self.current_version.minor,
self.current_version.patch,
) {
(0, 0, 0) => Ok(MigrationStep::Migration0To1(Migration0To1)),
(1, 0, 0) => Ok(MigrationStep::Migration1_0_0To1_1_0(Migration1_0_0To1_1_0)),
(1, 1, 0) => Ok(MigrationStep::Migration1_1_0To1_2_0(Migration1_1_0To1_2_0)),
(_, _, _) => Err(FinalisedStateError::Custom(format!(
"Missing migration from version {}",
self.current_version
))),
}
}
}
enum MigrationStep {
Migration0To1(Migration0To1),
Migration1_0_0To1_1_0(Migration1_0_0To1_1_0),
Migration1_1_0To1_2_0(Migration1_1_0To1_2_0),
}
impl MigrationStep {
fn to_version<T: BlockchainSource>(&self) -> DbVersion {
match self {
MigrationStep::Migration0To1(_step) => <Migration0To1 as Migration<T>>::TO_VERSION,
MigrationStep::Migration1_0_0To1_1_0(_step) => {
<Migration1_0_0To1_1_0 as Migration<T>>::TO_VERSION
}
MigrationStep::Migration1_1_0To1_2_0(_step) => {
<Migration1_1_0To1_2_0 as Migration<T>>::TO_VERSION
}
}
}
async fn migrate<T: BlockchainSource>(
&self,
router: Arc<Router>,
cfg: BlockCacheConfig,
source: T,
) -> Result<(), FinalisedStateError> {
match self {
MigrationStep::Migration0To1(step) => step.migrate(router, cfg, source).await,
MigrationStep::Migration1_0_0To1_1_0(step) => step.migrate(router, cfg, source).await,
MigrationStep::Migration1_1_0To1_2_0(step) => step.migrate(router, cfg, source).await,
}
}
}
struct Migration0To1;
#[async_trait]
impl<T: BlockchainSource> Migration<T> for Migration0To1 {
const CURRENT_VERSION: DbVersion = DbVersion {
major: 0,
minor: 0,
patch: 0,
};
const TO_VERSION: DbVersion = DB_VERSION_V1;
async fn migrate(
&self,
router: Arc<Router>,
cfg: BlockCacheConfig,
source: T,
) -> Result<(), FinalisedStateError> {
info!("Starting v0 to v1 migration.");
let shadow = Arc::new(DbBackend::spawn_v1(&cfg).await?);
router.set_shadow(Arc::clone(&shadow), Capability::empty());
let migration_status = shadow.get_metadata().await?.migration_status();
match migration_status {
MigrationStatus::Empty
| MigrationStatus::PartialBuidInProgress
| MigrationStatus::PartialBuildComplete
| MigrationStatus::FinalBuildInProgress => {
let mut parent_chain_work = ChainWork::from_u256(0.into());
let shadow_db_height_opt = shadow.db_height().await?;
let mut shadow_db_height = shadow_db_height_opt.unwrap_or(GENESIS_HEIGHT);
let mut build_start_height = if shadow_db_height_opt.is_some() {
parent_chain_work = shadow
.get_block_header(shadow_db_height)
.await?
.context
.chainwork;
shadow_db_height + 1
} else {
shadow_db_height
};
let mut primary_db_height = router.db_height().await?.unwrap_or(GENESIS_HEIGHT);
info!(
"Starting shadow database build, current database tips: v0:{} v1:{}",
primary_db_height, shadow_db_height
);
loop {
if shadow_db_height >= primary_db_height {
break;
}
for height in (build_start_height.0)..=primary_db_height.0 {
let block = source
.get_block(zebra_state::HashOrHeight::Height(
zebra_chain::block::Height(height),
))
.await?
.ok_or_else(|| {
FinalisedStateError::Custom(format!(
"block not found at height {height}"
))
})?;
let hash = BlockHash::from(block.hash().0);
let (sapling_root_data, orchard_root_data) =
source.get_commitment_tree_roots(hash).await?;
let (sapling_root, sapling_root_size) =
sapling_root_data.ok_or_else(|| {
FinalisedStateError::Custom(format!(
"sapling commitment tree data missing for block {hash:?} at height {height}"
))
})?;
let (orchard_root, orchard_root_size) =
orchard_root_data.ok_or_else(|| {
FinalisedStateError::Custom(format!(
"orchard commitment tree data missing for block {hash:?} at height {height}"
))
})?;
let metadata = BlockMetadata::new(
sapling_root,
sapling_root_size as u32,
orchard_root,
orchard_root_size as u32,
parent_chain_work,
cfg.network.to_zebra_network(),
);
let block_with_metadata = BlockWithMetadata::new(block.as_ref(), metadata);
let chain_block =
IndexedBlock::try_from(block_with_metadata).map_err(|_| {
FinalisedStateError::Custom(
"Failed to build chain block".to_string(),
)
})?;
parent_chain_work = *chain_block.chainwork();
shadow.write_block(chain_block).await?;
}
std::thread::sleep(std::time::Duration::from_millis(100));
shadow_db_height = shadow.db_height().await?.unwrap_or(Height(0));
build_start_height = shadow_db_height + 1;
primary_db_height = router.db_height().await?.unwrap_or(Height(0));
}
let mut metadata = shadow.get_metadata().await?;
metadata.migration_status = MigrationStatus::Complete;
shadow.update_metadata(metadata).await?;
info!("v1 database build complete.");
}
MigrationStatus::Complete => {
}
}
info!("promoting v1 database to primary.");
shadow.env().sync(true)?;
let db_v0 = router.promote_shadow()?;
tokio::spawn(async move {
while Arc::strong_count(&db_v0) > 1 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if let Err(e) = db_v0.shutdown().await {
tracing::warn!("Old primary shutdown failed: {e}");
}
let db_path_dir = match cfg.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "live",
NetworkKind::Testnet => "test",
NetworkKind::Regtest => "local",
};
let db_path = cfg.storage.database.path.join(db_path_dir);
info!("Wiping v0 database from disk.");
match tokio::fs::remove_dir_all(&db_path).await {
Ok(_) => tracing::info!("Deleted old database at {}", db_path.display()),
Err(e) => tracing::error!(
"Failed to delete old database at {}: {}",
db_path.display(),
e
),
}
});
info!("v0 to v1 migration complete.");
Ok(())
}
}
struct Migration1_0_0To1_1_0;
#[async_trait]
impl<T: BlockchainSource> Migration<T> for Migration1_0_0To1_1_0 {
const CURRENT_VERSION: DbVersion = DbVersion {
major: 1,
minor: 0,
patch: 0,
};
const TO_VERSION: DbVersion = DbVersion {
major: 1,
minor: 1,
patch: 0,
};
}
fn flush_migration_spent_batch(
env: &lmdb::Environment,
spent_db: lmdb::Database,
metadata_db: lmdb::Database,
progress_key: &[u8],
buffer: &mut Vec<(Vec<u8>, TxLocation)>,
up_to_height: Height,
) -> Result<(), FinalisedStateError> {
buffer.sort_by(|a, b| a.0.cmp(&b.0));
let mut txn = env.begin_rw_txn()?;
for (outpoint_bytes, tx_location) in buffer.iter() {
let entry_bytes = StoredEntryFixed::new(outpoint_bytes, *tx_location).to_bytes()?;
match txn.put(
spent_db,
outpoint_bytes,
&entry_bytes,
WriteFlags::NO_OVERWRITE,
) {
Ok(()) => {}
Err(lmdb::Error::KeyExist) => {
let existing = txn
.get(spent_db, outpoint_bytes)
.map_err(FinalisedStateError::LmdbError)?;
if existing != entry_bytes {
return Err(FinalisedStateError::Custom(format!(
"conflicting existing spent entry during batched migration for outpoint {}",
hex::encode(outpoint_bytes)
)));
}
}
Err(error) => return Err(FinalisedStateError::LmdbError(error)),
}
}
let progress = StoredEntryFixed::new(progress_key, up_to_height + 1);
txn.put(
metadata_db,
&progress_key,
&progress.to_bytes()?,
WriteFlags::empty(),
)?;
txn.commit()?;
env.sync(true)?;
buffer.clear();
Ok(())
}
struct Migration1_1_0To1_2_0;
#[async_trait]
impl<T: BlockchainSource> Migration<T> for Migration1_1_0To1_2_0 {
const CURRENT_VERSION: DbVersion = DbVersion {
major: 1,
minor: 1,
patch: 0,
};
const TO_VERSION: DbVersion = DbVersion {
major: 1,
minor: 2,
patch: 0,
};
async fn migrate(
&self,
router: Arc<Router>,
cfg: BlockCacheConfig,
_source: T,
) -> Result<(), FinalisedStateError> {
const MIGRATION_TXID_LOCATION_PROGRESS_KEY: &[u8] =
b"_migration_txid_location_progress_1_2_0_next_height";
const MIGRATION_SPENT_PROGRESS_KEY: &[u8] = b"_migration_spent_progress_1_2_0_next_height";
info!("Starting v1.1.0 → v1.2.0 migration.");
router.limit_primary_caps(Capability::TRANSPARENT_HIST_EXT);
let backend = router.backend(CapabilityRequest::WriteCore)?;
let env = backend.env();
let metadata_db = backend.metadata_db()?;
let txids_db = backend.txids_db()?;
let transparent_db = backend.transparent_db()?;
let spent_db = backend.spent_db()?;
let txid_location_db = backend.txid_location_db()?;
{
let mut metadata: DbMetadata = router.get_metadata().await?;
if metadata.migration_status == MigrationStatus::Empty {
metadata.migration_status = MigrationStatus::PartialBuidInProgress;
router.update_metadata(metadata).await?;
}
}
let read_progress = |key: &[u8]| -> Result<Option<u32>, FinalisedStateError> {
let txn = env.begin_ro_txn()?;
match txn.get(metadata_db, &key) {
Ok(bytes) => {
let entry = StoredEntryFixed::<Height>::from_bytes(bytes).map_err(|error| {
FinalisedStateError::Custom(format!(
"corrupt v1.2.0 migration progress entry: {error}"
))
})?;
if !entry.verify(key) {
return Err(FinalisedStateError::Custom(
"v1.2.0 migration progress checksum mismatch".to_string(),
));
}
Ok(Some(entry.inner().0))
}
Err(lmdb::Error::NotFound) => Ok(None),
Err(error) => Err(FinalisedStateError::LmdbError(error)),
}
};
if let Some(db_tip) = router.db_height().await? {
let db_tip = db_tip.0;
let mut next_height =
read_progress(MIGRATION_TXID_LOCATION_PROGRESS_KEY)?.unwrap_or(GENESIS_HEIGHT.0);
info!(
resume_height = next_height,
db_tip, "v1.2.0 migration Stage A: building txid_location index"
);
let stage_a_started = std::time::Instant::now();
while next_height <= db_tip {
let height = Height::try_from(next_height)
.map_err(|error| FinalisedStateError::Custom(error.to_string()))?;
let height_bytes = height.to_bytes()?;
let txids = {
let txn = env.begin_ro_txn()?;
let raw = txn
.get(txids_db, &height_bytes)
.map_err(FinalisedStateError::LmdbError)?;
let entry = StoredEntryVar::<TxidList>::from_bytes(raw).map_err(|error| {
FinalisedStateError::Custom(format!("txids corrupt data: {error}"))
})?;
if !entry.verify(&height_bytes) {
return Err(FinalisedStateError::Custom(
"txids checksum mismatch".to_string(),
));
}
entry.inner().txids().to_vec()
};
let mut entries: Vec<([u8; 32], TxLocation)> = Vec::with_capacity(txids.len());
for (tx_index, txid) in txids.iter().enumerate() {
let tx_index = u16::try_from(tx_index).map_err(|_| {
FinalisedStateError::Custom(format!(
"transaction index out of range at height {}",
height.0
))
})?;
entries.push(((*txid).into(), TxLocation::new(height.0, tx_index)));
}
entries.sort_by_key(|entry| entry.0);
{
let mut txn = env.begin_rw_txn()?;
for (txid_bytes, tx_location) in &entries {
let entry_bytes =
StoredEntryFixed::new(txid_bytes, *tx_location).to_bytes()?;
match txn.put(
txid_location_db,
txid_bytes,
&entry_bytes,
WriteFlags::NO_OVERWRITE,
) {
Ok(()) => {}
Err(lmdb::Error::KeyExist) => {
let existing_bytes = txn
.get(txid_location_db, txid_bytes)
.map_err(FinalisedStateError::LmdbError)?;
let existing_entry =
StoredEntryFixed::<TxLocation>::from_bytes(existing_bytes)
.map_err(|error| {
FinalisedStateError::Custom(format!(
"corrupt existing txid_location entry: {error}"
))
})?;
if !existing_entry.verify(txid_bytes) {
return Err(FinalisedStateError::Custom(
"existing txid_location entry checksum mismatch"
.to_string(),
));
}
if existing_entry.inner() != tx_location {
return Err(FinalisedStateError::Custom(format!(
"conflicting txid_location entry at height {}",
height.0
)));
}
}
Err(error) => return Err(FinalisedStateError::LmdbError(error)),
}
}
let progress =
StoredEntryFixed::new(MIGRATION_TXID_LOCATION_PROGRESS_KEY, height + 1);
txn.put(
metadata_db,
&MIGRATION_TXID_LOCATION_PROGRESS_KEY,
&progress.to_bytes()?,
WriteFlags::empty(),
)?;
txn.commit()?;
}
if next_height % SYNC_CHECKPOINT_INTERVAL == 0 {
env.sync(true)?;
}
if next_height % 50_000 == 0 {
info!(
height = next_height,
db_tip,
elapsed = ?stage_a_started.elapsed(),
"v1.2.0 migration Stage A progress"
);
}
next_height = height.0 + 1;
}
env.sync(true)?;
info!(
db_tip,
elapsed = ?stage_a_started.elapsed(),
"v1.2.0 migration Stage A complete"
);
let mut next_height_to_migrate = match read_progress(MIGRATION_SPENT_PROGRESS_KEY)? {
Some(height) => height,
None => {
let mut txn = env.begin_rw_txn()?;
let progress =
StoredEntryFixed::new(MIGRATION_SPENT_PROGRESS_KEY, GENESIS_HEIGHT);
txn.put(
metadata_db,
&MIGRATION_SPENT_PROGRESS_KEY,
&progress.to_bytes()?,
WriteFlags::empty(),
)?;
txn.commit()?;
GENESIS_HEIGHT.0
}
};
let db_tip = router
.db_height()
.await?
.map(|height| height.0)
.unwrap_or(db_tip);
info!(
resume_height = next_height_to_migrate,
db_tip, "v1.2.0 migration Stage B: backfilling spent index"
);
let stage_b_started = std::time::Instant::now();
let batch_budget = cfg.storage.database.sync_write_batch_bytes.max(1);
let mut spent_buffer: Vec<(Vec<u8>, TxLocation)> = Vec::new();
let mut spent_buffer_bytes: u64 = 0;
while next_height_to_migrate <= db_tip {
let height = Height::try_from(next_height_to_migrate)
.map_err(|error| FinalisedStateError::Custom(error.to_string()))?;
let height_bytes = height.to_bytes()?;
let transparent_tx_list = {
let txn = env.begin_ro_txn()?;
let raw = txn
.get(transparent_db, &height_bytes)
.map_err(FinalisedStateError::LmdbError)?;
let entry =
StoredEntryVar::<TransparentTxList>::from_bytes(raw).map_err(|error| {
FinalisedStateError::Custom(format!(
"transparent corrupt data: {error}"
))
})?;
if !entry.verify(&height_bytes) {
return Err(FinalisedStateError::Custom(
"transparent checksum mismatch".to_string(),
));
}
entry.inner().clone()
};
let transparent = transparent_tx_list.tx().to_vec();
let mut spent_map = std::collections::HashMap::new();
for (tx_index, tx_opt) in transparent.iter().enumerate() {
let Some(transparent_tx) = tx_opt else {
continue;
};
let tx_index = u16::try_from(tx_index).map_err(|_| {
FinalisedStateError::Custom(format!(
"transaction index out of range at height {}",
height.0
))
})?;
let tx_location = TxLocation::new(height.0, tx_index);
for input in transparent_tx.inputs() {
if input.is_null_prevout() {
continue;
}
let outpoint = Outpoint::new(*input.prevout_txid(), input.prevout_index());
if spent_map.insert(outpoint, tx_location).is_some() {
return Err(FinalisedStateError::Custom(format!(
"duplicate transparent spend for outpoint {:?} at height {}",
outpoint, height.0
)));
}
}
}
for (outpoint, tx_location) in &spent_map {
let outpoint_bytes = outpoint.to_bytes()?;
spent_buffer_bytes =
spent_buffer_bytes.saturating_add(outpoint_bytes.len() as u64 + 64);
spent_buffer.push((outpoint_bytes, *tx_location));
}
if spent_buffer_bytes >= batch_budget {
flush_migration_spent_batch(
&env,
spent_db,
metadata_db,
MIGRATION_SPENT_PROGRESS_KEY,
&mut spent_buffer,
height,
)?;
spent_buffer_bytes = 0;
}
if next_height_to_migrate % 10_000 == 0 {
info!(
height = next_height_to_migrate,
db_tip,
elapsed = ?stage_b_started.elapsed(),
"v1.2.0 migration Stage B progress"
);
}
next_height_to_migrate = height.0 + 1;
}
if !spent_buffer.is_empty() {
let tip_height = Height::try_from(db_tip)
.map_err(|error| FinalisedStateError::Custom(error.to_string()))?;
flush_migration_spent_batch(
&env,
spent_db,
metadata_db,
MIGRATION_SPENT_PROGRESS_KEY,
&mut spent_buffer,
tip_height,
)?;
}
info!(
db_tip,
elapsed = ?stage_b_started.elapsed(),
"v1.2.0 migration Stage B complete"
);
let stage_c_started = std::time::Instant::now();
info!(
db_tip,
"v1.2.0 migration Stage C: building txout-set accumulator"
);
backend.rebuild_tx_out_set_accumulator().await?;
info!(
db_tip,
elapsed = ?stage_c_started.elapsed(),
"v1.2.0 migration Stage C complete"
);
}
env.sync(true)?;
let mut metadata: DbMetadata = router.get_metadata().await?;
metadata.version = <Self as Migration<T>>::TO_VERSION;
metadata.schema_hash = crate::chain_index::finalised_state::db::v1::DB_SCHEMA_V1_HASH;
metadata.migration_status = MigrationStatus::Empty;
router.update_metadata(metadata).await?;
env.sync(true)?;
{
let mut txn = env.begin_rw_txn()?;
for key in [
MIGRATION_TXID_LOCATION_PROGRESS_KEY,
MIGRATION_SPENT_PROGRESS_KEY,
] {
match txn.del(metadata_db, &key, None) {
Ok(()) | Err(lmdb::Error::NotFound) => {}
Err(error) => return Err(FinalisedStateError::LmdbError(error)),
}
}
txn.commit()?;
}
env.sync(true)?;
router.extend_primary_caps(Capability::TRANSPARENT_HIST_EXT);
info!("v1.1.0 to v1.2.0 migration complete.");
Ok(())
}
}