1use crate::blocks::Tipset;
30use crate::chain::{
31 ChainEpochDelta, ExportOptions, FilecoinSnapshotMetadata, FilecoinSnapshotVersion,
32 index::{ChainIndex, ResolveNullTipset},
33};
34use crate::cid_collections::CidHashSet;
35use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
36use crate::daemon::bundle::load_actor_bundles;
37use crate::db::car::{AnyCar, ManyCar, forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL};
38use crate::f3::snapshot::F3SnapshotHeader;
39use crate::interpreter::VMTrace;
40use crate::ipld::{stream_chain, stream_graph};
41use crate::networks::{ChainConfig, NetworkChain, butterflynet, calibnet, mainnet};
42use crate::shim::address::CurrentNetwork;
43use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS, EPOCHS_IN_DAY};
44use crate::shim::fvm_shared_latest::address::Network;
45use crate::shim::machine::GLOBAL_MULTI_ENGINE;
46use crate::state_manager::{NO_CALLBACK, StateOutput, apply_block_messages};
47use crate::utils::db::car_stream::{CarBlock, CarBlockWrite as _, CarStream};
48use crate::utils::multihash::MultihashCode;
49use anyhow::{Context as _, bail};
50use chrono::DateTime;
51use cid::Cid;
52use clap::{Subcommand, ValueEnum};
53use dialoguer::{Confirm, theme::ColorfulTheme};
54use futures::{StreamExt as _, TryStreamExt as _};
55use fvm_ipld_blockstore::Blockstore;
56use fvm_ipld_encoding::DAG_CBOR;
57use indicatif::{ProgressBar, ProgressIterator, ProgressStyle};
58use itertools::Itertools;
59use multihash_derive::MultihashDigest as _;
60use sha2::Sha256;
61use std::fs::File;
62use std::io::{BufReader, Seek as _, SeekFrom};
63use std::ops::Range;
64use std::path::PathBuf;
65use std::sync::Arc;
66use tokio::io::{AsyncWriteExt, BufWriter};
67use tracing::info;
68
69#[derive(Debug, Clone, ValueEnum)]
70pub enum ExportMode {
71 All,
73 Lite,
75 Diff,
77}
78
79impl ExportMode {
80 pub fn lite(&self) -> bool {
81 matches!(self, ExportMode::All | ExportMode::Lite)
82 }
83
84 pub fn diff(&self) -> bool {
85 matches!(self, ExportMode::All | ExportMode::Diff)
86 }
87}
88
89#[derive(Debug, Subcommand)]
90pub enum ArchiveCommands {
91 Info {
93 snapshot: PathBuf,
95 },
96 Metadata {
98 snapshot: PathBuf,
100 },
101 F3Header {
103 snapshot: PathBuf,
105 },
106 Export {
108 #[arg(required = true)]
110 snapshot_files: Vec<PathBuf>,
111 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
114 output_path: PathBuf,
115 #[arg(short, long)]
118 epoch: Option<ChainEpoch>,
119 #[arg(short, long, default_value_t = 2000)]
121 depth: ChainEpochDelta,
122 #[arg(long)]
124 diff: Option<ChainEpoch>,
125 #[arg(long)]
128 diff_depth: Option<ChainEpochDelta>,
129 #[arg(long, default_value_t = false)]
131 force: bool,
132 },
133 Checkpoints {
135 #[arg(required = true)]
137 snapshot_files: Vec<PathBuf>,
138 },
139 Merge {
142 #[arg(required = true)]
144 snapshot_files: Vec<PathBuf>,
145 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
148 output_path: PathBuf,
149 #[arg(long, default_value_t = false)]
151 force: bool,
152 },
153 MergeF3 {
155 #[arg(long = "v1")]
157 filecoin_v1: PathBuf,
158 #[arg(long)]
160 f3: PathBuf,
161 #[arg(long)]
163 output: PathBuf,
164 },
165 Diff {
168 #[arg(required = true)]
170 snapshot_files: Vec<PathBuf>,
171 #[arg(long)]
173 epoch: ChainEpoch,
174 #[arg(long)]
177 depth: Option<u64>,
178 },
179 SyncBucket {
182 #[arg(required = true)]
183 snapshot_files: Vec<PathBuf>,
184 #[arg(long, default_value = FOREST_ARCHIVE_S3_ENDPOINT)]
186 endpoint: String,
187 #[arg(long, default_value_t = false)]
189 dry_run: bool,
190 #[arg(long, value_enum, default_value_t = ExportMode::All)]
192 export_mode: ExportMode,
193 },
194}
195
196impl ArchiveCommands {
197 pub async fn run(self) -> anyhow::Result<()> {
198 match self {
199 Self::Info { snapshot } => {
200 let store = AnyCar::try_from(snapshot.as_path())?;
201 let variant = store.variant().to_string();
202 let heaviest = store.heaviest_tipset()?;
203 let index_size_bytes = store.index_size_bytes();
204 let snapshot_version = if let Some(metadata) = store.metadata() {
205 metadata.version
206 } else {
207 FilecoinSnapshotVersion::V1
208 };
209 println!(
210 "{}",
211 ArchiveInfo::from_store(
212 &store,
213 variant,
214 heaviest,
215 snapshot_version,
216 index_size_bytes
217 )?
218 );
219 Ok(())
220 }
221 Self::Metadata { snapshot } => {
222 let store = AnyCar::try_from(snapshot.as_path())?;
223 if let Some(metadata) = store.metadata() {
224 println!("{metadata}");
225 if let Some(f3_cid) = metadata.f3_data {
226 let mut f3_data = store
227 .get_reader(f3_cid)?
228 .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
229 let f3_snap_header = F3SnapshotHeader::decode_from_snapshot(&mut f3_data)?;
230 println!("{f3_snap_header}");
231 }
232 } else {
233 println!(
234 "No metadata found (required by v2 snapshot) - this appears to be a v1 snapshot"
235 );
236 }
237 Ok(())
238 }
239 Self::F3Header { snapshot } => {
240 let mut r = BufReader::new(File::open(&snapshot).with_context(|| {
241 format!("failed to open F3 snapshot '{}'", snapshot.display())
242 })?);
243 let f3_snap_header =
244 F3SnapshotHeader::decode_from_snapshot(&mut r).with_context(|| {
245 format!(
246 "failed to decode F3 snapshot header from '{}'",
247 snapshot.display()
248 )
249 })?;
250 println!("{f3_snap_header}");
251 Ok(())
252 }
253 Self::Export {
254 snapshot_files,
255 output_path,
256 epoch,
257 depth,
258 diff,
259 diff_depth,
260 force,
261 } => {
262 let store = ManyCar::try_from(snapshot_files)?;
263 let heaviest_tipset = store.heaviest_tipset()?;
264 do_export(
265 &store.into(),
266 heaviest_tipset.into(),
267 output_path,
268 epoch,
269 depth,
270 diff,
271 diff_depth,
272 force,
273 )
274 .await
275 }
276 Self::Checkpoints {
277 snapshot_files: snapshot,
278 } => print_checkpoints(snapshot),
279 Self::Merge {
280 snapshot_files,
281 output_path,
282 force,
283 } => merge_snapshots(snapshot_files, output_path, force).await,
284 Self::MergeF3 {
285 filecoin_v1,
286 f3,
287 output,
288 } => merge_f3_snapshot(filecoin_v1, f3, output).await,
289 Self::Diff {
290 snapshot_files,
291 epoch,
292 depth,
293 } => show_tipset_diff(snapshot_files, epoch, depth).await,
294 Self::SyncBucket {
295 snapshot_files,
296 endpoint,
297 dry_run,
298 export_mode,
299 } => sync_bucket(snapshot_files, endpoint, dry_run, export_mode).await,
300 }
301 }
302}
303
304#[derive(Debug)]
305pub struct ArchiveInfo {
306 variant: String,
307 network: String,
308 epoch: ChainEpoch,
309 tipsets: ChainEpoch,
310 messages: ChainEpoch,
311 head: Tipset,
312 snapshot_version: FilecoinSnapshotVersion,
313 index_size_bytes: Option<u32>,
314}
315
316impl std::fmt::Display for ArchiveInfo {
317 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
318 writeln!(f, "CAR format: {}", self.variant)?;
319 writeln!(f, "Snapshot version: {}", self.snapshot_version as u64)?;
320 writeln!(f, "Network: {}", self.network)?;
321 writeln!(f, "Epoch: {}", self.epoch)?;
322 writeln!(f, "State-roots: {}", self.epoch - self.tipsets + 1)?;
323 writeln!(f, "Messages sets: {}", self.epoch - self.messages + 1)?;
324 let head_tipset_key_string = self
325 .head
326 .cids()
327 .iter()
328 .map(Cid::to_string)
329 .join("\n ");
330 write!(f, "Head Tipset: {head_tipset_key_string}")?;
331 if let Some(index_size_bytes) = self.index_size_bytes {
332 writeln!(f)?;
333 write!(
334 f,
335 "Index size: {}",
336 human_bytes::human_bytes(index_size_bytes)
337 )?;
338 }
339 Ok(())
340 }
341}
342
343impl ArchiveInfo {
344 fn from_store(
347 store: &impl Blockstore,
348 variant: String,
349 heaviest_tipset: Tipset,
350 snapshot_version: FilecoinSnapshotVersion,
351 index_size_bytes: Option<u32>,
352 ) -> anyhow::Result<Self> {
353 Self::from_store_with(
354 store,
355 variant,
356 heaviest_tipset,
357 snapshot_version,
358 index_size_bytes,
359 true,
360 )
361 }
362
363 fn from_store_with(
367 store: &impl Blockstore,
368 variant: String,
369 heaviest_tipset: Tipset,
370 snapshot_version: FilecoinSnapshotVersion,
371 index_size_bytes: Option<u32>,
372 progress: bool,
373 ) -> anyhow::Result<Self> {
374 let head = heaviest_tipset;
375 let root_epoch = head.epoch();
376
377 let tipsets = head.clone().chain(store);
378
379 let windowed = std::iter::once(head.clone()).chain(tipsets).tuple_windows();
380
381 let mut network: String = "unknown".into();
382 let mut lowest_stateroot_epoch = root_epoch;
383 let mut lowest_message_epoch = root_epoch;
384
385 let iter = if progress {
386 itertools::Either::Left(windowed.progress_count(root_epoch as u64))
387 } else {
388 itertools::Either::Right(windowed)
389 };
390
391 for (parent, tipset) in iter {
392 if tipset.epoch() >= parent.epoch() && parent.epoch() != root_epoch {
393 bail!("Broken invariant: non-sequential epochs");
394 }
395
396 if tipset.epoch() < 0 {
397 bail!("Broken invariant: tipset with negative epoch");
398 }
399
400 if lowest_stateroot_epoch == parent.epoch() && store.has(tipset.parent_state())? {
404 lowest_stateroot_epoch = tipset.epoch();
405 }
406 if lowest_message_epoch == parent.epoch()
407 && store.has(&tipset.min_ticket_block().messages)?
408 {
409 lowest_message_epoch = tipset.epoch();
410 }
411
412 let mut update_network_name = |block_cid: &Cid| {
413 if block_cid == &*calibnet::GENESIS_CID {
414 network = calibnet::NETWORK_COMMON_NAME.into();
415 } else if block_cid == &*mainnet::GENESIS_CID {
416 network = mainnet::NETWORK_COMMON_NAME.into();
417 } else if block_cid == &*butterflynet::GENESIS_CID {
418 network = butterflynet::NETWORK_COMMON_NAME.into();
419 }
420 };
421
422 if tipset.epoch() == 0 {
423 let block_cid = tipset.min_ticket_block().cid();
424 update_network_name(block_cid);
425 }
426
427 let may_skip =
431 lowest_stateroot_epoch != tipset.epoch() && lowest_message_epoch != tipset.epoch();
432 if may_skip {
433 let genesis_block = tipset.genesis(&store)?;
434 update_network_name(genesis_block.cid());
435 break;
436 }
437 }
438
439 Ok(ArchiveInfo {
440 variant,
441 network,
442 epoch: root_epoch,
443 tipsets: lowest_stateroot_epoch,
444 messages: lowest_message_epoch,
445 head,
446 snapshot_version,
447 index_size_bytes,
448 })
449 }
450
451 fn epoch_range(&self) -> Range<ChainEpoch> {
452 self.tipsets..self.epoch
453 }
454}
455
456fn print_checkpoints(snapshot_files: Vec<PathBuf>) -> anyhow::Result<()> {
459 let store = ManyCar::try_from(snapshot_files).context("couldn't read input CAR file")?;
460 let root = store.heaviest_tipset()?;
461
462 let genesis = root.genesis(&store)?;
463 let chain_name =
464 NetworkChain::from_genesis(genesis.cid()).context("Unrecognizable genesis block")?;
465
466 println!("{chain_name}:");
467 for (epoch, cid) in list_checkpoints(&store, root) {
468 println!(" {epoch}: {cid}");
469 }
470 Ok(())
471}
472
473fn list_checkpoints(
474 db: &impl Blockstore,
475 root: Tipset,
476) -> impl Iterator<Item = (ChainEpoch, cid::Cid)> + '_ {
477 let interval = EPOCHS_IN_DAY * 30;
478 let mut target_epoch = root.epoch() - root.epoch() % interval;
479 root.chain(db).filter_map(move |tipset| {
480 if tipset.epoch() <= target_epoch && tipset.epoch() != 0 {
481 target_epoch -= interval;
482 Some((tipset.epoch(), *tipset.min_ticket_block().cid()))
483 } else {
484 None
485 }
486 })
487}
488
489fn build_output_path(
492 chain: String,
493 genesis_timestamp: u64,
494 epoch: ChainEpoch,
495 output_path: PathBuf,
496) -> PathBuf {
497 match output_path.is_dir() {
498 true => output_path.join(snapshot::filename(
499 TrustedVendor::Forest,
500 chain,
501 DateTime::from_timestamp(genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS, 0)
502 .unwrap_or_default()
503 .naive_utc()
504 .date(),
505 epoch,
506 true,
507 )),
508 false => output_path.clone(),
509 }
510}
511
512#[allow(clippy::too_many_arguments)]
513pub async fn do_export(
514 store: &Arc<impl Blockstore + Send + Sync + 'static>,
515 root: Arc<Tipset>,
516 output_path: PathBuf,
517 epoch_option: Option<ChainEpoch>,
518 depth: ChainEpochDelta,
519 diff: Option<ChainEpoch>,
520 diff_depth: Option<ChainEpochDelta>,
521 force: bool,
522) -> anyhow::Result<()> {
523 let ts = root;
524
525 let genesis = ts.genesis(store)?;
526 let network = NetworkChain::from_genesis_or_devnet_placeholder(genesis.cid());
527
528 let epoch = epoch_option.unwrap_or(ts.epoch());
529
530 let finality = ChainConfig::from_chain(&network)
531 .policy
532 .chain_finality
533 .min(epoch);
534 if depth < finality {
535 bail!("For {}, depth has to be at least {}.", network, finality);
536 }
537
538 info!("looking up a tipset by epoch: {}", epoch);
539
540 let index = ChainIndex::new(store.clone());
541
542 let ts = index
543 .tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
544 .context("unable to get a tipset at given height")?;
545
546 let seen = if let Some(diff) = diff {
547 let diff_ts: Arc<Tipset> = index
548 .tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder)
549 .context("diff epoch must be smaller than target epoch")?;
550 let diff_ts: &Tipset = &diff_ts;
551 let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0);
552 let mut stream = stream_chain(
553 store.clone(),
554 diff_ts.clone().chain_owned(store.clone()),
555 diff_limit,
556 );
557 while stream.try_next().await?.is_some() {}
558 stream.into_seen()
559 } else {
560 CidHashSet::default()
561 };
562
563 let output_path = build_output_path(network.to_string(), genesis.timestamp, epoch, output_path);
564
565 if !force && output_path.exists() {
566 let have_permission = Confirm::with_theme(&ColorfulTheme::default())
567 .with_prompt(format!(
568 "{} will be overwritten. Continue?",
569 output_path.to_string_lossy()
570 ))
571 .default(false)
572 .interact()
573 .unwrap_or(false);
575 if !have_permission {
576 return Ok(());
577 }
578 }
579
580 let writer = tokio::fs::File::create(&output_path)
581 .await
582 .with_context(|| {
583 format!(
584 "unable to create a snapshot - is the output path '{}' correct?",
585 output_path.to_str().unwrap_or_default()
586 )
587 })?;
588
589 info!(
590 "exporting snapshot at location: {}",
591 output_path.to_str().unwrap_or_default()
592 );
593
594 let pb = ProgressBar::new_spinner().with_style(
595 ProgressStyle::with_template(
596 "{spinner} exported {total_bytes} with {binary_bytes_per_sec} in {elapsed}",
597 )
598 .expect("indicatif template must be valid"),
599 );
600 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
601 let writer = pb.wrap_async_write(writer);
602
603 crate::chain::export::<Sha256>(
604 store,
605 &ts,
606 depth,
607 writer,
608 Some(ExportOptions {
609 skip_checksum: true,
610 seen,
611 }),
612 )
613 .await?;
614
615 Ok(())
616}
617
618async fn merge_snapshots(
623 snapshot_files: Vec<PathBuf>,
624 output_path: PathBuf,
625 force: bool,
626) -> anyhow::Result<()> {
627 use crate::db::car::forest;
628
629 let store = ManyCar::try_from(snapshot_files)?;
630 let heaviest_tipset = store.heaviest_tipset()?;
631 let roots = heaviest_tipset.key().to_cids();
632
633 if !force && output_path.exists() {
634 let have_permission = Confirm::with_theme(&ColorfulTheme::default())
635 .with_prompt(format!(
636 "{} will be overwritten. Continue?",
637 output_path.to_string_lossy()
638 ))
639 .default(false)
640 .interact()
641 .unwrap_or(false);
643 if !have_permission {
644 return Ok(());
645 }
646 }
647
648 let mut writer = BufWriter::new(tokio::fs::File::create(&output_path).await.context(
649 format!(
650 "unable to create a snapshot - is the output path '{}' correct?",
651 output_path.to_str().unwrap_or_default()
652 ),
653 )?);
654
655 let blocks = stream_graph(&store, heaviest_tipset.chain(&store), 0);
657
658 let frames = forest::Encoder::compress_stream_default(blocks);
660
661 forest::Encoder::write(&mut writer, roots, frames).await?;
663
664 writer.flush().await.context("failed to flush")?;
666
667 Ok(())
668}
669
670async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> anyhow::Result<()> {
671 let store = AnyCar::try_from(filecoin.as_path())?;
672 anyhow::ensure!(
673 store.metadata().is_none(),
674 "The filecoin snapshot is not in v1 format"
675 );
676 drop(store);
677
678 let mut f3_data = File::open(f3)?;
679 let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?;
680
681 let car_stream = CarStream::new_from_path(&filecoin).await?;
682 let chain_head = car_stream.header_v1.roots.clone();
683
684 println!("f3 snapshot cid: {f3_cid}");
685 println!(
686 "chain head: [{}]",
687 chain_head.iter().map(|c| c.to_string()).join(", ")
688 );
689
690 let snap_meta = FilecoinSnapshotMetadata::new_v2(chain_head, Some(f3_cid));
691 let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
692 let snap_meta_block = CarBlock {
693 cid: Cid::new_v1(
694 DAG_CBOR,
695 MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
696 ),
697 data: snap_meta_cbor_encoded,
698 };
699
700 let roots = nunny::vec![snap_meta_block.cid];
701 let snap_meta_frame = {
702 let mut encoder =
703 crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
704 snap_meta_block.write(&mut encoder)?;
705 anyhow::Ok((
706 vec![snap_meta_block.cid],
707 crate::db::car::forest::finalize_frame(
708 DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
709 &mut encoder,
710 )?,
711 ))
712 };
713 let f3_frame = {
714 let mut encoder =
715 crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
716 let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
717 f3_data.seek(SeekFrom::Start(0))?;
718 encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
719 anyhow::Ok((
720 vec![f3_cid],
721 crate::db::car::forest::finalize_frame(
722 DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
723 &mut encoder,
724 )?,
725 ))
726 };
727
728 let block_frames = crate::db::car::forest::Encoder::compress_stream_default(
729 car_stream.map_err(anyhow::Error::from),
730 );
731 let frames = futures::stream::iter([snap_meta_frame, f3_frame]).chain(block_frames);
732
733 let temp_output = {
734 let mut dir = output.clone();
735 if dir.pop() {
736 tempfile::NamedTempFile::new_in(dir)?
737 } else {
738 tempfile::NamedTempFile::new_in(".")?
739 }
740 };
741 let writer = tokio::io::BufWriter::new(tokio::fs::File::create(&temp_output).await?);
742 let pb = ProgressBar::new_spinner().with_style(
743 ProgressStyle::with_template(
744 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
745 )
746 .expect("indicatif template must be valid"),
747 ).with_message(format!("Merging into {} ...", output.display()));
748 pb.enable_steady_tick(std::time::Duration::from_secs(1));
749 let mut writer = pb.wrap_async_write(writer);
750 crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
751 writer.shutdown().await?;
752 temp_output.persist(&output)?;
753 pb.finish();
754
755 Ok(())
756}
757
758async fn show_tipset_diff(
763 snapshot_files: Vec<PathBuf>,
764 epoch: ChainEpoch,
765 depth: Option<u64>,
766) -> anyhow::Result<()> {
767 use colored::*;
768
769 let store = Arc::new(ManyCar::try_from(snapshot_files)?);
770
771 let heaviest_tipset = Arc::new(store.heaviest_tipset()?);
772 if heaviest_tipset.epoch() <= epoch {
773 anyhow::bail!(
774 "Highest epoch must be at least 1 greater than the target epoch. \
775 Highest epoch = {}, target epoch = {}.",
776 heaviest_tipset.epoch(),
777 epoch
778 )
779 }
780
781 let genesis = heaviest_tipset.genesis(&store)?;
782 let network = NetworkChain::from_genesis_or_devnet_placeholder(genesis.cid());
783 load_actor_bundles(&store, &network).await?;
784
785 let timestamp = genesis.timestamp;
786 let chain_index = ChainIndex::new(Arc::clone(&store));
787 let chain_config = ChainConfig::from_chain(&network);
788 if chain_config.is_testnet() {
789 CurrentNetwork::set_global(Network::Testnet);
790 }
791 let beacon = Arc::new(chain_config.get_beacon_schedule(timestamp));
792 let tipset = chain_index.tipset_by_height(
793 epoch,
794 Arc::clone(&heaviest_tipset),
795 ResolveNullTipset::TakeOlder,
796 )?;
797
798 let child_tipset = chain_index.tipset_by_height(
799 epoch + 1,
800 Arc::clone(&heaviest_tipset),
801 ResolveNullTipset::TakeNewer,
802 )?;
803
804 let StateOutput { state_root, .. } = apply_block_messages(
805 timestamp,
806 Arc::new(chain_index),
807 Arc::new(chain_config),
808 beacon,
809 &GLOBAL_MULTI_ENGINE,
810 tipset,
811 NO_CALLBACK,
812 VMTrace::NotTraced,
813 )?;
814
815 if child_tipset.parent_state() != &state_root {
816 println!(
817 "{}",
818 format!("- Expected state hash: {}", child_tipset.parent_state()).red()
819 );
820 println!("{}", format!("+ Computed state hash: {state_root}").green());
821
822 crate::statediff::print_state_diff(
823 &store,
824 &state_root,
825 child_tipset.parent_state(),
826 depth,
827 )?;
828 } else {
829 println!("Computed state matches expected state.");
830 }
831
832 Ok(())
833}
834
835fn steps_in_range(
836 range: &Range<ChainEpoch>,
837 step_size: ChainEpochDelta,
838 offset: ChainEpochDelta,
839) -> impl Iterator<Item = ChainEpoch> {
840 let start = range.start / step_size;
841 (start..)
842 .map(move |x| x * step_size)
843 .skip_while(move |&x| x - offset < range.start)
844 .take_while(move |&x| x <= range.end)
845}
846
847fn epoch_to_date(genesis_timestamp: u64, epoch: ChainEpoch) -> anyhow::Result<String> {
848 Ok(
849 DateTime::from_timestamp(genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS, 0)
850 .unwrap_or_default()
851 .format("%Y-%m-%d")
852 .to_string(),
853 )
854}
855
856fn format_lite_snapshot(
857 network: &str,
858 genesis_timestamp: u64,
859 epoch: ChainEpoch,
860) -> anyhow::Result<String> {
861 Ok(format!(
862 "forest_snapshot_{network}_{date}_height_{epoch}.forest.car.zst",
863 date = epoch_to_date(genesis_timestamp, epoch)?,
864 epoch = epoch
865 ))
866}
867
868fn format_diff_snapshot(
869 network: &str,
870 genesis_timestamp: u64,
871 epoch: ChainEpoch,
872) -> anyhow::Result<String> {
873 Ok(format!(
874 "forest_diff_{network}_{date}_height_{epoch}+3000.forest.car.zst",
875 date = epoch_to_date(genesis_timestamp, epoch)?,
876 epoch = epoch - 3000
877 ))
878}
879
880async fn bucket_has_lite_snapshot(
884 network: &str,
885 genesis_timestamp: u64,
886 epoch: ChainEpoch,
887) -> anyhow::Result<bool> {
888 let url = format!(
889 "https://forest-internal.chainsafe.dev/{}/lite/{}",
890 network,
891 format_lite_snapshot(network, genesis_timestamp, epoch)?
892 );
893 let response = reqwest::Client::new().get(url).send().await?;
894 Ok(response.status().is_success())
895}
896
897async fn bucket_has_diff_snapshot(
901 network: &str,
902 genesis_timestamp: u64,
903 epoch: ChainEpoch,
904) -> anyhow::Result<bool> {
905 let url = format!(
906 "https://forest-internal.chainsafe.dev/{}/diff/{}",
907 network,
908 format_diff_snapshot(network, genesis_timestamp, epoch)?
909 );
910 let response = reqwest::Client::new().head(url).send().await?;
911 Ok(response.status().is_success())
912}
913
914const FOREST_ARCHIVE_S3_ENDPOINT: &str =
915 "https://2238a825c5aca59233eab1f221f7aefb.r2.cloudflarestorage.com";
916
917fn check_aws_config(endpoint: &str) -> anyhow::Result<()> {
919 let status = std::process::Command::new("aws")
920 .arg("help")
921 .stdout(std::process::Stdio::null())
922 .status()
923 .map_err(|e| anyhow::anyhow!("Failed to execute 'aws help': {}", e))?;
924
925 if !status.success() {
926 bail!(
927 "'aws help' failed with status code: {}. Please ensure that the AWS CLI is installed and configured.",
928 status
929 );
930 }
931
932 let status = std::process::Command::new("aws")
933 .args(["s3", "ls", "s3://forest-archive/", "--endpoint", endpoint])
934 .stdout(std::process::Stdio::null())
935 .status()
936 .map_err(|e| anyhow::anyhow!("Failed to execute 'aws s3 ls': {}", e))?;
937
938 if !status.success() {
939 bail!(
940 "'aws s3 ls' failed with status code: {}. Please check your AWS credentials.",
941 status
942 );
943 }
944 Ok(())
945}
946
947fn upload_to_forest_bucket(path: PathBuf, network: &str, tag: &str) -> anyhow::Result<()> {
949 let status = std::process::Command::new("aws")
950 .args([
951 "s3",
952 "cp",
953 "--acl",
954 "public-read",
955 path.to_str().unwrap(),
956 &format!("s3://forest-archive/{network}/{tag}/"),
957 "--endpoint",
958 FOREST_ARCHIVE_S3_ENDPOINT,
959 ])
960 .status()
961 .map_err(|e| anyhow::anyhow!("Failed to execute 'aws s3 cp': {}", e))?;
962
963 if !status.success() {
964 bail!(
965 "'aws s3 cp' failed with status code: {}. Upload failed.",
966 status
967 );
968 }
969 Ok(())
970}
971
972async fn export_lite_snapshot(
974 store: Arc<impl Blockstore + Send + Sync + 'static>,
975 root: Tipset,
976 network: &str,
977 genesis_timestamp: u64,
978 epoch: ChainEpoch,
979) -> anyhow::Result<PathBuf> {
980 let output_path: PathBuf = format_lite_snapshot(network, genesis_timestamp, epoch)?.into();
981
982 if output_path.exists() {
984 return Ok(output_path);
985 }
986
987 let depth = 900;
988 let diff = None;
989 let diff_depth = None;
990 let force = false;
991 do_export(
992 &store,
993 root.into(),
994 output_path.clone(),
995 Some(epoch),
996 depth,
997 diff,
998 diff_depth,
999 force,
1000 )
1001 .await?;
1002 Ok(output_path)
1003}
1004
1005async fn export_diff_snapshot(
1007 store: Arc<impl Blockstore + Send + Sync + 'static>,
1008 root: Tipset,
1009 network: &str,
1010 genesis_timestamp: u64,
1011 epoch: ChainEpoch,
1012) -> anyhow::Result<PathBuf> {
1013 let output_path: PathBuf = format_diff_snapshot(network, genesis_timestamp, epoch)?.into();
1014
1015 if output_path.exists() {
1017 return Ok(output_path);
1018 }
1019
1020 let depth = 3_000;
1021 let diff = Some(epoch - depth);
1022 let diff_depth = Some(900);
1023 let force = false;
1024 do_export(
1025 &store,
1026 root.into(),
1027 output_path.clone(),
1028 Some(epoch),
1029 depth,
1030 diff,
1031 diff_depth,
1032 force,
1033 )
1034 .await?;
1035 Ok(output_path)
1036}
1037
1038async fn sync_bucket(
1044 snapshot_files: Vec<PathBuf>,
1045 endpoint: String,
1046 dry_run: bool,
1047 export_mode: ExportMode,
1048) -> anyhow::Result<()> {
1049 check_aws_config(&endpoint)?;
1050
1051 let store = Arc::new(ManyCar::try_from(snapshot_files)?);
1052 let heaviest_tipset = store.heaviest_tipset()?;
1053
1054 let info = ArchiveInfo::from_store(
1055 &store,
1056 "ManyCAR".to_string(),
1057 heaviest_tipset.clone(),
1058 FilecoinSnapshotVersion::V1,
1059 None,
1060 )?;
1061
1062 let genesis_timestamp = heaviest_tipset.genesis(&store)?.timestamp;
1063
1064 let range = info.epoch_range();
1065
1066 println!("Network: {}", info.network);
1067 println!("Range: {} to {}", range.start, range.end);
1068 if export_mode.lite() {
1069 println!("Lites:",);
1070 for epoch in steps_in_range(&range, 30_000, 800) {
1071 println!(
1072 " {}: {}",
1073 epoch,
1074 bucket_has_lite_snapshot(&info.network, genesis_timestamp, epoch).await?
1075 );
1076 }
1077 }
1078 if export_mode.diff() {
1079 println!("Diffs:");
1080 for epoch in steps_in_range(&range, 3_000, 3_800) {
1081 println!(
1082 " {}: {}",
1083 epoch,
1084 bucket_has_diff_snapshot(&info.network, genesis_timestamp, epoch).await?
1085 );
1086 }
1087 }
1088
1089 if export_mode.lite() {
1090 for epoch in steps_in_range(&range, 30_000, 800) {
1091 if !bucket_has_lite_snapshot(&info.network, genesis_timestamp, epoch).await? {
1092 println!(" {epoch}: Exporting lite snapshot",);
1093 if !dry_run {
1094 let output_path = export_lite_snapshot(
1095 store.clone(),
1096 heaviest_tipset.clone(),
1097 &info.network,
1098 genesis_timestamp,
1099 epoch,
1100 )
1101 .await?;
1102 upload_to_forest_bucket(output_path, &info.network, "lite")?;
1103 } else {
1104 println!(" {epoch}: Would upload lite snapshot to S3");
1105 }
1106 }
1107 }
1108 }
1109
1110 if export_mode.diff() {
1111 for epoch in steps_in_range(&range, 3_000, 3_800) {
1112 if !bucket_has_diff_snapshot(&info.network, genesis_timestamp, epoch).await? {
1113 println!(" {epoch}: Exporting diff snapshot",);
1114 if !dry_run {
1115 let output_path = export_diff_snapshot(
1116 store.clone(),
1117 heaviest_tipset.clone(),
1118 &info.network,
1119 genesis_timestamp,
1120 epoch,
1121 )
1122 .await?;
1123 upload_to_forest_bucket(output_path, &info.network, "diff")?;
1124 } else {
1125 println!(" {epoch}: Would upload diff snapshot to S3");
1126 }
1127 }
1128 }
1129 }
1130 Ok(())
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136 use crate::db::car::AnyCar;
1137 use crate::utils::db::car_stream::CarStream;
1138 use tempfile::TempDir;
1139 use tokio::io::BufReader;
1140
1141 fn genesis_timestamp(genesis_car: &'static [u8]) -> u64 {
1142 let db = crate::db::car::PlainCar::try_from(genesis_car).unwrap();
1143 let ts = db.heaviest_tipset().unwrap();
1144 ts.genesis(&db).unwrap().timestamp
1145 }
1146
1147 #[test]
1148 fn steps_in_range_1() {
1149 let range = 30_000..60_001;
1150 let lite = steps_in_range(&range, 30_000, 800);
1151 assert_eq!(lite.collect::<Vec<_>>(), vec![60_000]);
1152 }
1153
1154 #[test]
1155 fn steps_in_range_2() {
1156 let range = (30_000 - 800)..60_001;
1157 let lite = steps_in_range(&range, 30_000, 800);
1158 assert_eq!(lite.collect::<Vec<_>>(), vec![30_000, 60_000]);
1159 }
1160
1161 #[tokio::test]
1162 async fn export() {
1163 let output_path = TempDir::new().unwrap();
1164 let store = AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap();
1165 let heaviest_tipset = store.heaviest_tipset().unwrap();
1166 do_export(
1167 &store.into(),
1168 heaviest_tipset.into(),
1169 output_path.path().into(),
1170 Some(0),
1171 1,
1172 None,
1173 None,
1174 false,
1175 )
1176 .await
1177 .unwrap();
1178 let file = tokio::fs::File::open(build_output_path(
1179 NetworkChain::Calibnet.to_string(),
1180 genesis_timestamp(calibnet::DEFAULT_GENESIS),
1181 0,
1182 output_path.path().into(),
1183 ))
1184 .await
1185 .unwrap();
1186 CarStream::new(BufReader::new(file)).await.unwrap();
1187 }
1188
1189 #[test]
1190 fn archive_info_calibnet() {
1191 let store = AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap();
1192 let variant = store.variant().to_string();
1193 let ts = store.heaviest_tipset().unwrap();
1194 let index_size_bytes = store.index_size_bytes();
1195 let info = ArchiveInfo::from_store(
1196 &store,
1197 variant,
1198 ts,
1199 FilecoinSnapshotVersion::V1,
1200 index_size_bytes,
1201 )
1202 .unwrap();
1203 assert_eq!(info.network, "calibnet");
1204 assert_eq!(info.epoch, 0);
1205 }
1206
1207 #[test]
1208 fn archive_info_mainnet() {
1209 let store = AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap();
1210 let variant = store.variant().to_string();
1211 let ts = store.heaviest_tipset().unwrap();
1212 let index_size_bytes = store.index_size_bytes();
1213 let info = ArchiveInfo::from_store(
1214 &store,
1215 variant,
1216 ts,
1217 FilecoinSnapshotVersion::V1,
1218 index_size_bytes,
1219 )
1220 .unwrap();
1221 assert_eq!(info.network, "mainnet");
1222 assert_eq!(info.epoch, 0);
1223 }
1224}