soroban_cli/commands/snapshot/
create.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use bytesize::ByteSize;
3use clap::{arg, Parser, ValueEnum};
4use futures::StreamExt;
5use humantime::format_duration;
6use itertools::{Either, Itertools};
7use sha2::{Digest, Sha256};
8use soroban_ledger_snapshot::LedgerSnapshot;
9use std::{
10    collections::HashSet,
11    fs,
12    io::{self},
13    path::PathBuf,
14    str::FromStr,
15    time::{Duration, Instant},
16};
17use stellar_xdr::curr::{
18    self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ConfigSettingId,
19    ContractExecutable, Frame, Hash, LedgerEntry, LedgerEntryData, LedgerHeaderHistoryEntry,
20    LedgerKey, LedgerKeyAccount, LedgerKeyClaimableBalance, LedgerKeyConfigSetting,
21    LedgerKeyContractCode, LedgerKeyContractData, LedgerKeyData, LedgerKeyLiquidityPool,
22    LedgerKeyOffer, LedgerKeyTrustLine, LedgerKeyTtl, Limited, Limits, ReadXdr, ScAddress,
23    ScContractInstance, ScVal,
24};
25use tokio::fs::OpenOptions;
26use tokio::io::BufReader;
27use tokio_util::io::StreamReader;
28use url::Url;
29
30use crate::{
31    commands::{config::data, global, HEADING_ARCHIVE},
32    config::{self, locator, network::passphrase},
33    print,
34    tx::builder,
35    utils::get_name_from_stellar_asset_contract_storage,
36};
37use crate::{config::address::UnresolvedMuxedAccount, utils::http};
38
39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum, Default)]
40pub enum Output {
41    #[default]
42    Json,
43}
44
45fn default_out_path() -> PathBuf {
46    PathBuf::new().join("snapshot.json")
47}
48
49/// Create a ledger snapshot using a history archive.
50///
51/// Filters (address, wasm-hash) specify what ledger entries to include.
52///
53/// Account addresses include the account, and trustlines.
54///
55/// Contract addresses include the related wasm, contract data.
56///
57/// If a contract is a Stellar asset contract, it includes the asset issuer's
58/// account and trust lines, but does not include all the trust lines of other
59/// accounts holding the asset. To include them specify the addresses of
60/// relevant accounts.
61///
62/// Any invalid contract id passed as `--address` will be ignored.
63///
64#[derive(Parser, Debug, Clone)]
65#[group(skip)]
66#[command(arg_required_else_help = true)]
67pub struct Cmd {
68    /// The ledger sequence number to snapshot. Defaults to latest history archived ledger.
69    #[arg(long)]
70    ledger: Option<u32>,
71
72    /// Account or contract address/alias to include in the snapshot.
73    #[arg(long = "address", help_heading = "Filter Options")]
74    address: Vec<String>,
75
76    /// WASM hashes to include in the snapshot.
77    #[arg(long = "wasm-hash", help_heading = "Filter Options")]
78    wasm_hashes: Vec<Hash>,
79
80    /// Format of the out file.
81    #[arg(long)]
82    output: Output,
83
84    /// Out path that the snapshot is written to.
85    #[arg(long, default_value=default_out_path().into_os_string())]
86    out: PathBuf,
87
88    /// Archive URL
89    #[arg(long, help_heading = HEADING_ARCHIVE, env = "STELLAR_ARCHIVE_URL")]
90    archive_url: Option<Url>,
91
92    #[command(flatten)]
93    locator: locator::Args,
94
95    #[command(flatten)]
96    network: config::network::Args,
97}
98
99#[derive(thiserror::Error, Debug)]
100pub enum Error {
101    #[error("wasm hash invalid: {0}")]
102    WasmHashInvalid(String),
103
104    #[error("downloading history: {0}")]
105    DownloadingHistory(reqwest::Error),
106
107    #[error("downloading history: got status code {0}")]
108    DownloadingHistoryGotStatusCode(reqwest::StatusCode),
109
110    #[error("json decoding history: {0}")]
111    JsonDecodingHistory(serde_json::Error),
112
113    #[error("opening cached bucket to read: {0}")]
114    ReadOpeningCachedBucket(io::Error),
115
116    #[error("parsing bucket url: {0}")]
117    ParsingBucketUrl(url::ParseError),
118
119    #[error("getting bucket: {0}")]
120    GettingBucket(reqwest::Error),
121
122    #[error("getting bucket: got status code {0}")]
123    GettingBucketGotStatusCode(reqwest::StatusCode),
124
125    #[error("opening cached bucket to write: {0}")]
126    WriteOpeningCachedBucket(io::Error),
127
128    #[error("streaming bucket: {0}")]
129    StreamingBucket(io::Error),
130
131    #[error("read XDR frame bucket entry: {0}")]
132    ReadXdrFrameBucketEntry(xdr::Error),
133
134    #[error("renaming temporary downloaded file to final destination: {0}")]
135    RenameDownloadFile(io::Error),
136
137    #[error("getting bucket directory: {0}")]
138    GetBucketDir(data::Error),
139
140    #[error("reading history http stream: {0}")]
141    ReadHistoryHttpStream(reqwest::Error),
142
143    #[error("writing ledger snapshot: {0}")]
144    WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
145
146    #[error(transparent)]
147    Join(#[from] tokio::task::JoinError),
148
149    #[error(transparent)]
150    Network(#[from] config::network::Error),
151
152    #[error(transparent)]
153    Locator(#[from] locator::Error),
154
155    #[error(transparent)]
156    Config(#[from] config::Error),
157
158    #[error("archive url not configured")]
159    ArchiveUrlNotConfigured,
160
161    #[error("parsing asset name: {0}")]
162    ParseAssetName(String),
163
164    #[error(transparent)]
165    Asset(#[from] builder::asset::Error),
166
167    #[error("ledger not found in archive")]
168    LedgerNotFound,
169
170    #[error("xdr parsing error: {0}")]
171    Xdr(#[from] xdr::Error),
172
173    #[error("corrupted bucket file: expected hash {expected}, got {actual}")]
174    CorruptedBucket { expected: String, actual: String },
175}
176
177/// Checkpoint frequency is usually 64 ledgers, but in local test nets it'll
178/// often by 8. There's no way to simply detect what frequency to expect ledgers
179/// at, so it is hardcoded at 64, and this value is used only to help the user
180/// select good ledger numbers when they select one that doesn't exist.
181const CHECKPOINT_FREQUENCY: u32 = 64;
182
183impl Cmd {
184    #[allow(clippy::too_many_lines)]
185    pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
186        let print = print::Print::new(global_args.quiet);
187        let start = Instant::now();
188
189        let archive_url = self.archive_url()?;
190        let history = get_history(&print, &archive_url, self.ledger).await?;
191
192        let ledger = history.current_ledger;
193        let network_passphrase = &history.network_passphrase;
194        let network_id = Sha256::digest(network_passphrase);
195
196        print.infoln(format!("Ledger: {ledger}"));
197        print.infoln(format!("Network Passphrase: {network_passphrase}"));
198        print.infoln(format!("Network id: {}", hex::encode(network_id)));
199
200        // Get ledger close time and base reserve from archive
201        let (ledger_close_time, base_reserve) =
202            match get_ledger_metadata_from_archive(&print, &archive_url, ledger).await {
203                Ok((close_time, reserve)) => {
204                    print.infoln(format!("Ledger Close Time: {close_time}"));
205                    print.infoln(format!("Base Reserve: {reserve}"));
206                    (close_time, reserve)
207                }
208                Err(e) => {
209                    print.warnln(format!("Failed to get ledger metadata from archive: {e}"));
210                    print.infoln("Using default values: close_time=0, base_reserve=1");
211                    (0u64, 1u32) // Default values
212                }
213            };
214
215        // Prepare a flat list of buckets to read. They'll be ordered by their
216        // level so that they can iterated higher level to lower level.
217        let buckets = history
218            .current_buckets
219            .iter()
220            .flat_map(|h| [h.curr.clone(), h.snap.clone()])
221            .filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
222            .collect::<Vec<_>>();
223
224        // Pre-cache the buckets.
225        for (i, bucket) in buckets.iter().enumerate() {
226            cache_bucket(&print, &archive_url, i, bucket).await?;
227        }
228
229        // The snapshot is what will be written to file at the end. Fields will
230        // be updated while parsing the history archive.
231        let mut snapshot = LedgerSnapshot {
232            protocol_version: 0,
233            sequence_number: ledger,
234            timestamp: ledger_close_time,
235            network_id: network_id.into(),
236            base_reserve,
237            min_persistent_entry_ttl: 0,
238            min_temp_entry_ttl: 0,
239            max_entry_ttl: 0,
240            ledger_entries: Vec::new(),
241        };
242
243        // Track ledger keys seen, so that we can ignore old versions of
244        // entries. Entries can appear in both higher level and lower level
245        // buckets, and to get the latest version of the entry the version in
246        // the higher level bucket should be used.
247        let mut seen = HashSet::new();
248
249        #[allow(clippy::items_after_statements)]
250        #[derive(Default)]
251        struct SearchInputs {
252            account_ids: HashSet<AccountId>,
253            contract_ids: HashSet<ScAddress>,
254            wasm_hashes: HashSet<Hash>,
255        }
256        impl SearchInputs {
257            pub fn is_empty(&self) -> bool {
258                self.account_ids.is_empty()
259                    && self.contract_ids.is_empty()
260                    && self.wasm_hashes.is_empty()
261            }
262        }
263
264        // Search the buckets using the user inputs as the starting inputs.
265        let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
266            .address
267            .iter()
268            .cloned()
269            .filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
270            .partition_map(|a| a);
271
272        let mut current = SearchInputs {
273            account_ids,
274            contract_ids,
275            wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
276        };
277        let mut next = SearchInputs::default();
278
279        loop {
280            if current.is_empty() {
281                break;
282            }
283
284            print.infoln(format!(
285                "Searching for {} accounts, {} contracts, {} wasms",
286                current.account_ids.len(),
287                current.contract_ids.len(),
288                current.wasm_hashes.len(),
289            ));
290
291            for (i, bucket) in buckets.iter().enumerate() {
292                // Defined where the bucket will be read from, either from cache on
293                // disk, or streamed from the archive.
294                let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
295                let file = std::fs::OpenOptions::new()
296                    .read(true)
297                    .open(&cache_path)
298                    .map_err(Error::ReadOpeningCachedBucket)?;
299
300                let message = format!("Searching bucket {i} {bucket}");
301                print.searchln(format!("{message}…"));
302
303                if let Ok(metadata) = file.metadata() {
304                    print.clear_previous_line();
305                    print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
306                }
307
308                // Stream the bucket entries from the bucket, identifying
309                // entries that match the filters, and including only the
310                // entries that match in the snapshot.
311                let limited = &mut Limited::new(file, Limits::none());
312                let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
313                let mut count_saved = 0;
314                for entry in entries {
315                    let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
316                    let (key, val) = match entry {
317                        BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
318                            let k = data_into_key(&l);
319                            (k, Some(l))
320                        }
321                        BucketEntry::Deadentry(k) => (k, None),
322                        BucketEntry::Metaentry(m) => {
323                            if m.ledger_version > snapshot.protocol_version {
324                                snapshot.protocol_version = m.ledger_version;
325                                print.infoln(format!(
326                                    "Protocol version: {}",
327                                    snapshot.protocol_version
328                                ));
329                            }
330                            continue;
331                        }
332                    };
333
334                    if seen.contains(&key) {
335                        continue;
336                    }
337
338                    let keep = match &key {
339                        LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
340                        LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
341                        LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
342                        LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
343                        LedgerKey::ConfigSetting(_) => true,
344                        _ => false,
345                    };
346
347                    if !keep {
348                        continue;
349                    }
350
351                    seen.insert(key.clone());
352
353                    let Some(val) = val else {
354                        continue;
355                    };
356
357                    match &val.data {
358                        LedgerEntryData::ConfigSetting(ConfigSettingEntry::StateArchival(
359                            state_archival,
360                        )) => {
361                            snapshot.min_persistent_entry_ttl = state_archival.min_persistent_ttl;
362                            snapshot.min_temp_entry_ttl = state_archival.min_temporary_ttl;
363                            snapshot.max_entry_ttl = state_archival.max_entry_ttl;
364                            false
365                        }
366
367                        LedgerEntryData::ContractData(e) => {
368                            // If a contract instance references contract
369                            // executable stored in another ledger entry, add
370                            // that ledger entry to the filter so that Wasm for
371                            // any filtered contract is collected too in the
372                            // second pass.
373                            if e.key == ScVal::LedgerKeyContractInstance {
374                                match &e.val {
375                                    ScVal::ContractInstance(ScContractInstance {
376                                        executable: ContractExecutable::Wasm(hash),
377                                        ..
378                                    }) => {
379                                        if !current.wasm_hashes.contains(hash) {
380                                            next.wasm_hashes.insert(hash.clone());
381                                            print.infoln(format!(
382                                                "Adding wasm {} to search",
383                                                hex::encode(hash)
384                                            ));
385                                        }
386                                    }
387                                    ScVal::ContractInstance(ScContractInstance {
388                                        executable: ContractExecutable::StellarAsset,
389                                        storage: Some(storage),
390                                    }) => {
391                                        if let Some(name) =
392                                            get_name_from_stellar_asset_contract_storage(storage)
393                                        {
394                                            let asset: builder::Asset = name.parse()?;
395                                            if let Some(issuer) = match asset
396                                                .resolve(&global_args.locator)?
397                                            {
398                                                Asset::Native => None,
399                                                Asset::CreditAlphanum4(a4) => Some(a4.issuer),
400                                                Asset::CreditAlphanum12(a12) => Some(a12.issuer),
401                                            } {
402                                                print.infoln(format!(
403                                                    "Adding asset issuer {issuer} to search"
404                                                ));
405                                                next.account_ids.insert(issuer);
406                                            }
407                                        }
408                                    }
409                                    _ => {}
410                                }
411                            }
412                            keep
413                        }
414                        _ => false,
415                    };
416                    snapshot
417                        .ledger_entries
418                        .push((Box::new(key), (Box::new(val), Some(u32::MAX))));
419                    count_saved += 1;
420                }
421                if count_saved > 0 {
422                    print.infoln(format!("Found {count_saved} entries"));
423                }
424            }
425            current = next;
426            next = SearchInputs::default();
427        }
428
429        // Write the snapshot to file.
430        snapshot
431            .write_file(&self.out)
432            .map_err(Error::WriteLedgerSnapshot)?;
433        print.saveln(format!(
434            "Saved {} entries to {:?}",
435            snapshot.ledger_entries.len(),
436            self.out
437        ));
438
439        let duration = Duration::from_secs(start.elapsed().as_secs());
440        print.checkln(format!("Completed in {}", format_duration(duration)));
441
442        Ok(())
443    }
444
445    fn archive_url(&self) -> Result<Url, Error> {
446        // Return the configured archive URL, or if one is not configured, guess
447        // at an appropriate archive URL given the network passphrase.
448        self.archive_url
449            .clone()
450            .or_else(|| {
451                self.network.get(&self.locator).ok().and_then(|network| {
452                    match network.network_passphrase.as_str() {
453                        passphrase::MAINNET => {
454                            Some("https://history.stellar.org/prd/core-live/core_live_001")
455                        }
456                        passphrase::TESTNET => {
457                            Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
458                        }
459                        passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
460                        passphrase::LOCAL => Some("http://localhost:8000/archive"),
461                        _ => None,
462                    }
463                    .map(|s| Url::from_str(s).expect("archive url valid"))
464                })
465            })
466            .ok_or(Error::ArchiveUrlNotConfigured)
467    }
468
469    fn resolve_address_sync(
470        &self,
471        address: &str,
472        network_passphrase: &str,
473    ) -> Option<Either<AccountId, ScAddress>> {
474        if let Some(contract) = self.resolve_contract(address, network_passphrase) {
475            Some(Either::Right(contract))
476        } else {
477            self.resolve_account_sync(address).map(Either::Left)
478        }
479    }
480
481    // Resolve an account address to an account id. The address can be a
482    // G-address or a key name (as in `stellar keys address NAME`).
483    fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
484        let address: UnresolvedMuxedAccount = address.parse().ok()?;
485        let muxed_account = address
486            .resolve_muxed_account_sync(&self.locator, None)
487            .ok()?;
488        Some(muxed_account.account_id())
489    }
490
491    // Resolve a contract address to a contract id. The contract can be a
492    // C-address or a contract alias.
493    fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
494        address.parse().ok().or_else(|| {
495            Some(ScAddress::Contract(stellar_xdr::curr::ContractId(
496                self.locator
497                    .resolve_contract_id(address, network_passphrase)
498                    .ok()?
499                    .0
500                    .into(),
501            )))
502        })
503    }
504}
505
506fn ledger_to_path_components(ledger: u32) -> (String, String, String, String) {
507    let ledger_hex = format!("{ledger:08x}");
508    let ledger_hex_0 = ledger_hex[0..=1].to_string();
509    let ledger_hex_1 = ledger_hex[2..=3].to_string();
510    let ledger_hex_2 = ledger_hex[4..=5].to_string();
511    (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2)
512}
513
514async fn get_history(
515    print: &print::Print,
516    archive_url: &Url,
517    ledger: Option<u32>,
518) -> Result<History, Error> {
519    let archive_url = archive_url.to_string();
520    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
521    let history_url = if let Some(ledger) = ledger {
522        let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) =
523            ledger_to_path_components(ledger);
524        format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
525    } else {
526        format!("{archive_url}/.well-known/stellar-history.json")
527    };
528    let history_url = Url::from_str(&history_url).unwrap();
529
530    print.globeln(format!("Downloading history {history_url}"));
531
532    let response = http::client()
533        .get(history_url.as_str())
534        .send()
535        .await
536        .map_err(Error::DownloadingHistory)?;
537
538    if !response.status().is_success() {
539        // Check ledger is a checkpoint ledger and available in archives.
540        if let Some(ledger) = ledger {
541            let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
542
543            if ledger_offset != 0 {
544                print.errorln(format!(
545                    "Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
546                    ledger - ledger_offset,
547                    ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
548                ));
549            }
550        }
551        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
552    }
553
554    let body = response
555        .bytes()
556        .await
557        .map_err(Error::ReadHistoryHttpStream)?;
558
559    print.clear_previous_line();
560    print.globeln(format!("Downloaded history {}", &history_url));
561
562    serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
563}
564
565async fn get_ledger_metadata_from_archive(
566    print: &print::Print,
567    archive_url: &Url,
568    ledger: u32,
569) -> Result<(u64, u32), Error> {
570    let archive_url = archive_url.to_string();
571    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
572
573    // Calculate the path to the ledger header file
574    let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) = ledger_to_path_components(ledger);
575    let ledger_url = format!(
576        "{archive_url}/ledger/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/ledger-{ledger_hex}.xdr.gz"
577    );
578
579    print.globeln(format!("Downloading ledger headers {ledger_url}"));
580
581    let ledger_url = Url::from_str(&ledger_url).map_err(Error::ParsingBucketUrl)?;
582    let response = http::client()
583        .get(ledger_url.as_str())
584        .send()
585        .await
586        .map_err(Error::DownloadingHistory)?;
587
588    if !response.status().is_success() {
589        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
590    }
591
592    // Cache the ledger file to disk like bucket files
593    let ledger_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
594    let cache_path = ledger_dir.join(format!("ledger-{ledger_hex}.xdr"));
595    let dl_path = cache_path.with_extension("dl");
596
597    let stream = response
598        .bytes_stream()
599        .map(|result| result.map_err(std::io::Error::other));
600    let stream_reader = StreamReader::new(stream);
601    let buf_reader = BufReader::new(stream_reader);
602    let mut decoder = GzipDecoder::new(buf_reader);
603
604    let mut file = OpenOptions::new()
605        .create(true)
606        .truncate(true)
607        .write(true)
608        .open(&dl_path)
609        .await
610        .map_err(Error::WriteOpeningCachedBucket)?;
611
612    tokio::io::copy(&mut decoder, &mut file)
613        .await
614        .map_err(Error::StreamingBucket)?;
615
616    fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
617
618    print.clear_previous_line();
619    print.globeln(format!("Downloaded ledger headers for ledger {ledger}"));
620
621    // Now read the cached file
622    let file = std::fs::File::open(&cache_path).map_err(Error::ReadOpeningCachedBucket)?;
623    let limited = &mut Limited::new(file, Limits::none());
624
625    // Find the specific ledger header entry we need
626    let entries = Frame::<LedgerHeaderHistoryEntry>::read_xdr_iter(limited);
627    for entry in entries {
628        let Frame(header_entry) = entry.map_err(Error::Xdr)?;
629
630        if header_entry.header.ledger_seq == ledger {
631            let close_time = header_entry.header.scp_value.close_time.0;
632            let base_reserve = header_entry.header.base_reserve;
633
634            return Ok((close_time, base_reserve));
635        }
636    }
637
638    Err(Error::LedgerNotFound)
639}
640
641fn validate_bucket_hash(cache_path: &PathBuf, expected_hash: &str) -> Result<(), Error> {
642    let file = std::fs::File::open(cache_path).map_err(Error::ReadOpeningCachedBucket)?;
643    let mut hasher = Sha256::new();
644    std::io::copy(&mut std::io::BufReader::new(file), &mut hasher)
645        .map_err(Error::ReadOpeningCachedBucket)?;
646    let actual_hash = hex::encode(hasher.finalize());
647
648    if actual_hash != expected_hash {
649        return Err(Error::CorruptedBucket {
650            expected: expected_hash.to_string(),
651            actual: actual_hash,
652        });
653    }
654
655    Ok(())
656}
657
658async fn cache_bucket(
659    print: &print::Print,
660    archive_url: &Url,
661    bucket_index: usize,
662    bucket: &str,
663) -> Result<PathBuf, Error> {
664    let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
665    let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
666
667    // Validate cached bucket if it exists
668    if cache_path.exists() {
669        if validate_bucket_hash(&cache_path, bucket).is_err() {
670            print.warnln(format!(
671                "Cached bucket {bucket} is corrupted, re-downloading"
672            ));
673            std::fs::remove_file(&cache_path).ok();
674        } else {
675            return Ok(cache_path);
676        }
677    }
678
679    if !cache_path.exists() {
680        let bucket_0 = &bucket[0..=1];
681        let bucket_1 = &bucket[2..=3];
682        let bucket_2 = &bucket[4..=5];
683        let bucket_url =
684            format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
685
686        print.globeln(format!("Downloading bucket {bucket_index} {bucket}…"));
687
688        let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
689
690        let response = http::client()
691            .get(bucket_url.as_str())
692            .send()
693            .await
694            .map_err(Error::GettingBucket)?;
695
696        if !response.status().is_success() {
697            print.println("");
698            return Err(Error::GettingBucketGotStatusCode(response.status()));
699        }
700
701        if let Some(len) = response.content_length() {
702            print.clear_previous_line();
703            print.globeln(format!(
704                "Downloaded bucket {bucket_index} {bucket} ({})",
705                ByteSize(len)
706            ));
707        }
708
709        let stream = response
710            .bytes_stream()
711            .map(|result| result.map_err(std::io::Error::other));
712        let stream_reader = StreamReader::new(stream);
713        let buf_reader = BufReader::new(stream_reader);
714        let mut decoder = GzipDecoder::new(buf_reader);
715        let dl_path = cache_path.with_extension("dl");
716        let mut file = OpenOptions::new()
717            .create(true)
718            .truncate(true)
719            .write(true)
720            .open(&dl_path)
721            .await
722            .map_err(Error::WriteOpeningCachedBucket)?;
723        tokio::io::copy(&mut decoder, &mut file)
724            .await
725            .map_err(Error::StreamingBucket)?;
726        fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
727    }
728    Ok(cache_path)
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
732#[serde(rename_all = "camelCase")]
733struct History {
734    current_ledger: u32,
735    current_buckets: Vec<HistoryBucket>,
736    network_passphrase: String,
737}
738
739#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
740#[serde(rename_all = "camelCase")]
741struct HistoryBucket {
742    curr: String,
743    snap: String,
744}
745
746fn data_into_key(d: &LedgerEntry) -> LedgerKey {
747    // TODO: Move this function into stellar-xdr.
748    match &d.data {
749        LedgerEntryData::Account(e) => LedgerKey::Account(LedgerKeyAccount {
750            account_id: e.account_id.clone(),
751        }),
752        LedgerEntryData::Trustline(e) => LedgerKey::Trustline(LedgerKeyTrustLine {
753            account_id: e.account_id.clone(),
754            asset: e.asset.clone(),
755        }),
756        LedgerEntryData::Offer(e) => LedgerKey::Offer(LedgerKeyOffer {
757            seller_id: e.seller_id.clone(),
758            offer_id: e.offer_id,
759        }),
760        LedgerEntryData::Data(e) => LedgerKey::Data(LedgerKeyData {
761            account_id: e.account_id.clone(),
762            data_name: e.data_name.clone(),
763        }),
764        LedgerEntryData::ClaimableBalance(e) => {
765            LedgerKey::ClaimableBalance(LedgerKeyClaimableBalance {
766                balance_id: e.balance_id.clone(),
767            })
768        }
769        LedgerEntryData::LiquidityPool(e) => LedgerKey::LiquidityPool(LedgerKeyLiquidityPool {
770            liquidity_pool_id: e.liquidity_pool_id.clone(),
771        }),
772        LedgerEntryData::ContractData(e) => LedgerKey::ContractData(LedgerKeyContractData {
773            contract: e.contract.clone(),
774            key: e.key.clone(),
775            durability: e.durability,
776        }),
777        LedgerEntryData::ContractCode(e) => LedgerKey::ContractCode(LedgerKeyContractCode {
778            hash: e.hash.clone(),
779        }),
780        LedgerEntryData::ConfigSetting(e) => LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
781            config_setting_id: match e {
782                ConfigSettingEntry::ContractMaxSizeBytes(_) => {
783                    ConfigSettingId::ContractMaxSizeBytes
784                }
785                ConfigSettingEntry::ContractComputeV0(_) => ConfigSettingId::ContractComputeV0,
786                ConfigSettingEntry::ContractLedgerCostV0(_) => {
787                    ConfigSettingId::ContractLedgerCostV0
788                }
789                ConfigSettingEntry::ContractHistoricalDataV0(_) => {
790                    ConfigSettingId::ContractHistoricalDataV0
791                }
792                ConfigSettingEntry::ContractEventsV0(_) => ConfigSettingId::ContractEventsV0,
793                ConfigSettingEntry::ContractBandwidthV0(_) => ConfigSettingId::ContractBandwidthV0,
794                ConfigSettingEntry::ContractCostParamsCpuInstructions(_) => {
795                    ConfigSettingId::ContractCostParamsCpuInstructions
796                }
797                ConfigSettingEntry::ContractCostParamsMemoryBytes(_) => {
798                    ConfigSettingId::ContractCostParamsMemoryBytes
799                }
800                ConfigSettingEntry::ContractDataKeySizeBytes(_) => {
801                    ConfigSettingId::ContractDataKeySizeBytes
802                }
803                ConfigSettingEntry::ContractDataEntrySizeBytes(_) => {
804                    ConfigSettingId::ContractDataEntrySizeBytes
805                }
806                ConfigSettingEntry::StateArchival(_) => ConfigSettingId::StateArchival,
807                ConfigSettingEntry::ContractExecutionLanes(_) => {
808                    ConfigSettingId::ContractExecutionLanes
809                }
810                ConfigSettingEntry::EvictionIterator(_) => ConfigSettingId::EvictionIterator,
811                ConfigSettingEntry::LiveSorobanStateSizeWindow(_) => {
812                    ConfigSettingId::LiveSorobanStateSizeWindow
813                }
814                ConfigSettingEntry::ContractParallelComputeV0(_) => {
815                    ConfigSettingId::ContractParallelComputeV0
816                }
817                ConfigSettingEntry::ContractLedgerCostExtV0(_) => {
818                    ConfigSettingId::ContractLedgerCostExtV0
819                }
820                ConfigSettingEntry::ScpTiming(_) => ConfigSettingId::ScpTiming,
821            },
822        }),
823        LedgerEntryData::Ttl(e) => LedgerKey::Ttl(LedgerKeyTtl {
824            key_hash: e.key_hash.clone(),
825        }),
826    }
827}