Skip to main content

soroban_cli/commands/snapshot/
create.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use bytesize::ByteSize;
3use clap::{Parser, ValueEnum};
4use futures::StreamExt;
5use humantime::format_duration;
6use itertools::{Either, Itertools};
7use sha2::{Digest, Sha256};
8use soroban_ledger_snapshot::LedgerSnapshot;
9use std::{
10    collections::HashSet,
11    fs,
12    io::{self},
13    path::PathBuf,
14    str::FromStr,
15    time::{Duration, Instant},
16};
17use stellar_xdr::{
18    self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ContractExecutable, Frame,
19    Hash, LedgerEntryData, LedgerHeaderHistoryEntry, LedgerKey, Limited, Limits, ReadXdr,
20    ScAddress, ScContractInstance, ScVal,
21};
22use tokio::fs::OpenOptions;
23use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
24use tokio_util::io::StreamReader;
25use url::Url;
26
27use crate::{
28    commands::{config::data, global, HEADING_ARCHIVE},
29    config::{self, locator, network::passphrase},
30    print,
31    tx::builder,
32    utils::get_name_from_stellar_asset_contract_storage,
33};
34use crate::{
35    config::address::UnresolvedMuxedAccount,
36    utils::{http, url::redact_url},
37};
38
39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum, Default)]
40pub enum Output {
41    #[default]
42    Json,
43}
44
45fn default_out_path() -> PathBuf {
46    PathBuf::new().join("snapshot.json")
47}
48
49/// Create a ledger snapshot using a history archive.
50///
51/// Filters (address, wasm-hash) specify what ledger entries to include.
52///
53/// Account addresses include the account, and trustlines.
54///
55/// Contract addresses include the related wasm, contract data.
56///
57/// If a contract is a Stellar asset contract, it includes the asset issuer's
58/// account and trust lines, but does not include all the trust lines of other
59/// accounts holding the asset. To include them specify the addresses of
60/// relevant accounts.
61///
62/// Any invalid contract id passed as `--address` will be ignored.
63///
64#[derive(Parser, Debug, Clone)]
65#[group(skip)]
66pub struct Cmd {
67    /// The ledger sequence number to snapshot. Defaults to latest history archived ledger.
68    #[arg(long)]
69    ledger: Option<u32>,
70
71    /// Account or contract address/alias to include in the snapshot.
72    #[arg(long = "address", help_heading = "Filter Options")]
73    address: Vec<String>,
74
75    /// WASM hashes to include in the snapshot.
76    #[arg(long = "wasm-hash", help_heading = "Filter Options")]
77    wasm_hashes: Vec<Hash>,
78
79    /// Format of the out file.
80    #[arg(long, value_enum, default_value_t)]
81    output: Output,
82
83    /// Out path that the snapshot is written to.
84    #[arg(long, default_value=default_out_path().into_os_string())]
85    out: PathBuf,
86
87    /// Archive URL
88    #[arg(long, help_heading = HEADING_ARCHIVE, env = "STELLAR_ARCHIVE_URL")]
89    archive_url: Option<Url>,
90
91    #[command(flatten)]
92    locator: locator::Args,
93
94    #[command(flatten)]
95    network: config::network::Args,
96}
97
98#[derive(thiserror::Error, Debug)]
99pub enum Error {
100    #[error("wasm hash invalid: {0}")]
101    WasmHashInvalid(String),
102
103    #[error("downloading history: {0}")]
104    DownloadingHistory(reqwest::Error),
105
106    #[error("downloading history: got status code {0}")]
107    DownloadingHistoryGotStatusCode(reqwest::StatusCode),
108
109    #[error("json decoding history: {0}")]
110    JsonDecodingHistory(serde_json::Error),
111
112    #[error("opening cached bucket to read: {0}")]
113    ReadOpeningCachedBucket(io::Error),
114
115    #[error("parsing bucket url: {0}")]
116    ParsingBucketUrl(url::ParseError),
117
118    #[error("getting bucket: {0}")]
119    GettingBucket(reqwest::Error),
120
121    #[error("getting bucket: got status code {0}")]
122    GettingBucketGotStatusCode(reqwest::StatusCode),
123
124    #[error("opening cached bucket to write: {0}")]
125    WriteOpeningCachedBucket(io::Error),
126
127    #[error("streaming bucket: {0}")]
128    StreamingBucket(io::Error),
129
130    #[error("read XDR frame bucket entry: {0}")]
131    ReadXdrFrameBucketEntry(xdr::Error),
132
133    #[error("renaming temporary downloaded file to final destination: {0}")]
134    RenameDownloadFile(io::Error),
135
136    #[error("getting bucket directory: {0}")]
137    GetBucketDir(data::Error),
138
139    #[error("reading history http stream: {0}")]
140    ReadHistoryHttpStream(reqwest::Error),
141
142    #[error("writing ledger snapshot: {0}")]
143    WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
144
145    #[error(transparent)]
146    Join(#[from] tokio::task::JoinError),
147
148    #[error(transparent)]
149    Network(#[from] config::network::Error),
150
151    #[error(transparent)]
152    Locator(#[from] locator::Error),
153
154    #[error(transparent)]
155    Config(#[from] config::Error),
156
157    #[error("archive url not configured")]
158    ArchiveUrlNotConfigured,
159
160    #[error("parsing asset name: {0}")]
161    ParseAssetName(String),
162
163    #[error(transparent)]
164    Asset(#[from] builder::asset::Error),
165
166    #[error("ledger not found in archive")]
167    LedgerNotFound,
168
169    #[error("xdr parsing error: {0}")]
170    Xdr(#[from] xdr::Error),
171
172    #[error("corrupted bucket file: expected hash {expected}, got {actual}")]
173    CorruptedBucket { expected: String, actual: String },
174
175    #[error("decompressed size exceeds maximum of {max}")]
176    DecompressedSizeLimitExceeded { max: ByteSize },
177}
178
179/// Checkpoint frequency is usually 64 ledgers, but in local test nets it'll
180/// often by 8. There's no way to simply detect what frequency to expect ledgers
181/// at, so it is hardcoded at 64, and this value is used only to help the user
182/// select good ledger numbers when they select one that doesn't exist.
183const CHECKPOINT_FREQUENCY: u32 = 64;
184
185/// Maximum decompressed size for bucket files (10 GiB).
186const MAX_BUCKET_DECOMPRESSED_SIZE: u64 = 10 * 1024 * 1024 * 1024;
187
188/// Maximum decompressed size for ledger header files (100 MiB).
189const MAX_LEDGER_HEADER_DECOMPRESSED_SIZE: u64 = 100 * 1024 * 1024;
190
191impl Cmd {
192    #[allow(clippy::too_many_lines)]
193    pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
194        let print = print::Print::new(global_args.quiet);
195        let start = Instant::now();
196
197        let archive_url = self.archive_url()?;
198        let history = get_history(&print, &archive_url, self.ledger).await?;
199
200        let ledger = history.current_ledger;
201        let network_passphrase = &history.network_passphrase;
202        let network_id = Sha256::digest(network_passphrase);
203
204        print.infoln(format!("Ledger: {ledger}"));
205        print.infoln(format!("Network Passphrase: {network_passphrase}"));
206        print.infoln(format!("Network id: {}", hex::encode(network_id)));
207
208        // Get ledger close time and base reserve from archive
209        let (ledger_close_time, base_reserve) =
210            match get_ledger_metadata_from_archive(&print, &archive_url, ledger).await {
211                Ok((close_time, reserve)) => {
212                    print.infoln(format!("Ledger Close Time: {close_time}"));
213                    print.infoln(format!("Base Reserve: {reserve}"));
214                    (close_time, reserve)
215                }
216                Err(e) => {
217                    print.warnln(format!("Failed to get ledger metadata from archive: {e}"));
218                    print.infoln("Using default values: close_time=0, base_reserve=1");
219                    (0u64, 1u32) // Default values
220                }
221            };
222
223        // Prepare a flat list of buckets to read. They'll be ordered by their
224        // level so that they can iterated higher level to lower level.
225        let buckets = history
226            .current_buckets
227            .iter()
228            .flat_map(|h| [h.curr.clone(), h.snap.clone()])
229            .filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
230            .collect::<Vec<_>>();
231
232        // Pre-cache the buckets.
233        for (i, bucket) in buckets.iter().enumerate() {
234            cache_bucket(&print, &archive_url, i, bucket).await?;
235        }
236
237        // The snapshot is what will be written to file at the end. Fields will
238        // be updated while parsing the history archive.
239        let mut snapshot = LedgerSnapshot {
240            protocol_version: 0,
241            sequence_number: ledger,
242            timestamp: ledger_close_time,
243            network_id: network_id.into(),
244            base_reserve,
245            min_persistent_entry_ttl: 0,
246            min_temp_entry_ttl: 0,
247            max_entry_ttl: 0,
248            ledger_entries: Vec::new(),
249        };
250
251        // Track ledger keys seen, so that we can ignore old versions of
252        // entries. Entries can appear in both higher level and lower level
253        // buckets, and to get the latest version of the entry the version in
254        // the higher level bucket should be used.
255        let mut seen = HashSet::new();
256
257        #[allow(clippy::items_after_statements)]
258        #[derive(Default)]
259        struct SearchInputs {
260            account_ids: HashSet<AccountId>,
261            contract_ids: HashSet<ScAddress>,
262            wasm_hashes: HashSet<Hash>,
263        }
264        impl SearchInputs {
265            pub fn is_empty(&self) -> bool {
266                self.account_ids.is_empty()
267                    && self.contract_ids.is_empty()
268                    && self.wasm_hashes.is_empty()
269            }
270        }
271
272        // Search the buckets using the user inputs as the starting inputs.
273        let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
274            .address
275            .iter()
276            .cloned()
277            .filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
278            .partition_map(|a| a);
279
280        let mut current = SearchInputs {
281            account_ids,
282            contract_ids,
283            wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
284        };
285        let mut next = SearchInputs::default();
286
287        loop {
288            if current.is_empty() {
289                break;
290            }
291
292            print.infoln(format!(
293                "Searching for {} accounts, {} contracts, {} wasms",
294                current.account_ids.len(),
295                current.contract_ids.len(),
296                current.wasm_hashes.len(),
297            ));
298
299            for (i, bucket) in buckets.iter().enumerate() {
300                // Defined where the bucket will be read from, either from cache on
301                // disk, or streamed from the archive.
302                let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
303                let file = std::fs::OpenOptions::new()
304                    .read(true)
305                    .open(&cache_path)
306                    .map_err(Error::ReadOpeningCachedBucket)?;
307
308                let message = format!("Searching bucket {i} {bucket}");
309                print.searchln(format!("{message}…"));
310
311                if let Ok(metadata) = file.metadata() {
312                    print.clear_previous_line();
313                    print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
314                }
315
316                // Stream the bucket entries from the bucket, identifying
317                // entries that match the filters, and including only the
318                // entries that match in the snapshot.
319                let limited = &mut Limited::new(file, Limits::none());
320                let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
321                let mut count_saved = 0;
322                for entry in entries {
323                    let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
324                    let (key, val) = match entry {
325                        BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
326                            let k = l.to_key();
327                            (k, Some(l))
328                        }
329                        BucketEntry::Deadentry(k) => (k, None),
330                        BucketEntry::Metaentry(m) => {
331                            if m.ledger_version > snapshot.protocol_version {
332                                snapshot.protocol_version = m.ledger_version;
333                                print.infoln(format!(
334                                    "Protocol version: {}",
335                                    snapshot.protocol_version
336                                ));
337                            }
338                            continue;
339                        }
340                    };
341
342                    if seen.contains(&key) {
343                        continue;
344                    }
345
346                    let keep = match &key {
347                        LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
348                        LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
349                        LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
350                        LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
351                        LedgerKey::ConfigSetting(_) => true,
352                        _ => false,
353                    };
354
355                    if !keep {
356                        continue;
357                    }
358
359                    seen.insert(key.clone());
360
361                    let Some(val) = val else {
362                        continue;
363                    };
364
365                    let include = match &val.data {
366                        LedgerEntryData::ConfigSetting(ConfigSettingEntry::StateArchival(
367                            state_archival,
368                        )) => {
369                            snapshot.min_persistent_entry_ttl = state_archival.min_persistent_ttl;
370                            snapshot.min_temp_entry_ttl = state_archival.min_temporary_ttl;
371                            snapshot.max_entry_ttl = state_archival.max_entry_ttl;
372                            false
373                        }
374
375                        LedgerEntryData::ContractData(e) => {
376                            // If a contract instance references contract
377                            // executable stored in another ledger entry, add
378                            // that ledger entry to the filter so that Wasm for
379                            // any filtered contract is collected too in the
380                            // second pass.
381                            if e.key == ScVal::LedgerKeyContractInstance {
382                                match &e.val {
383                                    ScVal::ContractInstance(ScContractInstance {
384                                        executable: ContractExecutable::Wasm(hash),
385                                        ..
386                                    }) if !current.wasm_hashes.contains(hash) => {
387                                        next.wasm_hashes.insert(hash.clone());
388                                        print.infoln(format!(
389                                            "Adding wasm {} to search",
390                                            hex::encode(hash)
391                                        ));
392                                    }
393                                    ScVal::ContractInstance(ScContractInstance {
394                                        executable: ContractExecutable::StellarAsset,
395                                        storage: Some(storage),
396                                    }) => {
397                                        if let Some(name) =
398                                            get_name_from_stellar_asset_contract_storage(storage)
399                                        {
400                                            let asset: builder::Asset = name.parse()?;
401                                            if let Some(issuer) = match asset
402                                                .resolve(&global_args.locator)?
403                                            {
404                                                Asset::Native => None,
405                                                Asset::CreditAlphanum4(a4) => Some(a4.issuer),
406                                                Asset::CreditAlphanum12(a12) => Some(a12.issuer),
407                                            } {
408                                                print.infoln(format!(
409                                                    "Adding asset issuer {issuer} to search"
410                                                ));
411                                                next.account_ids.insert(issuer);
412                                            }
413                                        }
414                                    }
415                                    _ => {}
416                                }
417                            }
418                            keep
419                        }
420                        _ => false,
421                    };
422                    if include {
423                        snapshot
424                            .ledger_entries
425                            .push((Box::new(key), (Box::new(val), Some(u32::MAX))));
426                        count_saved += 1;
427                    }
428                }
429                if count_saved > 0 {
430                    print.infoln(format!("Found {count_saved} entries"));
431                }
432            }
433            current = next;
434            next = SearchInputs::default();
435        }
436
437        // Write the snapshot to file.
438        snapshot
439            .write_file(&self.out)
440            .map_err(Error::WriteLedgerSnapshot)?;
441        print.saveln(format!(
442            "Saved {} entries to {:?}",
443            snapshot.ledger_entries.len(),
444            self.out
445        ));
446
447        let duration = Duration::from_secs(start.elapsed().as_secs());
448        print.checkln(format!("Completed in {}", format_duration(duration)));
449
450        Ok(())
451    }
452
453    fn archive_url(&self) -> Result<Url, Error> {
454        // Return the configured archive URL, or if one is not configured, guess
455        // at an appropriate archive URL given the network passphrase.
456        self.archive_url
457            .clone()
458            .or_else(|| {
459                self.network.get(&self.locator).ok().and_then(|network| {
460                    match network.network_passphrase.as_str() {
461                        passphrase::MAINNET => {
462                            Some("https://history.stellar.org/prd/core-live/core_live_001")
463                        }
464                        passphrase::TESTNET => {
465                            Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
466                        }
467                        passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
468                        passphrase::LOCAL => Some("http://localhost:8000/archive"),
469                        _ => None,
470                    }
471                    .map(|s| Url::from_str(s).expect("archive url valid"))
472                })
473            })
474            .ok_or(Error::ArchiveUrlNotConfigured)
475    }
476
477    fn resolve_address_sync(
478        &self,
479        address: &str,
480        network_passphrase: &str,
481    ) -> Option<Either<AccountId, ScAddress>> {
482        if let Some(contract) = self.resolve_contract(address, network_passphrase) {
483            Some(Either::Right(contract))
484        } else {
485            self.resolve_account_sync(address).map(Either::Left)
486        }
487    }
488
489    // Resolve an account address to an account id. The address can be a
490    // G-address or a key name (as in `stellar keys address NAME`).
491    fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
492        let address: UnresolvedMuxedAccount = address.parse().ok()?;
493        let muxed_account = address.resolve_muxed_account(&self.locator, None).ok()?;
494        Some(muxed_account.account_id())
495    }
496
497    // Resolve a contract address to a contract id. The contract can be a
498    // C-address or a contract alias.
499    fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
500        address.parse().ok().or_else(|| {
501            Some(ScAddress::Contract(stellar_xdr::ContractId(
502                self.locator
503                    .resolve_contract_id(address, network_passphrase)
504                    .ok()?
505                    .0
506                    .into(),
507            )))
508        })
509    }
510}
511
512/// Copy decompressed data from `reader` to `writer`, enforcing a maximum
513/// decompressed size. Returns an error if the decompressed output exceeds
514/// `max_bytes`.
515async fn copy_with_limit<R: AsyncRead + Unpin, W: tokio::io::AsyncWrite + Unpin>(
516    reader: R,
517    writer: &mut W,
518    max_bytes: u64,
519) -> Result<(), Error> {
520    let mut limited = reader.take(max_bytes);
521    tokio::io::copy(&mut limited, writer)
522        .await
523        .map_err(Error::StreamingBucket)?;
524
525    // If the underlying reader still has data, the limit was exceeded.
526    let mut decoder = limited.into_inner();
527    let mut overflow = [0u8; 1];
528    if decoder
529        .read(&mut overflow)
530        .await
531        .map_err(Error::StreamingBucket)?
532        > 0
533    {
534        return Err(Error::DecompressedSizeLimitExceeded {
535            max: ByteSize(max_bytes),
536        });
537    }
538    Ok(())
539}
540
541fn ledger_to_path_components(ledger: u32) -> (String, String, String, String) {
542    let ledger_hex = format!("{ledger:08x}");
543    let ledger_hex_0 = ledger_hex[0..=1].to_string();
544    let ledger_hex_1 = ledger_hex[2..=3].to_string();
545    let ledger_hex_2 = ledger_hex[4..=5].to_string();
546    (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2)
547}
548
549async fn get_history(
550    print: &print::Print,
551    archive_url: &Url,
552    ledger: Option<u32>,
553) -> Result<History, Error> {
554    let archive_url = archive_url.to_string();
555    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
556    let history_url = if let Some(ledger) = ledger {
557        let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) =
558            ledger_to_path_components(ledger);
559        format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
560    } else {
561        format!("{archive_url}/.well-known/stellar-history.json")
562    };
563    let history_url = Url::from_str(&history_url).unwrap();
564
565    print.globeln(format!(
566        "Downloading history {}",
567        redact_url(history_url.as_str())
568    ));
569
570    let response = http::client()
571        .get(history_url.as_str())
572        .send()
573        .await
574        .map_err(Error::DownloadingHistory)?;
575
576    if !response.status().is_success() {
577        // Check ledger is a checkpoint ledger and available in archives.
578        if let Some(ledger) = ledger {
579            let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
580
581            if ledger_offset != 0 {
582                print.errorln(format!(
583                    "Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
584                    ledger - ledger_offset,
585                    ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
586                ));
587            }
588        }
589        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
590    }
591
592    let body = response
593        .bytes()
594        .await
595        .map_err(Error::ReadHistoryHttpStream)?;
596
597    print.clear_previous_line();
598    print.globeln(format!(
599        "Downloaded history {}",
600        redact_url(history_url.as_str())
601    ));
602
603    serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
604}
605
606async fn get_ledger_metadata_from_archive(
607    print: &print::Print,
608    archive_url: &Url,
609    ledger: u32,
610) -> Result<(u64, u32), Error> {
611    let archive_url = archive_url.to_string();
612    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
613
614    // Calculate the path to the ledger header file
615    let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) = ledger_to_path_components(ledger);
616    let ledger_url = format!(
617        "{archive_url}/ledger/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/ledger-{ledger_hex}.xdr.gz"
618    );
619
620    let ledger_url = Url::from_str(&ledger_url).map_err(Error::ParsingBucketUrl)?;
621
622    print.globeln(format!(
623        "Downloading ledger headers {}",
624        redact_url(ledger_url.as_str())
625    ));
626
627    let response = http::client()
628        .get(ledger_url.as_str())
629        .send()
630        .await
631        .map_err(Error::DownloadingHistory)?;
632
633    if !response.status().is_success() {
634        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
635    }
636
637    // Cache the ledger file to disk like bucket files
638    let ledger_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
639    let cache_path = ledger_dir.join(format!("ledger-{ledger_hex}.xdr"));
640    let dl_path = cache_path.with_extension("dl");
641
642    let stream = response
643        .bytes_stream()
644        .map(|result| result.map_err(std::io::Error::other));
645    let stream_reader = StreamReader::new(stream);
646    let buf_reader = BufReader::new(stream_reader);
647    let decoder = GzipDecoder::new(buf_reader);
648
649    let mut file = OpenOptions::new()
650        .create(true)
651        .truncate(true)
652        .write(true)
653        .open(&dl_path)
654        .await
655        .map_err(Error::WriteOpeningCachedBucket)?;
656
657    if let Err(e) = copy_with_limit(decoder, &mut file, MAX_LEDGER_HEADER_DECOMPRESSED_SIZE).await {
658        let _ = fs::remove_file(&dl_path);
659        return Err(e);
660    }
661
662    fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
663    let _ = crate::config::locator::set_hardened_permissions(&cache_path);
664
665    print.clear_previous_line();
666    print.globeln(format!("Downloaded ledger headers for ledger {ledger}"));
667
668    // Now read the cached file
669    let file = std::fs::File::open(&cache_path).map_err(Error::ReadOpeningCachedBucket)?;
670    let limited = &mut Limited::new(file, Limits::none());
671
672    // Find the specific ledger header entry we need
673    let entries = Frame::<LedgerHeaderHistoryEntry>::read_xdr_iter(limited);
674    for entry in entries {
675        let Frame(header_entry) = entry.map_err(Error::Xdr)?;
676
677        if header_entry.header.ledger_seq == ledger {
678            let close_time = header_entry.header.scp_value.close_time.0;
679            let base_reserve = header_entry.header.base_reserve;
680
681            return Ok((close_time, base_reserve));
682        }
683    }
684
685    Err(Error::LedgerNotFound)
686}
687
688fn validate_bucket_hash(cache_path: &PathBuf, expected_hash: &str) -> Result<(), Error> {
689    let file = std::fs::File::open(cache_path).map_err(Error::ReadOpeningCachedBucket)?;
690    let mut hasher = Sha256::new();
691    std::io::copy(&mut std::io::BufReader::new(file), &mut hasher)
692        .map_err(Error::ReadOpeningCachedBucket)?;
693    let actual_hash = hex::encode(hasher.finalize());
694
695    if actual_hash != expected_hash {
696        return Err(Error::CorruptedBucket {
697            expected: expected_hash.to_string(),
698            actual: actual_hash,
699        });
700    }
701
702    Ok(())
703}
704
705async fn cache_bucket(
706    print: &print::Print,
707    archive_url: &Url,
708    bucket_index: usize,
709    bucket: &str,
710) -> Result<PathBuf, Error> {
711    let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
712    let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
713
714    // Validate cached bucket if it exists
715    if cache_path.exists() {
716        if validate_bucket_hash(&cache_path, bucket).is_err() {
717            print.warnln(format!(
718                "Cached bucket {bucket} is corrupted, re-downloading"
719            ));
720            std::fs::remove_file(&cache_path).ok();
721        } else {
722            return Ok(cache_path);
723        }
724    }
725
726    if !cache_path.exists() {
727        let bucket_0 = &bucket[0..=1];
728        let bucket_1 = &bucket[2..=3];
729        let bucket_2 = &bucket[4..=5];
730        let bucket_url =
731            format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
732
733        print.globeln(format!("Downloading bucket {bucket_index} {bucket}…"));
734
735        let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
736
737        let response = http::client()
738            .get(bucket_url.as_str())
739            .send()
740            .await
741            .map_err(Error::GettingBucket)?;
742
743        if !response.status().is_success() {
744            print.println("");
745            return Err(Error::GettingBucketGotStatusCode(response.status()));
746        }
747
748        if let Some(len) = response.content_length() {
749            print.clear_previous_line();
750            print.globeln(format!(
751                "Downloaded bucket {bucket_index} {bucket} ({})",
752                ByteSize(len)
753            ));
754        }
755
756        let stream = response
757            .bytes_stream()
758            .map(|result| result.map_err(std::io::Error::other));
759        let stream_reader = StreamReader::new(stream);
760        let buf_reader = BufReader::new(stream_reader);
761        let decoder = GzipDecoder::new(buf_reader);
762        let dl_path = cache_path.with_extension("dl");
763        let mut file = OpenOptions::new()
764            .create(true)
765            .truncate(true)
766            .write(true)
767            .open(&dl_path)
768            .await
769            .map_err(Error::WriteOpeningCachedBucket)?;
770
771        if let Err(e) = copy_with_limit(decoder, &mut file, MAX_BUCKET_DECOMPRESSED_SIZE).await {
772            let _ = fs::remove_file(&dl_path);
773            return Err(e);
774        }
775
776        fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
777        let _ = crate::config::locator::set_hardened_permissions(&cache_path);
778    }
779    Ok(cache_path)
780}
781
782#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
783#[serde(rename_all = "camelCase")]
784struct History {
785    current_ledger: u32,
786    current_buckets: Vec<HistoryBucket>,
787    network_passphrase: String,
788}
789
790#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
791#[serde(rename_all = "camelCase")]
792struct HistoryBucket {
793    curr: String,
794    snap: String,
795}
796
797#[cfg(test)]
798mod test {
799    use super::*;
800
801    #[tokio::test]
802    async fn test_copy_with_limit_under_limit() {
803        let input: &[u8] = b"hello";
804        let mut output = Vec::new();
805        copy_with_limit(input, &mut output, 10).await.unwrap();
806        assert_eq!(output, b"hello");
807    }
808
809    #[tokio::test]
810    async fn test_copy_with_limit_exact_limit() {
811        let input: &[u8] = b"hello";
812        let mut output = Vec::new();
813        copy_with_limit(input, &mut output, 5).await.unwrap();
814        assert_eq!(output, b"hello");
815    }
816
817    #[tokio::test]
818    async fn test_copy_with_limit_over_limit() {
819        let input: &[u8] = b"hello world, this exceeds the limit";
820        let mut output = Vec::new();
821        let err = copy_with_limit(input, &mut output, 10).await.unwrap_err();
822        assert!(
823            matches!(err, Error::DecompressedSizeLimitExceeded { .. }),
824            "expected DecompressedSizeLimitExceeded, got: {err}"
825        );
826    }
827}