use std::{
collections::{BTreeMap, HashMap},
fs::DirEntry,
io::{self, ErrorKind},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use hex::ToHex;
use zebra_chain::{
amount::{Amount, DeferredPoolBalanceChange},
block::{self, Block, Height},
serialization::{ZcashDeserializeInto, ZcashSerialize},
};
use crate::{
ContextuallyVerifiedBlock, IntoDisk, NonFinalizedState, SemanticallyVerifiedBlock,
WatchReceiver, ZebraDb,
};
#[cfg(not(test))]
use crate::service::write::validate_and_commit_non_finalized;
pub(crate) const MIN_DURATION_BETWEEN_BACKUP_UPDATES: Duration = Duration::from_secs(5);
pub(super) fn restore_backup(
mut non_finalized_state: NonFinalizedState,
backup_dir_path: &Path,
finalized_state: &ZebraDb,
) -> NonFinalizedState {
let mut store: BTreeMap<Height, Vec<SemanticallyVerifiedBlock>> = BTreeMap::new();
for block in read_non_finalized_blocks_from_backup(backup_dir_path, finalized_state) {
store.entry(block.height).or_default().push(block);
}
for (height, blocks) in store {
for block in blocks {
#[cfg(test)]
let commit_result = if non_finalized_state
.any_chain_contains(&block.block.header.previous_block_hash)
{
non_finalized_state.commit_block(block, finalized_state)
} else {
non_finalized_state.commit_new_chain(block, finalized_state)
};
#[cfg(not(test))]
let commit_result =
validate_and_commit_non_finalized(finalized_state, &mut non_finalized_state, block);
if let Err(commit_error) = commit_result {
tracing::warn!(
?commit_error,
?height,
"failed to commit non-finalized block from backup directory"
);
}
}
}
non_finalized_state
}
pub(super) fn update_non_finalized_state_backup(
backup_dir_path: &Path,
non_finalized_state: &NonFinalizedState,
mut backup_blocks: HashMap<block::Hash, PathBuf>,
) {
for block in non_finalized_state
.chain_iter()
.flat_map(|chain| chain.blocks.values())
.filter(|block| backup_blocks.remove(&block.hash).is_none())
{
write_backup_block(backup_dir_path, block);
}
for (_, outdated_backup_block_path) in backup_blocks {
if let Err(delete_error) = std::fs::remove_file(outdated_backup_block_path) {
tracing::warn!(?delete_error, "failed to delete backup block file");
}
}
}
pub(super) async fn run_backup_task(
mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
backup_dir_path: PathBuf,
) {
let err = loop {
let rate_limit = tokio::time::sleep(MIN_DURATION_BETWEEN_BACKUP_UPDATES);
let backup_blocks: HashMap<block::Hash, PathBuf> = {
let backup_dir_path = backup_dir_path.clone();
tokio::task::spawn_blocking(move || list_backup_dir_entries(&backup_dir_path))
.await
.expect("failed to join blocking task when reading in backup task")
.collect()
};
if let (Err(err), _) = tokio::join!(non_finalized_state_receiver.changed(), rate_limit) {
break err;
};
let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
let backup_dir_path = backup_dir_path.clone();
tokio::task::spawn_blocking(move || {
update_non_finalized_state_backup(
&backup_dir_path,
&latest_non_finalized_state,
backup_blocks,
);
})
.await
.expect("failed to join blocking task when writing in backup task");
};
tracing::warn!(
?err,
"got recv error waiting on non-finalized state change, is Zebra shutting down?"
)
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct NonFinalizedBlockBackup {
block: Arc<Block>,
deferred_pool_balance_change: Amount,
}
impl From<&ContextuallyVerifiedBlock> for NonFinalizedBlockBackup {
fn from(cv_block: &ContextuallyVerifiedBlock) -> Self {
Self {
block: cv_block.block.clone(),
deferred_pool_balance_change: cv_block.chain_value_pool_change.deferred_amount(),
}
}
}
impl NonFinalizedBlockBackup {
fn as_bytes(&self) -> Vec<u8> {
let block_bytes = self
.block
.zcash_serialize_to_vec()
.expect("verified block header version should be valid");
let deferred_pool_balance_change_bytes =
self.deferred_pool_balance_change.as_bytes().to_vec();
[deferred_pool_balance_change_bytes, block_bytes].concat()
}
#[allow(clippy::unwrap_in_result)]
fn from_bytes(bytes: Vec<u8>) -> Result<Self, io::Error> {
let (deferred_pool_balance_change_bytes, block_bytes) = bytes
.split_at_checked(size_of::<Amount>())
.ok_or(io::Error::new(
ErrorKind::InvalidInput,
"input is too short",
))?;
Ok(Self {
block: Arc::new(
block_bytes
.zcash_deserialize_into()
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
),
deferred_pool_balance_change: Amount::from_bytes(
deferred_pool_balance_change_bytes
.try_into()
.expect("slice from `split_at_checked()` should fit in [u8; 8]"),
)
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
})
}
}
fn write_backup_block(backup_dir_path: &Path, block: &ContextuallyVerifiedBlock) {
let backup_block_file_name: String = block.hash.encode_hex();
let backup_block_file_path = backup_dir_path.join(backup_block_file_name);
let non_finalized_block_backup: NonFinalizedBlockBackup = block.into();
if let Err(err) = std::fs::write(
backup_block_file_path,
non_finalized_block_backup.as_bytes(),
) {
tracing::warn!(?err, "failed to write non-finalized state backup block");
}
}
fn read_non_finalized_blocks_from_backup<'a>(
backup_dir_path: &Path,
finalized_state: &'a ZebraDb,
) -> impl Iterator<Item = SemanticallyVerifiedBlock> + 'a {
list_backup_dir_entries(backup_dir_path)
.filter(|&(block_hash, _)| !finalized_state.contains_hash(block_hash))
.filter_map(|(block_hash, file_path)| match std::fs::read(file_path) {
Ok(block_bytes) => Some((block_hash, block_bytes)),
Err(err) => {
tracing::warn!(?err, "failed to open non-finalized state backup block file");
None
}
})
.filter_map(|(expected_block_hash, backup_block_file_contents)| {
match NonFinalizedBlockBackup::from_bytes(backup_block_file_contents) {
Ok(NonFinalizedBlockBackup {
block,
deferred_pool_balance_change,
}) if block.coinbase_height().is_some() => {
let block = SemanticallyVerifiedBlock::from(block)
.with_deferred_pool_balance_change(Some(DeferredPoolBalanceChange::new(
deferred_pool_balance_change,
)));
if block.hash != expected_block_hash {
tracing::warn!(
block_hash = ?block.hash,
?expected_block_hash,
"wrong block hash in file name"
);
}
Some(block)
}
Ok(block) => {
tracing::warn!(
?block,
"invalid non-finalized backup block, missing coinbase height"
);
None
}
Err(err) => {
tracing::warn!(
?err,
"failed to deserialize non-finalized backup data into block"
);
None
}
}
})
}
pub(super) fn list_backup_dir_entries(
backup_dir_path: &Path,
) -> impl Iterator<Item = (block::Hash, PathBuf)> {
read_backup_dir(backup_dir_path).filter_map(process_backup_dir_entry)
}
fn read_backup_dir(backup_dir_path: &Path) -> impl Iterator<Item = DirEntry> {
std::fs::read_dir(backup_dir_path)
.expect("failed to read non-finalized state backup directory")
.filter_map(|entry| match entry {
Ok(entry) => Some(entry),
Err(io_err) => {
tracing::warn!(
?io_err,
"failed to read DirEntry in non-finalized state backup dir"
);
None
}
})
}
fn process_backup_dir_entry(entry: DirEntry) -> Option<(block::Hash, PathBuf)> {
let delete_file = || {
if let Err(delete_error) = std::fs::remove_file(entry.path()) {
tracing::warn!(?delete_error, "failed to delete backup block file");
}
};
let block_file_name = match entry.file_name().into_string() {
Ok(block_hash) => block_hash,
Err(err) => {
tracing::warn!(
?err,
"failed to convert OsString to String, attempting to delete file"
);
delete_file();
return None;
}
};
let block_hash: block::Hash = match block_file_name.parse() {
Ok(block_hash) => block_hash,
Err(err) => {
tracing::warn!(
?err,
"failed to parse hex-encoded block hash from file name, attempting to delete file"
);
delete_file();
return None;
}
};
Some((block_hash, entry.path()))
}