1use crate::chain::FilecoinSnapshotVersion;
5use crate::chain_sync::chain_muxer::DEFAULT_RECENT_STATE_ROOTS;
6use crate::cli_shared::snapshot::{self, TrustedVendor};
7use crate::db::car::forest::new_forest_car_temp_path_in;
8use crate::networks::calibnet;
9use crate::rpc::chain::ForestChainExportDiffParams;
10use crate::rpc::types::ApiExportResult;
11use crate::rpc::{self, chain::ForestChainExportParams, prelude::*};
12use crate::shim::policy::policy_constants::CHAIN_FINALITY;
13use anyhow::Context as _;
14use chrono::DateTime;
15use clap::Subcommand;
16use indicatif::{ProgressBar, ProgressStyle};
17use std::{
18 path::{Path, PathBuf},
19 time::Duration,
20};
21use tokio::io::AsyncWriteExt;
22
23#[derive(Debug, Clone, clap::ValueEnum)]
24pub enum Format {
25 Json,
26 Text,
27}
28
29#[derive(Debug, Subcommand)]
30pub enum SnapshotCommands {
31 Export {
33 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
35 output_path: PathBuf,
36 #[arg(long)]
38 skip_checksum: bool,
39 #[arg(long)]
41 dry_run: bool,
42 #[arg(short, long)]
44 tipset: Option<i64>,
45 #[arg(short, long, default_value_t = DEFAULT_RECENT_STATE_ROOTS)]
47 depth: crate::chain::ChainEpochDelta,
48 #[arg(long, value_enum, default_value_t = FilecoinSnapshotVersion::V2)]
50 format: FilecoinSnapshotVersion,
51 },
52 ExportStatus {
54 #[arg(long)]
56 wait: bool,
57 #[arg(long, value_enum, default_value_t = Format::Text)]
59 format: Format,
60 },
61 ExportCancel {},
63 ExportDiff {
65 #[arg(short, long, default_value = ".", verbatim_doc_comment)]
67 output_path: PathBuf,
68 #[arg(long)]
70 from: i64,
71 #[arg(long)]
73 to: i64,
74 #[arg(short, long)]
76 depth: Option<crate::chain::ChainEpochDelta>,
77 },
78}
79
80impl SnapshotCommands {
81 pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> {
82 match self {
83 Self::Export {
84 output_path,
85 skip_checksum,
86 dry_run,
87 tipset,
88 depth,
89 format,
90 } => {
91 anyhow::ensure!(
92 depth >= 0,
93 "--depth must be non-negative; use 0 for spine-only snapshots"
94 );
95
96 if depth < CHAIN_FINALITY {
97 tracing::warn!(
98 "Depth {depth} should be no less than CHAIN_FINALITY {CHAIN_FINALITY} to export a valid lite snapshot"
99 );
100 }
101
102 let raw_network_name = StateNetworkName::call(&client, ()).await?;
103 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
106 calibnet::NETWORK_COMMON_NAME
107 } else {
108 raw_network_name.as_str()
109 };
110
111 let tipset = if let Some(epoch) = tipset {
112 client
114 .call(
115 ChainGetTipSetByHeight::request((epoch, Default::default()))?
116 .with_timeout(Duration::from_secs(60 * 15)),
117 )
118 .await?
119 } else {
120 ChainHead::call(&client, ()).await?
121 };
122
123 let output_path = match output_path.is_dir() {
124 true => output_path.join(snapshot::filename(
125 TrustedVendor::Forest,
126 chain_name,
127 DateTime::from_timestamp(tipset.min_ticket_block().timestamp as i64, 0)
128 .unwrap_or_default()
129 .naive_utc()
130 .date(),
131 tipset.epoch(),
132 true,
133 )),
134 false => output_path.clone(),
135 };
136
137 let output_dir = output_path.parent().context("invalid output path")?;
138 let temp_path = new_forest_car_temp_path_in(output_dir)?;
139
140 let params = ForestChainExportParams {
141 version: format,
142 epoch: tipset.epoch(),
143 recent_roots: depth,
144 output_path: temp_path.to_path_buf(),
145 tipset_keys: tipset.key().clone().into(),
146 skip_checksum,
147 dry_run,
148 };
149
150 let pb = ProgressBar::new_spinner().with_style(
151 ProgressStyle::with_template(
152 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
153 )
154 .expect("indicatif template must be valid"),
155 ).with_message(format!("Exporting v{} snapshot to {} ...", format as u64, output_path.display()));
156 pb.enable_steady_tick(std::time::Duration::from_millis(80));
157 let handle = tokio::spawn({
158 let path: PathBuf = (&temp_path).into();
159 let pb = pb.clone();
160 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
161 async move {
162 loop {
163 interval.tick().await;
164 if let Ok(meta) = std::fs::metadata(&path) {
165 pb.set_position(meta.len());
166 }
167 }
168 }
169 });
170
171 let export_result = client
174 .call(ForestChainExport::request((params,))?.with_timeout(Duration::MAX))
175 .await?;
176
177 handle.abort();
178 pb.finish();
179 _ = handle.await;
180
181 if !dry_run {
182 match export_result.clone() {
183 ApiExportResult::Done(hash_opt) => {
184 temp_path.persist(&output_path)?;
186 if let Some(hash) = hash_opt {
187 save_checksum(&output_path, hash).await?;
188 }
189 }
190 ApiExportResult::Cancelled => { }
191 }
192 }
193
194 match export_result {
195 ApiExportResult::Done(_) => {
196 println!("Export completed.");
197 }
198 ApiExportResult::Cancelled => {
199 println!("Export cancelled.");
200 }
201 }
202 Ok(())
203 }
204 Self::ExportStatus { wait, format } => {
205 let result = client
206 .call(
207 ForestChainExportStatus::request(())?.with_timeout(Duration::from_secs(30)),
208 )
209 .await?;
210 if !result.exporting
211 && let Format::Text = format
212 {
213 if result.cancelled {
214 println!("No export in progress (last export was cancelled)");
215 } else {
216 println!("No export in progress");
217 }
218 return Ok(());
219 }
220 if wait {
221 let elapsed = chrono::Utc::now()
222 .signed_duration_since(result.start_time.unwrap_or_default())
223 .to_std()
224 .unwrap_or(Duration::ZERO);
225 let pb = ProgressBar::new(10000)
226 .with_elapsed(elapsed)
227 .with_message("Exporting");
228 pb.set_style(
229 ProgressStyle::with_template(
230 "[{elapsed_precise}] [{wide_bar}] {percent}% {msg} ",
231 )
232 .expect("indicatif template must be valid")
233 .progress_chars("#>-"),
234 );
235 loop {
236 let result = client
237 .call(
238 ForestChainExportStatus::request(())?
239 .with_timeout(Duration::from_secs(30)),
240 )
241 .await?;
242 if result.cancelled {
243 pb.set_message("Export cancelled");
244 pb.abandon();
245
246 return Ok(());
247 }
248 let position = (result.progress.clamp(0.0, 1.0) * 10000.0).trunc() as u64;
249 pb.set_position(position);
250
251 if position >= 10000 {
252 break;
253 }
254 tokio::time::sleep(Duration::from_millis(500)).await;
255 }
256 pb.finish_with_message("Export completed");
257
258 return Ok(());
259 }
260 match format {
261 Format::Text => {
262 println!("Exporting: {:.1}%", result.progress.clamp(0.0, 1.0) * 100.0);
263 }
264 Format::Json => {
265 println!("{}", serde_json::to_string_pretty(&result)?);
266 }
267 }
268
269 Ok(())
270 }
271 Self::ExportCancel {} => {
272 let result = client
273 .call(
274 ForestChainExportCancel::request(())?.with_timeout(Duration::from_secs(30)),
275 )
276 .await?;
277 if result {
278 println!("Export cancelled.");
279 } else {
280 println!("No export in progress to cancel.");
281 }
282 Ok(())
283 }
284 Self::ExportDiff {
285 output_path,
286 from,
287 to,
288 depth,
289 } => {
290 let raw_network_name = StateNetworkName::call(&client, ()).await?;
291
292 let chain_name = if raw_network_name == calibnet::NETWORK_GENESIS_NAME {
295 calibnet::NETWORK_COMMON_NAME
296 } else {
297 raw_network_name.as_str()
298 };
299
300 let depth = depth.unwrap_or_else(|| from - to);
301 anyhow::ensure!(depth > 0, "depth must be positive");
302
303 let output_path = match output_path.is_dir() {
304 true => output_path.join(format!(
305 "forest_snapshot_diff_{chain_name}_{from}_{to}+{depth}.car.zst"
306 )),
307 false => output_path.clone(),
308 };
309
310 let output_dir = output_path.parent().context("invalid output path")?;
311 let temp_path = new_forest_car_temp_path_in(output_dir)?;
312
313 let params = ForestChainExportDiffParams {
314 output_path: temp_path.to_path_buf(),
315 from,
316 to,
317 depth,
318 };
319
320 let pb = ProgressBar::new_spinner().with_style(
321 ProgressStyle::with_template(
322 "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})",
323 )
324 .expect("indicatif template must be valid"),
325 ).with_message(format!("Exporting {} ...", output_path.display()));
326 pb.enable_steady_tick(std::time::Duration::from_millis(80));
327 let handle = tokio::spawn({
328 let path: PathBuf = (&temp_path).into();
329 let pb = pb.clone();
330 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
331 async move {
332 loop {
333 interval.tick().await;
334 if let Ok(meta) = std::fs::metadata(&path) {
335 pb.set_position(meta.len());
336 }
337 }
338 }
339 });
340
341 client
344 .call(ForestChainExportDiff::request((params,))?.with_timeout(Duration::MAX))
345 .await?;
346
347 handle.abort();
348 pb.finish();
349 _ = handle.await;
350
351 temp_path.persist(output_path)?;
352 println!("Export completed.");
353 Ok(())
354 }
355 }
356 }
357}
358
359async fn save_checksum(source: &Path, encoded_hash: String) -> anyhow::Result<()> {
362 let checksum_file_content = format!(
363 "{encoded_hash} {}\n",
364 source
365 .file_name()
366 .and_then(std::ffi::OsStr::to_str)
367 .context("Failed to retrieve file name while saving checksum")?
368 );
369
370 let checksum_path = PathBuf::from(source).with_extension("sha256sum");
371
372 let mut checksum_file = tokio::fs::File::create(&checksum_path).await?;
373 checksum_file
374 .write_all(checksum_file_content.as_bytes())
375 .await?;
376 checksum_file.flush().await?;
377 Ok(())
378}