use async_compression::tokio::bufread::GzipDecoder;
use bytesize::ByteSize;
use clap::{Parser, ValueEnum};
use futures::StreamExt;
use humantime::format_duration;
use itertools::{Either, Itertools};
use sha2::{Digest, Sha256};
use soroban_ledger_snapshot::LedgerSnapshot;
use std::{
collections::HashSet,
fs,
io::{self},
path::PathBuf,
str::FromStr,
time::{Duration, Instant},
};
use stellar_xdr::curr::{
self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ContractExecutable, Frame,
Hash, LedgerEntryData, LedgerHeaderHistoryEntry, LedgerKey, Limited, Limits, ReadXdr,
ScAddress, ScContractInstance, ScVal,
};
use tokio::fs::OpenOptions;
use tokio::io::BufReader;
use tokio_util::io::StreamReader;
use url::Url;
use crate::{
commands::{config::data, global, HEADING_ARCHIVE},
config::{self, locator, network::passphrase},
print,
tx::builder,
utils::get_name_from_stellar_asset_contract_storage,
};
use crate::{config::address::UnresolvedMuxedAccount, utils::http};
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum, Default)]
pub enum Output {
#[default]
Json,
}
fn default_out_path() -> PathBuf {
PathBuf::new().join("snapshot.json")
}
#[derive(Parser, Debug, Clone)]
#[group(skip)]
pub struct Cmd {
#[arg(long)]
ledger: Option<u32>,
#[arg(long = "address", help_heading = "Filter Options")]
address: Vec<String>,
#[arg(long = "wasm-hash", help_heading = "Filter Options")]
wasm_hashes: Vec<Hash>,
#[arg(long, value_enum, default_value_t)]
output: Output,
#[arg(long, default_value=default_out_path().into_os_string())]
out: PathBuf,
#[arg(long, help_heading = HEADING_ARCHIVE, env = "STELLAR_ARCHIVE_URL")]
archive_url: Option<Url>,
#[command(flatten)]
locator: locator::Args,
#[command(flatten)]
network: config::network::Args,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("wasm hash invalid: {0}")]
WasmHashInvalid(String),
#[error("downloading history: {0}")]
DownloadingHistory(reqwest::Error),
#[error("downloading history: got status code {0}")]
DownloadingHistoryGotStatusCode(reqwest::StatusCode),
#[error("json decoding history: {0}")]
JsonDecodingHistory(serde_json::Error),
#[error("opening cached bucket to read: {0}")]
ReadOpeningCachedBucket(io::Error),
#[error("parsing bucket url: {0}")]
ParsingBucketUrl(url::ParseError),
#[error("getting bucket: {0}")]
GettingBucket(reqwest::Error),
#[error("getting bucket: got status code {0}")]
GettingBucketGotStatusCode(reqwest::StatusCode),
#[error("opening cached bucket to write: {0}")]
WriteOpeningCachedBucket(io::Error),
#[error("streaming bucket: {0}")]
StreamingBucket(io::Error),
#[error("read XDR frame bucket entry: {0}")]
ReadXdrFrameBucketEntry(xdr::Error),
#[error("renaming temporary downloaded file to final destination: {0}")]
RenameDownloadFile(io::Error),
#[error("getting bucket directory: {0}")]
GetBucketDir(data::Error),
#[error("reading history http stream: {0}")]
ReadHistoryHttpStream(reqwest::Error),
#[error("writing ledger snapshot: {0}")]
WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
#[error(transparent)]
Join(#[from] tokio::task::JoinError),
#[error(transparent)]
Network(#[from] config::network::Error),
#[error(transparent)]
Locator(#[from] locator::Error),
#[error(transparent)]
Config(#[from] config::Error),
#[error("archive url not configured")]
ArchiveUrlNotConfigured,
#[error("parsing asset name: {0}")]
ParseAssetName(String),
#[error(transparent)]
Asset(#[from] builder::asset::Error),
#[error("ledger not found in archive")]
LedgerNotFound,
#[error("xdr parsing error: {0}")]
Xdr(#[from] xdr::Error),
#[error("corrupted bucket file: expected hash {expected}, got {actual}")]
CorruptedBucket { expected: String, actual: String },
}
const CHECKPOINT_FREQUENCY: u32 = 64;
impl Cmd {
#[allow(clippy::too_many_lines)]
pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
let print = print::Print::new(global_args.quiet);
let start = Instant::now();
let archive_url = self.archive_url()?;
let history = get_history(&print, &archive_url, self.ledger).await?;
let ledger = history.current_ledger;
let network_passphrase = &history.network_passphrase;
let network_id = Sha256::digest(network_passphrase);
print.infoln(format!("Ledger: {ledger}"));
print.infoln(format!("Network Passphrase: {network_passphrase}"));
print.infoln(format!("Network id: {}", hex::encode(network_id)));
let (ledger_close_time, base_reserve) =
match get_ledger_metadata_from_archive(&print, &archive_url, ledger).await {
Ok((close_time, reserve)) => {
print.infoln(format!("Ledger Close Time: {close_time}"));
print.infoln(format!("Base Reserve: {reserve}"));
(close_time, reserve)
}
Err(e) => {
print.warnln(format!("Failed to get ledger metadata from archive: {e}"));
print.infoln("Using default values: close_time=0, base_reserve=1");
(0u64, 1u32) }
};
let buckets = history
.current_buckets
.iter()
.flat_map(|h| [h.curr.clone(), h.snap.clone()])
.filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
.collect::<Vec<_>>();
for (i, bucket) in buckets.iter().enumerate() {
cache_bucket(&print, &archive_url, i, bucket).await?;
}
let mut snapshot = LedgerSnapshot {
protocol_version: 0,
sequence_number: ledger,
timestamp: ledger_close_time,
network_id: network_id.into(),
base_reserve,
min_persistent_entry_ttl: 0,
min_temp_entry_ttl: 0,
max_entry_ttl: 0,
ledger_entries: Vec::new(),
};
let mut seen = HashSet::new();
#[allow(clippy::items_after_statements)]
#[derive(Default)]
struct SearchInputs {
account_ids: HashSet<AccountId>,
contract_ids: HashSet<ScAddress>,
wasm_hashes: HashSet<Hash>,
}
impl SearchInputs {
pub fn is_empty(&self) -> bool {
self.account_ids.is_empty()
&& self.contract_ids.is_empty()
&& self.wasm_hashes.is_empty()
}
}
let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
.address
.iter()
.cloned()
.filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
.partition_map(|a| a);
let mut current = SearchInputs {
account_ids,
contract_ids,
wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
};
let mut next = SearchInputs::default();
loop {
if current.is_empty() {
break;
}
print.infoln(format!(
"Searching for {} accounts, {} contracts, {} wasms",
current.account_ids.len(),
current.contract_ids.len(),
current.wasm_hashes.len(),
));
for (i, bucket) in buckets.iter().enumerate() {
let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
let file = std::fs::OpenOptions::new()
.read(true)
.open(&cache_path)
.map_err(Error::ReadOpeningCachedBucket)?;
let message = format!("Searching bucket {i} {bucket}");
print.searchln(format!("{message}…"));
if let Ok(metadata) = file.metadata() {
print.clear_previous_line();
print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
}
let limited = &mut Limited::new(file, Limits::none());
let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
let mut count_saved = 0;
for entry in entries {
let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
let (key, val) = match entry {
BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
let k = l.to_key();
(k, Some(l))
}
BucketEntry::Deadentry(k) => (k, None),
BucketEntry::Metaentry(m) => {
if m.ledger_version > snapshot.protocol_version {
snapshot.protocol_version = m.ledger_version;
print.infoln(format!(
"Protocol version: {}",
snapshot.protocol_version
));
}
continue;
}
};
if seen.contains(&key) {
continue;
}
let keep = match &key {
LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
LedgerKey::ConfigSetting(_) => true,
_ => false,
};
if !keep {
continue;
}
seen.insert(key.clone());
let Some(val) = val else {
continue;
};
let include = match &val.data {
LedgerEntryData::ConfigSetting(ConfigSettingEntry::StateArchival(
state_archival,
)) => {
snapshot.min_persistent_entry_ttl = state_archival.min_persistent_ttl;
snapshot.min_temp_entry_ttl = state_archival.min_temporary_ttl;
snapshot.max_entry_ttl = state_archival.max_entry_ttl;
false
}
LedgerEntryData::ContractData(e) => {
if e.key == ScVal::LedgerKeyContractInstance {
match &e.val {
ScVal::ContractInstance(ScContractInstance {
executable: ContractExecutable::Wasm(hash),
..
}) => {
if !current.wasm_hashes.contains(hash) {
next.wasm_hashes.insert(hash.clone());
print.infoln(format!(
"Adding wasm {} to search",
hex::encode(hash)
));
}
}
ScVal::ContractInstance(ScContractInstance {
executable: ContractExecutable::StellarAsset,
storage: Some(storage),
}) => {
if let Some(name) =
get_name_from_stellar_asset_contract_storage(storage)
{
let asset: builder::Asset = name.parse()?;
if let Some(issuer) = match asset
.resolve(&global_args.locator)?
{
Asset::Native => None,
Asset::CreditAlphanum4(a4) => Some(a4.issuer),
Asset::CreditAlphanum12(a12) => Some(a12.issuer),
} {
print.infoln(format!(
"Adding asset issuer {issuer} to search"
));
next.account_ids.insert(issuer);
}
}
}
_ => {}
}
}
keep
}
_ => false,
};
if include {
snapshot
.ledger_entries
.push((Box::new(key), (Box::new(val), Some(u32::MAX))));
count_saved += 1;
}
}
if count_saved > 0 {
print.infoln(format!("Found {count_saved} entries"));
}
}
current = next;
next = SearchInputs::default();
}
snapshot
.write_file(&self.out)
.map_err(Error::WriteLedgerSnapshot)?;
print.saveln(format!(
"Saved {} entries to {:?}",
snapshot.ledger_entries.len(),
self.out
));
let duration = Duration::from_secs(start.elapsed().as_secs());
print.checkln(format!("Completed in {}", format_duration(duration)));
Ok(())
}
fn archive_url(&self) -> Result<Url, Error> {
self.archive_url
.clone()
.or_else(|| {
self.network.get(&self.locator).ok().and_then(|network| {
match network.network_passphrase.as_str() {
passphrase::MAINNET => {
Some("https://history.stellar.org/prd/core-live/core_live_001")
}
passphrase::TESTNET => {
Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
}
passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
passphrase::LOCAL => Some("http://localhost:8000/archive"),
_ => None,
}
.map(|s| Url::from_str(s).expect("archive url valid"))
})
})
.ok_or(Error::ArchiveUrlNotConfigured)
}
fn resolve_address_sync(
&self,
address: &str,
network_passphrase: &str,
) -> Option<Either<AccountId, ScAddress>> {
if let Some(contract) = self.resolve_contract(address, network_passphrase) {
Some(Either::Right(contract))
} else {
self.resolve_account_sync(address).map(Either::Left)
}
}
fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
let address: UnresolvedMuxedAccount = address.parse().ok()?;
let muxed_account = address
.resolve_muxed_account_sync(&self.locator, None)
.ok()?;
Some(muxed_account.account_id())
}
fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
address.parse().ok().or_else(|| {
Some(ScAddress::Contract(stellar_xdr::curr::ContractId(
self.locator
.resolve_contract_id(address, network_passphrase)
.ok()?
.0
.into(),
)))
})
}
}
fn ledger_to_path_components(ledger: u32) -> (String, String, String, String) {
let ledger_hex = format!("{ledger:08x}");
let ledger_hex_0 = ledger_hex[0..=1].to_string();
let ledger_hex_1 = ledger_hex[2..=3].to_string();
let ledger_hex_2 = ledger_hex[4..=5].to_string();
(ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2)
}
async fn get_history(
print: &print::Print,
archive_url: &Url,
ledger: Option<u32>,
) -> Result<History, Error> {
let archive_url = archive_url.to_string();
let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
let history_url = if let Some(ledger) = ledger {
let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) =
ledger_to_path_components(ledger);
format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
} else {
format!("{archive_url}/.well-known/stellar-history.json")
};
let history_url = Url::from_str(&history_url).unwrap();
print.globeln(format!("Downloading history {history_url}"));
let response = http::client()
.get(history_url.as_str())
.send()
.await
.map_err(Error::DownloadingHistory)?;
if !response.status().is_success() {
if let Some(ledger) = ledger {
let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
if ledger_offset != 0 {
print.errorln(format!(
"Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
ledger - ledger_offset,
ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
));
}
}
return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
}
let body = response
.bytes()
.await
.map_err(Error::ReadHistoryHttpStream)?;
print.clear_previous_line();
print.globeln(format!("Downloaded history {}", &history_url));
serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
}
async fn get_ledger_metadata_from_archive(
print: &print::Print,
archive_url: &Url,
ledger: u32,
) -> Result<(u64, u32), Error> {
let archive_url = archive_url.to_string();
let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) = ledger_to_path_components(ledger);
let ledger_url = format!(
"{archive_url}/ledger/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/ledger-{ledger_hex}.xdr.gz"
);
print.globeln(format!("Downloading ledger headers {ledger_url}"));
let ledger_url = Url::from_str(&ledger_url).map_err(Error::ParsingBucketUrl)?;
let response = http::client()
.get(ledger_url.as_str())
.send()
.await
.map_err(Error::DownloadingHistory)?;
if !response.status().is_success() {
return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
}
let ledger_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
let cache_path = ledger_dir.join(format!("ledger-{ledger_hex}.xdr"));
let dl_path = cache_path.with_extension("dl");
let stream = response
.bytes_stream()
.map(|result| result.map_err(std::io::Error::other));
let stream_reader = StreamReader::new(stream);
let buf_reader = BufReader::new(stream_reader);
let mut decoder = GzipDecoder::new(buf_reader);
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dl_path)
.await
.map_err(Error::WriteOpeningCachedBucket)?;
tokio::io::copy(&mut decoder, &mut file)
.await
.map_err(Error::StreamingBucket)?;
fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
print.clear_previous_line();
print.globeln(format!("Downloaded ledger headers for ledger {ledger}"));
let file = std::fs::File::open(&cache_path).map_err(Error::ReadOpeningCachedBucket)?;
let limited = &mut Limited::new(file, Limits::none());
let entries = Frame::<LedgerHeaderHistoryEntry>::read_xdr_iter(limited);
for entry in entries {
let Frame(header_entry) = entry.map_err(Error::Xdr)?;
if header_entry.header.ledger_seq == ledger {
let close_time = header_entry.header.scp_value.close_time.0;
let base_reserve = header_entry.header.base_reserve;
return Ok((close_time, base_reserve));
}
}
Err(Error::LedgerNotFound)
}
fn validate_bucket_hash(cache_path: &PathBuf, expected_hash: &str) -> Result<(), Error> {
let file = std::fs::File::open(cache_path).map_err(Error::ReadOpeningCachedBucket)?;
let mut hasher = Sha256::new();
std::io::copy(&mut std::io::BufReader::new(file), &mut hasher)
.map_err(Error::ReadOpeningCachedBucket)?;
let actual_hash = hex::encode(hasher.finalize());
if actual_hash != expected_hash {
return Err(Error::CorruptedBucket {
expected: expected_hash.to_string(),
actual: actual_hash,
});
}
Ok(())
}
async fn cache_bucket(
print: &print::Print,
archive_url: &Url,
bucket_index: usize,
bucket: &str,
) -> Result<PathBuf, Error> {
let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
if cache_path.exists() {
if validate_bucket_hash(&cache_path, bucket).is_err() {
print.warnln(format!(
"Cached bucket {bucket} is corrupted, re-downloading"
));
std::fs::remove_file(&cache_path).ok();
} else {
return Ok(cache_path);
}
}
if !cache_path.exists() {
let bucket_0 = &bucket[0..=1];
let bucket_1 = &bucket[2..=3];
let bucket_2 = &bucket[4..=5];
let bucket_url =
format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
print.globeln(format!("Downloading bucket {bucket_index} {bucket}…"));
let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
let response = http::client()
.get(bucket_url.as_str())
.send()
.await
.map_err(Error::GettingBucket)?;
if !response.status().is_success() {
print.println("");
return Err(Error::GettingBucketGotStatusCode(response.status()));
}
if let Some(len) = response.content_length() {
print.clear_previous_line();
print.globeln(format!(
"Downloaded bucket {bucket_index} {bucket} ({})",
ByteSize(len)
));
}
let stream = response
.bytes_stream()
.map(|result| result.map_err(std::io::Error::other));
let stream_reader = StreamReader::new(stream);
let buf_reader = BufReader::new(stream_reader);
let mut decoder = GzipDecoder::new(buf_reader);
let dl_path = cache_path.with_extension("dl");
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&dl_path)
.await
.map_err(Error::WriteOpeningCachedBucket)?;
tokio::io::copy(&mut decoder, &mut file)
.await
.map_err(Error::StreamingBucket)?;
fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
}
Ok(cache_path)
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct History {
current_ledger: u32,
current_buckets: Vec<HistoryBucket>,
network_passphrase: String,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct HistoryBucket {
curr: String,
snap: String,
}