use std::{
error::Error,
io,
path::{Path, PathBuf},
};
use amaru_kernel::{BlockHeader, EraHistory, Hash, HeaderHash, IsHeader, NetworkName, Nonce, Point, from_cbor};
use amaru_ledger::{
bootstrap::import_initial_snapshot,
store::{EpochTransitionProgress, Store, TransactionalContext},
};
use amaru_ouroboros::{ChainStore, Nonces};
use amaru_progress_bar::new_terminal_progress_bar;
use amaru_stores::rocksdb::{RocksDB, RocksDbConfig, consensus::RocksDBStore};
use async_compression::tokio::bufread::GzipDecoder;
use futures_util::TryStreamExt;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio::{
fs::{self, File},
io::BufReader,
};
use tokio_util::io::StreamReader;
use tracing::info;
use crate::{default_initial_nonces, default_snapshots_dir, get_bootstrap_file, get_bootstrap_headers};
#[derive(Debug, Deserialize)]
struct Snapshot {
epoch: u64,
point: String,
url: String,
}
#[derive(Debug, thiserror::Error)]
pub enum BootstrapError {
#[error("Missing configuration file {0}")]
MissingConfigFile(PathBuf),
#[error("Can not read Snapshot configuration file {0}: {1}")]
ReadSnapshotsFile(PathBuf, io::Error),
#[error("Can not create snapshots directory {0}: {1}")]
CreateSnapshotsDir(PathBuf, io::Error),
#[error("Failed to parse snapshots JSON file {0}")]
MalformedSnapshotsFile(serde_json::Error),
#[error("Unable to store snapshots on disk: {0}")]
Io(#[from] std::io::Error),
#[error("Failed to download snapshot at url {0}: {1}")]
DownloadError(String, reqwest::Error),
#[error("Failed to download snapshot from {0}: HTTP status code {1}")]
DownloadInvalidStatusCode(String, reqwest::StatusCode),
}
async fn download_snapshots(snapshots_content: Vec<u8>, snapshots_dir: &PathBuf) -> Result<(), BootstrapError> {
fs::create_dir_all(snapshots_dir)
.await
.map_err(|e| BootstrapError::CreateSnapshotsDir(snapshots_dir.clone(), e))?;
let snapshots: Vec<Snapshot> =
serde_json::from_slice(&snapshots_content).map_err(BootstrapError::MalformedSnapshotsFile)?;
let client = reqwest::Client::new();
for snapshot in &snapshots {
info!(epoch=%snapshot.epoch, point=%snapshot.point,
"Downloading snapshot",
);
let filename = format!("{}.cbor", snapshot.point);
let target_path = snapshots_dir.join(&filename);
if target_path.exists() {
info!(filename=%filename, "Snapshot already exists, skipping");
continue;
}
let response = client
.get(&snapshot.url)
.send()
.await
.map_err(|e| BootstrapError::DownloadError(snapshot.url.clone(), e))?;
if !response.status().is_success() {
return Err(BootstrapError::DownloadInvalidStatusCode(snapshot.url.clone(), response.status()));
}
let (tmp_path, file) = uncompress_to_temp_file(&target_path, response).await?;
file.sync_all().await?;
tokio::fs::rename(&tmp_path, &target_path).await?;
info!(target_path=%target_path.display(), "Downloaded snapshot");
}
info!("All snapshots downloaded and decompressed successfully");
Ok(())
}
async fn uncompress_to_temp_file(
target_path: &Path,
response: reqwest::Response,
) -> Result<(PathBuf, File), BootstrapError> {
let tmp_path = target_path.with_extension("partial");
let mut file = File::create(&tmp_path).await?;
let raw_stream_reader = StreamReader::new(response.bytes_stream().map_err(io::Error::other));
let buffered_reader = BufReader::new(raw_stream_reader);
let mut decoded_stream = GzipDecoder::new(buffered_reader);
tokio::io::copy(&mut decoded_stream, &mut file).await?;
Ok((tmp_path, file))
}
pub async fn bootstrap(network: NetworkName, ledger_dir: PathBuf, chain_dir: PathBuf) -> Result<(), Box<dyn Error>> {
let snapshot_file_name = "snapshots.json";
let snapshots_dir: PathBuf = default_snapshots_dir(network).into();
let snapshots_file = get_bootstrap_file(network, snapshot_file_name)?
.ok_or(BootstrapError::MissingConfigFile(snapshot_file_name.into()))?;
download_snapshots(snapshots_file, &snapshots_dir).await?;
import_snapshots_from_directory(network, &ledger_dir, &snapshots_dir).await?;
import_nonces(network.into(), &chain_dir, default_initial_nonces(network)?).await?;
import_headers_for_network(&chain_dir, get_bootstrap_headers(network)?.collect::<Vec<_>>()).await?;
Ok(())
}
fn deserialize_point<'de, D>(deserializer: D) -> Result<Point, D::Error>
where
D: Deserializer<'de>,
{
let buf = <&str>::deserialize(deserializer)?;
Point::try_from(buf).map_err(|e| serde::de::Error::custom(format!("cannot convert vector to point: {:?}", e)))
}
fn serialize_point<S: Serializer>(point: &Point, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&point.to_string())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InitialNonces {
#[serde(serialize_with = "serialize_point", deserialize_with = "deserialize_point")]
pub at: Point,
pub active: Nonce,
pub evolving: Nonce,
pub candidate: Nonce,
pub tail: HeaderHash,
}
pub async fn import_nonces(
era_history: &EraHistory,
chain_db_path: &PathBuf,
initial_nonce: InitialNonces,
) -> Result<(), Box<dyn Error>> {
let db = Box::new(RocksDBStore::open_and_migrate(&RocksDbConfig::new(chain_db_path.into()))?)
as Box<dyn ChainStore<BlockHeader>>;
let header_hash = Hash::from(&initial_nonce.at);
info!(point.id = %header_hash, point.slot = %initial_nonce.at.slot_or_default(), "importing nonces");
let epoch = {
let slot = initial_nonce.at.slot_or_default();
era_history.slot_to_epoch_unchecked_horizon(slot)?
};
let nonces = Nonces {
epoch,
active: initial_nonce.active,
evolving: initial_nonce.evolving,
candidate: initial_nonce.candidate,
tail: initial_nonce.tail,
};
db.put_nonces(&header_hash, &nonces)?;
Ok(())
}
#[allow(clippy::unwrap_used)]
pub async fn import_headers_for_network(chain_dir: &PathBuf, headers: Vec<Vec<u8>>) -> Result<(), Box<dyn Error>> {
let db = RocksDBStore::open_and_migrate(&RocksDbConfig::new(chain_dir.into()))?;
for header in headers {
let block_header: BlockHeader = from_cbor(&header).unwrap();
let hash = block_header.hash();
info!(hash = hash.to_string().chars().take(8).collect::<String>(), "inserting header");
db.store_header(&block_header)?;
}
Ok(())
}
pub async fn import_snapshots_from_directory(
network: NetworkName,
ledger_dir: &PathBuf,
snapshot_dir: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
let mut snapshots = std::fs::read_dir(snapshot_dir)?
.filter_map(|entry| entry.ok().map(|e| e.path()))
.filter(|path| path.extension().and_then(|s| s.to_str()) == Some("cbor"))
.collect::<Vec<_>>();
sort_snapshots_by_slot(&mut snapshots);
import_snapshots(network, &snapshots, ledger_dir).await
}
fn sort_snapshots_by_slot(snapshots: &mut [PathBuf]) {
snapshots.sort_by_key(|path| {
path.file_name()
.and_then(|s| s.to_str())
.and_then(|s| s.split('.').next())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(u64::MAX)
});
}
pub async fn import_snapshots(
network: NetworkName,
snapshots: &Vec<PathBuf>,
ledger_dir: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
info!(count = snapshots.len(), "Importing snapshots");
for snapshot in snapshots {
import_snapshot(network, snapshot, ledger_dir).await?;
}
info!("Imported snapshots");
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum ImportError {
#[error("malformed date: {}", .0)]
MalformedDate(String),
#[error("invalid snapshot file: {0}")]
InvalidSnapshotFile(PathBuf),
#[error(
"You must provide either a single .cbor snapshot file (--snapshot) or a directory containing multiple .cbor snapshots (--snapshot-dir)"
)]
IncorrectUsage,
}
#[expect(clippy::unwrap_used)]
pub async fn import_snapshot(
network: NetworkName,
snapshot: &PathBuf,
ledger_dir: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
info!(snapshot=%snapshot.display(), "Importing snapshot");
let point = Point::try_from(snapshot.as_path().file_stem().and_then(|s| s.to_str()).unwrap())
.map_err(ImportError::MalformedDate)?;
std::fs::create_dir_all(ledger_dir)?;
if std::fs::exists(ledger_dir.join("live"))? {
std::fs::remove_dir_all(ledger_dir.join("live"))?;
}
let db = RocksDB::empty(&RocksDbConfig::new(ledger_dir.into()))?;
let mut file = std::fs::File::open(snapshot)?;
let dir = snapshot.parent().ok_or(ImportError::InvalidSnapshotFile(snapshot.into()))?;
let era_history = make_era_history(dir, &point, network)?;
let builder = std::thread::Builder::new().stack_size(10_000_000);
let (db, epoch) = builder
.spawn(move || {
import_initial_snapshot(&db, &mut file, &point, &era_history, network, new_terminal_progress_bar, true)
.map_err(|e| e.to_string())
.map(|epoch| (db, epoch))
})
.unwrap()
.join()
.unwrap()?;
db.next_snapshot(epoch)?;
let transaction = db.create_transaction();
transaction.try_epoch_transition(None, Some(EpochTransitionProgress::SnapshotTaken))?;
transaction.commit()?;
info!(epoch=%epoch, "Imported snapshot");
Ok(())
}
fn make_era_history(dir: &Path, point: &Point, network: NetworkName) -> Result<EraHistory, Box<dyn std::error::Error>> {
match network {
NetworkName::Testnet(_) => {
let filename = format!("history.{}.{}.json", point.slot_or_default(), point.hash());
let history_file = dir.join(filename);
if !history_file.is_file() {
return Err(format!("cannot import testnet era history from {:?}", history_file).into());
};
Ok(serde_json::from_slice(&std::fs::read(&history_file)?)?)
}
NetworkName::Mainnet | NetworkName::Preprod | NetworkName::Preview => Ok(<&EraHistory>::from(network).clone()),
}
}
#[cfg(test)]
mod tests {
use std::{path::PathBuf, str::FromStr, time::Duration};
use amaru_kernel::{Hash, HeaderHash, NetworkName, Point, Slot};
use crate::bootstrap::{make_era_history, sort_snapshots_by_slot};
#[test]
fn make_era_history_for_tesnet_given_file_exists() {
let dir = PathBuf::from("tests/data/");
let hash: HeaderHash =
Hash::from_str("4df4505d862586f9e2c533c5fbb659f04402664db1b095aba969728abfb77301").unwrap();
let point = Point::Specific(56073562.into(), hash);
let history = make_era_history(&dir, &point, NetworkName::Testnet(14)).expect("fail to make era history");
assert_eq!(
Duration::from_secs(5100000),
history.slot_to_relative_time_unchecked_horizon(Slot::from(5100000)).unwrap()
);
}
#[test]
fn sort_snapshot_file_names_by_slot_number() {
let mut paths = [
PathBuf::from("172786.932b9688167139cf4792e97ae4771b6dc762ad25752908cce7b24c2917847516.cbor"),
PathBuf::from("259174.a07da7616822a1ccb4811e907b1f3a3c5274365908a241f4d5ffab2a69eb8802.cbor"),
PathBuf::from("86392.1d38de4ffae6090c24151578d331b1021adb8f37d158011616db4d47d1704968.cbor"),
];
sort_snapshots_by_slot(&mut paths);
assert_eq!(
PathBuf::from("86392.1d38de4ffae6090c24151578d331b1021adb8f37d158011616db4d47d1704968.cbor"),
paths[0]
);
}
}