use std::{
fs::{self, canonicalize, remove_dir_all, DirEntry, ReadDir},
io::ErrorKind,
path::{Path, PathBuf},
time::Duration,
};
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::task::{spawn_blocking, JoinHandle};
use tracing::Span;
use zebra_chain::{common::default_cache_dir, parameters::Network};
use crate::{
constants::{DATABASE_FORMAT_VERSION_FILE_NAME, STATE_DATABASE_KIND},
service::finalized_state::restorable_db_versions,
state_database_format_version_in_code, BoxError,
};
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
pub cache_dir: PathBuf,
pub ephemeral: bool,
pub should_backup_non_finalized_state: bool,
pub delete_old_database: bool,
pub debug_stop_at_height: Option<u32>,
#[serde(with = "humantime_serde")]
pub debug_validity_check_interval: Option<Duration>,
pub debug_skip_non_finalized_state_backup_task: bool,
#[cfg(feature = "elasticsearch")]
pub elasticsearch_url: String,
#[cfg(feature = "elasticsearch")]
pub elasticsearch_username: String,
#[cfg(feature = "elasticsearch")]
pub elasticsearch_password: String,
}
fn gen_temp_path(prefix: &str) -> PathBuf {
tempfile::Builder::new()
.prefix(prefix)
.tempdir()
.expect("temporary directory is created successfully")
.keep()
}
impl Config {
pub fn db_path(
&self,
db_kind: impl AsRef<str>,
major_version: u64,
network: &Network,
) -> PathBuf {
let db_kind = db_kind.as_ref();
let major_version = format!("v{major_version}");
let net_dir = network.lowercase_name();
if self.ephemeral {
gen_temp_path(&format!("zebra-{db_kind}-{major_version}-{net_dir}-"))
} else {
self.cache_dir
.join(db_kind)
.join(major_version)
.join(net_dir)
}
}
pub fn non_finalized_state_backup_dir(&self, network: &Network) -> Option<PathBuf> {
if self.ephemeral || !self.should_backup_non_finalized_state {
return None;
}
let net_dir = network.lowercase_name();
Some(self.cache_dir.join("non_finalized_state").join(net_dir))
}
pub fn version_file_path(
&self,
db_kind: impl AsRef<str>,
major_version: u64,
network: &Network,
) -> PathBuf {
let mut version_path = self.db_path(db_kind, major_version, network);
version_path.push(DATABASE_FORMAT_VERSION_FILE_NAME);
version_path
}
pub fn ephemeral() -> Config {
Config {
ephemeral: true,
..Config::default()
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
cache_dir: default_cache_dir(),
ephemeral: false,
should_backup_non_finalized_state: true,
delete_old_database: true,
debug_stop_at_height: None,
debug_validity_check_interval: None,
debug_skip_non_finalized_state_backup_task: false,
#[cfg(feature = "elasticsearch")]
elasticsearch_url: "https://localhost:9200".to_string(),
#[cfg(feature = "elasticsearch")]
elasticsearch_username: "elastic".to_string(),
#[cfg(feature = "elasticsearch")]
elasticsearch_password: "".to_string(),
}
}
}
pub fn check_and_delete_old_state_databases(config: &Config, network: &Network) -> JoinHandle<()> {
check_and_delete_old_databases(
config,
STATE_DATABASE_KIND,
state_database_format_version_in_code().major,
network,
)
}
pub fn check_and_delete_old_databases(
config: &Config,
db_kind: impl AsRef<str>,
major_version: u64,
network: &Network,
) -> JoinHandle<()> {
let current_span = Span::current();
let config = config.clone();
let db_kind = db_kind.as_ref().to_string();
let network = network.clone();
spawn_blocking(move || {
current_span.in_scope(|| {
delete_old_databases(config, db_kind, major_version, &network);
info!("finished old database version cleanup task");
})
})
}
fn delete_old_databases(config: Config, db_kind: String, major_version: u64, network: &Network) {
if config.ephemeral || !config.delete_old_database {
return;
}
info!(db_kind, "checking for old database versions");
let restorable_db_versions = restorable_db_versions();
let mut db_path = config.db_path(&db_kind, major_version, network);
assert_eq!(
db_path.file_name(),
Some(network.lowercase_name().as_ref()),
"unexpected database network path structure"
);
assert!(db_path.pop());
assert_eq!(
db_path.file_name(),
Some(format!("v{major_version}").as_ref()),
"unexpected database version path structure"
);
assert!(db_path.pop());
assert_eq!(
db_path.file_name(),
Some(db_kind.as_ref()),
"unexpected database kind path structure"
);
if let Some(db_kind_dir) = read_dir(&db_path) {
for entry in db_kind_dir.flatten() {
let deleted_db =
check_and_delete_database(&config, major_version, &restorable_db_versions, &entry);
if let Some(deleted_db) = deleted_db {
info!(?deleted_db, "deleted outdated {db_kind} database directory");
}
}
}
}
fn read_dir(dir: &Path) -> Option<ReadDir> {
if dir.exists() {
if let Ok(read_dir) = dir.read_dir() {
return Some(read_dir);
}
}
None
}
fn check_and_delete_database(
config: &Config,
major_version: u64,
restorable_db_versions: &[u64],
entry: &DirEntry,
) -> Option<PathBuf> {
let dir_name = parse_dir_name(entry)?;
let dir_major_version = parse_major_version(&dir_name)?;
if dir_major_version >= major_version {
return None;
}
if restorable_db_versions
.iter()
.map(|v| v - 1)
.any(|v| v == dir_major_version)
{
return None;
}
let outdated_path = entry.path();
let cache_path = canonicalize(&config.cache_dir).ok()?;
let outdated_path = canonicalize(outdated_path).ok()?;
if !outdated_path.starts_with(&cache_path) {
info!(
skipped_path = ?outdated_path,
?cache_path,
"skipped cleanup of outdated state directory: state is outside cache directory",
);
return None;
}
remove_dir_all(&outdated_path).ok().map(|()| outdated_path)
}
fn parse_dir_name(entry: &DirEntry) -> Option<String> {
if let Ok(file_type) = entry.file_type() {
if file_type.is_dir() {
if let Ok(dir_name) = entry.file_name().into_string() {
return Some(dir_name);
}
}
}
None
}
fn parse_major_version(dir_name: &str) -> Option<u64> {
dir_name
.strip_prefix('v')
.and_then(|version| version.parse().ok())
}
pub fn state_database_format_version_on_disk(
config: &Config,
network: &Network,
) -> Result<Option<Version>, BoxError> {
database_format_version_on_disk(
config,
STATE_DATABASE_KIND,
state_database_format_version_in_code().major,
network,
)
}
pub fn database_format_version_on_disk(
config: &Config,
db_kind: impl AsRef<str>,
major_version: u64,
network: &Network,
) -> Result<Option<Version>, BoxError> {
let version_path = config.version_file_path(&db_kind, major_version, network);
let db_path = config.db_path(db_kind, major_version, network);
database_format_version_at_path(&version_path, &db_path, major_version)
}
pub(crate) fn database_format_version_at_path(
version_path: &Path,
db_path: &Path,
major_version: u64,
) -> Result<Option<Version>, BoxError> {
let disk_version_file = match fs::read_to_string(version_path) {
Ok(version) => Some(version),
Err(e) if e.kind() == ErrorKind::NotFound => {
None
}
Err(e) => Err(e)?,
};
if let Some(version) = disk_version_file {
return Ok(Some(
version
.parse()
.or_else(|err| {
format!("{major_version}.{version}")
.parse()
.map_err(|err2| format!("failed to parse format version: {err}, {err2}"))
})?,
));
}
match fs::metadata(db_path) {
Ok(_metadata) => Ok(Some(Version::new(major_version, 0, 0))),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e)?,
}
}
#[allow(unused_imports)]
pub(crate) use hidden::{
write_database_format_version_to_disk, write_state_database_format_version_to_disk,
};
pub(crate) mod hidden {
#![allow(dead_code)]
use zebra_chain::common::atomic_write;
use super::*;
pub fn write_state_database_format_version_to_disk(
config: &Config,
changed_version: &Version,
network: &Network,
) -> Result<(), BoxError> {
write_database_format_version_to_disk(
config,
STATE_DATABASE_KIND,
state_database_format_version_in_code().major,
changed_version,
network,
)
}
pub fn write_database_format_version_to_disk(
config: &Config,
db_kind: impl AsRef<str>,
major_version_in_code: u64,
changed_version: &Version,
network: &Network,
) -> Result<(), BoxError> {
atomic_write(
config.version_file_path(db_kind, major_version_in_code, network),
changed_version.to_string().as_bytes(),
)??;
Ok(())
}
}