1use async_compression::tokio::bufread::GzipDecoder;
2use bytesize::ByteSize;
3use clap::{Parser, ValueEnum};
4use futures::StreamExt;
5use humantime::format_duration;
6use itertools::{Either, Itertools};
7use sha2::{Digest, Sha256};
8use soroban_ledger_snapshot::LedgerSnapshot;
9use std::{
10 collections::HashSet,
11 fs,
12 io::{self},
13 path::PathBuf,
14 str::FromStr,
15 time::{Duration, Instant},
16};
17use stellar_xdr::curr::{
18 self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ContractExecutable, Frame,
19 Hash, LedgerEntryData, LedgerHeaderHistoryEntry, LedgerKey, Limited, Limits, ReadXdr,
20 ScAddress, ScContractInstance, ScVal,
21};
22use tokio::fs::OpenOptions;
23use tokio::io::BufReader;
24use tokio_util::io::StreamReader;
25use url::Url;
26
27use crate::{
28 commands::{config::data, global, HEADING_ARCHIVE},
29 config::{self, locator, network::passphrase},
30 print,
31 tx::builder,
32 utils::get_name_from_stellar_asset_contract_storage,
33};
34use crate::{config::address::UnresolvedMuxedAccount, utils::http};
35
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum, Default)]
37pub enum Output {
38 #[default]
39 Json,
40}
41
42fn default_out_path() -> PathBuf {
43 PathBuf::new().join("snapshot.json")
44}
45
46#[derive(Parser, Debug, Clone)]
62#[group(skip)]
63pub struct Cmd {
64 #[arg(long)]
66 ledger: Option<u32>,
67
68 #[arg(long = "address", help_heading = "Filter Options")]
70 address: Vec<String>,
71
72 #[arg(long = "wasm-hash", help_heading = "Filter Options")]
74 wasm_hashes: Vec<Hash>,
75
76 #[arg(long, value_enum, default_value_t)]
78 output: Output,
79
80 #[arg(long, default_value=default_out_path().into_os_string())]
82 out: PathBuf,
83
84 #[arg(long, help_heading = HEADING_ARCHIVE, env = "STELLAR_ARCHIVE_URL")]
86 archive_url: Option<Url>,
87
88 #[command(flatten)]
89 locator: locator::Args,
90
91 #[command(flatten)]
92 network: config::network::Args,
93}
94
95#[derive(thiserror::Error, Debug)]
96pub enum Error {
97 #[error("wasm hash invalid: {0}")]
98 WasmHashInvalid(String),
99
100 #[error("downloading history: {0}")]
101 DownloadingHistory(reqwest::Error),
102
103 #[error("downloading history: got status code {0}")]
104 DownloadingHistoryGotStatusCode(reqwest::StatusCode),
105
106 #[error("json decoding history: {0}")]
107 JsonDecodingHistory(serde_json::Error),
108
109 #[error("opening cached bucket to read: {0}")]
110 ReadOpeningCachedBucket(io::Error),
111
112 #[error("parsing bucket url: {0}")]
113 ParsingBucketUrl(url::ParseError),
114
115 #[error("getting bucket: {0}")]
116 GettingBucket(reqwest::Error),
117
118 #[error("getting bucket: got status code {0}")]
119 GettingBucketGotStatusCode(reqwest::StatusCode),
120
121 #[error("opening cached bucket to write: {0}")]
122 WriteOpeningCachedBucket(io::Error),
123
124 #[error("streaming bucket: {0}")]
125 StreamingBucket(io::Error),
126
127 #[error("read XDR frame bucket entry: {0}")]
128 ReadXdrFrameBucketEntry(xdr::Error),
129
130 #[error("renaming temporary downloaded file to final destination: {0}")]
131 RenameDownloadFile(io::Error),
132
133 #[error("getting bucket directory: {0}")]
134 GetBucketDir(data::Error),
135
136 #[error("reading history http stream: {0}")]
137 ReadHistoryHttpStream(reqwest::Error),
138
139 #[error("writing ledger snapshot: {0}")]
140 WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
141
142 #[error(transparent)]
143 Join(#[from] tokio::task::JoinError),
144
145 #[error(transparent)]
146 Network(#[from] config::network::Error),
147
148 #[error(transparent)]
149 Locator(#[from] locator::Error),
150
151 #[error(transparent)]
152 Config(#[from] config::Error),
153
154 #[error("archive url not configured")]
155 ArchiveUrlNotConfigured,
156
157 #[error("parsing asset name: {0}")]
158 ParseAssetName(String),
159
160 #[error(transparent)]
161 Asset(#[from] builder::asset::Error),
162
163 #[error("ledger not found in archive")]
164 LedgerNotFound,
165
166 #[error("xdr parsing error: {0}")]
167 Xdr(#[from] xdr::Error),
168
169 #[error("corrupted bucket file: expected hash {expected}, got {actual}")]
170 CorruptedBucket { expected: String, actual: String },
171}
172
173const CHECKPOINT_FREQUENCY: u32 = 64;
178
179impl Cmd {
180 #[allow(clippy::too_many_lines)]
181 pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
182 let print = print::Print::new(global_args.quiet);
183 let start = Instant::now();
184
185 let archive_url = self.archive_url()?;
186 let history = get_history(&print, &archive_url, self.ledger).await?;
187
188 let ledger = history.current_ledger;
189 let network_passphrase = &history.network_passphrase;
190 let network_id = Sha256::digest(network_passphrase);
191
192 print.infoln(format!("Ledger: {ledger}"));
193 print.infoln(format!("Network Passphrase: {network_passphrase}"));
194 print.infoln(format!("Network id: {}", hex::encode(network_id)));
195
196 let (ledger_close_time, base_reserve) =
198 match get_ledger_metadata_from_archive(&print, &archive_url, ledger).await {
199 Ok((close_time, reserve)) => {
200 print.infoln(format!("Ledger Close Time: {close_time}"));
201 print.infoln(format!("Base Reserve: {reserve}"));
202 (close_time, reserve)
203 }
204 Err(e) => {
205 print.warnln(format!("Failed to get ledger metadata from archive: {e}"));
206 print.infoln("Using default values: close_time=0, base_reserve=1");
207 (0u64, 1u32) }
209 };
210
211 let buckets = history
214 .current_buckets
215 .iter()
216 .flat_map(|h| [h.curr.clone(), h.snap.clone()])
217 .filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
218 .collect::<Vec<_>>();
219
220 for (i, bucket) in buckets.iter().enumerate() {
222 cache_bucket(&print, &archive_url, i, bucket).await?;
223 }
224
225 let mut snapshot = LedgerSnapshot {
228 protocol_version: 0,
229 sequence_number: ledger,
230 timestamp: ledger_close_time,
231 network_id: network_id.into(),
232 base_reserve,
233 min_persistent_entry_ttl: 0,
234 min_temp_entry_ttl: 0,
235 max_entry_ttl: 0,
236 ledger_entries: Vec::new(),
237 };
238
239 let mut seen = HashSet::new();
244
245 #[allow(clippy::items_after_statements)]
246 #[derive(Default)]
247 struct SearchInputs {
248 account_ids: HashSet<AccountId>,
249 contract_ids: HashSet<ScAddress>,
250 wasm_hashes: HashSet<Hash>,
251 }
252 impl SearchInputs {
253 pub fn is_empty(&self) -> bool {
254 self.account_ids.is_empty()
255 && self.contract_ids.is_empty()
256 && self.wasm_hashes.is_empty()
257 }
258 }
259
260 let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
262 .address
263 .iter()
264 .cloned()
265 .filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
266 .partition_map(|a| a);
267
268 let mut current = SearchInputs {
269 account_ids,
270 contract_ids,
271 wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
272 };
273 let mut next = SearchInputs::default();
274
275 loop {
276 if current.is_empty() {
277 break;
278 }
279
280 print.infoln(format!(
281 "Searching for {} accounts, {} contracts, {} wasms",
282 current.account_ids.len(),
283 current.contract_ids.len(),
284 current.wasm_hashes.len(),
285 ));
286
287 for (i, bucket) in buckets.iter().enumerate() {
288 let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
291 let file = std::fs::OpenOptions::new()
292 .read(true)
293 .open(&cache_path)
294 .map_err(Error::ReadOpeningCachedBucket)?;
295
296 let message = format!("Searching bucket {i} {bucket}");
297 print.searchln(format!("{message}…"));
298
299 if let Ok(metadata) = file.metadata() {
300 print.clear_previous_line();
301 print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
302 }
303
304 let limited = &mut Limited::new(file, Limits::none());
308 let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
309 let mut count_saved = 0;
310 for entry in entries {
311 let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
312 let (key, val) = match entry {
313 BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
314 let k = l.to_key();
315 (k, Some(l))
316 }
317 BucketEntry::Deadentry(k) => (k, None),
318 BucketEntry::Metaentry(m) => {
319 if m.ledger_version > snapshot.protocol_version {
320 snapshot.protocol_version = m.ledger_version;
321 print.infoln(format!(
322 "Protocol version: {}",
323 snapshot.protocol_version
324 ));
325 }
326 continue;
327 }
328 };
329
330 if seen.contains(&key) {
331 continue;
332 }
333
334 let keep = match &key {
335 LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
336 LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
337 LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
338 LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
339 LedgerKey::ConfigSetting(_) => true,
340 _ => false,
341 };
342
343 if !keep {
344 continue;
345 }
346
347 seen.insert(key.clone());
348
349 let Some(val) = val else {
350 continue;
351 };
352
353 let include = match &val.data {
354 LedgerEntryData::ConfigSetting(ConfigSettingEntry::StateArchival(
355 state_archival,
356 )) => {
357 snapshot.min_persistent_entry_ttl = state_archival.min_persistent_ttl;
358 snapshot.min_temp_entry_ttl = state_archival.min_temporary_ttl;
359 snapshot.max_entry_ttl = state_archival.max_entry_ttl;
360 false
361 }
362
363 LedgerEntryData::ContractData(e) => {
364 if e.key == ScVal::LedgerKeyContractInstance {
370 match &e.val {
371 ScVal::ContractInstance(ScContractInstance {
372 executable: ContractExecutable::Wasm(hash),
373 ..
374 }) => {
375 if !current.wasm_hashes.contains(hash) {
376 next.wasm_hashes.insert(hash.clone());
377 print.infoln(format!(
378 "Adding wasm {} to search",
379 hex::encode(hash)
380 ));
381 }
382 }
383 ScVal::ContractInstance(ScContractInstance {
384 executable: ContractExecutable::StellarAsset,
385 storage: Some(storage),
386 }) => {
387 if let Some(name) =
388 get_name_from_stellar_asset_contract_storage(storage)
389 {
390 let asset: builder::Asset = name.parse()?;
391 if let Some(issuer) = match asset
392 .resolve(&global_args.locator)?
393 {
394 Asset::Native => None,
395 Asset::CreditAlphanum4(a4) => Some(a4.issuer),
396 Asset::CreditAlphanum12(a12) => Some(a12.issuer),
397 } {
398 print.infoln(format!(
399 "Adding asset issuer {issuer} to search"
400 ));
401 next.account_ids.insert(issuer);
402 }
403 }
404 }
405 _ => {}
406 }
407 }
408 keep
409 }
410 _ => false,
411 };
412 if include {
413 snapshot
414 .ledger_entries
415 .push((Box::new(key), (Box::new(val), Some(u32::MAX))));
416 count_saved += 1;
417 }
418 }
419 if count_saved > 0 {
420 print.infoln(format!("Found {count_saved} entries"));
421 }
422 }
423 current = next;
424 next = SearchInputs::default();
425 }
426
427 snapshot
429 .write_file(&self.out)
430 .map_err(Error::WriteLedgerSnapshot)?;
431 print.saveln(format!(
432 "Saved {} entries to {:?}",
433 snapshot.ledger_entries.len(),
434 self.out
435 ));
436
437 let duration = Duration::from_secs(start.elapsed().as_secs());
438 print.checkln(format!("Completed in {}", format_duration(duration)));
439
440 Ok(())
441 }
442
443 fn archive_url(&self) -> Result<Url, Error> {
444 self.archive_url
447 .clone()
448 .or_else(|| {
449 self.network.get(&self.locator).ok().and_then(|network| {
450 match network.network_passphrase.as_str() {
451 passphrase::MAINNET => {
452 Some("https://history.stellar.org/prd/core-live/core_live_001")
453 }
454 passphrase::TESTNET => {
455 Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
456 }
457 passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
458 passphrase::LOCAL => Some("http://localhost:8000/archive"),
459 _ => None,
460 }
461 .map(|s| Url::from_str(s).expect("archive url valid"))
462 })
463 })
464 .ok_or(Error::ArchiveUrlNotConfigured)
465 }
466
467 fn resolve_address_sync(
468 &self,
469 address: &str,
470 network_passphrase: &str,
471 ) -> Option<Either<AccountId, ScAddress>> {
472 if let Some(contract) = self.resolve_contract(address, network_passphrase) {
473 Some(Either::Right(contract))
474 } else {
475 self.resolve_account_sync(address).map(Either::Left)
476 }
477 }
478
479 fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
482 let address: UnresolvedMuxedAccount = address.parse().ok()?;
483 let muxed_account = address
484 .resolve_muxed_account_sync(&self.locator, None)
485 .ok()?;
486 Some(muxed_account.account_id())
487 }
488
489 fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
492 address.parse().ok().or_else(|| {
493 Some(ScAddress::Contract(stellar_xdr::curr::ContractId(
494 self.locator
495 .resolve_contract_id(address, network_passphrase)
496 .ok()?
497 .0
498 .into(),
499 )))
500 })
501 }
502}
503
504fn ledger_to_path_components(ledger: u32) -> (String, String, String, String) {
505 let ledger_hex = format!("{ledger:08x}");
506 let ledger_hex_0 = ledger_hex[0..=1].to_string();
507 let ledger_hex_1 = ledger_hex[2..=3].to_string();
508 let ledger_hex_2 = ledger_hex[4..=5].to_string();
509 (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2)
510}
511
512async fn get_history(
513 print: &print::Print,
514 archive_url: &Url,
515 ledger: Option<u32>,
516) -> Result<History, Error> {
517 let archive_url = archive_url.to_string();
518 let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
519 let history_url = if let Some(ledger) = ledger {
520 let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) =
521 ledger_to_path_components(ledger);
522 format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
523 } else {
524 format!("{archive_url}/.well-known/stellar-history.json")
525 };
526 let history_url = Url::from_str(&history_url).unwrap();
527
528 print.globeln(format!("Downloading history {history_url}"));
529
530 let response = http::client()
531 .get(history_url.as_str())
532 .send()
533 .await
534 .map_err(Error::DownloadingHistory)?;
535
536 if !response.status().is_success() {
537 if let Some(ledger) = ledger {
539 let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
540
541 if ledger_offset != 0 {
542 print.errorln(format!(
543 "Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
544 ledger - ledger_offset,
545 ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
546 ));
547 }
548 }
549 return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
550 }
551
552 let body = response
553 .bytes()
554 .await
555 .map_err(Error::ReadHistoryHttpStream)?;
556
557 print.clear_previous_line();
558 print.globeln(format!("Downloaded history {}", &history_url));
559
560 serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
561}
562
563async fn get_ledger_metadata_from_archive(
564 print: &print::Print,
565 archive_url: &Url,
566 ledger: u32,
567) -> Result<(u64, u32), Error> {
568 let archive_url = archive_url.to_string();
569 let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
570
571 let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) = ledger_to_path_components(ledger);
573 let ledger_url = format!(
574 "{archive_url}/ledger/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/ledger-{ledger_hex}.xdr.gz"
575 );
576
577 print.globeln(format!("Downloading ledger headers {ledger_url}"));
578
579 let ledger_url = Url::from_str(&ledger_url).map_err(Error::ParsingBucketUrl)?;
580 let response = http::client()
581 .get(ledger_url.as_str())
582 .send()
583 .await
584 .map_err(Error::DownloadingHistory)?;
585
586 if !response.status().is_success() {
587 return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
588 }
589
590 let ledger_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
592 let cache_path = ledger_dir.join(format!("ledger-{ledger_hex}.xdr"));
593 let dl_path = cache_path.with_extension("dl");
594
595 let stream = response
596 .bytes_stream()
597 .map(|result| result.map_err(std::io::Error::other));
598 let stream_reader = StreamReader::new(stream);
599 let buf_reader = BufReader::new(stream_reader);
600 let mut decoder = GzipDecoder::new(buf_reader);
601
602 let mut file = OpenOptions::new()
603 .create(true)
604 .truncate(true)
605 .write(true)
606 .open(&dl_path)
607 .await
608 .map_err(Error::WriteOpeningCachedBucket)?;
609
610 tokio::io::copy(&mut decoder, &mut file)
611 .await
612 .map_err(Error::StreamingBucket)?;
613
614 fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
615
616 print.clear_previous_line();
617 print.globeln(format!("Downloaded ledger headers for ledger {ledger}"));
618
619 let file = std::fs::File::open(&cache_path).map_err(Error::ReadOpeningCachedBucket)?;
621 let limited = &mut Limited::new(file, Limits::none());
622
623 let entries = Frame::<LedgerHeaderHistoryEntry>::read_xdr_iter(limited);
625 for entry in entries {
626 let Frame(header_entry) = entry.map_err(Error::Xdr)?;
627
628 if header_entry.header.ledger_seq == ledger {
629 let close_time = header_entry.header.scp_value.close_time.0;
630 let base_reserve = header_entry.header.base_reserve;
631
632 return Ok((close_time, base_reserve));
633 }
634 }
635
636 Err(Error::LedgerNotFound)
637}
638
639fn validate_bucket_hash(cache_path: &PathBuf, expected_hash: &str) -> Result<(), Error> {
640 let file = std::fs::File::open(cache_path).map_err(Error::ReadOpeningCachedBucket)?;
641 let mut hasher = Sha256::new();
642 std::io::copy(&mut std::io::BufReader::new(file), &mut hasher)
643 .map_err(Error::ReadOpeningCachedBucket)?;
644 let actual_hash = hex::encode(hasher.finalize());
645
646 if actual_hash != expected_hash {
647 return Err(Error::CorruptedBucket {
648 expected: expected_hash.to_string(),
649 actual: actual_hash,
650 });
651 }
652
653 Ok(())
654}
655
656async fn cache_bucket(
657 print: &print::Print,
658 archive_url: &Url,
659 bucket_index: usize,
660 bucket: &str,
661) -> Result<PathBuf, Error> {
662 let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
663 let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
664
665 if cache_path.exists() {
667 if validate_bucket_hash(&cache_path, bucket).is_err() {
668 print.warnln(format!(
669 "Cached bucket {bucket} is corrupted, re-downloading"
670 ));
671 std::fs::remove_file(&cache_path).ok();
672 } else {
673 return Ok(cache_path);
674 }
675 }
676
677 if !cache_path.exists() {
678 let bucket_0 = &bucket[0..=1];
679 let bucket_1 = &bucket[2..=3];
680 let bucket_2 = &bucket[4..=5];
681 let bucket_url =
682 format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
683
684 print.globeln(format!("Downloading bucket {bucket_index} {bucket}…"));
685
686 let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
687
688 let response = http::client()
689 .get(bucket_url.as_str())
690 .send()
691 .await
692 .map_err(Error::GettingBucket)?;
693
694 if !response.status().is_success() {
695 print.println("");
696 return Err(Error::GettingBucketGotStatusCode(response.status()));
697 }
698
699 if let Some(len) = response.content_length() {
700 print.clear_previous_line();
701 print.globeln(format!(
702 "Downloaded bucket {bucket_index} {bucket} ({})",
703 ByteSize(len)
704 ));
705 }
706
707 let stream = response
708 .bytes_stream()
709 .map(|result| result.map_err(std::io::Error::other));
710 let stream_reader = StreamReader::new(stream);
711 let buf_reader = BufReader::new(stream_reader);
712 let mut decoder = GzipDecoder::new(buf_reader);
713 let dl_path = cache_path.with_extension("dl");
714 let mut file = OpenOptions::new()
715 .create(true)
716 .truncate(true)
717 .write(true)
718 .open(&dl_path)
719 .await
720 .map_err(Error::WriteOpeningCachedBucket)?;
721 tokio::io::copy(&mut decoder, &mut file)
722 .await
723 .map_err(Error::StreamingBucket)?;
724 fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
725 }
726 Ok(cache_path)
727}
728
729#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
730#[serde(rename_all = "camelCase")]
731struct History {
732 current_ledger: u32,
733 current_buckets: Vec<HistoryBucket>,
734 network_passphrase: String,
735}
736
737#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
738#[serde(rename_all = "camelCase")]
739struct HistoryBucket {
740 curr: String,
741 snap: String,
742}