Skip to main content

soroban_cli/commands/snapshot/
create.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use bytesize::ByteSize;
3use clap::{Parser, ValueEnum};
4use futures::StreamExt;
5use humantime::format_duration;
6use itertools::{Either, Itertools};
7use sha2::{Digest, Sha256};
8use soroban_ledger_snapshot::LedgerSnapshot;
9use std::{
10    collections::HashSet,
11    fs,
12    io::{self},
13    path::PathBuf,
14    str::FromStr,
15    time::{Duration, Instant},
16};
17use stellar_xdr::curr::{
18    self as xdr, AccountId, Asset, BucketEntry, ConfigSettingEntry, ContractExecutable, Frame,
19    Hash, LedgerEntryData, LedgerHeaderHistoryEntry, LedgerKey, Limited, Limits, ReadXdr,
20    ScAddress, ScContractInstance, ScVal,
21};
22use tokio::fs::OpenOptions;
23use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
24use tokio_util::io::StreamReader;
25use url::Url;
26
27use crate::{
28    commands::{config::data, global, HEADING_ARCHIVE},
29    config::{self, locator, network::passphrase},
30    print,
31    tx::builder,
32    utils::get_name_from_stellar_asset_contract_storage,
33};
34use crate::{config::address::UnresolvedMuxedAccount, utils::http};
35
36#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ValueEnum, Default)]
37pub enum Output {
38    #[default]
39    Json,
40}
41
42fn default_out_path() -> PathBuf {
43    PathBuf::new().join("snapshot.json")
44}
45
46/// Create a ledger snapshot using a history archive.
47///
48/// Filters (address, wasm-hash) specify what ledger entries to include.
49///
50/// Account addresses include the account, and trustlines.
51///
52/// Contract addresses include the related wasm, contract data.
53///
54/// If a contract is a Stellar asset contract, it includes the asset issuer's
55/// account and trust lines, but does not include all the trust lines of other
56/// accounts holding the asset. To include them specify the addresses of
57/// relevant accounts.
58///
59/// Any invalid contract id passed as `--address` will be ignored.
60///
61#[derive(Parser, Debug, Clone)]
62#[group(skip)]
63pub struct Cmd {
64    /// The ledger sequence number to snapshot. Defaults to latest history archived ledger.
65    #[arg(long)]
66    ledger: Option<u32>,
67
68    /// Account or contract address/alias to include in the snapshot.
69    #[arg(long = "address", help_heading = "Filter Options")]
70    address: Vec<String>,
71
72    /// WASM hashes to include in the snapshot.
73    #[arg(long = "wasm-hash", help_heading = "Filter Options")]
74    wasm_hashes: Vec<Hash>,
75
76    /// Format of the out file.
77    #[arg(long, value_enum, default_value_t)]
78    output: Output,
79
80    /// Out path that the snapshot is written to.
81    #[arg(long, default_value=default_out_path().into_os_string())]
82    out: PathBuf,
83
84    /// Archive URL
85    #[arg(long, help_heading = HEADING_ARCHIVE, env = "STELLAR_ARCHIVE_URL")]
86    archive_url: Option<Url>,
87
88    #[command(flatten)]
89    locator: locator::Args,
90
91    #[command(flatten)]
92    network: config::network::Args,
93}
94
95#[derive(thiserror::Error, Debug)]
96pub enum Error {
97    #[error("wasm hash invalid: {0}")]
98    WasmHashInvalid(String),
99
100    #[error("downloading history: {0}")]
101    DownloadingHistory(reqwest::Error),
102
103    #[error("downloading history: got status code {0}")]
104    DownloadingHistoryGotStatusCode(reqwest::StatusCode),
105
106    #[error("json decoding history: {0}")]
107    JsonDecodingHistory(serde_json::Error),
108
109    #[error("opening cached bucket to read: {0}")]
110    ReadOpeningCachedBucket(io::Error),
111
112    #[error("parsing bucket url: {0}")]
113    ParsingBucketUrl(url::ParseError),
114
115    #[error("getting bucket: {0}")]
116    GettingBucket(reqwest::Error),
117
118    #[error("getting bucket: got status code {0}")]
119    GettingBucketGotStatusCode(reqwest::StatusCode),
120
121    #[error("opening cached bucket to write: {0}")]
122    WriteOpeningCachedBucket(io::Error),
123
124    #[error("streaming bucket: {0}")]
125    StreamingBucket(io::Error),
126
127    #[error("read XDR frame bucket entry: {0}")]
128    ReadXdrFrameBucketEntry(xdr::Error),
129
130    #[error("renaming temporary downloaded file to final destination: {0}")]
131    RenameDownloadFile(io::Error),
132
133    #[error("getting bucket directory: {0}")]
134    GetBucketDir(data::Error),
135
136    #[error("reading history http stream: {0}")]
137    ReadHistoryHttpStream(reqwest::Error),
138
139    #[error("writing ledger snapshot: {0}")]
140    WriteLedgerSnapshot(soroban_ledger_snapshot::Error),
141
142    #[error(transparent)]
143    Join(#[from] tokio::task::JoinError),
144
145    #[error(transparent)]
146    Network(#[from] config::network::Error),
147
148    #[error(transparent)]
149    Locator(#[from] locator::Error),
150
151    #[error(transparent)]
152    Config(#[from] config::Error),
153
154    #[error("archive url not configured")]
155    ArchiveUrlNotConfigured,
156
157    #[error("parsing asset name: {0}")]
158    ParseAssetName(String),
159
160    #[error(transparent)]
161    Asset(#[from] builder::asset::Error),
162
163    #[error("ledger not found in archive")]
164    LedgerNotFound,
165
166    #[error("xdr parsing error: {0}")]
167    Xdr(#[from] xdr::Error),
168
169    #[error("corrupted bucket file: expected hash {expected}, got {actual}")]
170    CorruptedBucket { expected: String, actual: String },
171
172    #[error("decompressed size exceeds maximum of {max}")]
173    DecompressedSizeLimitExceeded { max: ByteSize },
174}
175
176/// Checkpoint frequency is usually 64 ledgers, but in local test nets it'll
177/// often by 8. There's no way to simply detect what frequency to expect ledgers
178/// at, so it is hardcoded at 64, and this value is used only to help the user
179/// select good ledger numbers when they select one that doesn't exist.
180const CHECKPOINT_FREQUENCY: u32 = 64;
181
182/// Maximum decompressed size for bucket files (10 GiB).
183const MAX_BUCKET_DECOMPRESSED_SIZE: u64 = 10 * 1024 * 1024 * 1024;
184
185/// Maximum decompressed size for ledger header files (100 MiB).
186const MAX_LEDGER_HEADER_DECOMPRESSED_SIZE: u64 = 100 * 1024 * 1024;
187
188impl Cmd {
189    #[allow(clippy::too_many_lines)]
190    pub async fn run(&self, global_args: &global::Args) -> Result<(), Error> {
191        let print = print::Print::new(global_args.quiet);
192        let start = Instant::now();
193
194        let archive_url = self.archive_url()?;
195        let history = get_history(&print, &archive_url, self.ledger).await?;
196
197        let ledger = history.current_ledger;
198        let network_passphrase = &history.network_passphrase;
199        let network_id = Sha256::digest(network_passphrase);
200
201        print.infoln(format!("Ledger: {ledger}"));
202        print.infoln(format!("Network Passphrase: {network_passphrase}"));
203        print.infoln(format!("Network id: {}", hex::encode(network_id)));
204
205        // Get ledger close time and base reserve from archive
206        let (ledger_close_time, base_reserve) =
207            match get_ledger_metadata_from_archive(&print, &archive_url, ledger).await {
208                Ok((close_time, reserve)) => {
209                    print.infoln(format!("Ledger Close Time: {close_time}"));
210                    print.infoln(format!("Base Reserve: {reserve}"));
211                    (close_time, reserve)
212                }
213                Err(e) => {
214                    print.warnln(format!("Failed to get ledger metadata from archive: {e}"));
215                    print.infoln("Using default values: close_time=0, base_reserve=1");
216                    (0u64, 1u32) // Default values
217                }
218            };
219
220        // Prepare a flat list of buckets to read. They'll be ordered by their
221        // level so that they can iterated higher level to lower level.
222        let buckets = history
223            .current_buckets
224            .iter()
225            .flat_map(|h| [h.curr.clone(), h.snap.clone()])
226            .filter(|b| b != "0000000000000000000000000000000000000000000000000000000000000000")
227            .collect::<Vec<_>>();
228
229        // Pre-cache the buckets.
230        for (i, bucket) in buckets.iter().enumerate() {
231            cache_bucket(&print, &archive_url, i, bucket).await?;
232        }
233
234        // The snapshot is what will be written to file at the end. Fields will
235        // be updated while parsing the history archive.
236        let mut snapshot = LedgerSnapshot {
237            protocol_version: 0,
238            sequence_number: ledger,
239            timestamp: ledger_close_time,
240            network_id: network_id.into(),
241            base_reserve,
242            min_persistent_entry_ttl: 0,
243            min_temp_entry_ttl: 0,
244            max_entry_ttl: 0,
245            ledger_entries: Vec::new(),
246        };
247
248        // Track ledger keys seen, so that we can ignore old versions of
249        // entries. Entries can appear in both higher level and lower level
250        // buckets, and to get the latest version of the entry the version in
251        // the higher level bucket should be used.
252        let mut seen = HashSet::new();
253
254        #[allow(clippy::items_after_statements)]
255        #[derive(Default)]
256        struct SearchInputs {
257            account_ids: HashSet<AccountId>,
258            contract_ids: HashSet<ScAddress>,
259            wasm_hashes: HashSet<Hash>,
260        }
261        impl SearchInputs {
262            pub fn is_empty(&self) -> bool {
263                self.account_ids.is_empty()
264                    && self.contract_ids.is_empty()
265                    && self.wasm_hashes.is_empty()
266            }
267        }
268
269        // Search the buckets using the user inputs as the starting inputs.
270        let (account_ids, contract_ids): (HashSet<AccountId>, HashSet<ScAddress>) = self
271            .address
272            .iter()
273            .cloned()
274            .filter_map(|a| self.resolve_address_sync(&a, network_passphrase))
275            .partition_map(|a| a);
276
277        let mut current = SearchInputs {
278            account_ids,
279            contract_ids,
280            wasm_hashes: self.wasm_hashes.iter().cloned().collect(),
281        };
282        let mut next = SearchInputs::default();
283
284        loop {
285            if current.is_empty() {
286                break;
287            }
288
289            print.infoln(format!(
290                "Searching for {} accounts, {} contracts, {} wasms",
291                current.account_ids.len(),
292                current.contract_ids.len(),
293                current.wasm_hashes.len(),
294            ));
295
296            for (i, bucket) in buckets.iter().enumerate() {
297                // Defined where the bucket will be read from, either from cache on
298                // disk, or streamed from the archive.
299                let cache_path = cache_bucket(&print, &archive_url, i, bucket).await?;
300                let file = std::fs::OpenOptions::new()
301                    .read(true)
302                    .open(&cache_path)
303                    .map_err(Error::ReadOpeningCachedBucket)?;
304
305                let message = format!("Searching bucket {i} {bucket}");
306                print.searchln(format!("{message}…"));
307
308                if let Ok(metadata) = file.metadata() {
309                    print.clear_previous_line();
310                    print.searchln(format!("{message} ({})", ByteSize(metadata.len())));
311                }
312
313                // Stream the bucket entries from the bucket, identifying
314                // entries that match the filters, and including only the
315                // entries that match in the snapshot.
316                let limited = &mut Limited::new(file, Limits::none());
317                let entries = Frame::<BucketEntry>::read_xdr_iter(limited);
318                let mut count_saved = 0;
319                for entry in entries {
320                    let Frame(entry) = entry.map_err(Error::ReadXdrFrameBucketEntry)?;
321                    let (key, val) = match entry {
322                        BucketEntry::Liveentry(l) | BucketEntry::Initentry(l) => {
323                            let k = l.to_key();
324                            (k, Some(l))
325                        }
326                        BucketEntry::Deadentry(k) => (k, None),
327                        BucketEntry::Metaentry(m) => {
328                            if m.ledger_version > snapshot.protocol_version {
329                                snapshot.protocol_version = m.ledger_version;
330                                print.infoln(format!(
331                                    "Protocol version: {}",
332                                    snapshot.protocol_version
333                                ));
334                            }
335                            continue;
336                        }
337                    };
338
339                    if seen.contains(&key) {
340                        continue;
341                    }
342
343                    let keep = match &key {
344                        LedgerKey::Account(k) => current.account_ids.contains(&k.account_id),
345                        LedgerKey::Trustline(k) => current.account_ids.contains(&k.account_id),
346                        LedgerKey::ContractData(k) => current.contract_ids.contains(&k.contract),
347                        LedgerKey::ContractCode(e) => current.wasm_hashes.contains(&e.hash),
348                        LedgerKey::ConfigSetting(_) => true,
349                        _ => false,
350                    };
351
352                    if !keep {
353                        continue;
354                    }
355
356                    seen.insert(key.clone());
357
358                    let Some(val) = val else {
359                        continue;
360                    };
361
362                    let include = match &val.data {
363                        LedgerEntryData::ConfigSetting(ConfigSettingEntry::StateArchival(
364                            state_archival,
365                        )) => {
366                            snapshot.min_persistent_entry_ttl = state_archival.min_persistent_ttl;
367                            snapshot.min_temp_entry_ttl = state_archival.min_temporary_ttl;
368                            snapshot.max_entry_ttl = state_archival.max_entry_ttl;
369                            false
370                        }
371
372                        LedgerEntryData::ContractData(e) => {
373                            // If a contract instance references contract
374                            // executable stored in another ledger entry, add
375                            // that ledger entry to the filter so that Wasm for
376                            // any filtered contract is collected too in the
377                            // second pass.
378                            if e.key == ScVal::LedgerKeyContractInstance {
379                                match &e.val {
380                                    ScVal::ContractInstance(ScContractInstance {
381                                        executable: ContractExecutable::Wasm(hash),
382                                        ..
383                                    }) if !current.wasm_hashes.contains(hash) => {
384                                        next.wasm_hashes.insert(hash.clone());
385                                        print.infoln(format!(
386                                            "Adding wasm {} to search",
387                                            hex::encode(hash)
388                                        ));
389                                    }
390                                    ScVal::ContractInstance(ScContractInstance {
391                                        executable: ContractExecutable::StellarAsset,
392                                        storage: Some(storage),
393                                    }) => {
394                                        if let Some(name) =
395                                            get_name_from_stellar_asset_contract_storage(storage)
396                                        {
397                                            let asset: builder::Asset = name.parse()?;
398                                            if let Some(issuer) = match asset
399                                                .resolve(&global_args.locator)?
400                                            {
401                                                Asset::Native => None,
402                                                Asset::CreditAlphanum4(a4) => Some(a4.issuer),
403                                                Asset::CreditAlphanum12(a12) => Some(a12.issuer),
404                                            } {
405                                                print.infoln(format!(
406                                                    "Adding asset issuer {issuer} to search"
407                                                ));
408                                                next.account_ids.insert(issuer);
409                                            }
410                                        }
411                                    }
412                                    _ => {}
413                                }
414                            }
415                            keep
416                        }
417                        _ => false,
418                    };
419                    if include {
420                        snapshot
421                            .ledger_entries
422                            .push((Box::new(key), (Box::new(val), Some(u32::MAX))));
423                        count_saved += 1;
424                    }
425                }
426                if count_saved > 0 {
427                    print.infoln(format!("Found {count_saved} entries"));
428                }
429            }
430            current = next;
431            next = SearchInputs::default();
432        }
433
434        // Write the snapshot to file.
435        snapshot
436            .write_file(&self.out)
437            .map_err(Error::WriteLedgerSnapshot)?;
438        print.saveln(format!(
439            "Saved {} entries to {:?}",
440            snapshot.ledger_entries.len(),
441            self.out
442        ));
443
444        let duration = Duration::from_secs(start.elapsed().as_secs());
445        print.checkln(format!("Completed in {}", format_duration(duration)));
446
447        Ok(())
448    }
449
450    fn archive_url(&self) -> Result<Url, Error> {
451        // Return the configured archive URL, or if one is not configured, guess
452        // at an appropriate archive URL given the network passphrase.
453        self.archive_url
454            .clone()
455            .or_else(|| {
456                self.network.get(&self.locator).ok().and_then(|network| {
457                    match network.network_passphrase.as_str() {
458                        passphrase::MAINNET => {
459                            Some("https://history.stellar.org/prd/core-live/core_live_001")
460                        }
461                        passphrase::TESTNET => {
462                            Some("https://history.stellar.org/prd/core-testnet/core_testnet_001")
463                        }
464                        passphrase::FUTURENET => Some("https://history-futurenet.stellar.org"),
465                        passphrase::LOCAL => Some("http://localhost:8000/archive"),
466                        _ => None,
467                    }
468                    .map(|s| Url::from_str(s).expect("archive url valid"))
469                })
470            })
471            .ok_or(Error::ArchiveUrlNotConfigured)
472    }
473
474    fn resolve_address_sync(
475        &self,
476        address: &str,
477        network_passphrase: &str,
478    ) -> Option<Either<AccountId, ScAddress>> {
479        if let Some(contract) = self.resolve_contract(address, network_passphrase) {
480            Some(Either::Right(contract))
481        } else {
482            self.resolve_account_sync(address).map(Either::Left)
483        }
484    }
485
486    // Resolve an account address to an account id. The address can be a
487    // G-address or a key name (as in `stellar keys address NAME`).
488    fn resolve_account_sync(&self, address: &str) -> Option<AccountId> {
489        let address: UnresolvedMuxedAccount = address.parse().ok()?;
490        let muxed_account = address.resolve_muxed_account(&self.locator, None).ok()?;
491        Some(muxed_account.account_id())
492    }
493
494    // Resolve a contract address to a contract id. The contract can be a
495    // C-address or a contract alias.
496    fn resolve_contract(&self, address: &str, network_passphrase: &str) -> Option<ScAddress> {
497        address.parse().ok().or_else(|| {
498            Some(ScAddress::Contract(stellar_xdr::curr::ContractId(
499                self.locator
500                    .resolve_contract_id(address, network_passphrase)
501                    .ok()?
502                    .0
503                    .into(),
504            )))
505        })
506    }
507}
508
509/// Copy decompressed data from `reader` to `writer`, enforcing a maximum
510/// decompressed size. Returns an error if the decompressed output exceeds
511/// `max_bytes`.
512async fn copy_with_limit<R: AsyncRead + Unpin, W: tokio::io::AsyncWrite + Unpin>(
513    reader: R,
514    writer: &mut W,
515    max_bytes: u64,
516) -> Result<(), Error> {
517    let mut limited = reader.take(max_bytes);
518    tokio::io::copy(&mut limited, writer)
519        .await
520        .map_err(Error::StreamingBucket)?;
521
522    // If the underlying reader still has data, the limit was exceeded.
523    let mut decoder = limited.into_inner();
524    let mut overflow = [0u8; 1];
525    if decoder
526        .read(&mut overflow)
527        .await
528        .map_err(Error::StreamingBucket)?
529        > 0
530    {
531        return Err(Error::DecompressedSizeLimitExceeded {
532            max: ByteSize(max_bytes),
533        });
534    }
535    Ok(())
536}
537
538fn ledger_to_path_components(ledger: u32) -> (String, String, String, String) {
539    let ledger_hex = format!("{ledger:08x}");
540    let ledger_hex_0 = ledger_hex[0..=1].to_string();
541    let ledger_hex_1 = ledger_hex[2..=3].to_string();
542    let ledger_hex_2 = ledger_hex[4..=5].to_string();
543    (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2)
544}
545
546async fn get_history(
547    print: &print::Print,
548    archive_url: &Url,
549    ledger: Option<u32>,
550) -> Result<History, Error> {
551    let archive_url = archive_url.to_string();
552    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
553    let history_url = if let Some(ledger) = ledger {
554        let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) =
555            ledger_to_path_components(ledger);
556        format!("{archive_url}/history/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/history-{ledger_hex}.json")
557    } else {
558        format!("{archive_url}/.well-known/stellar-history.json")
559    };
560    let history_url = Url::from_str(&history_url).unwrap();
561
562    print.globeln(format!("Downloading history {history_url}"));
563
564    let response = http::client()
565        .get(history_url.as_str())
566        .send()
567        .await
568        .map_err(Error::DownloadingHistory)?;
569
570    if !response.status().is_success() {
571        // Check ledger is a checkpoint ledger and available in archives.
572        if let Some(ledger) = ledger {
573            let ledger_offset = (ledger + 1) % CHECKPOINT_FREQUENCY;
574
575            if ledger_offset != 0 {
576                print.errorln(format!(
577                    "Ledger {ledger} may not be a checkpoint ledger, try {} or {}",
578                    ledger - ledger_offset,
579                    ledger + (CHECKPOINT_FREQUENCY - ledger_offset),
580                ));
581            }
582        }
583        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
584    }
585
586    let body = response
587        .bytes()
588        .await
589        .map_err(Error::ReadHistoryHttpStream)?;
590
591    print.clear_previous_line();
592    print.globeln(format!("Downloaded history {}", &history_url));
593
594    serde_json::from_slice::<History>(&body).map_err(Error::JsonDecodingHistory)
595}
596
597async fn get_ledger_metadata_from_archive(
598    print: &print::Print,
599    archive_url: &Url,
600    ledger: u32,
601) -> Result<(u64, u32), Error> {
602    let archive_url = archive_url.to_string();
603    let archive_url = archive_url.strip_suffix('/').unwrap_or(&archive_url);
604
605    // Calculate the path to the ledger header file
606    let (ledger_hex, ledger_hex_0, ledger_hex_1, ledger_hex_2) = ledger_to_path_components(ledger);
607    let ledger_url = format!(
608        "{archive_url}/ledger/{ledger_hex_0}/{ledger_hex_1}/{ledger_hex_2}/ledger-{ledger_hex}.xdr.gz"
609    );
610
611    print.globeln(format!("Downloading ledger headers {ledger_url}"));
612
613    let ledger_url = Url::from_str(&ledger_url).map_err(Error::ParsingBucketUrl)?;
614    let response = http::client()
615        .get(ledger_url.as_str())
616        .send()
617        .await
618        .map_err(Error::DownloadingHistory)?;
619
620    if !response.status().is_success() {
621        return Err(Error::DownloadingHistoryGotStatusCode(response.status()));
622    }
623
624    // Cache the ledger file to disk like bucket files
625    let ledger_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
626    let cache_path = ledger_dir.join(format!("ledger-{ledger_hex}.xdr"));
627    let dl_path = cache_path.with_extension("dl");
628
629    let stream = response
630        .bytes_stream()
631        .map(|result| result.map_err(std::io::Error::other));
632    let stream_reader = StreamReader::new(stream);
633    let buf_reader = BufReader::new(stream_reader);
634    let decoder = GzipDecoder::new(buf_reader);
635
636    let mut file = OpenOptions::new()
637        .create(true)
638        .truncate(true)
639        .write(true)
640        .open(&dl_path)
641        .await
642        .map_err(Error::WriteOpeningCachedBucket)?;
643
644    if let Err(e) = copy_with_limit(decoder, &mut file, MAX_LEDGER_HEADER_DECOMPRESSED_SIZE).await {
645        let _ = fs::remove_file(&dl_path);
646        return Err(e);
647    }
648
649    fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
650    let _ = crate::config::locator::set_hardened_permissions(&cache_path);
651
652    print.clear_previous_line();
653    print.globeln(format!("Downloaded ledger headers for ledger {ledger}"));
654
655    // Now read the cached file
656    let file = std::fs::File::open(&cache_path).map_err(Error::ReadOpeningCachedBucket)?;
657    let limited = &mut Limited::new(file, Limits::none());
658
659    // Find the specific ledger header entry we need
660    let entries = Frame::<LedgerHeaderHistoryEntry>::read_xdr_iter(limited);
661    for entry in entries {
662        let Frame(header_entry) = entry.map_err(Error::Xdr)?;
663
664        if header_entry.header.ledger_seq == ledger {
665            let close_time = header_entry.header.scp_value.close_time.0;
666            let base_reserve = header_entry.header.base_reserve;
667
668            return Ok((close_time, base_reserve));
669        }
670    }
671
672    Err(Error::LedgerNotFound)
673}
674
675fn validate_bucket_hash(cache_path: &PathBuf, expected_hash: &str) -> Result<(), Error> {
676    let file = std::fs::File::open(cache_path).map_err(Error::ReadOpeningCachedBucket)?;
677    let mut hasher = Sha256::new();
678    std::io::copy(&mut std::io::BufReader::new(file), &mut hasher)
679        .map_err(Error::ReadOpeningCachedBucket)?;
680    let actual_hash = hex::encode(hasher.finalize());
681
682    if actual_hash != expected_hash {
683        return Err(Error::CorruptedBucket {
684            expected: expected_hash.to_string(),
685            actual: actual_hash,
686        });
687    }
688
689    Ok(())
690}
691
692async fn cache_bucket(
693    print: &print::Print,
694    archive_url: &Url,
695    bucket_index: usize,
696    bucket: &str,
697) -> Result<PathBuf, Error> {
698    let bucket_dir = data::bucket_dir().map_err(Error::GetBucketDir)?;
699    let cache_path = bucket_dir.join(format!("bucket-{bucket}.xdr"));
700
701    // Validate cached bucket if it exists
702    if cache_path.exists() {
703        if validate_bucket_hash(&cache_path, bucket).is_err() {
704            print.warnln(format!(
705                "Cached bucket {bucket} is corrupted, re-downloading"
706            ));
707            std::fs::remove_file(&cache_path).ok();
708        } else {
709            return Ok(cache_path);
710        }
711    }
712
713    if !cache_path.exists() {
714        let bucket_0 = &bucket[0..=1];
715        let bucket_1 = &bucket[2..=3];
716        let bucket_2 = &bucket[4..=5];
717        let bucket_url =
718            format!("{archive_url}/bucket/{bucket_0}/{bucket_1}/{bucket_2}/bucket-{bucket}.xdr.gz");
719
720        print.globeln(format!("Downloading bucket {bucket_index} {bucket}…"));
721
722        let bucket_url = Url::from_str(&bucket_url).map_err(Error::ParsingBucketUrl)?;
723
724        let response = http::client()
725            .get(bucket_url.as_str())
726            .send()
727            .await
728            .map_err(Error::GettingBucket)?;
729
730        if !response.status().is_success() {
731            print.println("");
732            return Err(Error::GettingBucketGotStatusCode(response.status()));
733        }
734
735        if let Some(len) = response.content_length() {
736            print.clear_previous_line();
737            print.globeln(format!(
738                "Downloaded bucket {bucket_index} {bucket} ({})",
739                ByteSize(len)
740            ));
741        }
742
743        let stream = response
744            .bytes_stream()
745            .map(|result| result.map_err(std::io::Error::other));
746        let stream_reader = StreamReader::new(stream);
747        let buf_reader = BufReader::new(stream_reader);
748        let decoder = GzipDecoder::new(buf_reader);
749        let dl_path = cache_path.with_extension("dl");
750        let mut file = OpenOptions::new()
751            .create(true)
752            .truncate(true)
753            .write(true)
754            .open(&dl_path)
755            .await
756            .map_err(Error::WriteOpeningCachedBucket)?;
757
758        if let Err(e) = copy_with_limit(decoder, &mut file, MAX_BUCKET_DECOMPRESSED_SIZE).await {
759            let _ = fs::remove_file(&dl_path);
760            return Err(e);
761        }
762
763        fs::rename(&dl_path, &cache_path).map_err(Error::RenameDownloadFile)?;
764        let _ = crate::config::locator::set_hardened_permissions(&cache_path);
765    }
766    Ok(cache_path)
767}
768
769#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
770#[serde(rename_all = "camelCase")]
771struct History {
772    current_ledger: u32,
773    current_buckets: Vec<HistoryBucket>,
774    network_passphrase: String,
775}
776
777#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
778#[serde(rename_all = "camelCase")]
779struct HistoryBucket {
780    curr: String,
781    snap: String,
782}
783
784#[cfg(test)]
785mod test {
786    use super::*;
787
788    #[tokio::test]
789    async fn test_copy_with_limit_under_limit() {
790        let input: &[u8] = b"hello";
791        let mut output = Vec::new();
792        copy_with_limit(input, &mut output, 10).await.unwrap();
793        assert_eq!(output, b"hello");
794    }
795
796    #[tokio::test]
797    async fn test_copy_with_limit_exact_limit() {
798        let input: &[u8] = b"hello";
799        let mut output = Vec::new();
800        copy_with_limit(input, &mut output, 5).await.unwrap();
801        assert_eq!(output, b"hello");
802    }
803
804    #[tokio::test]
805    async fn test_copy_with_limit_over_limit() {
806        let input: &[u8] = b"hello world, this exceeds the limit";
807        let mut output = Vec::new();
808        let err = copy_with_limit(input, &mut output, 10).await.unwrap_err();
809        assert!(
810            matches!(err, Error::DecompressedSizeLimitExceeded { .. }),
811            "expected DecompressedSizeLimitExceeded, got: {err}"
812        );
813    }
814}