1use crate::chain::FilecoinSnapshotVersion;
5use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS;
6use crate::cli_shared::snapshot::{self, TrustedVendor};
7use crate::db::car::forest::new_forest_car_temp_path_in;
8use crate::networks::calibnet;
9use crate::rpc::chain::ForestChainExportDiffParams;
10use crate::rpc::types::ApiExportResult;
11use crate::rpc::{self, chain::ForestChainExportParams, prelude::*};
12use crate::shim::policy::policy_constants::CHAIN_FINALITY;
13use anyhow::Context as _;
14use chrono::DateTime;
15use clap::Subcommand;
16use indicatif::{ProgressBar, ProgressStyle};
17use std::{
18 path::{Path, PathBuf},
19 time::Duration,
20};
21use tokio::io::AsyncWriteExt;
22
23#[derive(Debug, Clone, clap::ValueEnum)]
24pub enum Format {
25 Json,
26 Text,
27}
28
29#[derive(Debug, Subcommand)]
30pub enum SnapshotCommands {
31 Export {
33 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
35 output_path: PathBuf,
36 #[arg(long)]
38 skip_checksum: bool,
39 #[arg(long)]
41 dry_run: bool,
42 #[arg(short, long)]
44 tipset: Option<i64>,
45 #[arg(short, long, default_value_t = DEFAULT_RECENT_STATE_ROOTS)]
47 depth: crate::chain::ChainEpochDelta,
48 #[arg(long, value_enum, default_value_t = FilecoinSnapshotVersion::V2)]
50 format: FilecoinSnapshotVersion,
51 },
52 ExportStatus {
54 #[arg(long)]
56 wait: bool,
57 #[arg(long, value_enum, default_value_t = Format::Text)]
59 format: Format,
60 },
61 ExportCancel {},
63 ExportDiff {
65 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
67 output_path: PathBuf,
68 #[arg(long)]
70 from: i64,
71 #[arg(long)]
73 to: i64,
74 #[arg(short, long)]
76 depth: Option<crate::chain::ChainEpochDelta>,
77 },
78}
79
80impl SnapshotCommands {
81 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
82 match self {
83 Self::Export {
84 output_path,
85 skip_checksum,
86 dry_run,
87 tipset,
88 depth,
89 format,
90 } => {
91 anyhow::ensure!(
92 depth >= 0,
93 "--depth must be non-negative; use 0 for spine-only snapshots"
94 );
95
96 if depth < CHAIN_FINALITY {
97 tracing::warn!(
98 "Depth {depth} should be no less than CHAIN_FINALITY {CHAIN_FINALITY} to export a valid lite snapshot"
99 );
100 }
101
102 let raw_network_name = StateNetworkName::call(&client, ()).await?;
103 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
106 calibnet::NETWORK_COMMON_NAME
107 } else {
108 raw_network_name.as_str()
109 };
110
111 let tipset = if let Some(epoch) = tipset {
112 client
114 .call(
115 ChainGetTipSetByHeight::request((epoch, Default::default()))?
116 .with_timeout(Duration::from_secs(60 * 15)),
117 )
118 .await?
119 } else {
120 ChainHead::call(&client, ()).await?
121 };
122
123 let output_path = match output_path.is_dir() {
124 true => output_path.join(snapshot::filename(
125 TrustedVendor::Forest,
126 chain_name,
127 DateTime::from_timestamp(tipset.min_ticket_block().timestamp as i64, 0)
128 .unwrap_or_default()
129 .naive_utc()
130 .date(),
131 tipset.epoch(),
132 true,
133 )),
134 false => output_path.clone(),
135 };
136
137 let output_dir = output_path.parent().context("invalid output path")?;
138 let temp_path = new_forest_car_temp_path_in(output_dir)?;
139
140 let params = ForestChainExportParams {
141 version: format,
142 epoch: tipset.epoch(),
143 recent_roots: depth,
144 output_path: temp_path.to_path_buf(),
145 tipset_keys: tipset.key().clone().into(),
146 include_receipts: false,
147 include_events: false,
148 include_tipset_keys: false,
149 skip_checksum,
150 dry_run,
151 };
152
153 let pb = ProgressBar::new_spinner().with_style(
154 ProgressStyle::with_template(
155 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
156 )
157 .expect("indicatif template must be valid"),
158 ).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display()));
159 pb.enable_steady_tick(std::time::Duration::from_millis(80));
160 let handle = tokio::spawn({
161 let path: PathBuf = (&temp_path).into();
162 let pb = pb.clone();
163 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
164 async move {
165 loop {
166 interval.tick().await;
167 if let Ok(meta) = std::fs::metadata(&path) {
168 pb.set_position(meta.len());
169 }
170 }
171 }
172 });
173
174 let export_result = client
177 .call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX))
178 .await?;
179
180 handle.abort();
181 pb.finish();
182 _ = handle.await;
183
184 if !dry_run {
185 match export_result.clone() {
186 ApiExportResult::Done(hash_opt) => {
187 temp_path.persist(&output_path)?;
189 if let Some(hash) = hash_opt {
190 save_checksum(&output_path, hash).await?;
191 }
192 }
193 ApiExportResult::Cancelled => { }
194 }
195 }
196
197 match export_result {
198 ApiExportResult::Done(_) => {
199 println!("Export completed.");
200 }
201 ApiExportResult::Cancelled => {
202 println!("Export cancelled.");
203 }
204 }
205 Ok(())
206 }
207 Self::ExportStatus { wait, format } => {
208 let result = client
209 .call(
210 ForestChainExportStatus::request(())?.with_timeout(Duration::from_secs(30)),
211 )
212 .await?;
213 if !result.exporting
214 && let Format::Text = format
215 {
216 if result.cancelled {
217 println!("No export in progress (last export was cancelled)");
218 } else {
219 println!("No export in progress");
220 }
221 return Ok(());
222 }
223 if wait {
224 let elapsed = chrono::Utc::now()
225 .signed_duration_since(result.start_time.unwrap_or_default())
226 .to_std()
227 .unwrap_or(Duration::ZERO);
228 let pb = ProgressBar::new(10000)
229 .with_elapsed(elapsed)
230 .with_message("Exporting");
231 pb.set_style(
232 ProgressStyle::with_template(
233 "[{elapsed_precise}] [{wide_bar}] {percent}% {msg} ",
234 )
235 .expect("indicatif template must be valid")
236 .progress_chars("#>-"),
237 );
238 loop {
239 let result = client
240 .call(
241 ForestChainExportStatus::request(())?
242 .with_timeout(Duration::from_secs(30)),
243 )
244 .await?;
245 if result.cancelled {
246 pb.set_message("Export cancelled");
247 pb.abandon();
248
249 return Ok(());
250 }
251 let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64;
252 pb.set_position(position);
253
254 if position >= 10000 {
255 break;
256 }
257 tokio::time::sleep(Duration::from_millis(500)).await;
258 }
259 pb.finish_with_message("Export completed");
260
261 return Ok(());
262 }
263 match format {
264 Format::Text => {
265 println!("Exporting: {:.1}%", result.progress.clamp(0.0, 1.0) * 100.0);
266 }
267 Format::Json => {
268 println!("{}", serde_json::to_string_pretty(&result)?);
269 }
270 }
271
272 Ok(())
273 }
274 Self::ExportCancel {} => {
275 let result = client
276 .call(
277 ForestChainExportCancel::request(())?.with_timeout(Duration::from_secs(30)),
278 )
279 .await?;
280 if result {
281 println!("Export cancelled.");
282 } else {
283 println!("No export in progress to cancel.");
284 }
285 Ok(())
286 }
287 Self::ExportDiff {
288 output_path,
289 from,
290 to,
291 depth,
292 } => {
293 let raw_network_name = StateNetworkName::call(&client, ()).await?;
294
295 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
298 calibnet::NETWORK_COMMON_NAME
299 } else {
300 raw_network_name.as_str()
301 };
302
303 let depth = depth.unwrap_or_else(|| from - to);
304 anyhow::ensure!(depth > 0, "depth must be positive");
305
306 let output_path = match output_path.is_dir() {
307 true => output_path.join(format!(
308 "forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst"
309 )),
310 false => output_path.clone(),
311 };
312
313 let output_dir = output_path.parent().context("invalid output path")?;
314 let temp_path = new_forest_car_temp_path_in(output_dir)?;
315
316 let params = ForestChainExportDiffParams {
317 output_path: temp_path.to_path_buf(),
318 from,
319 to,
320 depth,
321 };
322
323 let pb = ProgressBar::new_spinner().with_style(
324 ProgressStyle::with_template(
325 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
326 )
327 .expect("indicatif template must be valid"),
328 ).with_message(format!("Exporting {} ...", output_path.display()));
329 pb.enable_steady_tick(std::time::Duration::from_millis(80));
330 let handle = tokio::spawn({
331 let path: PathBuf = (&temp_path).into();
332 let pb = pb.clone();
333 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
334 async move {
335 loop {
336 interval.tick().await;
337 if let Ok(meta) = std::fs::metadata(&path) {
338 pb.set_position(meta.len());
339 }
340 }
341 }
342 });
343
344 client
347 .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX))
348 .await?;
349
350 handle.abort();
351 pb.finish();
352 _ = handle.await;
353
354 temp_path.persist(output_path)?;
355 println!("Export completed.");
356 Ok(())
357 }
358 }
359 }
360}
361
362async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> {
365 let checksum_file_content = format!(
366 "{encoded_hash} {}\n",
367 source
368 .file_name()
369 .and_then(std::ffi::OsStr::to_str)
370 .context("Failed to retrieve file name while saving checksum")?
371 );
372
373 let checksum_path = PathBuf::from(source).with_extension("sha256sum");
374
375 let mut checksum_file = tokio::fs::File::create(&checksum_path).await?;
376 checksum_file
377 .write_all(checksum_file_content.as_bytes())
378 .await?;
379 checksum_file.flush().await?;
380 Ok(())
381}