1use async_compression::tokio::bufread::GzipDecoder;
2use bytesize::ByteSize;
3use clap::{arg, Parser, ValueEnum};
4use futures::StreamExt;
5use humantime::format_duration;
6use itertools::{Either, Itertools};
7use sha2::{Digest, Sha256};
8use soroban_ledger_snapshot::LedgerSnapshot;
9use std::{
10 collections::HashSet,
11 fs,
12 io::{self},
13 path::PathBuf,
14 str::FromStr,
15 time::{Duration, Instant},
16};
17use stellar_xdr::curr::{
18 self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ConfigSettingId,
19 ContractExecutable, Frame, Hash, LedgerEntry, LedgerEntryData, LedgerKey, LedgerKeyAccount,
20 LedgerKeyClaimableBalance, LedgerKeyConfigSetting, LedgerKeyContractCode,
21 LedgerKeyContractData, LedgerKeyData, LedgerKeyLiquidityPool, LedgerKeyOffer,
22 LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits, ReadXdr, ScAddress, ScContractInstance,
23 ScVal,
24};
25use tokio::fs::OpenOptions;
26use tokio::io::BufReader;
27use tokio_util::io::StreamReader;
28use url::Url;
29
30use crate::{
31 commands::{config::data, global, HEADING_RPC},
32 config::{self, locator, network::passphrase},
33 print,
34 tx::builder,
35 utils::get_name_from_stellar_asset_contract_storage,
36};
37use crate::{config::address::UnresolvedMuxedAccount, utils::http};
38
39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum)]
40pub enum Output {
41 Json,
42}
43
44impl Default for Output {
45 fn default() -> Self {
46 Self::Json
47 }
48}
49
50fn default_out_path() -> PathBuf {
51 PathBuf::new().join("snapshot.json")
52}
53
54#[derive(Parser, Debug, Clone)]
70#[group(skip)]
71#[command(arg_required_else_help = true)]
72pub struct Cmd {
73 #[arg(long)]
75 ledger: Option<u32>,
76 #[arg(long = "address", help_heading = "Filter Options")]
78 address: Vec<String>,
79 #[arg(long = "wasm-hash", help_heading = "Filter Options")]
81 wasm_hashes: Vec<Hash>,
82 #[arg(long)]
84 output: Output,
85 #[arg(long, default_value=default_out_path().into_os_string())]
87 out: PathBuf,
88 #[command(flatten)]
89 locator: locator::Args,
90 #[command(flatten)]
91 network: config::network::Args,
92 #[arg(long, help_heading = HEADING_RPC, env = "STELLAR_ARCHIVE_URL")]
94 archive_url: Option<Url>,
95}
96
97#[derive(thiserror::Error, Debug)]
98pub enum Error {
99 #[error("wasm hash invalid: {0}")]
100 WasmHashInvalid(String),
101 #[error("downloading history: {0}")]
102 DownloadingHistory(reqwest::Error),
103 #[error("downloading history: got status code {0}")]
104 DownloadingHistoryGotStatusCode(reqwest::StatusCode),
105 #[error("json decoding history: {0}")]
106 JsonDecodingHistory(serde_json::Error),
107 #[error("opening cached bucket to read: {0}")]
108 ReadOpeningCachedBucket(io::Error),
109 #[error("parsing bucket url: {0}")]
110 ParsingBucketUrl(url::ParseError),
111 #[error("getting bucket: {0}")]
112 GettingBucket(reqwest::Error),
113 #[error("getting bucket: got status code {0}")]
114 GettingBucketGotStatusCode(reqwest::StatusCode),
115 #[error("opening cached bucket to write: {0}")]
116 WriteOpeningCachedBucket(io::Error),
117 #[error("streaming bucket: {0}")]
118 StreamingBucket(io::Error),
119 #[error("read XDR frame bucket entry: {0}")]
120 ReadXdrFrameBucketEntry(xdr::Error),
121 #[error("renaming temporary downloaded file to final destination: {0}")]
122 RenameDownloadFile(io::Error),
123 #[error("getting bucket directory: {0}")]
124 GetBucketDir(data::Error),
125 #[error("reading history http stream: {0}")]
126 ReadHistoryHttpStream(reqwest::Error),
127 #[error("writing ledger snapshot: {0}")]
128 WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
129 #[error(transparent)]
130 Join(#[from] tokio::task::JoinError),
131 #[error(transparent)]
132 Network(#[from] config::network::Error),
133 #[error(transparent)]
134 Locator(#[from] locator::Error),
135 #[error(transparent)]
136 Config(#[from] config::Error),
137 #[error("archive url not configured")]
138 ArchiveUrlNotConfigured,
139 #[error("parsing asset name: {0}")]
140 ParseAssetName(String),
141 #[error(transparent)]
142 Asset(#[from] builder::asset::Error),
143}
144
145const CHECKPOINT_FREQUENCY: u32 = 64;
150
151impl Cmd {
152 #[allow(clippy::too_many_lines)]
153 pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
154 let print = print::Print::new(global_args.quiet);
155 let start = Instant::now();
156
157 let archive_url = self.archive_url()?;
158 let history = get_history(&print, &archive_url, self.ledger).await?;
159
160 let ledger = history.current_ledger;
161 let network_passphrase = &history.network_passphrase;
162 let network_id = Sha256::digest(network_passphrase);
163
164 print.infoln(format!("Ledger: {ledger}"));
165 print.infoln(format!("Network Passphrase: {network_passphrase}"));
166 print.infoln(format!("Network id: {}", hex::encode(network_id)));
167
168 let buckets = history
171 .current_buckets
172 .iter()
173 .flat_map(|h| [h.curr.clone(), h.snap.clone()])
174 .filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
175 .collect::<Vec<_>>();
176
177 for (i, bucket) in buckets.iter().enumerate() {
179 cache_bucket(&print, &archive_url, i, bucket).await?;
180 }
181
182 let mut snapshot = LedgerSnapshot {
185 protocol_version: 0,
187 sequence_number: ledger,
188 timestamp: 0,
189 network_id: network_id.into(),
190 base_reserve: 1,
191 min_persistent_entry_ttl: 0,
192 min_temp_entry_ttl: 0,
193 max_entry_ttl: 0,
194 ledger_entries: Vec::new(),
195 };
196
197 let mut seen = HashSet::new();
202
203 #[allow(clippy::items_after_statements)]
204 #[derive(Default)]
205 struct SearchInputs {
206 account_ids: HashSet<AccountId>,
207 contract_ids: HashSet<ScAddress>,
208 wasm_hashes: HashSet<Hash>,
209 }
210 impl SearchInputs {
211 pub fn is_empty(&self) -> bool {
212 self.account_ids.is_empty()
213 && self.contract_ids.is_empty()
214 && self.wasm_hashes.is_empty()
215 }
216 }
217
218 let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
220 .address
221 .iter()
222 .cloned()
223 .filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
224 .partition_map(|a| a);
225
226 let mut current = SearchInputs {
227 account_ids,
228 contract_ids,
229 wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
230 };
231 let mut next = SearchInputs::default();
232
233 loop {
234 if current.is_empty() {
235 break;
236 }
237
238 print.infoln(format!(
239 "Searching for {} accounts, {} contracts, {} wasms",
240 current.account_ids.len(),
241 current.contract_ids.len(),
242 current.wasm_hashes.len(),
243 ));
244
245 for (i, bucket) in buckets.iter().enumerate() {
246 let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
249 let file = std::fs::OpenOptions::new()
250 .read(true)
251 .open(&cache_path)
252 .map_err(Error::ReadOpeningCachedBucket)?;
253
254 let message = format!("Searching bucket {i} {bucket}");
255 print.search(format!("{message}…"));
256
257 if let Ok(metadata) = file.metadata() {
258 print.clear_line();
259 print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
260 }
261
262 let limited = &mut Limited::new(file, Limits::none());
266 let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
267 let mut count_saved = 0;
268 for entry in entries {
269 let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
270 let (key, val) = match entry {
271 BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
272 let k = data_into_key(&l);
273 (k, Some(l))
274 }
275 BucketEntry::Deadentry(k) => (k, None),
276 BucketEntry::Metaentry(m) => {
277 if m.ledger_version > snapshot.protocol_version {
278 snapshot.protocol_version = m.ledger_version;
279 print.infoln(format!(
280 "Protocol version: {}",
281 snapshot.protocol_version
282 ));
283 }
284 continue;
285 }
286 };
287 if seen.contains(&key) {
288 continue;
289 }
290 let keep = match &key {
291 LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
292 LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
293 LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
294 LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
295 _ => false,
296 };
297 if !keep {
298 continue;
299 }
300 seen.insert(key.clone());
301 let Some(val) = val else { continue };
302 match &val.data {
303 LedgerEntryData::ContractData(e) => {
304 if keep && e.key == ScVal::LedgerKeyContractInstance {
310 match &e.val {
311 ScVal::ContractInstance(ScContractInstance {
312 executable: ContractExecutable::Wasm(hash),
313 ..
314 }) => {
315 if !current.wasm_hashes.contains(hash) {
316 next.wasm_hashes.insert(hash.clone());
317 print.infoln(format!(
318 "Adding wasm {} to search",
319 hex::encode(hash)
320 ));
321 }
322 }
323 ScVal::ContractInstance(ScContractInstance {
324 executable: ContractExecutable::StellarAsset,
325 storage: Some(storage),
326 }) => {
327 if let Some(name) =
328 get_name_from_stellar_asset_contract_storage(storage)
329 {
330 let asset: builder::Asset = name.parse()?;
331 if let Some(issuer) = match asset
332 .resolve(&global_args.locator)?
333 {
334 Asset::Native => None,
335 Asset::CreditAlphanum4(a4) => Some(a4.issuer),
336 Asset::CreditAlphanum12(a12) => Some(a12.issuer),
337 } {
338 print.infoln(format!(
339 "Adding asset issuer {issuer} to search"
340 ));
341 next.account_ids.insert(issuer);
342 }
343 }
344 }
345 _ => {}
346 }
347 }
348 keep
349 }
350 _ => false,
351 };
352 snapshot
353 .ledger_entries
354 .push((Box::new(key), (Box::new(val), Some(u32::MAX))));
355 count_saved += 1;
356 }
357 if count_saved > 0 {
358 print.infoln(format!("Found {count_saved} entries"));
359 }
360 }
361 current = next;
362 next = SearchInputs::default();
363 }
364
365 snapshot
367 .write_file(&self.out)
368 .map_err(Error::WriteLedgerSnapshot)?;
369 print.saveln(format!(
370 "Saved {} entries to {:?}",
371 snapshot.ledger_entries.len(),
372 self.out
373 ));
374
375 let duration = Duration::from_secs(start.elapsed().as_secs());
376 print.checkln(format!("Completed in {}", format_duration(duration)));
377
378 Ok(())
379 }
380
381 fn archive_url(&self) -> Result<Url, Error> {
382 self.archive_url
385 .clone()
386 .or_else(|| {
387 self.network.get(&self.locator).ok().and_then(|network| {
388 match network.network_passphrase.as_str() {
389 passphrase::MAINNET => {
390 Some("https://history.stellar.org/prd/core-live/core_live_001")
391 }
392 passphrase::TESTNET => {
393 Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
394 }
395 passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
396 passphrase::LOCAL => Some("http://localhost:8000/archive"),
397 _ => None,
398 }
399 .map(|s| Url::from_str(s).expect("archive url valid"))
400 })
401 })
402 .ok_or(Error::ArchiveUrlNotConfigured)
403 }
404
405 #[allow(dead_code)]
406 async fn resolve_address(
407 &self,
408 address: &str,
409 network_passphrase: &str,
410 ) -> Option<Either<AccountId, ScAddress>> {
411 if let Some(contract) = self.resolve_contract(address, network_passphrase) {
412 Some(Either::Right(contract))
413 } else {
414 self.resolve_account(address).await.map(Either::Left)
415 }
416 }
417
418 fn resolve_address_sync(
419 &self,
420 address: &str,
421 network_passphrase: &str,
422 ) -> Option<Either<AccountId, ScAddress>> {
423 if let Some(contract) = self.resolve_contract(address, network_passphrase) {
424 Some(Either::Right(contract))
425 } else {
426 self.resolve_account_sync(address).map(Either::Left)
427 }
428 }
429
430 async fn resolve_account(&self, address: &str) -> Option<AccountId> {
434 let address: UnresolvedMuxedAccount = address.parse().ok()?;
435 Some(AccountId(xdr::PublicKey::PublicKeyTypeEd25519(
436 match address
437 .resolve_muxed_account(&self.locator, None)
438 .await
439 .ok()?
440 {
441 xdr::MuxedAccount::Ed25519(uint256) => uint256,
442 xdr::MuxedAccount::MuxedEd25519(xdr::MuxedAccountMed25519 { ed25519, .. }) => {
443 ed25519
444 }
445 },
446 )))
447 }
448
449 fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
452 let address: UnresolvedMuxedAccount = address.parse().ok()?;
453 let muxed_account = address
454 .resolve_muxed_account_sync(&self.locator, None)
455 .ok()?;
456 Some(muxed_account.account_id())
457 }
458 fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
461 address.parse().ok().or_else(|| {
462 Some(ScAddress::Contract(stellar_xdr::curr::ContractId(
463 self.locator
464 .resolve_contract_id(address, network_passphrase)
465 .ok()?
466 .0
467 .into(),
468 )))
469 })
470 }
471}
472
473async fn get_history(
474 print: &print::Print,
475 archive_url: &Url,
476 ledger: Option<u32>,
477) -> Result<History, Error> {
478 let archive_url = archive_url.to_string();
479 let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
480 let history_url = if let Some(ledger) = ledger {
481 let ledger_hex = format!("{ledger:08x}");
482 let ledger_hex_0 = &ledger_hex[0..=1];
483 let ledger_hex_1 = &ledger_hex[2..=3];
484 let ledger_hex_2 = &ledger_hex[4..=5];
485 format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
486 } else {
487 format!("{archive_url}/.well-known/stellar-history.json")
488 };
489 let history_url = Url::from_str(&history_url).unwrap();
490
491 print.globe(format!("Downloading history {history_url}"));
492
493 let response = http::client()
494 .get(history_url.as_str())
495 .send()
496 .await
497 .map_err(Error::DownloadingHistory)?;
498
499 if !response.status().is_success() {
500 if let Some(ledger) = ledger {
502 let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
503
504 if ledger_offset != 0 {
505 print.println("");
506 print.errorln(format!(
507 "Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
508 ledger - ledger_offset,
509 ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
510 ));
511 }
512 }
513 return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
514 }
515
516 let body = response
517 .bytes()
518 .await
519 .map_err(Error::ReadHistoryHttpStream)?;
520
521 print.clear_line();
522 print.globeln(format!("Downloaded history {}", &history_url));
523
524 serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
525}
526
527async fn cache_bucket(
528 print: &print::Print,
529 archive_url: &Url,
530 bucket_index: usize,
531 bucket: &str,
532) -> Result<PathBuf, Error> {
533 let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
534 let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
535 if !cache_path.exists() {
536 let bucket_0 = &bucket[0..=1];
537 let bucket_1 = &bucket[2..=3];
538 let bucket_2 = &bucket[4..=5];
539 let bucket_url =
540 format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
541
542 print.globe(format!("Downloading bucket {bucket_index} {bucket}…"));
543
544 let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
545
546 let response = http::client()
547 .get(bucket_url.as_str())
548 .send()
549 .await
550 .map_err(Error::GettingBucket)?;
551
552 if !response.status().is_success() {
553 print.println("");
554 return Err(Error::GettingBucketGotStatusCode(response.status()));
555 }
556
557 if let Some(len) = response.content_length() {
558 print.clear_line();
559 print.globe(format!(
560 "Downloaded bucket {bucket_index} {bucket} ({})",
561 ByteSize(len)
562 ));
563 }
564
565 print.println("");
566
567 let stream = response
568 .bytes_stream()
569 .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
570 let stream_reader = StreamReader::new(stream);
571 let buf_reader = BufReader::new(stream_reader);
572 let mut decoder = GzipDecoder::new(buf_reader);
573 let dl_path = cache_path.with_extension("dl");
574 let mut file = OpenOptions::new()
575 .create(true)
576 .truncate(true)
577 .write(true)
578 .open(&dl_path)
579 .await
580 .map_err(Error::WriteOpeningCachedBucket)?;
581 tokio::io::copy(&mut decoder, &mut file)
582 .await
583 .map_err(Error::StreamingBucket)?;
584 fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
585 }
586 Ok(cache_path)
587}
588
589#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
590#[serde(rename_all = "camelCase")]
591struct History {
592 current_ledger: u32,
593 current_buckets: Vec<HistoryBucket>,
594 network_passphrase: String,
595}
596
597#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
598#[serde(rename_all = "camelCase")]
599struct HistoryBucket {
600 curr: String,
601 snap: String,
602}
603
604fn data_into_key(d: &LedgerEntry) -> LedgerKey {
605 match &d.data {
607 LedgerEntryData::Account(e) => LedgerKey::Account(LedgerKeyAccount {
608 account_id: e.account_id.clone(),
609 }),
610 LedgerEntryData::Trustline(e) => LedgerKey::Trustline(LedgerKeyTrustLine {
611 account_id: e.account_id.clone(),
612 asset: e.asset.clone(),
613 }),
614 LedgerEntryData::Offer(e) => LedgerKey::Offer(LedgerKeyOffer {
615 seller_id: e.seller_id.clone(),
616 offer_id: e.offer_id,
617 }),
618 LedgerEntryData::Data(e) => LedgerKey::Data(LedgerKeyData {
619 account_id: e.account_id.clone(),
620 data_name: e.data_name.clone(),
621 }),
622 LedgerEntryData::ClaimableBalance(e) => {
623 LedgerKey::ClaimableBalance(LedgerKeyClaimableBalance {
624 balance_id: e.balance_id.clone(),
625 })
626 }
627 LedgerEntryData::LiquidityPool(e) => LedgerKey::LiquidityPool(LedgerKeyLiquidityPool {
628 liquidity_pool_id: e.liquidity_pool_id.clone(),
629 }),
630 LedgerEntryData::ContractData(e) => LedgerKey::ContractData(LedgerKeyContractData {
631 contract: e.contract.clone(),
632 key: e.key.clone(),
633 durability: e.durability,
634 }),
635 LedgerEntryData::ContractCode(e) => LedgerKey::ContractCode(LedgerKeyContractCode {
636 hash: e.hash.clone(),
637 }),
638 LedgerEntryData::ConfigSetting(e) => LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
639 config_setting_id: match e {
640 ConfigSettingEntry::ContractMaxSizeBytes(_) => {
641 ConfigSettingId::ContractMaxSizeBytes
642 }
643 ConfigSettingEntry::ContractComputeV0(_) => ConfigSettingId::ContractComputeV0,
644 ConfigSettingEntry::ContractLedgerCostV0(_) => {
645 ConfigSettingId::ContractLedgerCostV0
646 }
647 ConfigSettingEntry::ContractHistoricalDataV0(_) => {
648 ConfigSettingId::ContractHistoricalDataV0
649 }
650 ConfigSettingEntry::ContractEventsV0(_) => ConfigSettingId::ContractEventsV0,
651 ConfigSettingEntry::ContractBandwidthV0(_) => ConfigSettingId::ContractBandwidthV0,
652 ConfigSettingEntry::ContractCostParamsCpuInstructions(_) => {
653 ConfigSettingId::ContractCostParamsCpuInstructions
654 }
655 ConfigSettingEntry::ContractCostParamsMemoryBytes(_) => {
656 ConfigSettingId::ContractCostParamsMemoryBytes
657 }
658 ConfigSettingEntry::ContractDataKeySizeBytes(_) => {
659 ConfigSettingId::ContractDataKeySizeBytes
660 }
661 ConfigSettingEntry::ContractDataEntrySizeBytes(_) => {
662 ConfigSettingId::ContractDataEntrySizeBytes
663 }
664 ConfigSettingEntry::StateArchival(_) => ConfigSettingId::StateArchival,
665 ConfigSettingEntry::ContractExecutionLanes(_) => {
666 ConfigSettingId::ContractExecutionLanes
667 }
668 ConfigSettingEntry::EvictionIterator(_) => ConfigSettingId::EvictionIterator,
669 ConfigSettingEntry::LiveSorobanStateSizeWindow(_) => {
670 ConfigSettingId::LiveSorobanStateSizeWindow
671 }
672 ConfigSettingEntry::ContractParallelComputeV0(_) => {
673 ConfigSettingId::ContractParallelComputeV0
674 }
675 ConfigSettingEntry::ContractLedgerCostExtV0(_) => {
676 ConfigSettingId::ContractLedgerCostExtV0
677 }
678 ConfigSettingEntry::ScpTiming(_) => ConfigSettingId::ScpTiming,
679 },
680 }),
681 LedgerEntryData::Ttl(e) => LedgerKey::Ttl(LedgerKeyTtl {
682 key_hash: e.key_hash.clone(),
683 }),
684 }
685}