use std::{path::Path, sync::Arc};
use crossbeam_channel::bounded;
use semver::Version;
use zebra_chain::parameters::Network;
use crate::{
config::database_format_version_on_disk,
constants::RESTORABLE_DB_VERSIONS,
service::finalized_state::{
disk_db::DiskDb,
disk_format::{
block::MAX_ON_DISK_HEIGHT,
upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
},
},
write_database_format_version_to_disk, BoxError, Config,
};
pub mod block;
pub mod chain;
pub mod metrics;
pub mod shielded;
pub mod transparent;
#[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))]
pub mod arbitrary;
#[derive(Clone, Debug)]
pub struct ZebraDb {
config: Arc<Config>,
debug_skip_format_upgrades: bool,
format_change_handle: Option<DbFormatChangeThreadHandle>,
db: DiskDb,
}
impl ZebraDb {
pub fn new(
config: &Config,
db_kind: impl AsRef<str>,
format_version_in_code: &Version,
network: &Network,
debug_skip_format_upgrades: bool,
column_families_in_code: impl IntoIterator<Item = String>,
read_only: bool,
) -> ZebraDb {
let disk_version = database_format_version_on_disk(
config,
&db_kind,
format_version_in_code.major,
network,
)
.expect("unable to read database format version file");
DiskDb::try_reusing_previous_db_after_major_upgrade(
&RESTORABLE_DB_VERSIONS,
format_version_in_code,
config,
&db_kind,
network,
);
let format_change = DbFormatChange::open_database(format_version_in_code, disk_version);
let debug_skip_format_upgrades = read_only
|| ((cfg!(test) || cfg!(feature = "shielded-scan")) && debug_skip_format_upgrades);
let mut db = ZebraDb {
config: Arc::new(config.clone()),
debug_skip_format_upgrades,
format_change_handle: None,
db: DiskDb::new(
config,
db_kind,
format_version_in_code,
network,
column_families_in_code,
read_only,
),
};
db.spawn_format_change(format_change);
db
}
pub fn spawn_format_change(&mut self, format_change: DbFormatChange) {
if self.debug_skip_format_upgrades {
return;
}
let initial_tip_height = self.finalized_tip_height();
let upgrade_db = self.clone();
let format_change_handle =
format_change.spawn_format_change(upgrade_db, initial_tip_height);
self.format_change_handle = Some(format_change_handle);
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn db_kind(&self) -> String {
self.db.db_kind()
}
pub fn format_version_in_code(&self) -> Version {
self.db.format_version_in_code()
}
pub fn major_version(&self) -> u64 {
self.db.major_version()
}
pub fn format_version_on_disk(&self) -> Result<Option<Version>, BoxError> {
database_format_version_on_disk(
self.config(),
self.db_kind(),
self.major_version(),
&self.network(),
)
}
pub(crate) fn update_format_version_on_disk(
&self,
new_version: &Version,
) -> Result<(), BoxError> {
write_database_format_version_to_disk(
self.config(),
self.db_kind(),
new_version,
&self.network(),
)
}
pub fn network(&self) -> Network {
self.db.network()
}
pub fn path(&self) -> &Path {
self.db.path()
}
pub fn check_for_panics(&mut self) {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.check_for_panics();
}
}
pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
self.db.try_catch_up_with_primary()
}
pub fn shutdown(&mut self, force: bool) {
let is_shutdown = force || self.db.shared_database_owners() <= 1;
if !self.debug_skip_format_upgrades && is_shutdown {
if let Some(format_change_handle) = self.format_change_handle.as_mut() {
format_change_handle.force_cancel();
}
let disk_version = database_format_version_on_disk(
&self.config,
self.db_kind(),
self.major_version(),
&self.network(),
)
.expect("unexpected invalid or unreadable database version file");
if let Some(disk_version) = disk_version {
let (_never_cancel_handle, never_cancel_receiver) = bounded(1);
if disk_version >= self.db.format_version_in_code() {
DbFormatChange::check_new_blocks(self)
.run_format_change_or_check(
self,
None,
&never_cancel_receiver,
)
.expect("cancel handle is never used");
}
}
}
self.check_for_panics();
self.db.shutdown(force);
}
pub(crate) fn check_max_on_disk_tip_height(&self) -> Result<(), String> {
if let Some((tip_height, tip_hash)) = self.tip() {
if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 {
let err = Err(format!(
"unexpectedly large tip height, database format upgrade required: \
tip height: {tip_height:?}, tip hash: {tip_hash:?}, \
max height: {MAX_ON_DISK_HEIGHT:?}"
));
error!(?err);
return err;
}
}
Ok(())
}
pub fn print_db_metrics(&self) {
self.db.print_db_metrics();
}
pub fn size(&self) -> u64 {
self.db.size()
}
}
impl Drop for ZebraDb {
fn drop(&mut self) {
self.shutdown(false);
}
}