1use crate::chain::FilecoinSnapshotVersion;
5use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS;
6use crate::cli_shared::snapshot::{self, TrustedVendor};
7use crate::db::car::forest::new_forest_car_temp_path_in;
8use crate::networks::calibnet;
9use crate::rpc::chain::ForestChainExportDiffParams;
10use crate::rpc::types::ApiExportResult;
11use crate::rpc::{self, chain::ForestChainExportParams, prelude::*};
12use crate::shim::policy::policy_constants::CHAIN_FINALITY;
13use anyhow::Context as _;
14use chrono::DateTime;
15use clap::Subcommand;
16use indicatif::{ProgressBar, ProgressStyle};
17use std::{
18 path::{Path, PathBuf},
19 time::Duration,
20};
21use tokio::io::AsyncWriteExt;
22
23#[derive(Debug, Clone, clap::ValueEnum)]
24pub enum Format {
25 Json,
26 Text,
27}
28
29#[derive(Debug, Subcommand)]
30pub enum SnapshotCommands {
31 Export {
33 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
35 output_path: PathBuf,
36 #[arg(long)]
38 skip_checksum: bool,
39 #[arg(long)]
41 dry_run: bool,
42 #[arg(short, long)]
44 tipset: Option<i64>,
45 #[arg(short, long, default_value_t = DEFAULT_RECENT_STATE_ROOTS)]
47 depth: crate::chain::ChainEpochDelta,
48 #[arg(long, value_enum, default_value_t = FilecoinSnapshotVersion::V2)]
50 format: FilecoinSnapshotVersion,
51 },
52 ExportStatus {
54 #[arg(long)]
56 wait: bool,
57 #[arg(long, value_enum, default_value_t = Format::Text)]
59 format: Format,
60 },
61 ExportCancel {},
63 ExportDiff {
65 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
67 output_path: PathBuf,
68 #[arg(long)]
70 from: i64,
71 #[arg(long)]
73 to: i64,
74 #[arg(short, long)]
76 depth: Option<crate::chain::ChainEpochDelta>,
77 },
78}
79
80impl SnapshotCommands {
81 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
82 match self {
83 Self::Export {
84 output_path,
85 skip_checksum,
86 dry_run,
87 tipset,
88 depth,
89 format,
90 } => {
91 anyhow::ensure!(
92 depth >= 0,
93 "--depth must be non-negative; use 0 for spine-only snapshots"
94 );
95
96 if depth < CHAIN_FINALITY {
97 tracing::warn!(
98 "Depth {depth} should be no less than CHAIN_FINALITY {CHAIN_FINALITY} to export a valid lite snapshot"
99 );
100 }
101
102 let raw_network_name = StateNetworkName::call(&client, ()).await?;
103 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
106 calibnet::NETWORK_COMMON_NAME
107 } else {
108 raw_network_name.as_str()
109 };
110
111 let tipset = if let Some(epoch) = tipset {
112 client
114 .call(
115 ChainGetTipSetByHeight::request((epoch, Default::default()))?
116 .with_timeout(Duration::from_secs(60 * 15)),
117 )
118 .await?
119 } else {
120 ChainHead::call(&client, ()).await?
121 };
122
123 let output_path = match output_path.is_dir() {
124 true => output_path.join(snapshot::filename(
125 TrustedVendor::Forest,
126 chain_name,
127 DateTime::from_timestamp(tipset.min_ticket_block().timestamp as i64, 0)
128 .unwrap_or_default()
129 .naive_utc()
130 .date(),
131 tipset.epoch(),
132 true,
133 )),
134 false => output_path.clone(),
135 };
136
137 let output_dir = output_path.parent().context("invalid output path")?;
138 let temp_path = new_forest_car_temp_path_in(output_dir)?;
139
140 let params = ForestChainExportParams {
141 version: format,
142 epoch: tipset.epoch(),
143 recent_roots: depth,
144 output_path: temp_path.to_path_buf(),
145 tipset_keys: tipset.key().clone().into(),
146 include_receipts: false,
147 include_events: false,
148 skip_checksum,
149 dry_run,
150 };
151
152 let pb = ProgressBar::new_spinner().with_style(
153 ProgressStyle::with_template(
154 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
155 )
156 .expect("indicatif template must be valid"),
157 ).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display()));
158 pb.enable_steady_tick(std::time::Duration::from_millis(80));
159 let handle = tokio::spawn({
160 let path: PathBuf = (&temp_path).into();
161 let pb = pb.clone();
162 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
163 async move {
164 loop {
165 interval.tick().await;
166 if let Ok(meta) = std::fs::metadata(&path) {
167 pb.set_position(meta.len());
168 }
169 }
170 }
171 });
172
173 let export_result = client
176 .call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX))
177 .await?;
178
179 handle.abort();
180 pb.finish();
181 _ = handle.await;
182
183 if !dry_run {
184 match export_result.clone() {
185 ApiExportResult::Done(hash_opt) => {
186 temp_path.persist(&output_path)?;
188 if let Some(hash) = hash_opt {
189 save_checksum(&output_path, hash).await?;
190 }
191 }
192 ApiExportResult::Cancelled => { }
193 }
194 }
195
196 match export_result {
197 ApiExportResult::Done(_) => {
198 println!("Export completed.");
199 }
200 ApiExportResult::Cancelled => {
201 println!("Export cancelled.");
202 }
203 }
204 Ok(())
205 }
206 Self::ExportStatus { wait, format } => {
207 let result = client
208 .call(
209 ForestChainExportStatus::request(())?.with_timeout(Duration::from_secs(30)),
210 )
211 .await?;
212 if !result.exporting
213 && let Format::Text = format
214 {
215 if result.cancelled {
216 println!("No export in progress (last export was cancelled)");
217 } else {
218 println!("No export in progress");
219 }
220 return Ok(());
221 }
222 if wait {
223 let elapsed = chrono::Utc::now()
224 .signed_duration_since(result.start_time.unwrap_or_default())
225 .to_std()
226 .unwrap_or(Duration::ZERO);
227 let pb = ProgressBar::new(10000)
228 .with_elapsed(elapsed)
229 .with_message("Exporting");
230 pb.set_style(
231 ProgressStyle::with_template(
232 "[{elapsed_precise}] [{wide_bar}] {percent}% {msg} ",
233 )
234 .expect("indicatif template must be valid")
235 .progress_chars("#>-"),
236 );
237 loop {
238 let result = client
239 .call(
240 ForestChainExportStatus::request(())?
241 .with_timeout(Duration::from_secs(30)),
242 )
243 .await?;
244 if result.cancelled {
245 pb.set_message("Export cancelled");
246 pb.abandon();
247
248 return Ok(());
249 }
250 let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64;
251 pb.set_position(position);
252
253 if position >= 10000 {
254 break;
255 }
256 tokio::time::sleep(Duration::from_millis(500)).await;
257 }
258 pb.finish_with_message("Export completed");
259
260 return Ok(());
261 }
262 match format {
263 Format::Text => {
264 println!("Exporting: {:.1}%", result.progress.clamp(0.0, 1.0) * 100.0);
265 }
266 Format::Json => {
267 println!("{}", serde_json::to_string_pretty(&result)?);
268 }
269 }
270
271 Ok(())
272 }
273 Self::ExportCancel {} => {
274 let result = client
275 .call(
276 ForestChainExportCancel::request(())?.with_timeout(Duration::from_secs(30)),
277 )
278 .await?;
279 if result {
280 println!("Export cancelled.");
281 } else {
282 println!("No export in progress to cancel.");
283 }
284 Ok(())
285 }
286 Self::ExportDiff {
287 output_path,
288 from,
289 to,
290 depth,
291 } => {
292 let raw_network_name = StateNetworkName::call(&client, ()).await?;
293
294 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
297 calibnet::NETWORK_COMMON_NAME
298 } else {
299 raw_network_name.as_str()
300 };
301
302 let depth = depth.unwrap_or_else(|| from - to);
303 anyhow::ensure!(depth > 0, "depth must be positive");
304
305 let output_path = match output_path.is_dir() {
306 true => output_path.join(format!(
307 "forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst"
308 )),
309 false => output_path.clone(),
310 };
311
312 let output_dir = output_path.parent().context("invalid output path")?;
313 let temp_path = new_forest_car_temp_path_in(output_dir)?;
314
315 let params = ForestChainExportDiffParams {
316 output_path: temp_path.to_path_buf(),
317 from,
318 to,
319 depth,
320 };
321
322 let pb = ProgressBar::new_spinner().with_style(
323 ProgressStyle::with_template(
324 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
325 )
326 .expect("indicatif template must be valid"),
327 ).with_message(format!("Exporting {} ...", output_path.display()));
328 pb.enable_steady_tick(std::time::Duration::from_millis(80));
329 let handle = tokio::spawn({
330 let path: PathBuf = (&temp_path).into();
331 let pb = pb.clone();
332 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
333 async move {
334 loop {
335 interval.tick().await;
336 if let Ok(meta) = std::fs::metadata(&path) {
337 pb.set_position(meta.len());
338 }
339 }
340 }
341 });
342
343 client
346 .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX))
347 .await?;
348
349 handle.abort();
350 pb.finish();
351 _ = handle.await;
352
353 temp_path.persist(output_path)?;
354 println!("Export completed.");
355 Ok(())
356 }
357 }
358 }
359}
360
361async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> {
364 let checksum_file_content = format!(
365 "{encoded_hash} {}\n",
366 source
367 .file_name()
368 .and_then(std::ffi::OsStr::to_str)
369 .context("Failed to retrieve file name while saving checksum")?
370 );
371
372 let checksum_path = PathBuf::from(source).with_extension("sha256sum");
373
374 let mut checksum_file = tokio::fs::File::create(&checksum_path).await?;
375 checksum_file
376 .write_all(checksum_file_content.as_bytes())
377 .await?;
378 checksum_file.flush().await?;
379 Ok(())
380}