forest/tool/subcommands/
archive_cmd.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Archives are key-value pairs encoded as
5//! [CAR files](https://ipld.io/specs/transport/car/carv1/). The key-value pairs
6//! represent a directed, acyclic graph (DAG). This graph is often a subset of a larger
7//! graph and references to missing keys are common.
8//!
9//! Each graph contains blocks, messages, state trees, and miscellaneous data
10//! such as compiled `WASM` code. The amount of data differs greatly in different
11//! kinds of archives. While there are no fixed definitions, there are three
12//! common kind of archives:
13//! - A full archive contains a complete graph with no missing nodes. These
14//!   archives are large (14 TiB for Filecoin's mainnet) and only used in special
15//!   situations.
16//! - A lite-archive typically has roughly 3 million blocks, 2000 complete sets of
17//!   state-roots, and 2000 sets of messages. These archives usually take up
18//!   roughly 100 GiB.
19//! - A diff-archive contains the subset of nodes that are _not_ shared by two
20//!   other archives. These archives are much smaller but can rarely be used on
21//!   their own. They are typically merged with other archives before use.
22//!
23//! The sub-commands in this module manipulate archive files without needing a
24//! running Forest-daemon or a separate database. Operations are carried out
25//! directly on CAR files.
26//!
27//! Additional reading: [`crate::db::car::plain`]
28
29use crate::blocks::Tipset;
30use crate::chain::{
31    ChainEpochDelta, ExportOptions, FilecoinSnapshotMetadata, FilecoinSnapshotVersion,
32    index::{ChainIndex, ResolveNullTipset},
33};
34use crate::cid_collections::CidHashSet;
35use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
36use crate::daemon::bundle::load_actor_bundles;
37use crate::db::car::{AnyCar, ManyCar, forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL};
38use crate::f3::snapshot::F3SnapshotHeader;
39use crate::interpreter::VMTrace;
40use crate::ipld::{stream_chain, stream_graph};
41use crate::networks::{ChainConfig, NetworkChain, butterflynet, calibnet, mainnet};
42use crate::shim::address::CurrentNetwork;
43use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS, EPOCHS_IN_DAY};
44use crate::shim::fvm_shared_latest::address::Network;
45use crate::shim::machine::GLOBAL_MULTI_ENGINE;
46use crate::state_manager::{NO_CALLBACK, StateOutput, apply_block_messages};
47use crate::utils::db::car_stream::{CarBlock, CarBlockWrite as _, CarStream};
48use crate::utils::multihash::MultihashCode;
49use anyhow::{Context as _, bail};
50use chrono::DateTime;
51use cid::Cid;
52use clap::{Subcommand, ValueEnum};
53use dialoguer::{Confirm, theme::ColorfulTheme};
54use futures::{StreamExt as _, TryStreamExt as _};
55use fvm_ipld_blockstore::Blockstore;
56use fvm_ipld_encoding::DAG_CBOR;
57use indicatif::{ProgressBar, ProgressIterator, ProgressStyle};
58use itertools::Itertools;
59use multihash_derive::MultihashDigest as _;
60use sha2::Sha256;
61use std::fs::File;
62use std::io::{BufReader, Seek as _, SeekFrom};
63use std::ops::Range;
64use std::path::PathBuf;
65use std::sync::Arc;
66use tokio::io::{AsyncWriteExt, BufWriter};
67use tracing::info;
68
69#[derive(Debug, Clone, ValueEnum)]
70pub enum ExportMode {
71    /// Export all types of snapshots.
72    All,
73    /// Export only lite snapshots.
74    Lite,
75    /// Export only diff snapshots.
76    Diff,
77}
78
79impl ExportMode {
80    pub fn lite(&self) -> bool {
81        matches!(self, ExportMode::All | ExportMode::Lite)
82    }
83
84    pub fn diff(&self) -> bool {
85        matches!(self, ExportMode::All | ExportMode::Diff)
86    }
87}
88
89#[derive(Debug, Subcommand)]
90pub enum ArchiveCommands {
91    /// Show basic information about an archive.
92    Info {
93        /// Path to an archive (`.car` or `.car.zst`).
94        snapshot: PathBuf,
95    },
96    /// Show FRC-0108 metadata of an Filecoin snapshot archive.
97    Metadata {
98        /// Path to an archive (`.car` or `.car.zst`).
99        snapshot: PathBuf,
100    },
101    /// Show FRC-0108 header of a standalone F3 snapshot.
102    F3Header {
103        /// Path to a standalone F3 snapshot.
104        snapshot: PathBuf,
105    },
106    /// Trim a snapshot of the chain and write it to `<output_path>`
107    Export {
108        /// Snapshot input path. Currently supports only `.car` file format.
109        #[arg(required = true)]
110        snapshot_files: Vec<PathBuf>,
111        /// Snapshot output filename or directory. Defaults to
112        /// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
113        #[arg(short, long, default_value = ".", verbatim_doc_comment)]
114        output_path: PathBuf,
115        /// Latest epoch that has to be exported for this snapshot, the upper bound. This value
116        /// cannot be greater than the latest epoch available in the input snapshot.
117        #[arg(short, long)]
118        epoch: Option<ChainEpoch>,
119        /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
120        #[arg(short, long, default_value_t = 2000)]
121        depth: ChainEpochDelta,
122        /// Do not include any values reachable from this epoch.
123        #[arg(long)]
124        diff: Option<ChainEpoch>,
125        /// How many state-roots to include when computing the diff set. All
126        /// state-roots are included if this flag is not set.
127        #[arg(long)]
128        diff_depth: Option<ChainEpochDelta>,
129        /// Overwrite output file without prompting.
130        #[arg(long, default_value_t = false)]
131        force: bool,
132    },
133    /// Print block headers at 30 day interval for a snapshot file
134    Checkpoints {
135        /// Path to snapshot file.
136        #[arg(required = true)]
137        snapshot_files: Vec<PathBuf>,
138    },
139    /// Merge snapshot archives into a single file. The output snapshot refers
140    /// to the heaviest tipset in the input set.
141    Merge {
142        /// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`.
143        #[arg(required = true)]
144        snapshot_files: Vec<PathBuf>,
145        /// Snapshot output filename or directory. Defaults to
146        /// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
147        #[arg(short, long, default_value = ".", verbatim_doc_comment)]
148        output_path: PathBuf,
149        /// Overwrite output file without prompting.
150        #[arg(long, default_value_t = false)]
151        force: bool,
152    },
153    /// Merge a v1 Filecoin snapshot with an F3 snapshot into a v2 Filecoin snapshot in `.forest.car.zst` format
154    MergeF3 {
155        /// Path to the v1 Filecoin snapshot
156        #[arg(long = "v1")]
157        filecoin_v1: PathBuf,
158        /// Path to the F3 snapshot
159        #[arg(long)]
160        f3: PathBuf,
161        /// Path to the snapshot output file in `.forest.car.zst` format
162        #[arg(long)]
163        output: PathBuf,
164    },
165    /// Show the difference between the canonical and computed state of a
166    /// tipset.
167    Diff {
168        /// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`.
169        #[arg(required = true)]
170        snapshot_files: Vec<PathBuf>,
171        /// Selected epoch to validate.
172        #[arg(long)]
173        epoch: ChainEpoch,
174        // Depth of diffing. Differences in trees below this depth will just be
175        // shown as different branch IDs.
176        #[arg(long)]
177        depth: Option<u64>,
178    },
179    /// Export lite and diff snapshots from one or more CAR files, and upload them
180    /// to an `S3` bucket.
181    SyncBucket {
182        #[arg(required = true)]
183        snapshot_files: Vec<PathBuf>,
184        /// `S3` endpoint URL.
185        #[arg(long, default_value = FOREST_ARCHIVE_S3_ENDPOINT)]
186        endpoint: String,
187        /// Don't generate or upload files, just show what would be done.
188        #[arg(long, default_value_t = false)]
189        dry_run: bool,
190        /// Export mode
191        #[arg(long, value_enum, default_value_t = ExportMode::All)]
192        export_mode: ExportMode,
193    },
194}
195
196impl ArchiveCommands {
197    pub async fn run(self) -> anyhow::Result<()> {
198        match self {
199            Self::Info { snapshot } => {
200                let store = AnyCar::try_from(snapshot.as_path())?;
201                let variant = store.variant().to_string();
202                let heaviest = store.heaviest_tipset()?;
203                let index_size_bytes = store.index_size_bytes();
204                let snapshot_version = if let Some(metadata) = store.metadata() {
205                    metadata.version
206                } else {
207                    FilecoinSnapshotVersion::V1
208                };
209                println!(
210                    "{}",
211                    ArchiveInfo::from_store(
212                        &store,
213                        variant,
214                        heaviest,
215                        snapshot_version,
216                        index_size_bytes
217                    )?
218                );
219                Ok(())
220            }
221            Self::Metadata { snapshot } => {
222                let store = AnyCar::try_from(snapshot.as_path())?;
223                if let Some(metadata) = store.metadata() {
224                    println!("{metadata}");
225                    if let Some(f3_cid) = metadata.f3_data {
226                        let mut f3_data = store
227                            .get_reader(f3_cid)?
228                            .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
229                        let f3_snap_header = F3SnapshotHeader::decode_from_snapshot(&mut f3_data)?;
230                        println!("{f3_snap_header}");
231                    }
232                } else {
233                    println!(
234                        "No metadata found (required by v2 snapshot) - this appears to be a v1 snapshot"
235                    );
236                }
237                Ok(())
238            }
239            Self::F3Header { snapshot } => {
240                let mut r = BufReader::new(File::open(&snapshot).with_context(|| {
241                    format!("failed to open F3 snapshot '{}'", snapshot.display())
242                })?);
243                let f3_snap_header =
244                    F3SnapshotHeader::decode_from_snapshot(&mut r).with_context(|| {
245                        format!(
246                            "failed to decode F3 snapshot header from '{}'",
247                            snapshot.display()
248                        )
249                    })?;
250                println!("{f3_snap_header}");
251                Ok(())
252            }
253            Self::Export {
254                snapshot_files,
255                output_path,
256                epoch,
257                depth,
258                diff,
259                diff_depth,
260                force,
261            } => {
262                let store = ManyCar::try_from(snapshot_files)?;
263                let heaviest_tipset = store.heaviest_tipset()?;
264                do_export(
265                    &store.into(),
266                    heaviest_tipset.into(),
267                    output_path,
268                    epoch,
269                    depth,
270                    diff,
271                    diff_depth,
272                    force,
273                )
274                .await
275            }
276            Self::Checkpoints {
277                snapshot_files: snapshot,
278            } => print_checkpoints(snapshot),
279            Self::Merge {
280                snapshot_files,
281                output_path,
282                force,
283            } => merge_snapshots(snapshot_files, output_path, force).await,
284            Self::MergeF3 {
285                filecoin_v1,
286                f3,
287                output,
288            } => merge_f3_snapshot(filecoin_v1, f3, output).await,
289            Self::Diff {
290                snapshot_files,
291                epoch,
292                depth,
293            } => show_tipset_diff(snapshot_files, epoch, depth).await,
294            Self::SyncBucket {
295                snapshot_files,
296                endpoint,
297                dry_run,
298                export_mode,
299            } => sync_bucket(snapshot_files, endpoint, dry_run, export_mode).await,
300        }
301    }
302}
303
304#[derive(Debug)]
305pub struct ArchiveInfo {
306    variant: String,
307    network: String,
308    epoch: ChainEpoch,
309    tipsets: ChainEpoch,
310    messages: ChainEpoch,
311    head: Tipset,
312    snapshot_version: FilecoinSnapshotVersion,
313    index_size_bytes: Option<u32>,
314}
315
316impl std::fmt::Display for ArchiveInfo {
317    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
318        writeln!(f, "CAR format:       {}", self.variant)?;
319        writeln!(f, "Snapshot version: {}", self.snapshot_version as u64)?;
320        writeln!(f, "Network:          {}", self.network)?;
321        writeln!(f, "Epoch:            {}", self.epoch)?;
322        writeln!(f, "State-roots:      {}", self.epoch - self.tipsets + 1)?;
323        writeln!(f, "Messages sets:    {}", self.epoch - self.messages + 1)?;
324        let head_tipset_key_string = self
325            .head
326            .cids()
327            .iter()
328            .map(Cid::to_string)
329            .join("\n                  ");
330        write!(f, "Head Tipset:      {head_tipset_key_string}")?;
331        if let Some(index_size_bytes) = self.index_size_bytes {
332            writeln!(f)?;
333            write!(
334                f,
335                "Index size:       {}",
336                human_bytes::human_bytes(index_size_bytes)
337            )?;
338        }
339        Ok(())
340    }
341}
342
343impl ArchiveInfo {
344    // Scan a CAR archive to identify which network it belongs to and how many
345    // tipsets/messages are available. Progress is rendered to stdout.
346    fn from_store(
347        store: &impl Blockstore,
348        variant: String,
349        heaviest_tipset: Tipset,
350        snapshot_version: FilecoinSnapshotVersion,
351        index_size_bytes: Option<u32>,
352    ) -> anyhow::Result<Self> {
353        Self::from_store_with(
354            store,
355            variant,
356            heaviest_tipset,
357            snapshot_version,
358            index_size_bytes,
359            true,
360        )
361    }
362
363    // Scan a CAR archive to identify which network it belongs to and how many
364    // tipsets/messages are available. Progress is optionally rendered to
365    // stdout.
366    fn from_store_with(
367        store: &impl Blockstore,
368        variant: String,
369        heaviest_tipset: Tipset,
370        snapshot_version: FilecoinSnapshotVersion,
371        index_size_bytes: Option<u32>,
372        progress: bool,
373    ) -> anyhow::Result<Self> {
374        let head = heaviest_tipset;
375        let root_epoch = head.epoch();
376
377        let tipsets = head.clone().chain(store);
378
379        let windowed = std::iter::once(head.clone()).chain(tipsets).tuple_windows();
380
381        let mut network: String = "unknown".into();
382        let mut lowest_stateroot_epoch = root_epoch;
383        let mut lowest_message_epoch = root_epoch;
384
385        let iter = if progress {
386            itertools::Either::Left(windowed.progress_count(root_epoch as u64))
387        } else {
388            itertools::Either::Right(windowed)
389        };
390
391        for (parent, tipset) in iter {
392            if tipset.epoch() >= parent.epoch() && parent.epoch() != root_epoch {
393                bail!("Broken invariant: non-sequential epochs");
394            }
395
396            if tipset.epoch() < 0 {
397                bail!("Broken invariant: tipset with negative epoch");
398            }
399
400            // Update the lowest-stateroot-epoch only if our parent also has a
401            // state-root. The genesis state-root is usually available but we're
402            // not interested in that.
403            if lowest_stateroot_epoch == parent.epoch() && store.has(tipset.parent_state())? {
404                lowest_stateroot_epoch = tipset.epoch();
405            }
406            if lowest_message_epoch == parent.epoch()
407                && store.has(&tipset.min_ticket_block().messages)?
408            {
409                lowest_message_epoch = tipset.epoch();
410            }
411
412            let mut update_network_name = |block_cid: &Cid| {
413                if block_cid == &*calibnet::GENESIS_CID {
414                    network = calibnet::NETWORK_COMMON_NAME.into();
415                } else if block_cid == &*mainnet::GENESIS_CID {
416                    network = mainnet::NETWORK_COMMON_NAME.into();
417                } else if block_cid == &*butterflynet::GENESIS_CID {
418                    network = butterflynet::NETWORK_COMMON_NAME.into();
419                }
420            };
421
422            if tipset.epoch() == 0 {
423                let block_cid = tipset.min_ticket_block().cid();
424                update_network_name(block_cid);
425            }
426
427            // If we've already found the lowest-stateroot-epoch and
428            // lowest-message-epoch then we can skip scanning the rest of the
429            // archive when we find a checkpoint.
430            let may_skip =
431                lowest_stateroot_epoch != tipset.epoch() && lowest_message_epoch != tipset.epoch();
432            if may_skip {
433                let genesis_block = tipset.genesis(&store)?;
434                update_network_name(genesis_block.cid());
435                break;
436            }
437        }
438
439        Ok(ArchiveInfo {
440            variant,
441            network,
442            epoch: root_epoch,
443            tipsets: lowest_stateroot_epoch,
444            messages: lowest_message_epoch,
445            head,
446            snapshot_version,
447            index_size_bytes,
448        })
449    }
450
451    fn epoch_range(&self) -> Range<ChainEpoch> {
452        self.tipsets..self.epoch
453    }
454}
455
456// Print a mapping of epochs to block headers in yaml format. This mapping can
457// be used by Forest to quickly identify tipsets.
458fn print_checkpoints(snapshot_files: Vec<PathBuf>) -> anyhow::Result<()> {
459    let store = ManyCar::try_from(snapshot_files).context("couldn't read input CAR file")?;
460    let root = store.heaviest_tipset()?;
461
462    let genesis = root.genesis(&store)?;
463    let chain_name =
464        NetworkChain::from_genesis(genesis.cid()).context("Unrecognizable genesis block")?;
465
466    println!("{chain_name}:");
467    for (epoch, cid) in list_checkpoints(&store, root) {
468        println!("  {epoch}: {cid}");
469    }
470    Ok(())
471}
472
473fn list_checkpoints(
474    db: &impl Blockstore,
475    root: Tipset,
476) -> impl Iterator<Item = (ChainEpoch, cid::Cid)> + '_ {
477    let interval = EPOCHS_IN_DAY * 30;
478    let mut target_epoch = root.epoch() - root.epoch() % interval;
479    root.chain(db).filter_map(move |tipset| {
480        if tipset.epoch() <= target_epoch && tipset.epoch() != 0 {
481            target_epoch -= interval;
482            Some((tipset.epoch(), *tipset.min_ticket_block().cid()))
483        } else {
484            None
485        }
486    })
487}
488
489// This does nothing if the output path is a file. If it is a directory - it produces the following:
490// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
491fn build_output_path(
492    chain: String,
493    genesis_timestamp: u64,
494    epoch: ChainEpoch,
495    output_path: PathBuf,
496) -> PathBuf {
497    match output_path.is_dir() {
498        true => output_path.join(snapshot::filename(
499            TrustedVendor::Forest,
500            chain,
501            DateTime::from_timestamp(genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS, 0)
502                .unwrap_or_default()
503                .naive_utc()
504                .date(),
505            epoch,
506            true,
507        )),
508        false => output_path.clone(),
509    }
510}
511
512#[allow(clippy::too_many_arguments)]
513pub async fn do_export(
514    store: &Arc<impl Blockstore + Send + Sync + 'static>,
515    root: Arc<Tipset>,
516    output_path: PathBuf,
517    epoch_option: Option<ChainEpoch>,
518    depth: ChainEpochDelta,
519    diff: Option<ChainEpoch>,
520    diff_depth: Option<ChainEpochDelta>,
521    force: bool,
522) -> anyhow::Result<()> {
523    let ts = root;
524
525    let genesis = ts.genesis(store)?;
526    let network = NetworkChain::from_genesis_or_devnet_placeholder(genesis.cid());
527
528    let epoch = epoch_option.unwrap_or(ts.epoch());
529
530    let finality = ChainConfig::from_chain(&network)
531        .policy
532        .chain_finality
533        .min(epoch);
534    if depth < finality {
535        bail!("For {}, depth has to be at least {}.", network, finality);
536    }
537
538    info!("looking up a tipset by epoch: {}", epoch);
539
540    let index = ChainIndex::new(store.clone());
541
542    let ts = index
543        .tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
544        .context("unable to get a tipset at given height")?;
545
546    let seen = if let Some(diff) = diff {
547        let diff_ts: Arc<Tipset> = index
548            .tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder)
549            .context("diff epoch must be smaller than target epoch")?;
550        let diff_ts: &Tipset = &diff_ts;
551        let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0);
552        let mut stream = stream_chain(
553            store.clone(),
554            diff_ts.clone().chain_owned(store.clone()),
555            diff_limit,
556        );
557        while stream.try_next().await?.is_some() {}
558        stream.into_seen()
559    } else {
560        CidHashSet::default()
561    };
562
563    let output_path = build_output_path(network.to_string(), genesis.timestamp, epoch, output_path);
564
565    if !force && output_path.exists() {
566        let have_permission = Confirm::with_theme(&ColorfulTheme::default())
567            .with_prompt(format!(
568                "{} will be overwritten. Continue?",
569                output_path.to_string_lossy()
570            ))
571            .default(false)
572            .interact()
573            // e.g not a tty (or some other error), so haven't got permission.
574            .unwrap_or(false);
575        if !have_permission {
576            return Ok(());
577        }
578    }
579
580    let writer = tokio::fs::File::create(&output_path)
581        .await
582        .with_context(|| {
583            format!(
584                "unable to create a snapshot - is the output path '{}' correct?",
585                output_path.to_str().unwrap_or_default()
586            )
587        })?;
588
589    info!(
590        "exporting snapshot at location: {}",
591        output_path.to_str().unwrap_or_default()
592    );
593
594    let pb = ProgressBar::new_spinner().with_style(
595        ProgressStyle::with_template(
596            "{spinner} exported {total_bytes} with {binary_bytes_per_sec} in {elapsed}",
597        )
598        .expect("indicatif template must be valid"),
599    );
600    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
601    let writer = pb.wrap_async_write(writer);
602
603    crate::chain::export::<Sha256>(
604        store,
605        &ts,
606        depth,
607        writer,
608        Some(ExportOptions {
609            skip_checksum: true,
610            seen,
611        }),
612    )
613    .await?;
614
615    Ok(())
616}
617
618// TODO(lemmih): https://github.com/ChainSafe/forest/issues/3347
619//               Testing with diff snapshots can be significantly improved
620/// Merge a set of snapshots (diff snapshots or lite snapshots). The output
621/// snapshot links to the heaviest tipset in the input set.
622async fn merge_snapshots(
623    snapshot_files: Vec<PathBuf>,
624    output_path: PathBuf,
625    force: bool,
626) -> anyhow::Result<()> {
627    use crate::db::car::forest;
628
629    let store = ManyCar::try_from(snapshot_files)?;
630    let heaviest_tipset = store.heaviest_tipset()?;
631    let roots = heaviest_tipset.key().to_cids();
632
633    if !force && output_path.exists() {
634        let have_permission = Confirm::with_theme(&ColorfulTheme::default())
635            .with_prompt(format!(
636                "{} will be overwritten. Continue?",
637                output_path.to_string_lossy()
638            ))
639            .default(false)
640            .interact()
641            // e.g not a tty (or some other error), so haven't got permission.
642            .unwrap_or(false);
643        if !have_permission {
644            return Ok(());
645        }
646    }
647
648    let mut writer = BufWriter::new(tokio::fs::File::create(&output_path).await.context(
649        format!(
650            "unable to create a snapshot - is the output path '{}' correct?",
651            output_path.to_str().unwrap_or_default()
652        ),
653    )?);
654
655    // Stream all available blocks from heaviest_tipset to genesis.
656    let blocks = stream_graph(&store, heaviest_tipset.chain(&store), 0);
657
658    // Encode Ipld key-value pairs in zstd frames
659    let frames = forest::Encoder::compress_stream_default(blocks);
660
661    // Write zstd frames and include a skippable index
662    forest::Encoder::write(&mut writer, roots, frames).await?;
663
664    // Flush to ensure everything has been successfully written
665    writer.flush().await.context("failed to flush")?;
666
667    Ok(())
668}
669
670async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> anyhow::Result<()> {
671    let store = AnyCar::try_from(filecoin.as_path())?;
672    anyhow::ensure!(
673        store.metadata().is_none(),
674        "The filecoin snapshot is not in v1 format"
675    );
676    drop(store);
677
678    let mut f3_data = File::open(f3)?;
679    let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?;
680
681    let car_stream = CarStream::new_from_path(&filecoin).await?;
682    let chain_head = car_stream.header_v1.roots.clone();
683
684    println!("f3 snapshot cid: {f3_cid}");
685    println!(
686        "chain head:      [{}]",
687        chain_head.iter().map(|c| c.to_string()).join(", ")
688    );
689
690    let snap_meta = FilecoinSnapshotMetadata::new_v2(chain_head, Some(f3_cid));
691    let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
692    let snap_meta_block = CarBlock {
693        cid: Cid::new_v1(
694            DAG_CBOR,
695            MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
696        ),
697        data: snap_meta_cbor_encoded,
698    };
699
700    let roots = nunny::vec![snap_meta_block.cid];
701    let snap_meta_frame = {
702        let mut encoder =
703            crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
704        snap_meta_block.write(&mut encoder)?;
705        anyhow::Ok((
706            vec![snap_meta_block.cid],
707            crate::db::car::forest::finalize_frame(
708                DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
709                &mut encoder,
710            )?,
711        ))
712    };
713    let f3_frame = {
714        let mut encoder =
715            crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
716        let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
717        f3_data.seek(SeekFrom::Start(0))?;
718        encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
719        anyhow::Ok((
720            vec![f3_cid],
721            crate::db::car::forest::finalize_frame(
722                DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
723                &mut encoder,
724            )?,
725        ))
726    };
727
728    let block_frames = crate::db::car::forest::Encoder::compress_stream_default(
729        car_stream.map_err(anyhow::Error::from),
730    );
731    let frames = futures::stream::iter([snap_meta_frame, f3_frame]).chain(block_frames);
732
733    let temp_output = {
734        let mut dir = output.clone();
735        if dir.pop() {
736            tempfile::NamedTempFile::new_in(dir)?
737        } else {
738            tempfile::NamedTempFile::new_in(".")?
739        }
740    };
741    let writer = tokio::io::BufWriter::new(tokio::fs::File::create(&temp_output).await?);
742    let pb = ProgressBar::new_spinner().with_style(
743        ProgressStyle::with_template(
744            "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
745        )
746        .expect("indicatif template must be valid"),
747    ).with_message(format!("Merging into {} ...", output.display()));
748    pb.enable_steady_tick(std::time::Duration::from_secs(1));
749    let mut writer = pb.wrap_async_write(writer);
750    crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
751    writer.shutdown().await?;
752    temp_output.persist(&output)?;
753    pb.finish();
754
755    Ok(())
756}
757
758/// Compute the tree of actor states for a given epoch and compare it to the
759/// expected result (as encoded in the blockchain). Differences are printed
760/// using the diff format (red for the blockchain state, green for the computed
761/// state).
762async fn show_tipset_diff(
763    snapshot_files: Vec<PathBuf>,
764    epoch: ChainEpoch,
765    depth: Option<u64>,
766) -> anyhow::Result<()> {
767    use colored::*;
768
769    let store = Arc::new(ManyCar::try_from(snapshot_files)?);
770
771    let heaviest_tipset = Arc::new(store.heaviest_tipset()?);
772    if heaviest_tipset.epoch() <= epoch {
773        anyhow::bail!(
774            "Highest epoch must be at least 1 greater than the target epoch. \
775             Highest epoch = {}, target epoch = {}.",
776            heaviest_tipset.epoch(),
777            epoch
778        )
779    }
780
781    let genesis = heaviest_tipset.genesis(&store)?;
782    let network = NetworkChain::from_genesis_or_devnet_placeholder(genesis.cid());
783    load_actor_bundles(&store, &network).await?;
784
785    let timestamp = genesis.timestamp;
786    let chain_index = ChainIndex::new(Arc::clone(&store));
787    let chain_config = ChainConfig::from_chain(&network);
788    if chain_config.is_testnet() {
789        CurrentNetwork::set_global(Network::Testnet);
790    }
791    let beacon = Arc::new(chain_config.get_beacon_schedule(timestamp));
792    let tipset = chain_index.tipset_by_height(
793        epoch,
794        Arc::clone(&heaviest_tipset),
795        ResolveNullTipset::TakeOlder,
796    )?;
797
798    let child_tipset = chain_index.tipset_by_height(
799        epoch + 1,
800        Arc::clone(&heaviest_tipset),
801        ResolveNullTipset::TakeNewer,
802    )?;
803
804    let StateOutput { state_root, .. } = apply_block_messages(
805        timestamp,
806        Arc::new(chain_index),
807        Arc::new(chain_config),
808        beacon,
809        &GLOBAL_MULTI_ENGINE,
810        tipset,
811        NO_CALLBACK,
812        VMTrace::NotTraced,
813    )?;
814
815    if child_tipset.parent_state() != &state_root {
816        println!(
817            "{}",
818            format!("- Expected state hash: {}", child_tipset.parent_state()).red()
819        );
820        println!("{}", format!("+ Computed state hash: {state_root}").green());
821
822        crate::statediff::print_state_diff(
823            &store,
824            &state_root,
825            child_tipset.parent_state(),
826            depth,
827        )?;
828    } else {
829        println!("Computed state matches expected state.");
830    }
831
832    Ok(())
833}
834
835fn steps_in_range(
836    range: &Range<ChainEpoch>,
837    step_size: ChainEpochDelta,
838    offset: ChainEpochDelta,
839) -> impl Iterator<Item = ChainEpoch> {
840    let start = range.start / step_size;
841    (start..)
842        .map(move |x| x * step_size)
843        .skip_while(move |&x| x - offset < range.start)
844        .take_while(move |&x| x <= range.end)
845}
846
847fn epoch_to_date(genesis_timestamp: u64, epoch: ChainEpoch) -> anyhow::Result<String> {
848    Ok(
849        DateTime::from_timestamp(genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS, 0)
850            .unwrap_or_default()
851            .format("%Y-%m-%d")
852            .to_string(),
853    )
854}
855
856fn format_lite_snapshot(
857    network: &str,
858    genesis_timestamp: u64,
859    epoch: ChainEpoch,
860) -> anyhow::Result<String> {
861    Ok(format!(
862        "forest_snapshot_{network}_{date}_height_{epoch}.forest.car.zst",
863        date = epoch_to_date(genesis_timestamp, epoch)?,
864        epoch = epoch
865    ))
866}
867
868fn format_diff_snapshot(
869    network: &str,
870    genesis_timestamp: u64,
871    epoch: ChainEpoch,
872) -> anyhow::Result<String> {
873    Ok(format!(
874        "forest_diff_{network}_{date}_height_{epoch}+3000.forest.car.zst",
875        date = epoch_to_date(genesis_timestamp, epoch)?,
876        epoch = epoch - 3000
877    ))
878}
879
880// Check if
881// forest-internal.chainsafe.dev/{network}/lite/forest_snapshot_{network}_{date}_height_{epoch}.forest.car.zst
882// exists.
883async fn bucket_has_lite_snapshot(
884    network: &str,
885    genesis_timestamp: u64,
886    epoch: ChainEpoch,
887) -> anyhow::Result<bool> {
888    let url = format!(
889        "https://forest-internal.chainsafe.dev/{}/lite/{}",
890        network,
891        format_lite_snapshot(network, genesis_timestamp, epoch)?
892    );
893    let response = reqwest::Client::new().get(url).send().await?;
894    Ok(response.status().is_success())
895}
896
897// Check if
898// forest-internal.chainsafe.dev/{network}/diff/forest_diff_{network}_{date}_height_{epoch}+3000.forest.car.zst
899// exists.
900async fn bucket_has_diff_snapshot(
901    network: &str,
902    genesis_timestamp: u64,
903    epoch: ChainEpoch,
904) -> anyhow::Result<bool> {
905    let url = format!(
906        "https://forest-internal.chainsafe.dev/{}/diff/{}",
907        network,
908        format_diff_snapshot(network, genesis_timestamp, epoch)?
909    );
910    let response = reqwest::Client::new().head(url).send().await?;
911    Ok(response.status().is_success())
912}
913
914const FOREST_ARCHIVE_S3_ENDPOINT: &str =
915    "https://2238a825c5aca59233eab1f221f7aefb.r2.cloudflarestorage.com";
916
917/// Check if the AWS CLI is installed and correctly configured.
918fn check_aws_config(endpoint: &str) -> anyhow::Result<()> {
919    let status = std::process::Command::new("aws")
920        .arg("help")
921        .stdout(std::process::Stdio::null())
922        .status()
923        .map_err(|e| anyhow::anyhow!("Failed to execute 'aws help': {}", e))?;
924
925    if !status.success() {
926        bail!(
927            "'aws help' failed with status code: {}. Please ensure that the AWS CLI is installed and configured.",
928            status
929        );
930    }
931
932    let status = std::process::Command::new("aws")
933        .args(["s3", "ls", "s3://forest-archive/", "--endpoint", endpoint])
934        .stdout(std::process::Stdio::null())
935        .status()
936        .map_err(|e| anyhow::anyhow!("Failed to execute 'aws s3 ls': {}", e))?;
937
938    if !status.success() {
939        bail!(
940            "'aws s3 ls' failed with status code: {}. Please check your AWS credentials.",
941            status
942        );
943    }
944    Ok(())
945}
946
947/// Use the AWS CLI to upload a snapshot file to the `S3` bucket.
948fn upload_to_forest_bucket(path: PathBuf, network: &str, tag: &str) -> anyhow::Result<()> {
949    let status = std::process::Command::new("aws")
950        .args([
951            "s3",
952            "cp",
953            "--acl",
954            "public-read",
955            path.to_str().unwrap(),
956            &format!("s3://forest-archive/{network}/{tag}/"),
957            "--endpoint",
958            FOREST_ARCHIVE_S3_ENDPOINT,
959        ])
960        .status()
961        .map_err(|e| anyhow::anyhow!("Failed to execute 'aws s3 cp': {}", e))?;
962
963    if !status.success() {
964        bail!(
965            "'aws s3 cp' failed with status code: {}. Upload failed.",
966            status
967        );
968    }
969    Ok(())
970}
971
972/// Given a block store, export a lite snapshot for a given epoch.
973async fn export_lite_snapshot(
974    store: Arc<impl Blockstore + Send + Sync + 'static>,
975    root: Tipset,
976    network: &str,
977    genesis_timestamp: u64,
978    epoch: ChainEpoch,
979) -> anyhow::Result<PathBuf> {
980    let output_path: PathBuf = format_lite_snapshot(network, genesis_timestamp, epoch)?.into();
981
982    // Skip if file already exists
983    if output_path.exists() {
984        return Ok(output_path);
985    }
986
987    let depth = 900;
988    let diff = None;
989    let diff_depth = None;
990    let force = false;
991    do_export(
992        &store,
993        root.into(),
994        output_path.clone(),
995        Some(epoch),
996        depth,
997        diff,
998        diff_depth,
999        force,
1000    )
1001    .await?;
1002    Ok(output_path)
1003}
1004
1005/// Given a block store, export a diff snapshot for a given epoch.
1006async fn export_diff_snapshot(
1007    store: Arc<impl Blockstore + Send + Sync + 'static>,
1008    root: Tipset,
1009    network: &str,
1010    genesis_timestamp: u64,
1011    epoch: ChainEpoch,
1012) -> anyhow::Result<PathBuf> {
1013    let output_path: PathBuf = format_diff_snapshot(network, genesis_timestamp, epoch)?.into();
1014
1015    // Skip if file already exists
1016    if output_path.exists() {
1017        return Ok(output_path);
1018    }
1019
1020    let depth = 3_000;
1021    let diff = Some(epoch - depth);
1022    let diff_depth = Some(900);
1023    let force = false;
1024    do_export(
1025        &store,
1026        root.into(),
1027        output_path.clone(),
1028        Some(epoch),
1029        depth,
1030        diff,
1031        diff_depth,
1032        force,
1033    )
1034    .await?;
1035    Ok(output_path)
1036}
1037
1038// This command is used for keeping the S3 bucket of archival snapshots
1039// up-to-date. It takes a set of snapshot files and queries the S3 bucket to see
1040// what is missing. If the input set of snapshot files can be used to generate
1041// missing lite or diff snapshots, they'll be generated and uploaded to the S3
1042// bucket.
1043async fn sync_bucket(
1044    snapshot_files: Vec<PathBuf>,
1045    endpoint: String,
1046    dry_run: bool,
1047    export_mode: ExportMode,
1048) -> anyhow::Result<()> {
1049    check_aws_config(&endpoint)?;
1050
1051    let store = Arc::new(ManyCar::try_from(snapshot_files)?);
1052    let heaviest_tipset = store.heaviest_tipset()?;
1053
1054    let info = ArchiveInfo::from_store(
1055        &store,
1056        "ManyCAR".to_string(),
1057        heaviest_tipset.clone(),
1058        FilecoinSnapshotVersion::V1,
1059        None,
1060    )?;
1061
1062    let genesis_timestamp = heaviest_tipset.genesis(&store)?.timestamp;
1063
1064    let range = info.epoch_range();
1065
1066    println!("Network: {}", info.network);
1067    println!("Range:   {} to {}", range.start, range.end);
1068    if export_mode.lite() {
1069        println!("Lites:",);
1070        for epoch in steps_in_range(&range, 30_000, 800) {
1071            println!(
1072                "  {}: {}",
1073                epoch,
1074                bucket_has_lite_snapshot(&info.network, genesis_timestamp, epoch).await?
1075            );
1076        }
1077    }
1078    if export_mode.diff() {
1079        println!("Diffs:");
1080        for epoch in steps_in_range(&range, 3_000, 3_800) {
1081            println!(
1082                "  {}: {}",
1083                epoch,
1084                bucket_has_diff_snapshot(&info.network, genesis_timestamp, epoch).await?
1085            );
1086        }
1087    }
1088
1089    if export_mode.lite() {
1090        for epoch in steps_in_range(&range, 30_000, 800) {
1091            if !bucket_has_lite_snapshot(&info.network, genesis_timestamp, epoch).await? {
1092                println!("  {epoch}: Exporting lite snapshot",);
1093                if !dry_run {
1094                    let output_path = export_lite_snapshot(
1095                        store.clone(),
1096                        heaviest_tipset.clone(),
1097                        &info.network,
1098                        genesis_timestamp,
1099                        epoch,
1100                    )
1101                    .await?;
1102                    upload_to_forest_bucket(output_path, &info.network, "lite")?;
1103                } else {
1104                    println!("  {epoch}: Would upload lite snapshot to S3");
1105                }
1106            }
1107        }
1108    }
1109
1110    if export_mode.diff() {
1111        for epoch in steps_in_range(&range, 3_000, 3_800) {
1112            if !bucket_has_diff_snapshot(&info.network, genesis_timestamp, epoch).await? {
1113                println!("  {epoch}: Exporting diff snapshot",);
1114                if !dry_run {
1115                    let output_path = export_diff_snapshot(
1116                        store.clone(),
1117                        heaviest_tipset.clone(),
1118                        &info.network,
1119                        genesis_timestamp,
1120                        epoch,
1121                    )
1122                    .await?;
1123                    upload_to_forest_bucket(output_path, &info.network, "diff")?;
1124                } else {
1125                    println!("  {epoch}: Would upload diff snapshot to S3");
1126                }
1127            }
1128        }
1129    }
1130    Ok(())
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use super::*;
1136    use crate::db::car::AnyCar;
1137    use crate::utils::db::car_stream::CarStream;
1138    use tempfile::TempDir;
1139    use tokio::io::BufReader;
1140
1141    fn genesis_timestamp(genesis_car: &'static [u8]) -> u64 {
1142        let db = crate::db::car::PlainCar::try_from(genesis_car).unwrap();
1143        let ts = db.heaviest_tipset().unwrap();
1144        ts.genesis(&db).unwrap().timestamp
1145    }
1146
1147    #[test]
1148    fn steps_in_range_1() {
1149        let range = 30_000..60_001;
1150        let lite = steps_in_range(&range, 30_000, 800);
1151        assert_eq!(lite.collect::<Vec<_>>(), vec![60_000]);
1152    }
1153
1154    #[test]
1155    fn steps_in_range_2() {
1156        let range = (30_000 - 800)..60_001;
1157        let lite = steps_in_range(&range, 30_000, 800);
1158        assert_eq!(lite.collect::<Vec<_>>(), vec![30_000, 60_000]);
1159    }
1160
1161    #[tokio::test]
1162    async fn export() {
1163        let output_path = TempDir::new().unwrap();
1164        let store = AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap();
1165        let heaviest_tipset = store.heaviest_tipset().unwrap();
1166        do_export(
1167            &store.into(),
1168            heaviest_tipset.into(),
1169            output_path.path().into(),
1170            Some(0),
1171            1,
1172            None,
1173            None,
1174            false,
1175        )
1176        .await
1177        .unwrap();
1178        let file = tokio::fs::File::open(build_output_path(
1179            NetworkChain::Calibnet.to_string(),
1180            genesis_timestamp(calibnet::DEFAULT_GENESIS),
1181            0,
1182            output_path.path().into(),
1183        ))
1184        .await
1185        .unwrap();
1186        CarStream::new(BufReader::new(file)).await.unwrap();
1187    }
1188
1189    #[test]
1190    fn archive_info_calibnet() {
1191        let store = AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap();
1192        let variant = store.variant().to_string();
1193        let ts = store.heaviest_tipset().unwrap();
1194        let index_size_bytes = store.index_size_bytes();
1195        let info = ArchiveInfo::from_store(
1196            &store,
1197            variant,
1198            ts,
1199            FilecoinSnapshotVersion::V1,
1200            index_size_bytes,
1201        )
1202        .unwrap();
1203        assert_eq!(info.network, "calibnet");
1204        assert_eq!(info.epoch, 0);
1205    }
1206
1207    #[test]
1208    fn archive_info_mainnet() {
1209        let store = AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap();
1210        let variant = store.variant().to_string();
1211        let ts = store.heaviest_tipset().unwrap();
1212        let index_size_bytes = store.index_size_bytes();
1213        let info = ArchiveInfo::from_store(
1214            &store,
1215            variant,
1216            ts,
1217            FilecoinSnapshotVersion::V1,
1218            index_size_bytes,
1219        )
1220        .unwrap();
1221        assert_eq!(info.network, "mainnet");
1222        assert_eq!(info.epoch, 0);
1223    }
1224}