use std::{
env::temp_dir,
fs,
io::{self, Write},
sync::Arc,
};
use anyhow::anyhow;
use log::*;
use minotari_app_grpc::tari_rpc::readiness_status::State as ReadinessState;
use tari_common::{
configuration::Network,
exit_codes::{ExitCode, ExitError},
};
use tari_core::{
chain_storage::{
BlockchainBackend,
BlockchainDatabase,
BlockchainDatabaseConfig,
Validators,
async_db::AsyncBlockchainDb,
create_lmdb_database_with_stats_channel,
create_recovery_lmdb_database,
},
consensus::BaseNodeConsensusManager,
proof_of_work::randomx_factory::RandomXFactory,
validation::{
DifficultyCalculator,
block_body::{BlockBodyFullValidator, BlockBodyInternalConsistencyValidator},
header::HeaderFullValidator,
mocks::MockValidator,
},
};
use tari_transaction_components::crypto_factories::CryptoFactories;
use crate::{BaseNodeConfig, DatabaseType, grpc::readiness_grpc_server::ReadinessStatusHandler};
pub const LOG_TARGET: &str = "base_node::app";
pub fn initiate_recover_db(config: &BaseNodeConfig) -> Result<(), ExitError> {
match &config.db_type {
DatabaseType::Lmdb => {
create_recovery_lmdb_database(config.lmdb_path.as_path()).map_err(|err| {
error!(target: LOG_TARGET, "{err}");
ExitError::new(ExitCode::UnknownError, err)
})?;
},
};
Ok(())
}
pub async fn run_recovery(
node_config: &BaseNodeConfig,
readiness_handler: ReadinessStatusHandler,
) -> Result<(), anyhow::Error> {
println!("Starting recovery mode");
let rules = BaseNodeConsensusManager::builder(node_config.network)
.build()
.map_err(|e| {
error!(target: LOG_TARGET, "Error configuring consensus manager: {e}");
anyhow!("Could not configure consensus manager: {e}")
})?;
let (temp_db, main_db, temp_path) = match &node_config.db_type {
DatabaseType::Lmdb => {
readiness_handler.send_readiness_status(ReadinessState::DatabaseInitializing);
let backend = create_lmdb_database_with_stats_channel(
&node_config.lmdb_path,
node_config.lmdb.clone(),
rules.clone(),
Some(readiness_handler.lmdb_migration_status_tx.clone()),
)
.map_err(|e| {
error!(target: LOG_TARGET, "Error opening db: {e}");
anyhow!("Could not open DB: {e}")
})?;
let temp_path = temp_dir().join("temp_recovery");
let temp = create_lmdb_database_with_stats_channel(
&temp_path,
node_config.lmdb.clone(),
rules.clone(),
Some(readiness_handler.lmdb_migration_status_tx.clone()),
)
.map_err(|e| {
error!(target: LOG_TARGET, "Error opening recovery db: {e}");
anyhow!("Could not open recovery DB: {e}")
})?;
(temp, backend, temp_path)
},
};
let factories = CryptoFactories::default();
let randomx_factory = RandomXFactory::new(node_config.max_randomx_vms);
let difficulty_calculator = DifficultyCalculator::new(rules.clone(), randomx_factory);
let validators = Validators::new(
BlockBodyFullValidator::new(rules.clone(), node_config.bypass_range_proof_verification),
HeaderFullValidator::new(rules.clone(), difficulty_calculator.clone()),
BlockBodyInternalConsistencyValidator::new(
rules.clone(),
node_config.bypass_range_proof_verification,
factories.clone(),
),
);
let db = BlockchainDatabase::new(
main_db,
rules.clone(),
validators,
node_config.storage,
difficulty_calculator,
)?;
db.start()?;
do_recovery(db.into(), temp_db, &readiness_handler).await?;
info!(
target: LOG_TARGET,
"Node has completed recovery mode, it will try to cleanup the db"
);
fs::remove_dir_all(&temp_path).map_err(|e| {
error!(target: LOG_TARGET, "Error opening recovery db: {e}");
anyhow!("Could not open recovery DB: {e}")
})
}
async fn do_recovery<D: BlockchainBackend + 'static>(
db: AsyncBlockchainDb<D>,
source_backend: D,
readiness_status_handler: &ReadinessStatusHandler,
) -> Result<(), anyhow::Error> {
let rules = BaseNodeConsensusManager::builder(Network::LocalNet)
.build()
.map_err(|e| {
error!(target: LOG_TARGET, "Error creating consensus manager: {e}");
anyhow!("Error creating consensus manager: {e}")
})?;
let validators = Validators::new(
MockValidator::new(true),
MockValidator::new(true),
MockValidator::new(true),
);
let source_database = BlockchainDatabase::new(
source_backend,
rules.clone(),
validators,
BlockchainDatabaseConfig::default(),
DifficultyCalculator::new(rules, Default::default()),
)?;
source_database.start()?;
let max_height = source_database
.get_chain_metadata()
.map_err(|e| anyhow!("Could not get max chain height: {e}"))?
.best_block_height();
let mut counter = 1;
print!("Starting recovery at height: ");
loop {
print!("{counter}");
io::stdout().flush().unwrap();
trace!(target: LOG_TARGET, "Asking for block with height: {counter}");
let block = source_database
.fetch_block(counter, true)
.map_err(|e| anyhow!("Could not get block from recovery db: {e}"))?
.into_block();
trace!(target: LOG_TARGET, "Adding block: {block}");
db.add_block(Arc::new(block))
.await
.map_err(|e| anyhow!("Stopped recovery at height {counter}, reason: {e}"))?;
readiness_status_handler.send_readiness_status(ReadinessState::RecoveringRebuildingDatabase);
if counter >= max_height {
info!(target: LOG_TARGET, "Done with recovery, chain height {counter}");
break;
}
print!("\x1B[{}D\x1B[K", counter.to_string().len());
counter += 1;
}
Ok(())
}